This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new d831673c2c Kafka Connect: Use GenericFileWriterFactory instead of
GenericAppenderFactory (#14328)
d831673c2c is described below
commit d831673c2ccba4213de9e1d859045526e3b2e333
Author: pvary <[email protected]>
AuthorDate: Wed Oct 15 17:46:41 2025 +0200
Kafka Connect: Use GenericFileWriterFactory instead of
GenericAppenderFactory (#14328)
---
.../iceberg/data/GenericFileWriterFactory.java | 61 +++++++++++++++++-----
.../connect/data/PartitionedAppendWriter.java | 6 +--
.../apache/iceberg/connect/data/RecordUtils.java | 37 +++++++------
3 files changed, 73 insertions(+), 31 deletions(-)
diff --git
a/data/src/main/java/org/apache/iceberg/data/GenericFileWriterFactory.java
b/data/src/main/java/org/apache/iceberg/data/GenericFileWriterFactory.java
index 2a96f93df2..58ea9bafd6 100644
--- a/data/src/main/java/org/apache/iceberg/data/GenericFileWriterFactory.java
+++ b/data/src/main/java/org/apache/iceberg/data/GenericFileWriterFactory.java
@@ -34,9 +34,38 @@ import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-class GenericFileWriterFactory extends BaseFileWriterFactory<Record> {
+public class GenericFileWriterFactory extends BaseFileWriterFactory<Record> {
+ GenericFileWriterFactory(
+ Table table,
+ FileFormat dataFileFormat,
+ Schema dataSchema,
+ SortOrder dataSortOrder,
+ FileFormat deleteFileFormat,
+ int[] equalityFieldIds,
+ Schema equalityDeleteRowSchema,
+ SortOrder equalityDeleteSortOrder,
+ Schema positionDeleteRowSchema,
+ Map<String, String> writerProperties) {
+ super(
+ table,
+ dataFileFormat,
+ dataSchema,
+ dataSortOrder,
+ deleteFileFormat,
+ equalityFieldIds,
+ equalityDeleteRowSchema,
+ equalityDeleteSortOrder,
+ positionDeleteRowSchema,
+ writerProperties);
+ }
+
+ /**
+ * @deprecated as of 1.11.0; it will be removed in 1.12.0
+ */
+ @Deprecated
GenericFileWriterFactory(
Table table,
FileFormat dataFileFormat,
@@ -108,7 +137,7 @@ class GenericFileWriterFactory extends
BaseFileWriterFactory<Record> {
builder.createWriterFunc(GenericOrcWriter::buildWriter);
}
- static class Builder {
+ public static class Builder {
private final Table table;
private FileFormat dataFileFormat;
private Schema dataSchema;
@@ -118,8 +147,9 @@ class GenericFileWriterFactory extends
BaseFileWriterFactory<Record> {
private Schema equalityDeleteRowSchema;
private SortOrder equalityDeleteSortOrder;
private Schema positionDeleteRowSchema;
+ private Map<String, String> writerProperties = ImmutableMap.of();
- Builder(Table table) {
+ public Builder(Table table) {
this.table = table;
this.dataSchema = table.schema();
@@ -134,47 +164,53 @@ class GenericFileWriterFactory extends
BaseFileWriterFactory<Record> {
this.deleteFileFormat = FileFormat.fromString(deleteFileFormatName);
}
- Builder dataFileFormat(FileFormat newDataFileFormat) {
+ public Builder dataFileFormat(FileFormat newDataFileFormat) {
this.dataFileFormat = newDataFileFormat;
return this;
}
- Builder dataSchema(Schema newDataSchema) {
+ public Builder dataSchema(Schema newDataSchema) {
this.dataSchema = newDataSchema;
return this;
}
- Builder dataSortOrder(SortOrder newDataSortOrder) {
+ public Builder dataSortOrder(SortOrder newDataSortOrder) {
this.dataSortOrder = newDataSortOrder;
return this;
}
- Builder deleteFileFormat(FileFormat newDeleteFileFormat) {
+ public Builder deleteFileFormat(FileFormat newDeleteFileFormat) {
this.deleteFileFormat = newDeleteFileFormat;
return this;
}
- Builder equalityFieldIds(int[] newEqualityFieldIds) {
+ public Builder equalityFieldIds(int[] newEqualityFieldIds) {
this.equalityFieldIds = newEqualityFieldIds;
return this;
}
- Builder equalityDeleteRowSchema(Schema newEqualityDeleteRowSchema) {
+ public Builder equalityDeleteRowSchema(Schema newEqualityDeleteRowSchema) {
this.equalityDeleteRowSchema = newEqualityDeleteRowSchema;
return this;
}
- Builder equalityDeleteSortOrder(SortOrder newEqualityDeleteSortOrder) {
+ public Builder equalityDeleteSortOrder(SortOrder
newEqualityDeleteSortOrder) {
this.equalityDeleteSortOrder = newEqualityDeleteSortOrder;
return this;
}
+ /** Sets default writer properties. */
+ public Builder writerProperties(Map<String, String> newWriterProperties) {
+ this.writerProperties = newWriterProperties;
+ return this;
+ }
+
Builder positionDeleteRowSchema(Schema newPositionDeleteRowSchema) {
this.positionDeleteRowSchema = newPositionDeleteRowSchema;
return this;
}
- GenericFileWriterFactory build() {
+ public GenericFileWriterFactory build() {
boolean noEqualityDeleteConf = equalityFieldIds == null &&
equalityDeleteRowSchema == null;
boolean fullEqualityDeleteConf = equalityFieldIds != null &&
equalityDeleteRowSchema != null;
Preconditions.checkArgument(
@@ -190,7 +226,8 @@ class GenericFileWriterFactory extends
BaseFileWriterFactory<Record> {
equalityFieldIds,
equalityDeleteRowSchema,
equalityDeleteSortOrder,
- positionDeleteRowSchema);
+ positionDeleteRowSchema,
+ writerProperties);
}
}
}
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/PartitionedAppendWriter.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/PartitionedAppendWriter.java
index ad8b5715a9..3fe9e60650 100644
---
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/PartitionedAppendWriter.java
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/PartitionedAppendWriter.java
@@ -24,8 +24,8 @@ import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.InternalRecordWrapper;
import org.apache.iceberg.data.Record;
-import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileWriterFactory;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.PartitionedFanoutWriter;
@@ -37,12 +37,12 @@ class PartitionedAppendWriter extends
PartitionedFanoutWriter<Record> {
PartitionedAppendWriter(
PartitionSpec spec,
FileFormat format,
- FileAppenderFactory<Record> appenderFactory,
+ FileWriterFactory<Record> fileWriterFactory,
OutputFileFactory fileFactory,
FileIO io,
long targetFileSize,
Schema schema) {
- super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+ super(spec, format, fileWriterFactory, fileFactory, io, targetFileSize);
this.partitionKey = new PartitionKey(spec, schema);
this.wrapper = new InternalRecordWrapper(schema.asStruct());
}
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordUtils.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordUtils.java
index 5ac9307397..d4bf4ce2a4 100644
---
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordUtils.java
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordUtils.java
@@ -27,9 +27,9 @@ import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.connect.IcebergSinkConfig;
-import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.GenericFileWriterFactory;
import org.apache.iceberg.data.Record;
-import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileWriterFactory;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.UnpartitionedWriter;
@@ -128,20 +128,25 @@ class RecordUtils {
.collect(Collectors.toSet());
}
- FileAppenderFactory<Record> appenderFactory;
+ FileWriterFactory<Record> writerFactory;
if (identifierFieldIds == null || identifierFieldIds.isEmpty()) {
- appenderFactory =
- new GenericAppenderFactory(table.schema(), table.spec(), null, null,
null)
- .setAll(tableProps);
+ writerFactory =
+ new GenericFileWriterFactory.Builder(table)
+ .dataSchema(table.schema())
+ .dataFileFormat(format)
+ .writerProperties(tableProps)
+ .build();
} else {
- appenderFactory =
- new GenericAppenderFactory(
- table.schema(),
- table.spec(),
- Ints.toArray(identifierFieldIds),
- TypeUtil.select(table.schema(),
Sets.newHashSet(identifierFieldIds)),
- null)
- .setAll(tableProps);
+ writerFactory =
+ new GenericFileWriterFactory.Builder(table)
+ .dataSchema(table.schema())
+ .dataFileFormat(format)
+ .equalityFieldIds(Ints.toArray(identifierFieldIds))
+ .equalityDeleteRowSchema(
+ TypeUtil.select(table.schema(),
Sets.newHashSet(identifierFieldIds)))
+ .deleteFileFormat(format)
+ .writerProperties(tableProps)
+ .build();
}
// (partition ID + task ID + operation ID) must be unique
@@ -156,13 +161,13 @@ class RecordUtils {
if (table.spec().isUnpartitioned()) {
writer =
new UnpartitionedWriter<>(
- table.spec(), format, appenderFactory, fileFactory, table.io(),
targetFileSize);
+ table.spec(), format, writerFactory, fileFactory, table.io(),
targetFileSize);
} else {
writer =
new PartitionedAppendWriter(
table.spec(),
format,
- appenderFactory,
+ writerFactory,
fileFactory,
table.io(),
targetFileSize,