This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 68651a3498 Flink: Dynamic Iceberg Sink: Add sink / core processing 
logic / benchmarking (#13304)
68651a3498 is described below

commit 68651a34987ebd66422fe79ceaff9f987f32d6d6
Author: Maximilian Michels <m...@apache.org>
AuthorDate: Tue Jun 17 17:20:15 2025 +0200

    Flink: Dynamic Iceberg Sink: Add sink / core processing logic / 
benchmarking (#13304)
---
 ...namicRecordSerializerDeserializerBenchmark.java | 138 ++++
 .../org/apache/iceberg/flink/FlinkConfParser.java  |   7 +
 .../org/apache/iceberg/flink/FlinkWriteConf.java   |   4 +
 .../org/apache/iceberg/flink/sink/FlinkSink.java   |  49 +-
 .../org/apache/iceberg/flink/sink/IcebergSink.java |  49 +-
 .../org/apache/iceberg/flink/sink/SinkUtil.java    |  59 +-
 .../flink/sink/dynamic/DynamicCommitter.java       |   3 +-
 .../flink/sink/dynamic/DynamicIcebergSink.java     | 406 ++++++++++
 .../flink/sink/dynamic/DynamicRecordGenerator.java |  34 +
 .../flink/sink/dynamic/DynamicRecordProcessor.java | 171 +++++
 .../flink/sink/dynamic/HashKeyGenerator.java       |  16 +-
 .../org/apache/iceberg/flink/SimpleDataUtil.java   |  22 +-
 .../org/apache/iceberg/flink/TestFlinkFilters.java |   1 -
 .../java/org/apache/iceberg/flink/TestHelpers.java |   2 +-
 .../flink/sink/TestFlinkIcebergSinkBase.java       |   5 +-
 .../flink/sink/dynamic/TestDynamicIcebergSink.java | 831 +++++++++++++++++++++
 .../sink/dynamic/TestDynamicIcebergSinkPerf.java   | 245 ++++++
 17 files changed, 1933 insertions(+), 109 deletions(-)

diff --git 
a/flink/v2.0/flink/src/jmh/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordSerializerDeserializerBenchmark.java
 
b/flink/v2.0/flink/src/jmh/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordSerializerDeserializerBenchmark.java
new file mode 100644
index 0000000000..d7c3a7b32b
--- /dev/null
+++ 
b/flink/v2.0/flink/src/jmh/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordSerializerDeserializerBenchmark.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.dynamic;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+public class DynamicRecordSerializerDeserializerBenchmark {
+  private static final int SAMPLE_SIZE = 100_000;
+  private static final Schema SCHEMA =
+      new Schema(
+          Types.NestedField.required(1, "id", Types.IntegerType.get()),
+          Types.NestedField.required(2, "name2", Types.StringType.get()),
+          Types.NestedField.required(3, "name3", Types.StringType.get()),
+          Types.NestedField.required(4, "name4", Types.StringType.get()),
+          Types.NestedField.required(5, "name5", Types.StringType.get()),
+          Types.NestedField.required(6, "name6", Types.StringType.get()),
+          Types.NestedField.required(7, "name7", Types.StringType.get()),
+          Types.NestedField.required(8, "name8", Types.StringType.get()),
+          Types.NestedField.required(9, "name9", Types.StringType.get()));
+
+  private List<DynamicRecordInternal> rows = 
Lists.newArrayListWithExpectedSize(SAMPLE_SIZE);
+  private DynamicRecordInternalType type;
+
+  public static void main(String[] args) throws RunnerException {
+    Options options =
+        new OptionsBuilder()
+            
.include(DynamicRecordSerializerDeserializerBenchmark.class.getSimpleName())
+            .build();
+    new Runner(options).run();
+  }
+
+  @Setup
+  public void setupBenchmark() throws IOException {
+    List<Record> records = RandomGenericData.generate(SCHEMA, SAMPLE_SIZE, 1L);
+    this.rows =
+        records.stream()
+            .map(
+                r ->
+                    new DynamicRecordInternal(
+                        "t",
+                        "main",
+                        SCHEMA,
+                        RowDataConverter.convert(SCHEMA, r),
+                        PartitionSpec.unpartitioned(),
+                        1,
+                        false,
+                        Collections.emptySet()))
+            .collect(Collectors.toList());
+
+    File warehouse = Files.createTempFile("perf-bench", null).toFile();
+    CatalogLoader catalogLoader =
+        CatalogLoader.hadoop(
+            "hadoop",
+            new Configuration(),
+            ImmutableMap.of(CatalogProperties.WAREHOUSE_LOCATION, 
warehouse.getPath()));
+    this.type = new DynamicRecordInternalType(catalogLoader, true, 100);
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void testSerialize(Blackhole blackhole) throws IOException {
+    TypeSerializer<DynamicRecordInternal> serializer =
+        type.createSerializer((SerializerConfig) null);
+    DataOutputSerializer outputView = new DataOutputSerializer(1024);
+    for (int i = 0; i < SAMPLE_SIZE; ++i) {
+      serializer.serialize(rows.get(i), outputView);
+    }
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void testSerializeAndDeserialize(Blackhole blackhole) throws 
IOException {
+    TypeSerializer<DynamicRecordInternal> serializer =
+        type.createSerializer((SerializerConfig) null);
+
+    DataOutputSerializer outputView = new DataOutputSerializer(1024);
+    for (int i = 0; i < SAMPLE_SIZE; ++i) {
+      serializer.serialize(rows.get(i), outputView);
+      serializer.deserialize(new 
DataInputDeserializer(outputView.getSharedBuffer()));
+    }
+  }
+}
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java
index d6d2fd92f4..e0672811cf 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.util.TimeUtils;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
 @Internal
@@ -43,6 +44,12 @@ public class FlinkConfParser {
     this.readableConfig = readableConfig;
   }
 
+  FlinkConfParser(Map<String, String> options, ReadableConfig readableConfig) {
+    this.tableProperties = ImmutableMap.of();
+    this.options = options;
+    this.readableConfig = readableConfig;
+  }
+
   public BooleanConfParser booleanConf() {
     return new BooleanConfParser();
   }
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
index 12ad7989c3..222a1e8104 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
@@ -55,6 +55,10 @@ public class FlinkWriteConf {
     this.confParser = new FlinkConfParser(table, writeOptions, readableConfig);
   }
 
+  public FlinkWriteConf(Map<String, String> writeOptions, ReadableConfig 
readableConfig) {
+    this.confParser = new FlinkConfParser(writeOptions, readableConfig);
+  }
+
   public boolean overwriteMode() {
     return confParser
         .booleanConf()
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index c42e4a015b..8da97df037 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -18,12 +18,6 @@
  */
 package org.apache.iceberg.flink.sink;
 
-import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
-import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL;
-import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
-import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
-import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
-import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
 import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
 
 import java.io.IOException;
@@ -722,51 +716,10 @@ public class FlinkSink {
             flinkRowType,
             flinkWriteConf.targetDataFileSize(),
             format,
-            writeProperties(initTable, format, flinkWriteConf),
+            SinkUtil.writeProperties(format, flinkWriteConf, initTable),
             equalityFieldIds,
             flinkWriteConf.upsertMode());
 
     return new IcebergStreamWriter<>(initTable.name(), taskWriterFactory);
   }
-
-  /**
-   * Based on the {@link FileFormat} overwrites the table level compression 
properties for the table
-   * write.
-   *
-   * @param table The table to get the table level settings
-   * @param format The FileFormat to use
-   * @param conf The write configuration
-   * @return The properties to use for writing
-   */
-  private static Map<String, String> writeProperties(
-      Table table, FileFormat format, FlinkWriteConf conf) {
-    Map<String, String> writeProperties = Maps.newHashMap(table.properties());
-
-    switch (format) {
-      case PARQUET:
-        writeProperties.put(PARQUET_COMPRESSION, 
conf.parquetCompressionCodec());
-        String parquetCompressionLevel = conf.parquetCompressionLevel();
-        if (parquetCompressionLevel != null) {
-          writeProperties.put(PARQUET_COMPRESSION_LEVEL, 
parquetCompressionLevel);
-        }
-
-        break;
-      case AVRO:
-        writeProperties.put(AVRO_COMPRESSION, conf.avroCompressionCodec());
-        String avroCompressionLevel = conf.avroCompressionLevel();
-        if (avroCompressionLevel != null) {
-          writeProperties.put(AVRO_COMPRESSION_LEVEL, 
conf.avroCompressionLevel());
-        }
-
-        break;
-      case ORC:
-        writeProperties.put(ORC_COMPRESSION, conf.orcCompressionCodec());
-        writeProperties.put(ORC_COMPRESSION_STRATEGY, 
conf.orcCompressionStrategy());
-        break;
-      default:
-        throw new IllegalArgumentException(String.format("Unknown file format 
%s", format));
-    }
-
-    return writeProperties;
-  }
 }
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
index 8f33a8e58d..9ab7a7730c 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
@@ -18,12 +18,6 @@
  */
 package org.apache.iceberg.flink.sink;
 
-import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
-import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL;
-import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
-import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
-import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
-import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
 import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
 
 import java.io.IOException;
@@ -649,7 +643,7 @@ public class IcebergSink
           table,
           snapshotSummary,
           uidSuffix,
-          writeProperties(table, flinkWriteConf.dataFileFormat(), 
flinkWriteConf),
+          SinkUtil.writeProperties(flinkWriteConf.dataFileFormat(), 
flinkWriteConf, table),
           toFlinkRowType(table.schema(), tableSchema),
           tableSupplier,
           flinkWriteConf,
@@ -728,47 +722,6 @@ public class IcebergSink
     }
   }
 
-  /**
-   * Based on the {@link FileFormat} overwrites the table level compression 
properties for the table
-   * write.
-   *
-   * @param table The table to get the table level settings
-   * @param format The FileFormat to use
-   * @param conf The write configuration
-   * @return The properties to use for writing
-   */
-  private static Map<String, String> writeProperties(
-      Table table, FileFormat format, FlinkWriteConf conf) {
-    Map<String, String> writeProperties = Maps.newHashMap(table.properties());
-
-    switch (format) {
-      case PARQUET:
-        writeProperties.put(PARQUET_COMPRESSION, 
conf.parquetCompressionCodec());
-        String parquetCompressionLevel = conf.parquetCompressionLevel();
-        if (parquetCompressionLevel != null) {
-          writeProperties.put(PARQUET_COMPRESSION_LEVEL, 
parquetCompressionLevel);
-        }
-
-        break;
-      case AVRO:
-        writeProperties.put(AVRO_COMPRESSION, conf.avroCompressionCodec());
-        String avroCompressionLevel = conf.avroCompressionLevel();
-        if (avroCompressionLevel != null) {
-          writeProperties.put(AVRO_COMPRESSION_LEVEL, 
conf.avroCompressionLevel());
-        }
-
-        break;
-      case ORC:
-        writeProperties.put(ORC_COMPRESSION, conf.orcCompressionCodec());
-        writeProperties.put(ORC_COMPRESSION_STRATEGY, 
conf.orcCompressionStrategy());
-        break;
-      default:
-        throw new IllegalArgumentException(String.format("Unknown file format 
%s", format));
-    }
-
-    return writeProperties;
-  }
-
   private DataStream<RowData> distributeDataStream(DataStream<RowData> input) {
     DistributionMode mode = flinkWriteConf.distributionMode();
     Schema schema = table.schema();
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java
index 3f60b45a1f..b3a9ac6ba2 100644
--- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java
+++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java
@@ -18,17 +18,30 @@
  */
 package org.apache.iceberg.flink.sink;
 
+import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
+import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL;
+import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
+import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
+import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
+import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
+
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.FlinkWriteConf;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class SinkUtil {
+@Internal
+public class SinkUtil {
 
   private static final long INITIAL_CHECKPOINT_ID = -1L;
 
@@ -90,4 +103,48 @@ class SinkUtil {
 
     return lastCommittedCheckpointId;
   }
+
+  /**
+   * Based on the {@link FileFormat} overwrites the table level compression 
properties for the table
+   * write.
+   *
+   * @param format The FileFormat to use
+   * @param conf The write configuration
+   * @param table The table to get the table level settings
+   * @return The properties to use for writing
+   */
+  public static Map<String, String> writeProperties(
+      FileFormat format, FlinkWriteConf conf, @Nullable Table table) {
+    Map<String, String> writeProperties = Maps.newHashMap();
+    if (table != null) {
+      writeProperties.putAll(table.properties());
+    }
+
+    switch (format) {
+      case PARQUET:
+        writeProperties.put(PARQUET_COMPRESSION, 
conf.parquetCompressionCodec());
+        String parquetCompressionLevel = conf.parquetCompressionLevel();
+        if (parquetCompressionLevel != null) {
+          writeProperties.put(PARQUET_COMPRESSION_LEVEL, 
parquetCompressionLevel);
+        }
+
+        break;
+      case AVRO:
+        writeProperties.put(AVRO_COMPRESSION, conf.avroCompressionCodec());
+        String avroCompressionLevel = conf.avroCompressionLevel();
+        if (avroCompressionLevel != null) {
+          writeProperties.put(AVRO_COMPRESSION_LEVEL, 
conf.avroCompressionLevel());
+        }
+
+        break;
+      case ORC:
+        writeProperties.put(ORC_COMPRESSION, conf.orcCompressionCodec());
+        writeProperties.put(ORC_COMPRESSION_STRATEGY, 
conf.orcCompressionStrategy());
+        break;
+      default:
+        throw new IllegalArgumentException(String.format("Unknown file format 
%s", format));
+    }
+
+    return writeProperties;
+  }
 }
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
index 3e051dc5d5..8f9ce802d1 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
@@ -325,7 +325,8 @@ class DynamicCommitter implements 
Committer<DynamicCommittable> {
     }
   }
 
-  private void commitOperation(
+  @VisibleForTesting
+  void commitOperation(
       Table table,
       String branch,
       SnapshotUpdate<?> operation,
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
new file mode 100644
index 0000000000..8d62e93a30
--- /dev/null
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.dynamic;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.CommitterInitContext;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.SupportsCommitter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import 
org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import 
org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
+import 
org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology;
+import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.OutputTag;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.FlinkWriteConf;
+import org.apache.iceberg.flink.FlinkWriteOptions;
+import org.apache.iceberg.flink.sink.IcebergSink;
+import org.apache.iceberg.flink.sink.SinkUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+/**
+ * Dynamic version of the IcebergSink which supports:
+ *
+ * <ul>
+ *   <li>Writing to any number of tables (No more 1:1 sink/topic relationship).
+ *   <li>Creating and updating tables based on the user-supplied routing.
+ *   <li>Updating the schema and partition spec of tables based on the 
user-supplied specification.
+ * </ul>
+ */
+@Experimental
+public class DynamicIcebergSink
+    implements Sink<DynamicRecordInternal>,
+        SupportsPreWriteTopology<DynamicRecordInternal>,
+        SupportsCommitter<DynamicCommittable>,
+        SupportsPreCommitTopology<DynamicWriteResult, DynamicCommittable>,
+        SupportsPostCommitTopology<DynamicCommittable> {
+
+  private final CatalogLoader catalogLoader;
+  private final Map<String, String> snapshotProperties;
+  private final String uidPrefix;
+  private final String sinkId;
+  private final Map<String, String> writeProperties;
+  private final transient FlinkWriteConf flinkWriteConf;
+  private final FileFormat dataFileFormat;
+  private final long targetDataFileSize;
+  private final boolean overwriteMode;
+  private final int workerPoolSize;
+  private final int cacheMaximumSize;
+
+  DynamicIcebergSink(
+      CatalogLoader catalogLoader,
+      Map<String, String> snapshotProperties,
+      String uidPrefix,
+      Map<String, String> writeProperties,
+      FlinkWriteConf flinkWriteConf,
+      int cacheMaximumSize) {
+    this.catalogLoader = catalogLoader;
+    this.snapshotProperties = snapshotProperties;
+    this.uidPrefix = uidPrefix;
+    this.writeProperties = writeProperties;
+    this.flinkWriteConf = flinkWriteConf;
+    this.dataFileFormat = flinkWriteConf.dataFileFormat();
+    this.targetDataFileSize = flinkWriteConf.targetDataFileSize();
+    this.overwriteMode = flinkWriteConf.overwriteMode();
+    this.workerPoolSize = flinkWriteConf.workerPoolSize();
+    this.cacheMaximumSize = cacheMaximumSize;
+    // We generate a random UUID every time when a sink is created.
+    // This is used to separate files generated by different sinks writing the 
same table.
+    // Also used to generate the aggregator operator name
+    this.sinkId = UUID.randomUUID().toString();
+  }
+
+  @Override
+  public SinkWriter<DynamicRecordInternal> createWriter(WriterInitContext 
context) {
+    return new DynamicWriter(
+        catalogLoader.loadCatalog(),
+        dataFileFormat,
+        targetDataFileSize,
+        writeProperties,
+        cacheMaximumSize,
+        new DynamicWriterMetrics(context.metricGroup()),
+        context.getTaskInfo().getIndexOfThisSubtask(),
+        context.getTaskInfo().getAttemptNumber());
+  }
+
+  @Override
+  public Committer<DynamicCommittable> createCommitter(CommitterInitContext 
context) {
+    DynamicCommitterMetrics metrics = new 
DynamicCommitterMetrics(context.metricGroup());
+    return new DynamicCommitter(
+        catalogLoader.loadCatalog(),
+        snapshotProperties,
+        overwriteMode,
+        workerPoolSize,
+        sinkId,
+        metrics);
+  }
+
+  @Override
+  public SimpleVersionedSerializer<DynamicCommittable> 
getCommittableSerializer() {
+    return new DynamicCommittableSerializer();
+  }
+
+  @Override
+  public void addPostCommitTopology(
+      DataStream<CommittableMessage<DynamicCommittable>> committables) {}
+
+  @Override
+  public DataStream<DynamicRecordInternal> addPreWriteTopology(
+      DataStream<DynamicRecordInternal> inputDataStream) {
+    return distributeDataStream(inputDataStream);
+  }
+
+  @Override
+  public DataStream<CommittableMessage<DynamicCommittable>> 
addPreCommitTopology(
+      DataStream<CommittableMessage<DynamicWriteResult>> writeResults) {
+    TypeInformation<CommittableMessage<DynamicCommittable>> typeInformation =
+        CommittableMessageTypeInfo.of(this::getCommittableSerializer);
+
+    return writeResults
+        .keyBy(
+            committable -> {
+              if (committable instanceof CommittableSummary) {
+                return "__summary";
+              } else {
+                CommittableWithLineage<DynamicWriteResult> result =
+                    (CommittableWithLineage<DynamicWriteResult>) committable;
+                return result.getCommittable().key().tableName();
+              }
+            })
+        .transform(
+            prefixIfNotNull(uidPrefix, sinkId + " Pre Commit"),
+            typeInformation,
+            new DynamicWriteResultAggregator(catalogLoader))
+        .uid(prefixIfNotNull(uidPrefix, sinkId + "-pre-commit-topology"));
+  }
+
+  @Override
+  public SimpleVersionedSerializer<DynamicWriteResult> 
getWriteResultSerializer() {
+    return new DynamicWriteResultSerializer();
+  }
+
+  public static class Builder<T> {
+    private DataStream<T> input;
+    private DynamicRecordGenerator<T> generator;
+    private CatalogLoader catalogLoader;
+    private String uidPrefix = null;
+    private final Map<String, String> writeOptions = Maps.newHashMap();
+    private final Map<String, String> snapshotSummary = Maps.newHashMap();
+    private ReadableConfig readableConfig = new Configuration();
+    private boolean immediateUpdate = false;
+    private int cacheMaximumSize = 100;
+    private long cacheRefreshMs = 1_000;
+
+    Builder() {}
+
+    public Builder<T> forInput(DataStream<T> inputStream) {
+      this.input = inputStream;
+      return this;
+    }
+
+    public Builder<T> generator(DynamicRecordGenerator<T> inputGenerator) {
+      this.generator = inputGenerator;
+      return this;
+    }
+
+    /**
+     * The catalog loader is used for loading tables in {@link 
DynamicCommitter} lazily, we need
+     * this loader because {@link Table} is not serializable and could not 
just use the loaded table
+     * from Builder#table in the remote task manager.
+     *
+     * @param newCatalogLoader to load iceberg table inside tasks.
+     * @return {@link Builder} to connect the iceberg table.
+     */
+    public Builder<T> catalogLoader(CatalogLoader newCatalogLoader) {
+      this.catalogLoader = newCatalogLoader;
+      return this;
+    }
+
+    /**
+     * Set the write properties for IcebergSink. View the supported properties 
in {@link
+     * FlinkWriteOptions}
+     */
+    public Builder<T> set(String property, String value) {
+      writeOptions.put(property, value);
+      return this;
+    }
+
+    /**
+     * Set the write properties for IcebergSink. View the supported properties 
in {@link
+     * FlinkWriteOptions}
+     */
+    public Builder<T> setAll(Map<String, String> properties) {
+      writeOptions.putAll(properties);
+      return this;
+    }
+
+    public Builder<T> overwrite(boolean newOverwrite) {
+      writeOptions.put(FlinkWriteOptions.OVERWRITE_MODE.key(), 
Boolean.toString(newOverwrite));
+      return this;
+    }
+
+    public Builder<T> flinkConf(ReadableConfig config) {
+      this.readableConfig = config;
+      return this;
+    }
+
+    /**
+     * Configuring the write parallel number for iceberg stream writer.
+     *
+     * @param newWriteParallelism the number of parallel iceberg stream writer.
+     * @return {@link DynamicIcebergSink.Builder} to connect the iceberg table.
+     */
+    public Builder<T> writeParallelism(int newWriteParallelism) {
+      writeOptions.put(
+          FlinkWriteOptions.WRITE_PARALLELISM.key(), 
Integer.toString(newWriteParallelism));
+      return this;
+    }
+
+    /**
+     * Set the uid prefix for IcebergSink operators. Note that IcebergSink 
internally consists of
+     * multiple operators (like writer, committer, aggregator) Actual operator 
uid will be appended
+     * with a suffix like "uidPrefix-writer".
+     *
+     * <p>If provided, this prefix is also applied to operator names.
+     *
+     * <p>Flink auto generates operator uid if not set explicitly. It is a 
recommended <a
+     * 
href="https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/production_ready/";>
+     * best-practice to set uid for all operators</a> before deploying to 
production. Flink has an
+     * option to {@code pipeline.auto-generate-uid=false} to disable 
auto-generation and force
+     * explicit setting of all operator uid.
+     *
+     * <p>Be careful with setting this for an existing job, because now we are 
changing the operator
+     * uid from an auto-generated one to this new value. When deploying the 
change with a
+     * checkpoint, Flink won't be able to restore the previous IcebergSink 
operator state (more
+     * specifically the committer operator state). You need to use {@code 
--allowNonRestoredState}
+     * to ignore the previous sink state. During restore IcebergSink state is 
used to check if last
+     * commit was actually successful or not. {@code --allowNonRestoredState} 
can lead to data loss
+     * if the Iceberg commit failed in the last completed checkpoint.
+     *
+     * @param newPrefix prefix for Flink sink operator uid and name
+     * @return {@link Builder} to connect the iceberg table.
+     */
+    public Builder<T> uidPrefix(String newPrefix) {
+      this.uidPrefix = newPrefix;
+      return this;
+    }
+
+    public Builder<T> snapshotProperties(Map<String, String> properties) {
+      snapshotSummary.putAll(properties);
+      return this;
+    }
+
+    public Builder<T> setSnapshotProperty(String property, String value) {
+      snapshotSummary.put(property, value);
+      return this;
+    }
+
+    public Builder<T> toBranch(String branch) {
+      writeOptions.put(FlinkWriteOptions.BRANCH.key(), branch);
+      return this;
+    }
+
+    public Builder<T> immediateTableUpdate(boolean newImmediateUpdate) {
+      this.immediateUpdate = newImmediateUpdate;
+      return this;
+    }
+
+    /** Maximum size of the caches used in Dynamic Sink for table data and 
serializers. */
+    public Builder<T> cacheMaxSize(int maxSize) {
+      this.cacheMaximumSize = maxSize;
+      return this;
+    }
+
+    /** Maximum interval for cache items renewals. */
+    public Builder<T> cacheRefreshMs(long refreshMs) {
+      this.cacheRefreshMs = refreshMs;
+      return this;
+    }
+
+    private String operatorName(String suffix) {
+      return uidPrefix != null ? uidPrefix + "-" + suffix : suffix;
+    }
+
+    private DynamicIcebergSink build() {
+
+      Preconditions.checkArgument(
+          generator != null, "Please use withGenerator() to convert the input 
DataStream.");
+      Preconditions.checkNotNull(catalogLoader, "Catalog loader shouldn't be 
null");
+
+      FlinkWriteConf flinkWriteConf = new FlinkWriteConf(writeOptions, 
readableConfig);
+      Map<String, String> writeProperties =
+          SinkUtil.writeProperties(flinkWriteConf.dataFileFormat(), 
flinkWriteConf, null);
+      uidPrefix = Optional.ofNullable(uidPrefix).orElse("");
+
+      return instantiateSink(writeProperties, flinkWriteConf);
+    }
+
+    @VisibleForTesting
+    DynamicIcebergSink instantiateSink(
+        Map<String, String> writeProperties, FlinkWriteConf flinkWriteConf) {
+      return new DynamicIcebergSink(
+          catalogLoader,
+          snapshotSummary,
+          uidPrefix,
+          writeProperties,
+          flinkWriteConf,
+          cacheMaximumSize);
+    }
+
+    /**
+     * Append the iceberg sink operators to write records to iceberg table.
+     *
+     * @return {@link DataStreamSink} for sink.
+     */
+    public DataStreamSink<DynamicRecordInternal> append() {
+      DynamicRecordInternalType type =
+          new DynamicRecordInternalType(catalogLoader, false, 
cacheMaximumSize);
+      DynamicIcebergSink sink = build();
+      SingleOutputStreamOperator<DynamicRecordInternal> converted =
+          input
+              .process(
+                  new DynamicRecordProcessor<>(
+                      generator, catalogLoader, immediateUpdate, 
cacheMaximumSize, cacheRefreshMs))
+              .uid(prefixIfNotNull(uidPrefix, "-generator"))
+              .name(operatorName("generator"))
+              .returns(type);
+
+      DataStreamSink<DynamicRecordInternal> rowDataDataStreamSink =
+          converted
+              .getSideOutput(
+                  new OutputTag<>(
+                      DynamicRecordProcessor.DYNAMIC_TABLE_UPDATE_STREAM,
+                      new DynamicRecordInternalType(catalogLoader, true, 
cacheMaximumSize)))
+              .keyBy((KeySelector<DynamicRecordInternal, String>) 
DynamicRecordInternal::tableName)
+              .map(new DynamicTableUpdateOperator(catalogLoader, 
cacheMaximumSize, cacheRefreshMs))
+              .uid(prefixIfNotNull(uidPrefix, "-updater"))
+              .name(operatorName("Updater"))
+              .returns(type)
+              .union(converted)
+              .sinkTo(sink)
+              .uid(prefixIfNotNull(uidPrefix, "-sink"));
+      if (sink.flinkWriteConf.writeParallelism() != null) {
+        
rowDataDataStreamSink.setParallelism(sink.flinkWriteConf.writeParallelism());
+      }
+
+      return rowDataDataStreamSink;
+    }
+  }
+
+  DataStream<DynamicRecordInternal> 
distributeDataStream(DataStream<DynamicRecordInternal> input) {
+    return input.keyBy(DynamicRecordInternal::writerKey);
+  }
+
+  private static String prefixIfNotNull(String uidPrefix, String suffix) {
+    return uidPrefix != null ? uidPrefix + "-" + suffix : suffix;
+  }
+
+  /**
+   * Initialize a {@link IcebergSink.Builder} to export the data from input 
data stream with {@link
+   * RowData}s into iceberg table.
+   *
+   * @param input the source input data stream with {@link RowData}s.
+   * @return {@link IcebergSink.Builder} to connect the iceberg table.
+   */
+  public static <T> Builder<T> forInput(DataStream<T> input) {
+    return new Builder<T>().forInput(input);
+  }
+}
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordGenerator.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordGenerator.java
new file mode 100644
index 0000000000..637dd1307d
--- /dev/null
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordGenerator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.dynamic;
+
+import java.io.Serializable;
+import org.apache.flink.api.common.functions.OpenContext;
+import org.apache.flink.util.Collector;
+
+/** A generator to yield {@link DynamicRecord} from the provided input. */
+public interface DynamicRecordGenerator<T> extends Serializable {
+  default void open(OpenContext openContext) throws Exception {}
+
+  /**
+   * Takes the user-defined input and yields zero, one, or multiple {@link 
DynamicRecord}s using the
+   * {@link Collector}.
+   */
+  void convert(T inputRecord, Collector<DynamicRecord> out) throws Exception;
+}
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java
new file mode 100644
index 0000000000..bc569633cc
--- /dev/null
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.dynamic;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.functions.OpenContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.flink.CatalogLoader;
+
+@Internal
+class DynamicRecordProcessor<T> extends ProcessFunction<T, 
DynamicRecordInternal>
+    implements Collector<DynamicRecord> {
+  @VisibleForTesting
+  static final String DYNAMIC_TABLE_UPDATE_STREAM = 
"dynamic-table-update-stream";
+
+  private final DynamicRecordGenerator<T> generator;
+  private final CatalogLoader catalogLoader;
+  private final boolean immediateUpdate;
+  private final int cacheMaximumSize;
+  private final long cacheRefreshMs;
+
+  private transient TableMetadataCache tableCache;
+  private transient HashKeyGenerator hashKeyGenerator;
+  private transient TableUpdater updater;
+  private transient OutputTag<DynamicRecordInternal> updateStream;
+  private transient Collector<DynamicRecordInternal> collector;
+  private transient Context context;
+
+  DynamicRecordProcessor(
+      DynamicRecordGenerator<T> generator,
+      CatalogLoader catalogLoader,
+      boolean immediateUpdate,
+      int cacheMaximumSize,
+      long cacheRefreshMs) {
+    this.generator = generator;
+    this.catalogLoader = catalogLoader;
+    this.immediateUpdate = immediateUpdate;
+    this.cacheMaximumSize = cacheMaximumSize;
+    this.cacheRefreshMs = cacheRefreshMs;
+  }
+
+  @Override
+  public void open(OpenContext openContext) throws Exception {
+    super.open(openContext);
+    Catalog catalog = catalogLoader.loadCatalog();
+    this.tableCache = new TableMetadataCache(catalog, cacheMaximumSize, 
cacheRefreshMs);
+    this.hashKeyGenerator =
+        new HashKeyGenerator(
+            cacheMaximumSize, 
getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks());
+    if (immediateUpdate) {
+      updater = new TableUpdater(tableCache, catalog);
+    } else {
+      updateStream =
+          new OutputTag<>(
+              DYNAMIC_TABLE_UPDATE_STREAM,
+              new DynamicRecordInternalType(catalogLoader, true, 
cacheMaximumSize)) {};
+    }
+
+    generator.open(openContext);
+  }
+
+  @Override
+  public void processElement(T element, Context ctx, 
Collector<DynamicRecordInternal> out)
+      throws Exception {
+    this.context = ctx;
+    this.collector = out;
+    generator.convert(element, this);
+  }
+
+  @Override
+  public void collect(DynamicRecord data) {
+    boolean exists = tableCache.exists(data.tableIdentifier()).f0;
+    String foundBranch = exists ? tableCache.branch(data.tableIdentifier(), 
data.branch()) : null;
+
+    Tuple2<Schema, CompareSchemasVisitor.Result> foundSchema =
+        exists
+            ? tableCache.schema(data.tableIdentifier(), data.schema())
+            : TableMetadataCache.NOT_FOUND;
+
+    PartitionSpec foundSpec = exists ? tableCache.spec(data.tableIdentifier(), 
data.spec()) : null;
+
+    if (!exists
+        || foundBranch == null
+        || foundSpec == null
+        || foundSchema.f1 == 
CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) {
+      if (immediateUpdate) {
+        Tuple3<Schema, CompareSchemasVisitor.Result, PartitionSpec> newData =
+            updater.update(data.tableIdentifier(), data.branch(), 
data.schema(), data.spec());
+        emit(collector, data, newData.f0, newData.f1, newData.f2);
+      } else {
+        int writerKey =
+            hashKeyGenerator.generateKey(
+                data,
+                foundSchema.f0 != null ? foundSchema.f0 : data.schema(),
+                foundSpec != null ? foundSpec : data.spec(),
+                data.rowData());
+        context.output(
+            updateStream,
+            new DynamicRecordInternal(
+                data.tableIdentifier().toString(),
+                data.branch(),
+                data.schema(),
+                data.rowData(),
+                data.spec(),
+                writerKey,
+                data.upsertMode(),
+                DynamicSinkUtil.getEqualityFieldIds(data.equalityFields(), 
data.schema())));
+      }
+    } else {
+      emit(collector, data, foundSchema.f0, foundSchema.f1, foundSpec);
+    }
+  }
+
+  private void emit(
+      Collector<DynamicRecordInternal> out,
+      DynamicRecord data,
+      Schema schema,
+      CompareSchemasVisitor.Result result,
+      PartitionSpec spec) {
+    RowData rowData =
+        result == CompareSchemasVisitor.Result.SAME
+            ? data.rowData()
+            : RowDataEvolver.convert(data.rowData(), data.schema(), schema);
+    int writerKey = hashKeyGenerator.generateKey(data, schema, spec, rowData);
+    String tableName = data.tableIdentifier().toString();
+    out.collect(
+        new DynamicRecordInternal(
+            tableName,
+            data.branch(),
+            schema,
+            rowData,
+            spec,
+            writerKey,
+            data.upsertMode(),
+            DynamicSinkUtil.getEqualityFieldIds(data.equalityFields(), 
schema)));
+  }
+
+  @Override
+  public void close() {
+    try {
+      super.close();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
index 6cb1f46089..d0909e0605 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
@@ -77,8 +77,7 @@ class HashKeyGenerator {
       DynamicRecord dynamicRecord,
       @Nullable Schema tableSchema,
       @Nullable PartitionSpec tableSpec,
-      @Nullable RowData overrideRowData)
-      throws Exception {
+      @Nullable RowData overrideRowData) {
     String tableIdent = dynamicRecord.tableIdentifier().toString();
     SelectorKey cacheKey =
         new SelectorKey(
@@ -89,8 +88,8 @@ class HashKeyGenerator {
             dynamicRecord.schema(),
             dynamicRecord.spec(),
             dynamicRecord.equalityFields());
-    return keySelectorCache
-        .get(
+    KeySelector<RowData, Integer> keySelector =
+        keySelectorCache.get(
             cacheKey,
             k ->
                 getKeySelector(
@@ -101,8 +100,13 @@ class HashKeyGenerator {
                         dynamicRecord.distributionMode(), 
DistributionMode.NONE),
                     MoreObjects.firstNonNull(
                         dynamicRecord.equalityFields(), 
Collections.emptySet()),
-                    dynamicRecord.writeParallelism()))
-        .getKey(overrideRowData != null ? overrideRowData : 
dynamicRecord.rowData());
+                    dynamicRecord.writeParallelism()));
+    try {
+      return keySelector.getKey(
+          overrideRowData != null ? overrideRowData : dynamicRecord.rowData());
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
   }
 
   private KeySelector<RowData, Integer> getKeySelector(
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
index 7f53215a5e..0d39a665cf 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
@@ -82,12 +82,19 @@ public class SimpleDataUtil {
           Types.NestedField.optional(1, "id", Types.IntegerType.get()),
           Types.NestedField.optional(2, "data", Types.StringType.get()));
 
+  public static final Schema SCHEMA2 =
+      new Schema(
+          Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+          Types.NestedField.optional(2, "data", Types.StringType.get()),
+          Types.NestedField.optional(3, "extra", Types.StringType.get()));
+
   public static final TableSchema FLINK_SCHEMA =
       TableSchema.builder().field("id", DataTypes.INT()).field("data", 
DataTypes.STRING()).build();
 
   public static final RowType ROW_TYPE = (RowType) 
FLINK_SCHEMA.toRowDataType().getLogicalType();
 
   public static final Record RECORD = GenericRecord.create(SCHEMA);
+  public static final Record RECORD2 = GenericRecord.create(SCHEMA2);
 
   public static Table createTable(
       String path, Map<String, String> properties, boolean partitioned) {
@@ -107,6 +114,14 @@ public class SimpleDataUtil {
     return record;
   }
 
+  public static Record createRecord(Integer id, String data, String extra) {
+    Record record = RECORD2.copy();
+    record.setField("id", id);
+    record.setField("data", data);
+    record.setField("extra", extra);
+    return record;
+  }
+
   public static RowData createRowData(Integer id, String data) {
     return GenericRowData.of(id, StringData.fromString(data));
   }
@@ -224,7 +239,12 @@ public class SimpleDataUtil {
     for (RowData row : rows) {
       Integer id = row.isNullAt(0) ? null : row.getInt(0);
       String data = row.isNullAt(1) ? null : row.getString(1).toString();
-      records.add(createRecord(id, data));
+      if (row.getArity() == 2) {
+        records.add(createRecord(id, data));
+      } else {
+        String extra = row.isNullAt(2) ? null : row.getString(2).toString();
+        records.add(createRecord(id, data, extra));
+      }
     }
     return records;
   }
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkFilters.java 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkFilters.java
index 8188a8bcdc..b47a7920fe 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkFilters.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkFilters.java
@@ -427,7 +427,6 @@ public class TestFlinkFilters {
                 unresolvedCall.getChildren().stream()
                     .map(e -> (ResolvedExpression) e.accept(this))
                     .collect(Collectors.toList());
-            // TODO mxm false?
             return new CallExpression(
                 false,
                 unresolvedCall.getFunctionIdentifier().orElse(null),
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
index 960849fb4f..d8d3c5dc24 100644
--- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
+++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
@@ -132,7 +132,7 @@ public class TestHelpers {
         .collect(Collectors.toList());
   }
 
-  private static List<Row> convertRecordToRow(List<Record> expectedRecords, 
Schema schema) {
+  public static List<Row> convertRecordToRow(List<Record> expectedRecords, 
Schema schema) {
     List<Row> expected = Lists.newArrayList();
     @SuppressWarnings("unchecked")
     DataStructureConverter<RowData, Row> converter =
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBase.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBase.java
index 9513cd1e48..de098f826d 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBase.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBase.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.flink.sink;
 import static org.apache.iceberg.flink.TestFixtures.DATABASE;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -64,8 +65,8 @@ public class TestFlinkIcebergSinkBase {
   protected Table table;
   protected StreamExecutionEnvironment env;
 
-  protected BoundedTestSource<Row> createBoundedSource(List<Row> rows) {
-    return new BoundedTestSource<>(rows.toArray(new Row[0]));
+  protected <T> BoundedTestSource<T> createBoundedSource(List<T> rows) {
+    return new BoundedTestSource<>(Collections.singletonList(rows));
   }
 
   protected List<Row> createRows(String prefix) {
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
new file mode 100644
index 0000000000..f94990cc15
--- /dev/null
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
@@ -0,0 +1,831 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.dynamic;
+
+import static org.apache.iceberg.flink.TestFixtures.DATABASE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.CommitterInitContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartStrategyOptions;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.legacy.api.TableSchema;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SnapshotUpdate;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.FlinkWriteConf;
+import org.apache.iceberg.flink.MiniFlinkClusterExtension;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.flink.sink.CommitSummary;
+import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase;
+import org.apache.iceberg.inmemory.InMemoryInputFile;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class TestDynamicIcebergSink extends TestFlinkIcebergSinkBase {
+
+  private static long seed;
+
+  @BeforeEach
+  void before() {
+    env =
+        StreamExecutionEnvironment.getExecutionEnvironment(
+                MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG)
+            .enableCheckpointing(100)
+            .setParallelism(2);
+    seed = 0;
+  }
+
+  private static class DynamicIcebergDataImpl implements Serializable {
+    Row rowProvided;
+    Row rowExpected;
+    Schema schemaProvided;
+    Schema schemaExpected;
+    String tableName;
+    String branch;
+    PartitionSpec partitionSpec;
+    boolean upsertMode;
+    Set<String> equalityFields;
+
+    private DynamicIcebergDataImpl(
+        Schema schemaProvided, String tableName, String branch, PartitionSpec 
partitionSpec) {
+      this(
+          schemaProvided,
+          schemaProvided,
+          tableName,
+          branch,
+          partitionSpec,
+          false,
+          Collections.emptySet(),
+          false);
+    }
+
+    private DynamicIcebergDataImpl(
+        Schema schemaProvided,
+        Schema schemaExpected,
+        String tableName,
+        String branch,
+        PartitionSpec partitionSpec) {
+      this(
+          schemaProvided,
+          schemaExpected,
+          tableName,
+          branch,
+          partitionSpec,
+          false,
+          Collections.emptySet(),
+          false);
+    }
+
+    private DynamicIcebergDataImpl(
+        Schema schemaProvided,
+        String tableName,
+        String branch,
+        PartitionSpec partitionSpec,
+        boolean upsertMode,
+        Set<String> equalityFields,
+        boolean isDuplicate) {
+      this(
+          schemaProvided,
+          schemaProvided,
+          tableName,
+          branch,
+          partitionSpec,
+          upsertMode,
+          equalityFields,
+          isDuplicate);
+    }
+
+    private DynamicIcebergDataImpl(
+        Schema schemaProvided,
+        Schema schemaExpected,
+        String tableName,
+        String branch,
+        PartitionSpec partitionSpec,
+        boolean upsertMode,
+        Set<String> equalityFields,
+        boolean isDuplicate) {
+      this.rowProvided = randomRow(schemaProvided, isDuplicate ? seed : 
++seed);
+      this.rowExpected = isDuplicate ? null : rowProvided;
+      this.schemaProvided = schemaProvided;
+      this.schemaExpected = schemaExpected;
+      this.tableName = tableName;
+      this.branch = branch;
+      this.partitionSpec = partitionSpec;
+      this.upsertMode = upsertMode;
+      this.equalityFields = equalityFields;
+    }
+  }
+
+  private static class Generator implements 
DynamicRecordGenerator<DynamicIcebergDataImpl> {
+
+    @Override
+    public void convert(DynamicIcebergDataImpl row, Collector<DynamicRecord> 
out) {
+      TableIdentifier tableIdentifier = TableIdentifier.of(DATABASE, 
row.tableName);
+      String branch = row.branch;
+      Schema schema = row.schemaProvided;
+      PartitionSpec spec = row.partitionSpec;
+      DynamicRecord dynamicRecord =
+          new DynamicRecord(
+              tableIdentifier,
+              branch,
+              schema,
+              converter(schema).toInternal(row.rowProvided),
+              spec,
+              spec.isPartitioned() ? DistributionMode.HASH : 
DistributionMode.NONE,
+              10);
+      dynamicRecord.setUpsertMode(row.upsertMode);
+      dynamicRecord.setEqualityFields(row.equalityFields);
+      out.collect(dynamicRecord);
+    }
+  }
+
+  private static DataFormatConverters.RowConverter converter(Schema schema) {
+    RowType rowType = FlinkSchemaUtil.convert(schema);
+    TableSchema tableSchema = FlinkSchemaUtil.toSchema(rowType);
+    return new 
DataFormatConverters.RowConverter(tableSchema.getFieldDataTypes());
+  }
+
+  @Test
+  void testWrite() throws Exception {
+    List<DynamicIcebergDataImpl> rows =
+        Lists.newArrayList(
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA, "t1", "main", 
PartitionSpec.unpartitioned()),
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA, "t1", "main", 
PartitionSpec.unpartitioned()));
+
+    runTest(rows);
+  }
+
+  @Test
+  void testWritePartitioned() throws Exception {
+    PartitionSpec spec = 
PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).bucket("id", 10).build();
+
+    List<DynamicIcebergDataImpl> rows =
+        Lists.newArrayList(
+            new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", 
spec),
+            new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", 
spec),
+            new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", 
spec),
+            new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", 
spec),
+            new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", 
spec));
+
+    runTest(rows);
+  }
+
+  @Test
+  void testWritePartitionedAdjustSchemaIdsInSpec() throws Exception {
+    Schema schema =
+        new Schema(
+            // Use zero-based schema field ids
+            Types.NestedField.required(0, "id", Types.IntegerType.get()),
+            Types.NestedField.required(1, "data", Types.StringType.get()));
+    PartitionSpec spec = PartitionSpec.builderFor(schema).bucket("id", 
10).build();
+    Schema schema2 =
+        new Schema(
+            // Use zero-based schema field ids
+            Types.NestedField.required(0, "id", Types.IntegerType.get()),
+            Types.NestedField.required(1, "data", Types.StringType.get()),
+            Types.NestedField.optional(2, "extra", Types.StringType.get()));
+    PartitionSpec spec2 = PartitionSpec.builderFor(schema2).bucket("extra", 
23).build();
+
+    List<DynamicIcebergDataImpl> rows =
+        Lists.newArrayList(
+            new DynamicIcebergDataImpl(schema, "t1", "main", spec),
+            new DynamicIcebergDataImpl(schema, "t1", "main", spec),
+            new DynamicIcebergDataImpl(schema, "t1", "main", spec),
+            new DynamicIcebergDataImpl(schema2, "t1", "main", spec2),
+            new DynamicIcebergDataImpl(schema2, "t1", "main", spec2));
+
+    runTest(rows);
+  }
+
+  @Test
+  void testSchemaEvolutionFieldOrderChanges() throws Exception {
+    Schema schema =
+        new Schema(
+            Types.NestedField.required(0, "id", Types.IntegerType.get()),
+            Types.NestedField.required(1, "data", Types.StringType.get()));
+    Schema expectedSchema =
+        new Schema(
+            Types.NestedField.required(1, "id", Types.IntegerType.get()),
+            Types.NestedField.required(2, "data", Types.StringType.get()));
+
+    Schema schema2 =
+        new Schema(
+            Types.NestedField.required(0, "id", Types.IntegerType.get()),
+            Types.NestedField.required(1, "extra", Types.StringType.get()),
+            Types.NestedField.required(2, "data", Types.StringType.get()));
+    Schema expectedSchema2 =
+        new Schema(
+            Types.NestedField.required(1, "id", Types.IntegerType.get()),
+            Types.NestedField.optional(3, "extra", Types.StringType.get()),
+            Types.NestedField.required(2, "data", Types.StringType.get()));
+
+    List<DynamicIcebergDataImpl> rows =
+        Lists.newArrayList(
+            new DynamicIcebergDataImpl(
+                schema, expectedSchema, "t1", "main", 
PartitionSpec.unpartitioned()),
+            new DynamicIcebergDataImpl(
+                schema, expectedSchema, "t1", "main", 
PartitionSpec.unpartitioned()),
+            new DynamicIcebergDataImpl(
+                schema, expectedSchema, "t1", "main", 
PartitionSpec.unpartitioned()),
+            new DynamicIcebergDataImpl(
+                schema2, expectedSchema2, "t1", "main", 
PartitionSpec.unpartitioned()),
+            new DynamicIcebergDataImpl(
+                schema2, expectedSchema2, "t1", "main", 
PartitionSpec.unpartitioned()));
+
+    for (DynamicIcebergDataImpl row : rows) {
+      if (row.schemaExpected == expectedSchema) {
+        // We manually adjust the expected Row to match the second expected 
schema
+        row.rowExpected = Row.of(row.rowProvided.getField(0), null, 
row.rowProvided.getField(1));
+      }
+    }
+
+    runTest(rows);
+  }
+
+  @Test
+  void testMultipleTables() throws Exception {
+    List<DynamicIcebergDataImpl> rows =
+        Lists.newArrayList(
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA, "t1", "main", 
PartitionSpec.unpartitioned()),
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA, "t2", "main", 
PartitionSpec.unpartitioned()));
+
+    runTest(rows);
+  }
+
+  @Test
+  void testMultipleTablesPartitioned() throws Exception {
+    PartitionSpec spec = 
PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).bucket("id", 10).build();
+
+    List<DynamicIcebergDataImpl> rows =
+        Lists.newArrayList(
+            new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", 
spec),
+            new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t2", "main", 
spec));
+
+    runTest(rows);
+  }
+
+  @Test
+  void testSchemaEvolutionAddField() throws Exception {
+    List<DynamicIcebergDataImpl> rows =
+        Lists.newArrayList(
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA, "t1", "main", 
PartitionSpec.unpartitioned()),
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA2, "t1", "main", 
PartitionSpec.unpartitioned()));
+
+    runTest(rows, this.env, 1);
+  }
+
+  @Test
+  void testRowEvolutionNullMissingOptionalField() throws Exception {
+    List<DynamicIcebergDataImpl> rows =
+        Lists.newArrayList(
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA2, "t1", "main", 
PartitionSpec.unpartitioned()),
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA, "t1", "main", 
PartitionSpec.unpartitioned()));
+
+    runTest(rows, this.env, 1);
+  }
+
+  @Test
+  void testSchemaEvolutionNonBackwardsCompatible() throws Exception {
+    Schema backwardsIncompatibleSchema =
+        new Schema(
+            Types.NestedField.required(1, "id", Types.IntegerType.get()),
+            Types.NestedField.required(2, "data", Types.StringType.get()));
+    // Required column is missing in this schema
+    Schema erroringSchema =
+        new Schema(Types.NestedField.required(1, "id", 
Types.IntegerType.get()));
+
+    List<DynamicIcebergDataImpl> rows =
+        Lists.newArrayList(
+            new DynamicIcebergDataImpl(
+                backwardsIncompatibleSchema, "t1", "main", 
PartitionSpec.unpartitioned()),
+            new DynamicIcebergDataImpl(
+                erroringSchema, "t1", "main", PartitionSpec.unpartitioned()));
+
+    try {
+      runTest(rows, StreamExecutionEnvironment.getExecutionEnvironment(), 1);
+      fail();
+    } catch (JobExecutionException e) {
+      assertThat(
+              ExceptionUtils.findThrowable(
+                  e,
+                  t ->
+                      t.getMessage()
+                          .contains(
+                              "Field 2 in target schema ROW<`id` INT NOT NULL, 
`data` STRING NOT NULL> is non-nullable but does not exist in source schema.")))
+          .isNotEmpty();
+    }
+  }
+
+  @Test
+  void testPartitionSpecEvolution() throws Exception {
+    PartitionSpec spec1 = 
PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).bucket("id", 10).build();
+    PartitionSpec spec2 =
+        PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).bucket("id", 
5).identity("data").build();
+
+    List<DynamicIcebergDataImpl> rows =
+        Lists.newArrayList(
+            new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", 
spec1),
+            new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", 
spec2),
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA, "t1", "main", 
PartitionSpec.unpartitioned()),
+            new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", 
spec1),
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA, "t1", "main", 
PartitionSpec.unpartitioned()),
+            new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", 
spec2),
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA, "t1", "main", 
PartitionSpec.unpartitioned()),
+            new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", 
spec1),
+            new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", 
spec2));
+
+    runTest(rows);
+  }
+
+  @Test
+  void testMultipleBranches() throws Exception {
+    List<DynamicIcebergDataImpl> rows =
+        Lists.newArrayList(
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA, "t1", "branch1", 
PartitionSpec.unpartitioned()),
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA, "t1", "main", 
PartitionSpec.unpartitioned()));
+
+    runTest(rows);
+  }
+
+  @Test
+  void testWriteMultipleTablesWithSchemaChanges() throws Exception {
+    List<DynamicIcebergDataImpl> rows =
+        Lists.newArrayList(
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA, "t1", "main", 
PartitionSpec.unpartitioned()),
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA, "t2", "main", 
PartitionSpec.unpartitioned()),
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA, "t1", "main", 
PartitionSpec.unpartitioned()),
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA2, "t2", "main", 
PartitionSpec.unpartitioned()),
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA, "t1", "main", 
PartitionSpec.unpartitioned()),
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA, "t2", "main", 
PartitionSpec.unpartitioned()),
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA, "t1", "main", 
PartitionSpec.unpartitioned()),
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA2, "t2", "main", 
PartitionSpec.unpartitioned()),
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA, "t1", "main", 
PartitionSpec.unpartitioned()));
+
+    runTest(rows);
+  }
+
+  @Test
+  void testUpsert() throws Exception {
+    List<DynamicIcebergDataImpl> rows =
+        Lists.newArrayList(
+            // Insert one rows
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA,
+                "t1",
+                "main",
+                PartitionSpec.unpartitioned(),
+                true,
+                Sets.newHashSet("id"),
+                false),
+            // Remaining rows are duplicates
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA,
+                "t1",
+                "main",
+                PartitionSpec.unpartitioned(),
+                true,
+                Sets.newHashSet("id"),
+                true),
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA,
+                "t1",
+                "main",
+                PartitionSpec.unpartitioned(),
+                true,
+                Sets.newHashSet("id"),
+                true),
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA,
+                "t1",
+                "main",
+                PartitionSpec.unpartitioned(),
+                true,
+                Sets.newHashSet("id"),
+                true));
+
+    executeDynamicSink(rows, env, true, 1, null);
+
+    try (CloseableIterable<Record> iterable =
+        IcebergGenerics.read(
+                
CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.of("default", "t1")))
+            .build()) {
+      List<Record> records = Lists.newArrayList();
+      for (Record record : iterable) {
+        records.add(record);
+      }
+
+      assertThat(records.size()).isEqualTo(1);
+      Record actual = records.get(0);
+      DynamicIcebergDataImpl input = rows.get(0);
+      assertThat(actual.get(0)).isEqualTo(input.rowProvided.getField(0));
+      assertThat(actual.get(1)).isEqualTo(input.rowProvided.getField(1));
+      // There is an additional _pos field which gets added
+    }
+  }
+
+  @Test
+  void testCommitFailedBeforeOrAfterCommit() throws Exception {
+    // Configure a Restart strategy to allow recovery
+    Configuration configuration = new Configuration();
+    configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
+    
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 
2);
+    
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, 
Duration.ZERO);
+    env.configure(configuration);
+
+    List<DynamicIcebergDataImpl> rows =
+        Lists.newArrayList(
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA, "t1", "main", 
PartitionSpec.unpartitioned()),
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA, "t2", "main", 
PartitionSpec.unpartitioned()));
+
+    FailBeforeAndAfterCommit.reset();
+    final CommitHook commitHook = new FailBeforeAndAfterCommit();
+    assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isFalse();
+    assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isFalse();
+
+    executeDynamicSink(rows, env, true, 1, commitHook);
+
+    assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isTrue();
+    assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isTrue();
+  }
+
+  @Test
+  void testCommitConcurrency() throws Exception {
+
+    List<DynamicIcebergDataImpl> rows =
+        Lists.newArrayList(
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA, "t1", "main", 
PartitionSpec.unpartitioned()),
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA, "t2", "main", 
PartitionSpec.unpartitioned()));
+
+    TableIdentifier tableIdentifier = TableIdentifier.of("default", "t1");
+    Catalog catalog = CATALOG_EXTENSION.catalog();
+    catalog.createTable(tableIdentifier, new Schema());
+
+    final CommitHook commitHook = new 
AppendRightBeforeCommit(tableIdentifier.toString());
+
+    executeDynamicSink(rows, env, true, 1, commitHook);
+  }
+
+  interface CommitHook extends Serializable {
+    void beforeCommit();
+
+    void duringCommit();
+
+    void afterCommit();
+  }
+
+  private static class FailBeforeAndAfterCommit implements CommitHook {
+
+    static boolean failedBeforeCommit;
+    static boolean failedAfterCommit;
+
+    @Override
+    public void beforeCommit() {
+      if (!failedBeforeCommit) {
+        failedBeforeCommit = true;
+        throw new RuntimeException("Failing before commit");
+      }
+    }
+
+    @Override
+    public void duringCommit() {}
+
+    @Override
+    public void afterCommit() {
+      if (!failedAfterCommit) {
+        failedAfterCommit = true;
+        throw new RuntimeException("Failing before commit");
+      }
+    }
+
+    static void reset() {
+      failedBeforeCommit = false;
+      failedAfterCommit = false;
+    }
+  }
+
+  private static class AppendRightBeforeCommit implements CommitHook {
+
+    final String tableIdentifier;
+
+    private AppendRightBeforeCommit(String tableIdentifier) {
+      this.tableIdentifier = tableIdentifier;
+    }
+
+    @Override
+    public void beforeCommit() {}
+
+    @Override
+    public void duringCommit() {
+      // Create a conflict
+      Table table = 
CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.parse(tableIdentifier));
+      DataFile dataFile =
+          DataFiles.builder(PartitionSpec.unpartitioned())
+              .withInputFile(new InMemoryInputFile(new byte[] {1, 2, 3}))
+              .withFormat(FileFormat.AVRO)
+              .withRecordCount(3)
+              .build();
+      table.newAppend().appendFile(dataFile).commit();
+    }
+
+    @Override
+    public void afterCommit() {}
+  }
+
+  private void runTest(List<DynamicIcebergDataImpl> dynamicData) throws 
Exception {
+    runTest(dynamicData, this.env, 2);
+  }
+
+  private void runTest(
+      List<DynamicIcebergDataImpl> dynamicData, StreamExecutionEnvironment 
env, int parallelism)
+      throws Exception {
+    runTest(dynamicData, env, true, parallelism);
+    runTest(dynamicData, env, false, parallelism);
+  }
+
+  private void runTest(
+      List<DynamicIcebergDataImpl> dynamicData,
+      StreamExecutionEnvironment env,
+      boolean immediateUpdate,
+      int parallelism)
+      throws Exception {
+    executeDynamicSink(dynamicData, env, immediateUpdate, parallelism, null);
+    verifyResults(dynamicData);
+  }
+
+  private void executeDynamicSink(
+      List<DynamicIcebergDataImpl> dynamicData,
+      StreamExecutionEnvironment env,
+      boolean immediateUpdate,
+      int parallelism,
+      @Nullable CommitHook commitHook)
+      throws Exception {
+    DataStream<DynamicIcebergDataImpl> dataStream =
+        env.addSource(createBoundedSource(dynamicData), TypeInformation.of(new 
TypeHint<>() {}));
+    env.setParallelism(parallelism);
+
+    if (commitHook != null) {
+      new CommitHookEnabledDynamicIcebergSink(commitHook)
+          .forInput(dataStream)
+          .generator(new Generator())
+          .catalogLoader(CATALOG_EXTENSION.catalogLoader())
+          .writeParallelism(parallelism)
+          .immediateTableUpdate(immediateUpdate)
+          .setSnapshotProperty("commit.retry.num-retries", "0")
+          .append();
+    } else {
+      DynamicIcebergSink.forInput(dataStream)
+          .generator(new Generator())
+          .catalogLoader(CATALOG_EXTENSION.catalogLoader())
+          .writeParallelism(parallelism)
+          .immediateTableUpdate(immediateUpdate)
+          .append();
+    }
+
+    // Write the data
+    env.execute("Test Iceberg DataStream");
+  }
+
+  static class CommitHookEnabledDynamicIcebergSink<T> extends 
DynamicIcebergSink.Builder<T> {
+    private final CommitHook commitHook;
+
+    CommitHookEnabledDynamicIcebergSink(CommitHook commitHook) {
+      this.commitHook = commitHook;
+    }
+
+    @Override
+    DynamicIcebergSink instantiateSink(
+        Map<String, String> writeProperties, FlinkWriteConf flinkWriteConf) {
+      return new CommitHookDynamicIcebergSink(
+          commitHook,
+          CATALOG_EXTENSION.catalogLoader(),
+          Collections.emptyMap(),
+          "uidPrefix",
+          writeProperties,
+          flinkWriteConf,
+          100);
+    }
+  }
+
+  static class CommitHookDynamicIcebergSink extends DynamicIcebergSink {
+
+    private final CommitHook commitHook;
+
+    CommitHookDynamicIcebergSink(
+        CommitHook commitHook,
+        CatalogLoader catalogLoader,
+        Map<String, String> snapshotProperties,
+        String uidPrefix,
+        Map<String, String> writeProperties,
+        FlinkWriteConf flinkWriteConf,
+        int cacheMaximumSize) {
+      super(
+          catalogLoader,
+          snapshotProperties,
+          uidPrefix,
+          writeProperties,
+          flinkWriteConf,
+          cacheMaximumSize);
+      this.commitHook = commitHook;
+    }
+
+    @Override
+    public Committer<DynamicCommittable> createCommitter(CommitterInitContext 
context) {
+      //      return super.createCommitter(context);
+      return new CommitHookEnabledDynamicCommitter(
+          commitHook,
+          CATALOG_EXTENSION.catalogLoader().loadCatalog(),
+          Collections.emptyMap(),
+          false,
+          10,
+          "sinkId",
+          new DynamicCommitterMetrics(context.metricGroup()));
+    }
+  }
+
+  static class CommitHookEnabledDynamicCommitter extends DynamicCommitter {
+    private final CommitHook commitHook;
+
+    CommitHookEnabledDynamicCommitter(
+        CommitHook commitHook,
+        Catalog catalog,
+        Map<String, String> snapshotProperties,
+        boolean replacePartitions,
+        int workerPoolSize,
+        String sinkId,
+        DynamicCommitterMetrics committerMetrics) {
+      super(
+          catalog, snapshotProperties, replacePartitions, workerPoolSize, 
sinkId, committerMetrics);
+      this.commitHook = commitHook;
+    }
+
+    @Override
+    public void commit(Collection<CommitRequest<DynamicCommittable>> 
commitRequests)
+        throws IOException, InterruptedException {
+      commitHook.beforeCommit();
+      super.commit(commitRequests);
+      commitHook.afterCommit();
+    }
+
+    @Override
+    void commitOperation(
+        Table table,
+        String branch,
+        SnapshotUpdate<?> operation,
+        CommitSummary summary,
+        String description,
+        String newFlinkJobId,
+        String operatorId,
+        long checkpointId) {
+      commitHook.duringCommit();
+      super.commitOperation(
+          table, branch, operation, summary, description, newFlinkJobId, 
operatorId, checkpointId);
+    }
+  }
+
+  private void verifyResults(List<DynamicIcebergDataImpl> dynamicData) throws 
IOException {
+    // Calculate the expected result
+    Map<Tuple2<String, String>, List<RowData>> expectedData = 
Maps.newHashMap();
+    Map<String, Schema> expectedSchema = Maps.newHashMap();
+    dynamicData.forEach(
+        r -> {
+          Schema oldSchema = expectedSchema.get(r.tableName);
+          if (oldSchema == null || oldSchema.columns().size() < 
r.schemaProvided.columns().size()) {
+            expectedSchema.put(r.tableName, r.schemaExpected);
+          }
+        });
+
+    dynamicData.forEach(
+        r -> {
+          List<RowData> data =
+              expectedData.computeIfAbsent(
+                  Tuple2.of(r.tableName, r.branch), unused -> 
Lists.newArrayList());
+          data.addAll(
+              convertToRowData(expectedSchema.get(r.tableName), 
ImmutableList.of(r.rowExpected)));
+        });
+
+    // Check the expected result
+    int count = dynamicData.size();
+    for (Map.Entry<Tuple2<String, String>, List<RowData>> e : 
expectedData.entrySet()) {
+      SimpleDataUtil.assertTableRows(
+          CATALOG_EXTENSION
+              .catalogLoader()
+              .loadCatalog()
+              .loadTable(TableIdentifier.of(DATABASE, e.getKey().f0)),
+          e.getValue(),
+          e.getKey().f1);
+      count -= e.getValue().size();
+    }
+
+    // Found every record
+    assertThat(count).isZero();
+  }
+
+  private List<RowData> convertToRowData(Schema schema, List<Row> rows) {
+    DataFormatConverters.RowConverter converter = converter(schema);
+    return rows.stream()
+        .map(
+            r -> {
+              Row updateRow = r;
+              // We need conversion to generate the missing columns
+              if (r.getArity() != schema.columns().size()) {
+                updateRow = new Row(schema.columns().size());
+                for (int i = 0; i < r.getArity(); ++i) {
+                  updateRow.setField(i, r.getField(i));
+                }
+              }
+              return converter.toInternal(updateRow);
+            })
+        .collect(Collectors.toList());
+  }
+
+  private static Row randomRow(Schema schema, long seedOverride) {
+    return TestHelpers.convertRecordToRow(
+            RandomGenericData.generate(schema, 1, seedOverride), schema)
+        .get(0);
+  }
+}
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSinkPerf.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSinkPerf.java
new file mode 100644
index 0000000000..6e943efb62
--- /dev/null
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSinkPerf.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.dynamic;
+
+import static org.apache.iceberg.flink.TestFixtures.DATABASE;
+import static org.apache.iceberg.flink.TestFixtures.TABLE;
+import static 
org.apache.iceberg.flink.sink.dynamic.DynamicCommitter.MAX_CONTINUOUS_EMPTY_COMMITS;
+
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.apache.iceberg.flink.MiniFlinkClusterExtension;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.sink.IcebergSink;
+import org.apache.iceberg.flink.source.BoundedTestSource;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Performance test class to compare {@link DynamicIcebergSink} against {@link 
IcebergSink} to
+ * measure and compare their throughput.
+ *
+ * <p>The test dynamically generates input for multiple tables, then writes to 
these tables. For the
+ * DynamicSink, a single sink is used to write all tables. For the 
IcebergSink, one sink is used per
+ * table. The test logs the written record counts and elapsed time based on 
the Iceberg snapshot
+ * metadata.
+ *
+ * <h2>Usage</h2>
+ *
+ * <ul>
+ *   <li>Set the SAMPLE_SIZE, RECORD_SIZE, and TABLE_NUM.
+ *   <li>Run the unit tests and review logs for performance results.
+ * </ul>
+ *
+ * <p>Note: This test is disabled by default and should be enabled manually 
when performance testing
+ * is needed. It is not intended as a standard unit test.
+ */
+@Disabled("Please enable manually for performance testing.")
+class TestDynamicIcebergSinkPerf {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestDynamicIcebergSinkPerf.class);
+
+  @RegisterExtension
+  protected static final HadoopCatalogExtension CATALOG_EXTENSION =
+      new HadoopCatalogExtension(DATABASE, TABLE);
+
+  private static final int SAMPLE_SIZE = 50_000;
+  private static final int RECORD_SIZE = 5_000_000;
+  private static final int TABLE_NUM = 3;
+  private static final int PARALLELISM = 2;
+  private static final int WRITE_PARALLELISM = 2;
+  private static final TableIdentifier[] IDENTIFIERS = new 
TableIdentifier[TABLE_NUM];
+  private static final Schema SCHEMA =
+      new Schema(
+          Types.NestedField.required(1, "id", Types.IntegerType.get()),
+          Types.NestedField.required(2, "name2", Types.StringType.get()),
+          Types.NestedField.required(3, "name3", Types.StringType.get()),
+          Types.NestedField.required(4, "name4", Types.StringType.get()),
+          Types.NestedField.required(5, "name5", Types.StringType.get()),
+          Types.NestedField.required(6, "name6", Types.StringType.get()),
+          Types.NestedField.required(7, "name7", Types.StringType.get()),
+          Types.NestedField.required(8, "name8", Types.StringType.get()),
+          Types.NestedField.required(9, "name9", Types.StringType.get()));
+  private static final List<Integer> RANGE =
+      IntStream.range(0, RECORD_SIZE).boxed().collect(Collectors.toList());
+
+  private static List<DynamicRecord> rows;
+  private StreamExecutionEnvironment env;
+
+  @BeforeEach
+  void before() {
+    for (int i = 0; i < TABLE_NUM; ++i) {
+      // So the table name hash difference is bigger than 1
+      IDENTIFIERS[i] = TableIdentifier.of(DATABASE, TABLE + "_" + (i * 13));
+
+      Table table =
+          CATALOG_EXTENSION
+              .catalog()
+              .createTable(
+                  IDENTIFIERS[i],
+                  SCHEMA,
+                  PartitionSpec.unpartitioned(),
+                  ImmutableMap.of(MAX_CONTINUOUS_EMPTY_COMMITS, "100000"));
+
+      table.manageSnapshots().createBranch("main").commit();
+    }
+
+    List<Record> records = RandomGenericData.generate(SCHEMA, SAMPLE_SIZE, 1L);
+    rows = Lists.newArrayListWithCapacity(records.size());
+    for (int i = 0; i < records.size(); ++i) {
+      rows.add(
+          new DynamicRecord(
+              IDENTIFIERS[i % TABLE_NUM],
+              "main",
+              SCHEMA,
+              RowDataConverter.convert(SCHEMA, records.get(i)),
+              PartitionSpec.unpartitioned(),
+              DistributionMode.NONE,
+              WRITE_PARALLELISM));
+    }
+
+    Configuration configuration = 
MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG;
+    configuration.setString("rest.flamegraph.enabled", "true");
+    env =
+        StreamExecutionEnvironment.getExecutionEnvironment(configuration)
+            .enableCheckpointing(100)
+            .setParallelism(PARALLELISM)
+            .setMaxParallelism(PARALLELISM);
+    env.getConfig().enableObjectReuse();
+  }
+
+  @AfterEach
+  void after() {
+    for (TableIdentifier identifier : IDENTIFIERS) {
+      CATALOG_EXTENSION.catalog().dropTable(identifier);
+    }
+  }
+
+  private static class IdBasedGenerator implements 
DynamicRecordGenerator<Integer> {
+
+    @Override
+    public void convert(Integer id, Collector<DynamicRecord> out) {
+      out.collect(rows.get(id % SAMPLE_SIZE));
+    }
+  }
+
+  @Test
+  void testDynamicSink() throws Exception {
+    // So we make sure that the writer threads are the same for the 2 tests
+    env.setMaxParallelism(PARALLELISM * TABLE_NUM * 2);
+    env.setParallelism(PARALLELISM * TABLE_NUM * 2);
+    runTest(
+        s -> {
+          DynamicIcebergSink.forInput(s)
+              .generator(new IdBasedGenerator())
+              .immediateTableUpdate(true)
+              .catalogLoader(CATALOG_EXTENSION.catalogLoader())
+              .append();
+        });
+  }
+
+  @Test
+  void testIcebergSink() throws Exception {
+    runTest(
+        s -> {
+          for (int i = 0; i < IDENTIFIERS.length; ++i) {
+            TableLoader tableLoader =
+                TableLoader.fromCatalog(CATALOG_EXTENSION.catalogLoader(), 
IDENTIFIERS[i]);
+            final int finalInt = i;
+            IcebergSink.forRowData(
+                    s.flatMap(
+                            (FlatMapFunction<Integer, RowData>)
+                                (input, collector) -> {
+                                  if (input % TABLE_NUM == finalInt) {
+                                    collector.collect(rows.get(input % 
SAMPLE_SIZE).rowData());
+                                  }
+                                })
+                        
.returns(InternalTypeInfo.of(FlinkSchemaUtil.convert(SCHEMA)))
+                        .rebalance())
+                .tableLoader(tableLoader)
+                .uidSuffix("Uid" + i)
+                .writeParallelism(WRITE_PARALLELISM)
+                .append();
+          }
+        });
+  }
+
+  private void runTest(Consumer<DataStream<Integer>> sink) throws Exception {
+    DataStream<Integer> dataStream =
+        env.addSource(
+            new BoundedTestSource<>(
+                ImmutableList.of(
+                    RANGE, RANGE, RANGE, RANGE, RANGE, RANGE, RANGE, RANGE, 
RANGE, RANGE),
+                true),
+            TypeInformation.of(Integer.class));
+
+    sink.accept(dataStream);
+
+    long before = System.currentTimeMillis();
+    env.execute();
+
+    for (TableIdentifier identifier : IDENTIFIERS) {
+      Table table = CATALOG_EXTENSION.catalog().loadTable(identifier);
+      for (Snapshot snapshot : table.snapshots()) {
+        long records = 0;
+        for (DataFile dataFile : snapshot.addedDataFiles(table.io())) {
+          records += dataFile.recordCount();
+        }
+
+        LOG.info(
+            "TEST RESULT: For table {} snapshot {} written {} records in {} 
ms",
+            identifier,
+            snapshot.snapshotId(),
+            records,
+            snapshot.timestampMillis() - before);
+        before = snapshot.timestampMillis();
+      }
+    }
+  }
+}


Reply via email to