This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new ccf4d91e1c Flink: Dynamic Sink: Resolve effective write config at 
runtime (#15237)
ccf4d91e1c is described below

commit ccf4d91e1c0d3db1b723cb2cebb51253b0e05790
Author: Maximilian Michels <[email protected]>
AuthorDate: Thu Feb 5 16:11:10 2026 +0100

    Flink: Dynamic Sink: Resolve effective write config at runtime (#15237)
---
 .../flink/sink/dynamic/DynamicIcebergSink.java     |  42 ++++-----
 .../iceberg/flink/sink/dynamic/DynamicWriter.java  |  31 +++----
 .../flink/sink/dynamic/TestDynamicIcebergSink.java |  10 +-
 .../flink/sink/dynamic/TestDynamicWriter.java      | 101 ++++++++++++++++++++-
 4 files changed, 134 insertions(+), 50 deletions(-)

diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
index 61b1f84a43..afafbe5b59 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
@@ -46,13 +46,11 @@ import 
org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.util.OutputTag;
-import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.flink.CatalogLoader;
 import org.apache.iceberg.flink.FlinkWriteConf;
 import org.apache.iceberg.flink.FlinkWriteOptions;
 import org.apache.iceberg.flink.sink.IcebergSink;
-import org.apache.iceberg.flink.sink.SinkUtil;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 
@@ -78,11 +76,7 @@ public class DynamicIcebergSink
   private final String uidPrefix;
   private final String sinkId;
   private final Map<String, String> writeProperties;
-  private final transient FlinkWriteConf flinkWriteConf;
-  private final FileFormat dataFileFormat;
-  private final long targetDataFileSize;
-  private final boolean overwriteMode;
-  private final int workerPoolSize;
+  private final Configuration flinkConfig;
   private final int cacheMaximumSize;
 
   DynamicIcebergSink(
@@ -90,17 +84,13 @@ public class DynamicIcebergSink
       Map<String, String> snapshotProperties,
       String uidPrefix,
       Map<String, String> writeProperties,
-      FlinkWriteConf flinkWriteConf,
+      Configuration flinkConfig,
       int cacheMaximumSize) {
     this.catalogLoader = catalogLoader;
     this.snapshotProperties = snapshotProperties;
     this.uidPrefix = uidPrefix;
     this.writeProperties = writeProperties;
-    this.flinkWriteConf = flinkWriteConf;
-    this.dataFileFormat = flinkWriteConf.dataFileFormat();
-    this.targetDataFileSize = flinkWriteConf.targetDataFileSize();
-    this.overwriteMode = flinkWriteConf.overwriteMode();
-    this.workerPoolSize = flinkWriteConf.workerPoolSize();
+    this.flinkConfig = flinkConfig;
     this.cacheMaximumSize = cacheMaximumSize;
     // We generate a random UUID every time when a sink is created.
     // This is used to separate files generated by different sinks writing the 
same table.
@@ -112,9 +102,8 @@ public class DynamicIcebergSink
   public SinkWriter<DynamicRecordInternal> createWriter(WriterInitContext 
context) {
     return new DynamicWriter(
         catalogLoader.loadCatalog(),
-        dataFileFormat,
-        targetDataFileSize,
         writeProperties,
+        flinkConfig,
         cacheMaximumSize,
         new DynamicWriterMetrics(context.metricGroup()),
         context.getTaskInfo().getIndexOfThisSubtask(),
@@ -123,12 +112,13 @@ public class DynamicIcebergSink
 
   @Override
   public Committer<DynamicCommittable> createCommitter(CommitterInitContext 
context) {
+    FlinkWriteConf flinkWriteConf = new FlinkWriteConf(writeProperties, 
flinkConfig);
     DynamicCommitterMetrics metrics = new 
DynamicCommitterMetrics(context.metricGroup());
     return new DynamicCommitter(
         catalogLoader.loadCatalog(),
         snapshotProperties,
-        overwriteMode,
-        workerPoolSize,
+        flinkWriteConf.overwriteMode(),
+        flinkWriteConf.workerPoolSize(),
         sinkId,
         metrics);
   }
@@ -373,17 +363,19 @@ public class DynamicIcebergSink
           generator != null, "Please use withGenerator() to convert the input 
DataStream.");
       Preconditions.checkNotNull(catalogLoader, "Catalog loader shouldn't be 
null");
 
-      FlinkWriteConf flinkWriteConf = new FlinkWriteConf(writeOptions, 
readableConfig);
-      Map<String, String> writeProperties =
-          SinkUtil.writeProperties(flinkWriteConf.dataFileFormat(), 
flinkWriteConf, null);
       uidPrefix = Optional.ofNullable(uidPrefix).orElse("");
 
-      return instantiateSink(writeProperties, flinkWriteConf);
+      Configuration flinkConfig =
+          readableConfig instanceof Configuration
+              ? (Configuration) readableConfig
+              : Configuration.fromMap(readableConfig.toMap());
+
+      return instantiateSink(writeOptions, flinkConfig);
     }
 
     @VisibleForTesting
     DynamicIcebergSink instantiateSink(
-        Map<String, String> writeProperties, FlinkWriteConf flinkWriteConf) {
+        Map<String, String> writeProperties, Configuration flinkWriteConf) {
       return new DynamicIcebergSink(
           catalogLoader,
           snapshotSummary,
@@ -441,8 +433,10 @@ public class DynamicIcebergSink
               .union(converted)
               .sinkTo(sink)
               .uid(prefixIfNotNull(uidPrefix, "-sink"));
-      if (sink.flinkWriteConf.writeParallelism() != null) {
-        
rowDataDataStreamSink.setParallelism(sink.flinkWriteConf.writeParallelism());
+
+      FlinkWriteConf flinkWriteConf = new FlinkWriteConf(writeOptions, 
readableConfig);
+      if (flinkWriteConf.writeParallelism() != null) {
+        
rowDataDataStreamSink.setParallelism(flinkWriteConf.writeParallelism());
       }
 
       return rowDataDataStreamSink;
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
index 8425ea747f..fcd0d08270 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
@@ -28,14 +28,16 @@ import java.util.concurrent.TimeUnit;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
 import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.data.RowData;
-import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.FlinkWriteConf;
 import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
+import org.apache.iceberg.flink.sink.SinkUtil;
 import org.apache.iceberg.io.TaskWriter;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
@@ -56,27 +58,24 @@ class DynamicWriter implements 
CommittingSinkWriter<DynamicRecordInternal, Dynam
 
   private final Map<WriteTarget, RowDataTaskWriterFactory> taskWriterFactories;
   private final Map<WriteTarget, TaskWriter<RowData>> writers;
+  private final Configuration flinkConfig;
+  private final Map<String, String> commonWriteProperties;
   private final DynamicWriterMetrics metrics;
   private final int subTaskId;
   private final int attemptId;
   private final Catalog catalog;
-  private final FileFormat dataFileFormat;
-  private final long targetDataFileSize;
-  private final Map<String, String> commonWriteProperties;
 
   DynamicWriter(
       Catalog catalog,
-      FileFormat dataFileFormat,
-      long targetDataFileSize,
       Map<String, String> commonWriteProperties,
+      Configuration flinkConfig,
       int cacheMaximumSize,
       DynamicWriterMetrics metrics,
       int subTaskId,
       int attemptId) {
     this.catalog = catalog;
-    this.dataFileFormat = dataFileFormat;
-    this.targetDataFileSize = targetDataFileSize;
     this.commonWriteProperties = commonWriteProperties;
+    this.flinkConfig = flinkConfig;
     this.metrics = metrics;
     this.subTaskId = subTaskId;
     this.attemptId = attemptId;
@@ -106,10 +105,6 @@ class DynamicWriter implements 
CommittingSinkWriter<DynamicRecordInternal, Dynam
                         Table table =
                             
catalog.loadTable(TableIdentifier.parse(factoryKey.tableName()));
 
-                        Map<String, String> tableWriteProperties =
-                            Maps.newHashMap(table.properties());
-                        tableWriteProperties.putAll(commonWriteProperties);
-
                         Set<Integer> equalityFieldIds =
                             getEqualityFields(table, element.equalityFields());
                         if (element.upsertMode()) {
@@ -128,12 +123,18 @@ class DynamicWriter implements 
CommittingSinkWriter<DynamicRecordInternal, Dynam
                           }
                         }
 
+                        FlinkWriteConf flinkWriteConf =
+                            new FlinkWriteConf(table, commonWriteProperties, 
flinkConfig);
+                        Map<String, String> tableWriteProperties =
+                            SinkUtil.writeProperties(
+                                flinkWriteConf.dataFileFormat(), 
flinkWriteConf, table);
+
                         LOG.debug("Creating new writer factory for table 
'{}'", table.name());
                         return new RowDataTaskWriterFactory(
                             () -> table,
                             FlinkSchemaUtil.convert(element.schema()),
-                            targetDataFileSize,
-                            dataFileFormat,
+                            flinkWriteConf.targetDataFileSize(),
+                            flinkWriteConf.dataFileFormat(),
                             tableWriteProperties,
                             Lists.newArrayList(equalityFieldIds),
                             element.upsertMode(),
@@ -165,8 +166,6 @@ class DynamicWriter implements 
CommittingSinkWriter<DynamicRecordInternal, Dynam
     return MoreObjects.toStringHelper(this)
         .add("subtaskId", subTaskId)
         .add("attemptId", attemptId)
-        .add("dataFileFormat", dataFileFormat)
-        .add("targetDataFileSize", targetDataFileSize)
         .add("writeProperties", commonWriteProperties)
         .toString();
   }
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
index dc05d0f327..ad4a13d561 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
@@ -1296,14 +1296,14 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
 
     @Override
     DynamicIcebergSink instantiateSink(
-        Map<String, String> writeProperties, FlinkWriteConf flinkWriteConf) {
+        Map<String, String> writeProperties, Configuration flinkConfig) {
       return new CommitHookDynamicIcebergSink(
           commitHook,
           CATALOG_EXTENSION.catalogLoader(),
           Collections.emptyMap(),
           "uidPrefix",
           writeProperties,
-          flinkWriteConf,
+          flinkConfig,
           100);
     }
   }
@@ -1319,17 +1319,17 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
         Map<String, String> snapshotProperties,
         String uidPrefix,
         Map<String, String> writeProperties,
-        FlinkWriteConf flinkWriteConf,
+        Configuration flinkConfig,
         int cacheMaximumSize) {
       super(
           catalogLoader,
           snapshotProperties,
           uidPrefix,
           writeProperties,
-          flinkWriteConf,
+          flinkConfig,
           cacheMaximumSize);
       this.commitHook = commitHook;
-      this.overwriteMode = flinkWriteConf.overwriteMode();
+      this.overwriteMode = new FlinkWriteConf(writeProperties, 
flinkConfig).overwriteMode();
     }
 
     @Override
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
index d17848225f..8e346cd8a1 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
@@ -26,6 +26,7 @@ import java.net.URI;
 import java.util.Collection;
 import java.util.Map;
 import javax.annotation.Nonnull;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.table.data.RowData;
 import org.apache.iceberg.FileFormat;
@@ -34,6 +35,7 @@ import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.common.DynFields;
 import org.apache.iceberg.data.BaseFileWriterFactory;
+import org.apache.iceberg.flink.FlinkWriteOptions;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase;
 import org.apache.iceberg.io.BaseTaskWriter;
@@ -140,7 +142,7 @@ class TestDynamicWriter extends TestFlinkIcebergSinkBase {
   }
 
   @Test
-  void testDynamicWriterPropertiesPriority() throws Exception {
+  void testFlinkConfigOverridesTableProperties() throws Exception {
     Catalog catalog = CATALOG_EXTENSION.catalog();
     Table table1 =
         catalog.createTable(
@@ -149,11 +151,45 @@ class TestDynamicWriter extends TestFlinkIcebergSinkBase {
             null,
             ImmutableMap.of("write.parquet.compression-codec", "zstd"));
 
+    Configuration flinkConfig = new Configuration();
+    flinkConfig.set(FlinkWriteOptions.COMPRESSION_CODEC, "snappy");
+
     DynamicWriter dynamicWriter =
-        createDynamicWriter(catalog, 
ImmutableMap.of("write.parquet.compression-codec", "gzip"));
+        new DynamicWriter(
+            catalog,
+            Map.of(),
+            flinkConfig,
+            100,
+            new 
DynamicWriterMetrics(UnregisteredMetricsGroup.createSinkWriterMetricGroup()),
+            0,
+            0);
     DynamicRecordInternal record1 = getDynamicRecordInternal(table1);
 
-    assertThat(getNumDataFiles(table1)).isEqualTo(0);
+    dynamicWriter.write(record1, null);
+    Map<String, String> properties = properties(dynamicWriter);
+    assertThat(properties).containsEntry("write.parquet.compression-codec", 
"snappy");
+
+    dynamicWriter.close();
+  }
+
+  @Test
+  void testWritePropertiesOverrideFlinkConfig() throws Exception {
+    Catalog catalog = CATALOG_EXTENSION.catalog();
+    Table table1 = catalog.createTable(TABLE1, SimpleDataUtil.SCHEMA);
+
+    Configuration flinkConfig = new Configuration();
+    flinkConfig.set(FlinkWriteOptions.COMPRESSION_CODEC, "snappy");
+
+    DynamicWriter dynamicWriter =
+        new DynamicWriter(
+            catalog,
+            ImmutableMap.of("compression-codec", "gzip"),
+            flinkConfig,
+            100,
+            new 
DynamicWriterMetrics(UnregisteredMetricsGroup.createSinkWriterMetricGroup()),
+            0,
+            0);
+    DynamicRecordInternal record1 = getDynamicRecordInternal(table1);
 
     dynamicWriter.write(record1, null);
     Map<String, String> properties = properties(dynamicWriter);
@@ -162,6 +198,62 @@ class TestDynamicWriter extends TestFlinkIcebergSinkBase {
     dynamicWriter.close();
   }
 
+  @Test
+  void testFlinkConfigFileFormat() throws Exception {
+    Catalog catalog = CATALOG_EXTENSION.catalog();
+    Table table1 = catalog.createTable(TABLE1, SimpleDataUtil.SCHEMA);
+
+    Configuration flinkConfig = new Configuration();
+    flinkConfig.set(FlinkWriteOptions.WRITE_FORMAT, "orc");
+
+    DynamicWriter dynamicWriter =
+        new DynamicWriter(
+            catalog,
+            Map.of(),
+            flinkConfig,
+            100,
+            new 
DynamicWriterMetrics(UnregisteredMetricsGroup.createSinkWriterMetricGroup()),
+            0,
+            0);
+    DynamicRecordInternal record1 = getDynamicRecordInternal(table1);
+
+    dynamicWriter.write(record1, null);
+    dynamicWriter.prepareCommit();
+
+    File dataDir = new File(URI.create(table1.location()).getPath(), "data");
+    File[] files = dataDir.listFiles((dir, name) -> name.endsWith(".orc"));
+    assertThat(files).isNotNull().hasSize(1);
+
+    dynamicWriter.close();
+  }
+
+  @Test
+  void testFlinkConfigTargetFileSize() throws Exception {
+    Catalog catalog = CATALOG_EXTENSION.catalog();
+    Table table1 = catalog.createTable(TABLE1, SimpleDataUtil.SCHEMA);
+
+    Configuration flinkConfig = new Configuration();
+    flinkConfig.set(FlinkWriteOptions.TARGET_FILE_SIZE_BYTES, 2048L);
+
+    DynamicWriter dynamicWriter =
+        new DynamicWriter(
+            catalog,
+            Map.of(),
+            flinkConfig,
+            100,
+            new 
DynamicWriterMetrics(UnregisteredMetricsGroup.createSinkWriterMetricGroup()),
+            0,
+            0);
+    DynamicRecordInternal record1 = getDynamicRecordInternal(table1);
+
+    dynamicWriter.write(record1, null);
+    dynamicWriter.prepareCommit();
+
+    assertThat(getNumDataFiles(table1)).isEqualTo(1);
+
+    dynamicWriter.close();
+  }
+
   @Test
   void testDynamicWriterUpsert() throws Exception {
     Catalog catalog = CATALOG_EXTENSION.catalog();
@@ -239,9 +331,8 @@ class TestDynamicWriter extends TestFlinkIcebergSinkBase {
     DynamicWriter dynamicWriter =
         new DynamicWriter(
             catalog,
-            FileFormat.PARQUET,
-            1024L,
             properties,
+            new Configuration(),
             100,
             new 
DynamicWriterMetrics(UnregisteredMetricsGroup.createSinkWriterMetricGroup()),
             0,

Reply via email to