This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new bfcb979438 Data: Moving GenericFileWriterFactory to the new
FormatModel API (#15334)
bfcb979438 is described below
commit bfcb97943831591f1340c584467b08995ae19965
Author: pvary <[email protected]>
AuthorDate: Tue Feb 17 12:40:17 2026 +0100
Data: Moving GenericFileWriterFactory to the new FormatModel API (#15334)
---
.../apache/iceberg/data/BaseFileWriterFactory.java | 15 +-
.../iceberg/data/GenericFileWriterFactory.java | 157 ++++++++++++++++--
.../data/RegistryBasedFileWriterFactory.java | 182 +++++++++++++++++++++
3 files changed, 332 insertions(+), 22 deletions(-)
diff --git
a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java
b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java
index 55f3b5701e..444c0d0226 100644
--- a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java
+++ b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java
@@ -40,7 +40,13 @@ import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-/** A base writer factory to be extended by query engine integrations. */
+/**
+ * A base writer factory to be extended by query engine integrations.
+ *
+ * @deprecated since version 1.11.0 and will be removed in 1.12.0. Use {@link
+ * RegistryBasedFileWriterFactory}
+ */
+@Deprecated
public abstract class BaseFileWriterFactory<T> implements
FileWriterFactory<T>, Serializable {
private final Table table;
private final FileFormat dataFileFormat;
@@ -75,13 +81,6 @@ public abstract class BaseFileWriterFactory<T> implements
FileWriterFactory<T>,
this.positionDeleteRowSchema = null;
}
- /**
- * @deprecated This constructor is deprecated as of version 1.11.0 and will
be removed in 1.12.0.
- * Position deletes that include row data are no longer supported. Use
{@link
- * #BaseFileWriterFactory(Table, FileFormat, Schema, SortOrder,
FileFormat, int[], Schema,
- * SortOrder, Map)} instead.
- */
- @Deprecated
protected BaseFileWriterFactory(
Table table,
FileFormat dataFileFormat,
diff --git
a/data/src/main/java/org/apache/iceberg/data/GenericFileWriterFactory.java
b/data/src/main/java/org/apache/iceberg/data/GenericFileWriterFactory.java
index e6872cc6e1..1e75b9eda9 100644
--- a/data/src/main/java/org/apache/iceberg/data/GenericFileWriterFactory.java
+++ b/data/src/main/java/org/apache/iceberg/data/GenericFileWriterFactory.java
@@ -22,21 +22,37 @@ import static
org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT;
+import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.Map;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.avro.DataWriter;
import org.apache.iceberg.data.orc.GenericOrcWriter;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.formats.FormatModelRegistry;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class GenericFileWriterFactory extends BaseFileWriterFactory<Record> {
+public class GenericFileWriterFactory extends
RegistryBasedFileWriterFactory<Record, Schema> {
+ private static final Logger LOG =
LoggerFactory.getLogger(GenericFileWriterFactory.class);
+
+ private Table table;
+ private FileFormat format;
+ private Schema positionDeleteRowSchema;
+ private Map<String, String> writerProperties;
GenericFileWriterFactory(
Table table,
@@ -50,13 +66,16 @@ public class GenericFileWriterFactory extends
BaseFileWriterFactory<Record> {
super(
table,
dataFileFormat,
+ Record.class,
dataSchema,
dataSortOrder,
deleteFileFormat,
equalityFieldIds,
equalityDeleteRowSchema,
equalityDeleteSortOrder,
- ImmutableMap.of());
+ ImmutableMap.of(),
+ null,
+ null);
}
/**
@@ -80,14 +99,20 @@ public class GenericFileWriterFactory extends
BaseFileWriterFactory<Record> {
super(
table,
dataFileFormat,
+ Record.class,
dataSchema,
dataSortOrder,
deleteFileFormat,
equalityFieldIds,
equalityDeleteRowSchema,
equalityDeleteSortOrder,
- positionDeleteRowSchema,
- writerProperties);
+ writerProperties,
+ null,
+ null);
+ this.table = table;
+ this.format = dataFileFormat;
+ this.positionDeleteRowSchema = positionDeleteRowSchema;
+ this.writerProperties = writerProperties != null ? writerProperties :
ImmutableMap.of();
}
/**
@@ -107,64 +132,168 @@ public class GenericFileWriterFactory extends
BaseFileWriterFactory<Record> {
super(
table,
dataFileFormat,
+ Record.class,
dataSchema,
dataSortOrder,
deleteFileFormat,
equalityFieldIds,
equalityDeleteRowSchema,
equalityDeleteSortOrder,
- positionDeleteRowSchema);
+ ImmutableMap.of(),
+ dataSchema,
+ equalityDeleteRowSchema);
+ this.table = table;
+ this.format = dataFileFormat;
+ this.positionDeleteRowSchema = positionDeleteRowSchema;
}
static Builder builderFor(Table table) {
return new Builder(table);
}
- @Override
+ /**
+ * @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called
starting in 1.11.0 as
+ * the configuration is done by the {@link FormatModelRegistry}.
+ */
+ @Deprecated
protected void configureDataWrite(Avro.DataWriteBuilder builder) {
builder.createWriterFunc(DataWriter::create);
}
- @Override
+ /**
+ * @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called
starting in 1.11.0 as
+ * the configuration is done by the {@link FormatModelRegistry}.
+ */
+ @Deprecated
protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) {
builder.createWriterFunc(DataWriter::create);
}
- @Override
+ /**
+ * @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called
starting in 1.11.0 as
+ * the configuration is done by the {@link FormatModelRegistry}.
+ */
+ @Deprecated
protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {
builder.createWriterFunc(DataWriter::create);
}
- @Override
+ /**
+ * @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called
starting in 1.11.0 as
+ * the configuration is done by the {@link FormatModelRegistry}.
+ */
+ @Deprecated
protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
builder.createWriterFunc(GenericParquetWriter::create);
}
- @Override
+ /**
+ * @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called
starting in 1.11.0 as
+ * the configuration is done by the {@link FormatModelRegistry}.
+ */
+ @Deprecated
protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) {
builder.createWriterFunc(GenericParquetWriter::create);
}
- @Override
+ /**
+ * @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called
starting in 1.11.0 as
+ * the configuration is done by the {@link FormatModelRegistry}.
+ */
+ @Deprecated
protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) {
builder.createWriterFunc(GenericParquetWriter::create);
}
- @Override
+ /**
+ * @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called
starting in 1.11.0 as
+ * the configuration is done by the {@link FormatModelRegistry}.
+ */
+ @Deprecated
protected void configureDataWrite(ORC.DataWriteBuilder builder) {
builder.createWriterFunc(GenericOrcWriter::buildWriter);
}
- @Override
+ /**
+ * @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called
starting in 1.11.0 as
+ * the configuration is done by the {@link FormatModelRegistry}.
+ */
+ @Deprecated
protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) {
builder.createWriterFunc(GenericOrcWriter::buildWriter);
}
- @Override
+ /**
+ * @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called
starting in 1.11.0 as
+ * the configuration is done by the {@link FormatModelRegistry}.
+ */
+ @Deprecated
protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) {
builder.createWriterFunc(GenericOrcWriter::buildWriter);
}
+ @Override
+ public PositionDeleteWriter<Record> newPositionDeleteWriter(
+ EncryptedOutputFile file, PartitionSpec spec, StructLike partition) {
+ if (positionDeleteRowSchema == null) {
+ return super.newPositionDeleteWriter(file, spec, partition);
+ } else {
+ LOG.warn(
+ "Deprecated feature used. Position delete row schema is used to
create the position delete writer.");
+ Map<String, String> properties = table == null ? ImmutableMap.of() :
table.properties();
+ MetricsConfig metricsConfig =
+ table == null
+ ? MetricsConfig.forPositionDelete()
+ : MetricsConfig.forPositionDelete(table);
+
+ try {
+ return switch (format) {
+ case AVRO ->
+ Avro.writeDeletes(file)
+ .setAll(properties)
+ .setAll(writerProperties)
+ .metricsConfig(metricsConfig)
+ .createWriterFunc(DataWriter::create)
+ .withPartition(partition)
+ .overwrite()
+ .rowSchema(positionDeleteRowSchema)
+ .withSpec(spec)
+ .withKeyMetadata(file.keyMetadata())
+ .buildPositionWriter();
+ case ORC ->
+ ORC.writeDeletes(file)
+ .setAll(properties)
+ .setAll(writerProperties)
+ .metricsConfig(metricsConfig)
+ .createWriterFunc(GenericOrcWriter::buildWriter)
+ .withPartition(partition)
+ .overwrite()
+ .rowSchema(positionDeleteRowSchema)
+ .withSpec(spec)
+ .withKeyMetadata(file.keyMetadata())
+ .buildPositionWriter();
+ case PARQUET ->
+ Parquet.writeDeletes(file)
+ .setAll(properties)
+ .setAll(writerProperties)
+ .metricsConfig(metricsConfig)
+ .createWriterFunc(GenericParquetWriter::create)
+ .withPartition(partition)
+ .overwrite()
+ .rowSchema(positionDeleteRowSchema)
+ .withSpec(spec)
+ .withKeyMetadata(file.keyMetadata())
+ .buildPositionWriter();
+ default ->
+ throw new UnsupportedOperationException(
+ "Cannot write pos-deletes for unsupported file format: " +
format);
+ };
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to create new position delete
writer", e);
+ }
+ }
+ }
+
public static class Builder {
private final Table table;
private FileFormat dataFileFormat;
diff --git
a/data/src/main/java/org/apache/iceberg/data/RegistryBasedFileWriterFactory.java
b/data/src/main/java/org/apache/iceberg/data/RegistryBasedFileWriterFactory.java
new file mode 100644
index 0000000000..868b41f584
--- /dev/null
+++
b/data/src/main/java/org/apache/iceberg/data/RegistryBasedFileWriterFactory.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UncheckedIOException;
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.apache.iceberg.formats.FileWriterBuilder;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.FileWriterFactory;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+
+/**
+ * A base writer factory to be extended by query engine integrations.
+ *
+ * @param <T> row type
+ */
+public abstract class RegistryBasedFileWriterFactory<T, S>
+ implements FileWriterFactory<T>, Serializable {
+ private final Table table;
+ private final FileFormat dataFileFormat;
+ private final Class<T> inputType;
+ private final Schema dataSchema;
+ private final SortOrder dataSortOrder;
+ private final FileFormat deleteFileFormat;
+ private final int[] equalityFieldIds;
+ private final Schema equalityDeleteRowSchema;
+ private final SortOrder equalityDeleteSortOrder;
+ private final Map<String, String> writerProperties;
+ private final S inputSchema;
+ private final S equalityDeleteInputSchema;
+
+ protected RegistryBasedFileWriterFactory(
+ Table table,
+ FileFormat dataFileFormat,
+ Class<T> inputType,
+ Schema dataSchema,
+ SortOrder dataSortOrder,
+ FileFormat deleteFileFormat,
+ int[] equalityFieldIds,
+ Schema equalityDeleteRowSchema,
+ SortOrder equalityDeleteSortOrder,
+ Map<String, String> writerProperties,
+ S inputSchema,
+ S equalityDeleteInputSchema) {
+ this.table = table;
+ this.dataFileFormat = dataFileFormat;
+ this.inputType = inputType;
+ this.dataSchema = dataSchema;
+ this.dataSortOrder = dataSortOrder;
+ this.deleteFileFormat = deleteFileFormat;
+ this.equalityFieldIds = equalityFieldIds;
+ this.equalityDeleteRowSchema = equalityDeleteRowSchema;
+ this.equalityDeleteSortOrder = equalityDeleteSortOrder;
+ this.writerProperties = writerProperties != null ? writerProperties :
ImmutableMap.of();
+ this.inputSchema = inputSchema;
+ this.equalityDeleteInputSchema = equalityDeleteInputSchema;
+ }
+
+ protected S inputSchema() {
+ return inputSchema;
+ }
+
+ protected S equalityDeleteInputSchema() {
+ return equalityDeleteInputSchema;
+ }
+
+ @Override
+ public DataWriter<T> newDataWriter(
+ EncryptedOutputFile file, PartitionSpec spec, StructLike partition) {
+ Preconditions.checkArgument(dataSchema != null, "Invalid data schema:
null");
+ EncryptionKeyMetadata keyMetadata = file.keyMetadata();
+ Map<String, String> properties = table != null ? table.properties() :
ImmutableMap.of();
+ MetricsConfig metricsConfig =
+ table != null ? MetricsConfig.forTable(table) :
MetricsConfig.getDefault();
+
+ try {
+ FileWriterBuilder<DataWriter<T>, S> builder =
+ FormatModelRegistry.dataWriteBuilder(dataFileFormat, inputType,
file);
+ return builder
+ .schema(dataSchema)
+ .engineSchema(inputSchema())
+ .setAll(properties)
+ .setAll(writerProperties)
+ .metricsConfig(metricsConfig)
+ .spec(spec)
+ .partition(partition)
+ .keyMetadata(keyMetadata)
+ .sortOrder(dataSortOrder)
+ .overwrite()
+ .build();
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to create new data writer", e);
+ }
+ }
+
+ @Override
+ public EqualityDeleteWriter<T> newEqualityDeleteWriter(
+ EncryptedOutputFile file, PartitionSpec spec, StructLike partition) {
+ Preconditions.checkArgument(equalityDeleteRowSchema != null, "Invalid
delete schema: null");
+
+ EncryptionKeyMetadata keyMetadata = file.keyMetadata();
+ Map<String, String> properties = table != null ? table.properties() :
ImmutableMap.of();
+ MetricsConfig metricsConfig =
+ table != null ? MetricsConfig.forTable(table) :
MetricsConfig.getDefault();
+
+ try {
+ FileWriterBuilder<EqualityDeleteWriter<T>, S> builder =
+ FormatModelRegistry.equalityDeleteWriteBuilder(deleteFileFormat,
inputType, file);
+ return builder
+ .setAll(properties)
+ .setAll(writerProperties)
+ .metricsConfig(metricsConfig)
+ .schema(equalityDeleteRowSchema)
+ .engineSchema(equalityDeleteInputSchema())
+ .equalityFieldIds(equalityFieldIds)
+ .spec(spec)
+ .partition(partition)
+ .keyMetadata(keyMetadata)
+ .sortOrder(equalityDeleteSortOrder)
+ .overwrite()
+ .build();
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to create new equality delete
writer", e);
+ }
+ }
+
+ @Override
+ public PositionDeleteWriter<T> newPositionDeleteWriter(
+ EncryptedOutputFile file, PartitionSpec spec, StructLike partition) {
+ EncryptionKeyMetadata keyMetadata = file.keyMetadata();
+ Map<String, String> properties = table != null ? table.properties() :
ImmutableMap.of();
+ MetricsConfig metricsConfig =
+ table != null ? MetricsConfig.forPositionDelete(table) :
MetricsConfig.forPositionDelete();
+
+ try {
+ FileWriterBuilder<PositionDeleteWriter<T>, ?> builder =
+ FormatModelRegistry.positionDeleteWriteBuilder(deleteFileFormat,
file);
+ return builder
+ .setAll(properties)
+ .setAll(writerProperties)
+ .metricsConfig(metricsConfig)
+ .spec(spec)
+ .partition(partition)
+ .keyMetadata(keyMetadata)
+ .overwrite()
+ .build();
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to create new position delete
writer", e);
+ }
+ }
+}