This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new d831673c2c Kafka Connect: Use GenericFileWriterFactory instead of 
GenericAppenderFactory (#14328)
d831673c2c is described below

commit d831673c2ccba4213de9e1d859045526e3b2e333
Author: pvary <[email protected]>
AuthorDate: Wed Oct 15 17:46:41 2025 +0200

    Kafka Connect: Use GenericFileWriterFactory instead of 
GenericAppenderFactory (#14328)
---
 .../iceberg/data/GenericFileWriterFactory.java     | 61 +++++++++++++++++-----
 .../connect/data/PartitionedAppendWriter.java      |  6 +--
 .../apache/iceberg/connect/data/RecordUtils.java   | 37 +++++++------
 3 files changed, 73 insertions(+), 31 deletions(-)

diff --git 
a/data/src/main/java/org/apache/iceberg/data/GenericFileWriterFactory.java 
b/data/src/main/java/org/apache/iceberg/data/GenericFileWriterFactory.java
index 2a96f93df2..58ea9bafd6 100644
--- a/data/src/main/java/org/apache/iceberg/data/GenericFileWriterFactory.java
+++ b/data/src/main/java/org/apache/iceberg/data/GenericFileWriterFactory.java
@@ -34,9 +34,38 @@ import org.apache.iceberg.data.parquet.GenericParquetWriter;
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 
-class GenericFileWriterFactory extends BaseFileWriterFactory<Record> {
+public class GenericFileWriterFactory extends BaseFileWriterFactory<Record> {
 
+  GenericFileWriterFactory(
+      Table table,
+      FileFormat dataFileFormat,
+      Schema dataSchema,
+      SortOrder dataSortOrder,
+      FileFormat deleteFileFormat,
+      int[] equalityFieldIds,
+      Schema equalityDeleteRowSchema,
+      SortOrder equalityDeleteSortOrder,
+      Schema positionDeleteRowSchema,
+      Map<String, String> writerProperties) {
+    super(
+        table,
+        dataFileFormat,
+        dataSchema,
+        dataSortOrder,
+        deleteFileFormat,
+        equalityFieldIds,
+        equalityDeleteRowSchema,
+        equalityDeleteSortOrder,
+        positionDeleteRowSchema,
+        writerProperties);
+  }
+
+  /**
+   * @deprecated as of 1.11.0; it will be removed in 1.12.0
+   */
+  @Deprecated
   GenericFileWriterFactory(
       Table table,
       FileFormat dataFileFormat,
@@ -108,7 +137,7 @@ class GenericFileWriterFactory extends 
BaseFileWriterFactory<Record> {
     builder.createWriterFunc(GenericOrcWriter::buildWriter);
   }
 
-  static class Builder {
+  public static class Builder {
     private final Table table;
     private FileFormat dataFileFormat;
     private Schema dataSchema;
@@ -118,8 +147,9 @@ class GenericFileWriterFactory extends 
BaseFileWriterFactory<Record> {
     private Schema equalityDeleteRowSchema;
     private SortOrder equalityDeleteSortOrder;
     private Schema positionDeleteRowSchema;
+    private Map<String, String> writerProperties = ImmutableMap.of();
 
-    Builder(Table table) {
+    public Builder(Table table) {
       this.table = table;
       this.dataSchema = table.schema();
 
@@ -134,47 +164,53 @@ class GenericFileWriterFactory extends 
BaseFileWriterFactory<Record> {
       this.deleteFileFormat = FileFormat.fromString(deleteFileFormatName);
     }
 
-    Builder dataFileFormat(FileFormat newDataFileFormat) {
+    public Builder dataFileFormat(FileFormat newDataFileFormat) {
       this.dataFileFormat = newDataFileFormat;
       return this;
     }
 
-    Builder dataSchema(Schema newDataSchema) {
+    public Builder dataSchema(Schema newDataSchema) {
       this.dataSchema = newDataSchema;
       return this;
     }
 
-    Builder dataSortOrder(SortOrder newDataSortOrder) {
+    public Builder dataSortOrder(SortOrder newDataSortOrder) {
       this.dataSortOrder = newDataSortOrder;
       return this;
     }
 
-    Builder deleteFileFormat(FileFormat newDeleteFileFormat) {
+    public Builder deleteFileFormat(FileFormat newDeleteFileFormat) {
       this.deleteFileFormat = newDeleteFileFormat;
       return this;
     }
 
-    Builder equalityFieldIds(int[] newEqualityFieldIds) {
+    public Builder equalityFieldIds(int[] newEqualityFieldIds) {
       this.equalityFieldIds = newEqualityFieldIds;
       return this;
     }
 
-    Builder equalityDeleteRowSchema(Schema newEqualityDeleteRowSchema) {
+    public Builder equalityDeleteRowSchema(Schema newEqualityDeleteRowSchema) {
       this.equalityDeleteRowSchema = newEqualityDeleteRowSchema;
       return this;
     }
 
-    Builder equalityDeleteSortOrder(SortOrder newEqualityDeleteSortOrder) {
+    public Builder equalityDeleteSortOrder(SortOrder 
newEqualityDeleteSortOrder) {
       this.equalityDeleteSortOrder = newEqualityDeleteSortOrder;
       return this;
     }
 
+    /** Sets default writer properties. */
+    public Builder writerProperties(Map<String, String> newWriterProperties) {
+      this.writerProperties = newWriterProperties;
+      return this;
+    }
+
     Builder positionDeleteRowSchema(Schema newPositionDeleteRowSchema) {
       this.positionDeleteRowSchema = newPositionDeleteRowSchema;
       return this;
     }
 
-    GenericFileWriterFactory build() {
+    public GenericFileWriterFactory build() {
       boolean noEqualityDeleteConf = equalityFieldIds == null && 
equalityDeleteRowSchema == null;
       boolean fullEqualityDeleteConf = equalityFieldIds != null && 
equalityDeleteRowSchema != null;
       Preconditions.checkArgument(
@@ -190,7 +226,8 @@ class GenericFileWriterFactory extends 
BaseFileWriterFactory<Record> {
           equalityFieldIds,
           equalityDeleteRowSchema,
           equalityDeleteSortOrder,
-          positionDeleteRowSchema);
+          positionDeleteRowSchema,
+          writerProperties);
     }
   }
 }
diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/PartitionedAppendWriter.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/PartitionedAppendWriter.java
index ad8b5715a9..3fe9e60650 100644
--- 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/PartitionedAppendWriter.java
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/PartitionedAppendWriter.java
@@ -24,8 +24,8 @@ import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.data.InternalRecordWrapper;
 import org.apache.iceberg.data.Record;
-import org.apache.iceberg.io.FileAppenderFactory;
 import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileWriterFactory;
 import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.io.PartitionedFanoutWriter;
 
@@ -37,12 +37,12 @@ class PartitionedAppendWriter extends 
PartitionedFanoutWriter<Record> {
   PartitionedAppendWriter(
       PartitionSpec spec,
       FileFormat format,
-      FileAppenderFactory<Record> appenderFactory,
+      FileWriterFactory<Record> fileWriterFactory,
       OutputFileFactory fileFactory,
       FileIO io,
       long targetFileSize,
       Schema schema) {
-    super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+    super(spec, format, fileWriterFactory, fileFactory, io, targetFileSize);
     this.partitionKey = new PartitionKey(spec, schema);
     this.wrapper = new InternalRecordWrapper(schema.asStruct());
   }
diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordUtils.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordUtils.java
index 5ac9307397..d4bf4ce2a4 100644
--- 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordUtils.java
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordUtils.java
@@ -27,9 +27,9 @@ import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.connect.IcebergSinkConfig;
-import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.GenericFileWriterFactory;
 import org.apache.iceberg.data.Record;
-import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileWriterFactory;
 import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.io.TaskWriter;
 import org.apache.iceberg.io.UnpartitionedWriter;
@@ -128,20 +128,25 @@ class RecordUtils {
               .collect(Collectors.toSet());
     }
 
-    FileAppenderFactory<Record> appenderFactory;
+    FileWriterFactory<Record> writerFactory;
     if (identifierFieldIds == null || identifierFieldIds.isEmpty()) {
-      appenderFactory =
-          new GenericAppenderFactory(table.schema(), table.spec(), null, null, 
null)
-              .setAll(tableProps);
+      writerFactory =
+          new GenericFileWriterFactory.Builder(table)
+              .dataSchema(table.schema())
+              .dataFileFormat(format)
+              .writerProperties(tableProps)
+              .build();
     } else {
-      appenderFactory =
-          new GenericAppenderFactory(
-                  table.schema(),
-                  table.spec(),
-                  Ints.toArray(identifierFieldIds),
-                  TypeUtil.select(table.schema(), 
Sets.newHashSet(identifierFieldIds)),
-                  null)
-              .setAll(tableProps);
+      writerFactory =
+          new GenericFileWriterFactory.Builder(table)
+              .dataSchema(table.schema())
+              .dataFileFormat(format)
+              .equalityFieldIds(Ints.toArray(identifierFieldIds))
+              .equalityDeleteRowSchema(
+                  TypeUtil.select(table.schema(), 
Sets.newHashSet(identifierFieldIds)))
+              .deleteFileFormat(format)
+              .writerProperties(tableProps)
+              .build();
     }
 
     // (partition ID + task ID + operation ID) must be unique
@@ -156,13 +161,13 @@ class RecordUtils {
     if (table.spec().isUnpartitioned()) {
       writer =
           new UnpartitionedWriter<>(
-              table.spec(), format, appenderFactory, fileFactory, table.io(), 
targetFileSize);
+              table.spec(), format, writerFactory, fileFactory, table.io(), 
targetFileSize);
     } else {
       writer =
           new PartitionedAppendWriter(
               table.spec(),
               format,
-              appenderFactory,
+              writerFactory,
               fileFactory,
               table.io(),
               targetFileSize,

Reply via email to