This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new ccf4d91e1c Flink: Dynamic Sink: Resolve effective write config at
runtime (#15237)
ccf4d91e1c is described below
commit ccf4d91e1c0d3db1b723cb2cebb51253b0e05790
Author: Maximilian Michels <[email protected]>
AuthorDate: Thu Feb 5 16:11:10 2026 +0100
Flink: Dynamic Sink: Resolve effective write config at runtime (#15237)
---
.../flink/sink/dynamic/DynamicIcebergSink.java | 42 ++++-----
.../iceberg/flink/sink/dynamic/DynamicWriter.java | 31 +++----
.../flink/sink/dynamic/TestDynamicIcebergSink.java | 10 +-
.../flink/sink/dynamic/TestDynamicWriter.java | 101 ++++++++++++++++++++-
4 files changed, 134 insertions(+), 50 deletions(-)
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
index 61b1f84a43..afafbe5b59 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
@@ -46,13 +46,11 @@ import
org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.OutputTag;
-import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.FlinkWriteConf;
import org.apache.iceberg.flink.FlinkWriteOptions;
import org.apache.iceberg.flink.sink.IcebergSink;
-import org.apache.iceberg.flink.sink.SinkUtil;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -78,11 +76,7 @@ public class DynamicIcebergSink
private final String uidPrefix;
private final String sinkId;
private final Map<String, String> writeProperties;
- private final transient FlinkWriteConf flinkWriteConf;
- private final FileFormat dataFileFormat;
- private final long targetDataFileSize;
- private final boolean overwriteMode;
- private final int workerPoolSize;
+ private final Configuration flinkConfig;
private final int cacheMaximumSize;
DynamicIcebergSink(
@@ -90,17 +84,13 @@ public class DynamicIcebergSink
Map<String, String> snapshotProperties,
String uidPrefix,
Map<String, String> writeProperties,
- FlinkWriteConf flinkWriteConf,
+ Configuration flinkConfig,
int cacheMaximumSize) {
this.catalogLoader = catalogLoader;
this.snapshotProperties = snapshotProperties;
this.uidPrefix = uidPrefix;
this.writeProperties = writeProperties;
- this.flinkWriteConf = flinkWriteConf;
- this.dataFileFormat = flinkWriteConf.dataFileFormat();
- this.targetDataFileSize = flinkWriteConf.targetDataFileSize();
- this.overwriteMode = flinkWriteConf.overwriteMode();
- this.workerPoolSize = flinkWriteConf.workerPoolSize();
+ this.flinkConfig = flinkConfig;
this.cacheMaximumSize = cacheMaximumSize;
// We generate a random UUID every time when a sink is created.
// This is used to separate files generated by different sinks writing the
same table.
@@ -112,9 +102,8 @@ public class DynamicIcebergSink
public SinkWriter<DynamicRecordInternal> createWriter(WriterInitContext
context) {
return new DynamicWriter(
catalogLoader.loadCatalog(),
- dataFileFormat,
- targetDataFileSize,
writeProperties,
+ flinkConfig,
cacheMaximumSize,
new DynamicWriterMetrics(context.metricGroup()),
context.getTaskInfo().getIndexOfThisSubtask(),
@@ -123,12 +112,13 @@ public class DynamicIcebergSink
@Override
public Committer<DynamicCommittable> createCommitter(CommitterInitContext
context) {
+ FlinkWriteConf flinkWriteConf = new FlinkWriteConf(writeProperties,
flinkConfig);
DynamicCommitterMetrics metrics = new
DynamicCommitterMetrics(context.metricGroup());
return new DynamicCommitter(
catalogLoader.loadCatalog(),
snapshotProperties,
- overwriteMode,
- workerPoolSize,
+ flinkWriteConf.overwriteMode(),
+ flinkWriteConf.workerPoolSize(),
sinkId,
metrics);
}
@@ -373,17 +363,19 @@ public class DynamicIcebergSink
generator != null, "Please use withGenerator() to convert the input
DataStream.");
Preconditions.checkNotNull(catalogLoader, "Catalog loader shouldn't be
null");
- FlinkWriteConf flinkWriteConf = new FlinkWriteConf(writeOptions,
readableConfig);
- Map<String, String> writeProperties =
- SinkUtil.writeProperties(flinkWriteConf.dataFileFormat(),
flinkWriteConf, null);
uidPrefix = Optional.ofNullable(uidPrefix).orElse("");
- return instantiateSink(writeProperties, flinkWriteConf);
+ Configuration flinkConfig =
+ readableConfig instanceof Configuration
+ ? (Configuration) readableConfig
+ : Configuration.fromMap(readableConfig.toMap());
+
+ return instantiateSink(writeOptions, flinkConfig);
}
@VisibleForTesting
DynamicIcebergSink instantiateSink(
- Map<String, String> writeProperties, FlinkWriteConf flinkWriteConf) {
+ Map<String, String> writeProperties, Configuration flinkWriteConf) {
return new DynamicIcebergSink(
catalogLoader,
snapshotSummary,
@@ -441,8 +433,10 @@ public class DynamicIcebergSink
.union(converted)
.sinkTo(sink)
.uid(prefixIfNotNull(uidPrefix, "-sink"));
- if (sink.flinkWriteConf.writeParallelism() != null) {
-
rowDataDataStreamSink.setParallelism(sink.flinkWriteConf.writeParallelism());
+
+ FlinkWriteConf flinkWriteConf = new FlinkWriteConf(writeOptions,
readableConfig);
+ if (flinkWriteConf.writeParallelism() != null) {
+
rowDataDataStreamSink.setParallelism(flinkWriteConf.writeParallelism());
}
return rowDataDataStreamSink;
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
index 8425ea747f..fcd0d08270 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
@@ -28,14 +28,16 @@ import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
-import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.FlinkWriteConf;
import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
+import org.apache.iceberg.flink.sink.SinkUtil;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
@@ -56,27 +58,24 @@ class DynamicWriter implements
CommittingSinkWriter<DynamicRecordInternal, Dynam
private final Map<WriteTarget, RowDataTaskWriterFactory> taskWriterFactories;
private final Map<WriteTarget, TaskWriter<RowData>> writers;
+ private final Configuration flinkConfig;
+ private final Map<String, String> commonWriteProperties;
private final DynamicWriterMetrics metrics;
private final int subTaskId;
private final int attemptId;
private final Catalog catalog;
- private final FileFormat dataFileFormat;
- private final long targetDataFileSize;
- private final Map<String, String> commonWriteProperties;
DynamicWriter(
Catalog catalog,
- FileFormat dataFileFormat,
- long targetDataFileSize,
Map<String, String> commonWriteProperties,
+ Configuration flinkConfig,
int cacheMaximumSize,
DynamicWriterMetrics metrics,
int subTaskId,
int attemptId) {
this.catalog = catalog;
- this.dataFileFormat = dataFileFormat;
- this.targetDataFileSize = targetDataFileSize;
this.commonWriteProperties = commonWriteProperties;
+ this.flinkConfig = flinkConfig;
this.metrics = metrics;
this.subTaskId = subTaskId;
this.attemptId = attemptId;
@@ -106,10 +105,6 @@ class DynamicWriter implements
CommittingSinkWriter<DynamicRecordInternal, Dynam
Table table =
catalog.loadTable(TableIdentifier.parse(factoryKey.tableName()));
- Map<String, String> tableWriteProperties =
- Maps.newHashMap(table.properties());
- tableWriteProperties.putAll(commonWriteProperties);
-
Set<Integer> equalityFieldIds =
getEqualityFields(table, element.equalityFields());
if (element.upsertMode()) {
@@ -128,12 +123,18 @@ class DynamicWriter implements
CommittingSinkWriter<DynamicRecordInternal, Dynam
}
}
+ FlinkWriteConf flinkWriteConf =
+ new FlinkWriteConf(table, commonWriteProperties,
flinkConfig);
+ Map<String, String> tableWriteProperties =
+ SinkUtil.writeProperties(
+ flinkWriteConf.dataFileFormat(),
flinkWriteConf, table);
+
LOG.debug("Creating new writer factory for table
'{}'", table.name());
return new RowDataTaskWriterFactory(
() -> table,
FlinkSchemaUtil.convert(element.schema()),
- targetDataFileSize,
- dataFileFormat,
+ flinkWriteConf.targetDataFileSize(),
+ flinkWriteConf.dataFileFormat(),
tableWriteProperties,
Lists.newArrayList(equalityFieldIds),
element.upsertMode(),
@@ -165,8 +166,6 @@ class DynamicWriter implements
CommittingSinkWriter<DynamicRecordInternal, Dynam
return MoreObjects.toStringHelper(this)
.add("subtaskId", subTaskId)
.add("attemptId", attemptId)
- .add("dataFileFormat", dataFileFormat)
- .add("targetDataFileSize", targetDataFileSize)
.add("writeProperties", commonWriteProperties)
.toString();
}
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
index dc05d0f327..ad4a13d561 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
@@ -1296,14 +1296,14 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
@Override
DynamicIcebergSink instantiateSink(
- Map<String, String> writeProperties, FlinkWriteConf flinkWriteConf) {
+ Map<String, String> writeProperties, Configuration flinkConfig) {
return new CommitHookDynamicIcebergSink(
commitHook,
CATALOG_EXTENSION.catalogLoader(),
Collections.emptyMap(),
"uidPrefix",
writeProperties,
- flinkWriteConf,
+ flinkConfig,
100);
}
}
@@ -1319,17 +1319,17 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
Map<String, String> snapshotProperties,
String uidPrefix,
Map<String, String> writeProperties,
- FlinkWriteConf flinkWriteConf,
+ Configuration flinkConfig,
int cacheMaximumSize) {
super(
catalogLoader,
snapshotProperties,
uidPrefix,
writeProperties,
- flinkWriteConf,
+ flinkConfig,
cacheMaximumSize);
this.commitHook = commitHook;
- this.overwriteMode = flinkWriteConf.overwriteMode();
+ this.overwriteMode = new FlinkWriteConf(writeProperties,
flinkConfig).overwriteMode();
}
@Override
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
index d17848225f..8e346cd8a1 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
@@ -26,6 +26,7 @@ import java.net.URI;
import java.util.Collection;
import java.util.Map;
import javax.annotation.Nonnull;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.FileFormat;
@@ -34,6 +35,7 @@ import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.common.DynFields;
import org.apache.iceberg.data.BaseFileWriterFactory;
+import org.apache.iceberg.flink.FlinkWriteOptions;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase;
import org.apache.iceberg.io.BaseTaskWriter;
@@ -140,7 +142,7 @@ class TestDynamicWriter extends TestFlinkIcebergSinkBase {
}
@Test
- void testDynamicWriterPropertiesPriority() throws Exception {
+ void testFlinkConfigOverridesTableProperties() throws Exception {
Catalog catalog = CATALOG_EXTENSION.catalog();
Table table1 =
catalog.createTable(
@@ -149,11 +151,45 @@ class TestDynamicWriter extends TestFlinkIcebergSinkBase {
null,
ImmutableMap.of("write.parquet.compression-codec", "zstd"));
+ Configuration flinkConfig = new Configuration();
+ flinkConfig.set(FlinkWriteOptions.COMPRESSION_CODEC, "snappy");
+
DynamicWriter dynamicWriter =
- createDynamicWriter(catalog,
ImmutableMap.of("write.parquet.compression-codec", "gzip"));
+ new DynamicWriter(
+ catalog,
+ Map.of(),
+ flinkConfig,
+ 100,
+ new
DynamicWriterMetrics(UnregisteredMetricsGroup.createSinkWriterMetricGroup()),
+ 0,
+ 0);
DynamicRecordInternal record1 = getDynamicRecordInternal(table1);
- assertThat(getNumDataFiles(table1)).isEqualTo(0);
+ dynamicWriter.write(record1, null);
+ Map<String, String> properties = properties(dynamicWriter);
+ assertThat(properties).containsEntry("write.parquet.compression-codec",
"snappy");
+
+ dynamicWriter.close();
+ }
+
+ @Test
+ void testWritePropertiesOverrideFlinkConfig() throws Exception {
+ Catalog catalog = CATALOG_EXTENSION.catalog();
+ Table table1 = catalog.createTable(TABLE1, SimpleDataUtil.SCHEMA);
+
+ Configuration flinkConfig = new Configuration();
+ flinkConfig.set(FlinkWriteOptions.COMPRESSION_CODEC, "snappy");
+
+ DynamicWriter dynamicWriter =
+ new DynamicWriter(
+ catalog,
+ ImmutableMap.of("compression-codec", "gzip"),
+ flinkConfig,
+ 100,
+ new
DynamicWriterMetrics(UnregisteredMetricsGroup.createSinkWriterMetricGroup()),
+ 0,
+ 0);
+ DynamicRecordInternal record1 = getDynamicRecordInternal(table1);
dynamicWriter.write(record1, null);
Map<String, String> properties = properties(dynamicWriter);
@@ -162,6 +198,62 @@ class TestDynamicWriter extends TestFlinkIcebergSinkBase {
dynamicWriter.close();
}
+ @Test
+ void testFlinkConfigFileFormat() throws Exception {
+ Catalog catalog = CATALOG_EXTENSION.catalog();
+ Table table1 = catalog.createTable(TABLE1, SimpleDataUtil.SCHEMA);
+
+ Configuration flinkConfig = new Configuration();
+ flinkConfig.set(FlinkWriteOptions.WRITE_FORMAT, "orc");
+
+ DynamicWriter dynamicWriter =
+ new DynamicWriter(
+ catalog,
+ Map.of(),
+ flinkConfig,
+ 100,
+ new
DynamicWriterMetrics(UnregisteredMetricsGroup.createSinkWriterMetricGroup()),
+ 0,
+ 0);
+ DynamicRecordInternal record1 = getDynamicRecordInternal(table1);
+
+ dynamicWriter.write(record1, null);
+ dynamicWriter.prepareCommit();
+
+ File dataDir = new File(URI.create(table1.location()).getPath(), "data");
+ File[] files = dataDir.listFiles((dir, name) -> name.endsWith(".orc"));
+ assertThat(files).isNotNull().hasSize(1);
+
+ dynamicWriter.close();
+ }
+
+ @Test
+ void testFlinkConfigTargetFileSize() throws Exception {
+ Catalog catalog = CATALOG_EXTENSION.catalog();
+ Table table1 = catalog.createTable(TABLE1, SimpleDataUtil.SCHEMA);
+
+ Configuration flinkConfig = new Configuration();
+ flinkConfig.set(FlinkWriteOptions.TARGET_FILE_SIZE_BYTES, 2048L);
+
+ DynamicWriter dynamicWriter =
+ new DynamicWriter(
+ catalog,
+ Map.of(),
+ flinkConfig,
+ 100,
+ new
DynamicWriterMetrics(UnregisteredMetricsGroup.createSinkWriterMetricGroup()),
+ 0,
+ 0);
+ DynamicRecordInternal record1 = getDynamicRecordInternal(table1);
+
+ dynamicWriter.write(record1, null);
+ dynamicWriter.prepareCommit();
+
+ assertThat(getNumDataFiles(table1)).isEqualTo(1);
+
+ dynamicWriter.close();
+ }
+
@Test
void testDynamicWriterUpsert() throws Exception {
Catalog catalog = CATALOG_EXTENSION.catalog();
@@ -239,9 +331,8 @@ class TestDynamicWriter extends TestFlinkIcebergSinkBase {
DynamicWriter dynamicWriter =
new DynamicWriter(
catalog,
- FileFormat.PARQUET,
- 1024L,
properties,
+ new Configuration(),
100,
new
DynamicWriterMetrics(UnregisteredMetricsGroup.createSinkWriterMetricGroup()),
0,