This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 76972ef77b Flink: Dynamic Iceberg Sink: Add HashKeyGenerator / 
RowDataEvolver / TableUpdateOperator (#13277)
76972ef77b is described below

commit 76972ef77b96f864712bdde2749a170f9494b001
Author: Maximilian Michels <m...@apache.org>
AuthorDate: Wed Jun 11 17:52:56 2025 +0200

    Flink: Dynamic Iceberg Sink: Add HashKeyGenerator / RowDataEvolver / 
TableUpdateOperator (#13277)
---
 .../iceberg/flink/sink/BaseDeltaTaskWriter.java    |   4 +-
 .../flink/sink/EqualityFieldKeySelector.java       |  12 +-
 .../org/apache/iceberg/flink/sink/FlinkSink.java   |   8 +-
 .../org/apache/iceberg/flink/sink/IcebergSink.java |   9 +-
 .../iceberg/flink/sink/PartitionKeySelector.java   |   6 +-
 .../iceberg/flink/sink/PartitionedDeltaWriter.java |   4 +-
 .../flink/sink/RowDataTaskWriterFactory.java       |  17 +-
 .../org/apache/iceberg/flink/sink/SinkUtil.java    |   7 +-
 .../flink/sink/UnpartitionedDeltaWriter.java       |   4 +-
 .../iceberg/flink/sink/dynamic/DynamicRecord.java  |   8 +-
 .../flink/sink/dynamic/DynamicRecordInternal.java  |  10 +-
 .../dynamic/DynamicRecordInternalSerializer.java   |  16 +-
 .../flink/sink/dynamic/DynamicSinkUtil.java        |  65 ++++
 .../sink/dynamic/DynamicTableUpdateOperator.java   |  78 +++++
 .../iceberg/flink/sink/dynamic/DynamicWriter.java  |  10 +-
 .../flink/sink/dynamic/HashKeyGenerator.java       | 379 +++++++++++++++++++++
 .../iceberg/flink/sink/dynamic/RowDataEvolver.java | 190 +++++++++++
 .../iceberg/flink/sink/dynamic/WriteTarget.java    |  16 +-
 .../iceberg/flink/sink/TestDeltaTaskWriter.java    |  18 +-
 .../DynamicRecordInternalSerializerTestBase.java   |   2 +-
 .../dynamic/TestDynamicCommittableSerializer.java  |   6 +-
 .../flink/sink/dynamic/TestDynamicCommitter.java   |  34 +-
 .../dynamic/TestDynamicTableUpdateOperator.java    | 112 ++++++
 .../dynamic/TestDynamicWriteResultAggregator.java  |   7 +-
 .../dynamic/TestDynamicWriteResultSerializer.java  |   6 +-
 .../flink/sink/dynamic/TestDynamicWriter.java      |   4 +-
 .../flink/sink/dynamic/TestHashKeyGenerator.java   | 354 +++++++++++++++++++
 .../flink/sink/dynamic/TestRowDataEvolver.java     | 256 ++++++++++++++
 .../flink/source/TestProjectMetaColumn.java        |  11 +-
 29 files changed, 1547 insertions(+), 106 deletions(-)

diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
index e8a46c5bec..d845046cd2 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
@@ -19,7 +19,7 @@
 package org.apache.iceberg.flink.sink;
 
 import java.io.IOException;
-import java.util.List;
+import java.util.Set;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.iceberg.FileFormat;
@@ -56,7 +56,7 @@ abstract class BaseDeltaTaskWriter extends 
BaseTaskWriter<RowData> {
       long targetFileSize,
       Schema schema,
       RowType flinkSchema,
-      List<Integer> equalityFieldIds,
+      Set<Integer> equalityFieldIds,
       boolean upsert) {
     super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
     this.schema = schema;
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/EqualityFieldKeySelector.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/EqualityFieldKeySelector.java
index 18b269d6c3..92e47792c1 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/EqualityFieldKeySelector.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/EqualityFieldKeySelector.java
@@ -18,13 +18,13 @@
  */
 package org.apache.iceberg.flink.sink;
 
-import java.util.List;
+import java.util.Set;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.flink.RowDataWrapper;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.StructLikeWrapper;
 import org.apache.iceberg.util.StructProjection;
@@ -33,7 +33,8 @@ import org.apache.iceberg.util.StructProjection;
  * Create a {@link KeySelector} to shuffle by equality fields, to ensure same 
equality fields record
  * will be emitted to same writer in order.
  */
-class EqualityFieldKeySelector implements KeySelector<RowData, Integer> {
+@Internal
+public class EqualityFieldKeySelector implements KeySelector<RowData, Integer> 
{
 
   private final Schema schema;
   private final RowType flinkSchema;
@@ -43,10 +44,11 @@ class EqualityFieldKeySelector implements 
KeySelector<RowData, Integer> {
   private transient StructProjection structProjection;
   private transient StructLikeWrapper structLikeWrapper;
 
-  EqualityFieldKeySelector(Schema schema, RowType flinkSchema, List<Integer> 
equalityFieldIds) {
+  public EqualityFieldKeySelector(
+      Schema schema, RowType flinkSchema, Set<Integer> equalityFieldIds) {
     this.schema = schema;
     this.flinkSchema = flinkSchema;
-    this.deleteSchema = TypeUtil.select(schema, 
Sets.newHashSet(equalityFieldIds));
+    this.deleteSchema = TypeUtil.select(schema, equalityFieldIds);
   }
 
   /**
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index 3f2b265f7b..c42e4a015b 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -419,7 +419,7 @@ public class FlinkSink {
       flinkWriteConf = new FlinkWriteConf(table, writeOptions, readableConfig);
 
       // Find out the equality field id list based on the user-provided 
equality field column names.
-      List<Integer> equalityFieldIds =
+      Set<Integer> equalityFieldIds =
           SinkUtil.checkAndGetEqualityFieldIds(table, equalityFieldColumns);
 
       RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
@@ -524,7 +524,7 @@ public class FlinkSink {
     private SingleOutputStreamOperator<FlinkWriteResult> appendWriter(
         DataStream<RowData> input,
         RowType flinkRowType,
-        List<Integer> equalityFieldIds,
+        Set<Integer> equalityFieldIds,
         int writerParallelism) {
       // Validate the equality fields and partition fields if we enable the 
upsert mode.
       if (flinkWriteConf.upsertMode()) {
@@ -575,7 +575,7 @@ public class FlinkSink {
 
     private DataStream<RowData> distributeDataStream(
         DataStream<RowData> input,
-        List<Integer> equalityFieldIds,
+        Set<Integer> equalityFieldIds,
         RowType flinkRowType,
         int writerParallelism) {
       DistributionMode writeMode = flinkWriteConf.distributionMode();
@@ -711,7 +711,7 @@ public class FlinkSink {
       SerializableSupplier<Table> tableSupplier,
       FlinkWriteConf flinkWriteConf,
       RowType flinkRowType,
-      List<Integer> equalityFieldIds) {
+      Set<Integer> equalityFieldIds) {
     Preconditions.checkArgument(tableSupplier != null, "Iceberg table supplier 
shouldn't be null");
 
     Table initTable = tableSupplier.get();
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
index 52db4d4340..a2aec53a74 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
@@ -31,6 +31,7 @@ import java.io.UncheckedIOException;
 import java.time.Duration;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.function.Function;
 import org.apache.flink.annotation.Experimental;
@@ -152,7 +153,7 @@ public class IcebergSink
   private final RowType flinkRowType;
   private final SerializableSupplier<Table> tableSupplier;
   private final transient FlinkWriteConf flinkWriteConf;
-  private final List<Integer> equalityFieldIds;
+  private final Set<Integer> equalityFieldIds;
   private final boolean upsertMode;
   private final FileFormat dataFileFormat;
   private final long targetDataFileSize;
@@ -163,7 +164,7 @@ public class IcebergSink
   private final transient FlinkMaintenanceConfig flinkMaintenanceConfig;
 
   private final Table table;
-  private final List<String> equalityFieldColumns = null;
+  private final Set<String> equalityFieldColumns = null;
 
   private IcebergSink(
       TableLoader tableLoader,
@@ -174,7 +175,7 @@ public class IcebergSink
       RowType flinkRowType,
       SerializableSupplier<Table> tableSupplier,
       FlinkWriteConf flinkWriteConf,
-      List<Integer> equalityFieldIds,
+      Set<Integer> equalityFieldIds,
       String branch,
       boolean overwriteMode,
       FlinkMaintenanceConfig flinkMaintenanceConfig) {
@@ -617,7 +618,7 @@ public class IcebergSink
       boolean overwriteMode = flinkWriteConf.overwriteMode();
 
       // Validate the equality fields and partition fields if we enable the 
upsert mode.
-      List<Integer> equalityFieldIds =
+      Set<Integer> equalityFieldIds =
           SinkUtil.checkAndGetEqualityFieldIds(table, equalityFieldColumns);
 
       if (flinkWriteConf.upsertMode()) {
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionKeySelector.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionKeySelector.java
index df951684b4..17c8233e1f 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionKeySelector.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionKeySelector.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.flink.sink;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
@@ -31,7 +32,8 @@ import org.apache.iceberg.flink.RowDataWrapper;
  * wrote by only one task. That will reduce lots of small files in partitioned 
fanout write policy
  * for {@link FlinkSink}.
  */
-class PartitionKeySelector implements KeySelector<RowData, String> {
+@Internal
+public class PartitionKeySelector implements KeySelector<RowData, String> {
 
   private final Schema schema;
   private final PartitionKey partitionKey;
@@ -39,7 +41,7 @@ class PartitionKeySelector implements KeySelector<RowData, 
String> {
 
   private transient RowDataWrapper rowDataWrapper;
 
-  PartitionKeySelector(PartitionSpec spec, Schema schema, RowType flinkSchema) 
{
+  public PartitionKeySelector(PartitionSpec spec, Schema schema, RowType 
flinkSchema) {
     this.schema = schema;
     this.partitionKey = new PartitionKey(spec, schema);
     this.flinkSchema = flinkSchema;
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
index 38062dd1a2..3eb4dba802 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
@@ -20,8 +20,8 @@ package org.apache.iceberg.flink.sink;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
-import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.iceberg.FileFormat;
@@ -49,7 +49,7 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
       long targetFileSize,
       Schema schema,
       RowType flinkSchema,
-      List<Integer> equalityFieldIds,
+      Set<Integer> equalityFieldIds,
       boolean upsert) {
     super(
         spec,
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
index 8dc8d38869..7c11b20c44 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
@@ -18,8 +18,9 @@
  */
 package org.apache.iceberg.flink.sink;
 
-import java.util.List;
+import java.util.Collection;
 import java.util.Map;
+import java.util.Set;
 import java.util.function.Supplier;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
@@ -48,7 +49,7 @@ public class RowDataTaskWriterFactory implements 
TaskWriterFactory<RowData> {
   private final PartitionSpec spec;
   private final long targetFileSizeBytes;
   private final FileFormat format;
-  private final List<Integer> equalityFieldIds;
+  private final Set<Integer> equalityFieldIds;
   private final boolean upsert;
   private final FileAppenderFactory<RowData> appenderFactory;
 
@@ -60,7 +61,7 @@ public class RowDataTaskWriterFactory implements 
TaskWriterFactory<RowData> {
       long targetFileSizeBytes,
       FileFormat format,
       Map<String, String> writeProperties,
-      List<Integer> equalityFieldIds,
+      Collection<Integer> equalityFieldIds,
       boolean upsert) {
     this(
         () -> table,
@@ -78,7 +79,7 @@ public class RowDataTaskWriterFactory implements 
TaskWriterFactory<RowData> {
       long targetFileSizeBytes,
       FileFormat format,
       Map<String, String> writeProperties,
-      List<Integer> equalityFieldIds,
+      Collection<Integer> equalityFieldIds,
       boolean upsert) {
     this(
         tableSupplier,
@@ -98,7 +99,7 @@ public class RowDataTaskWriterFactory implements 
TaskWriterFactory<RowData> {
       long targetFileSizeBytes,
       FileFormat format,
       Map<String, String> writeProperties,
-      List<Integer> equalityFieldIds,
+      Collection<Integer> equalityFieldIds,
       boolean upsert,
       Schema schema,
       PartitionSpec spec) {
@@ -117,7 +118,7 @@ public class RowDataTaskWriterFactory implements 
TaskWriterFactory<RowData> {
     this.spec = spec;
     this.targetFileSizeBytes = targetFileSizeBytes;
     this.format = format;
-    this.equalityFieldIds = equalityFieldIds;
+    this.equalityFieldIds = equalityFieldIds != null ? 
Sets.newHashSet(equalityFieldIds) : null;
     this.upsert = upsert;
 
     if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
@@ -137,7 +138,7 @@ public class RowDataTaskWriterFactory implements 
TaskWriterFactory<RowData> {
               flinkSchema,
               writeProperties,
               spec,
-              ArrayUtil.toIntArray(equalityFieldIds),
+              ArrayUtil.toPrimitive(equalityFieldIds.toArray(new Integer[0])),
               TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)),
               null);
     } else {
@@ -148,7 +149,7 @@ public class RowDataTaskWriterFactory implements 
TaskWriterFactory<RowData> {
               flinkSchema,
               writeProperties,
               spec,
-              ArrayUtil.toIntArray(equalityFieldIds),
+              ArrayUtil.toPrimitive(equalityFieldIds.toArray(new Integer[0])),
               schema,
               null);
     }
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java
index 0a2a7c1b88..3f60b45a1f 100644
--- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java
+++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java
@@ -24,7 +24,6 @@ import java.util.Set;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,8 +41,8 @@ class SinkUtil {
 
   private static final Logger LOG = LoggerFactory.getLogger(SinkUtil.class);
 
-  static List<Integer> checkAndGetEqualityFieldIds(Table table, List<String> 
equalityFieldColumns) {
-    List<Integer> equalityFieldIds = 
Lists.newArrayList(table.schema().identifierFieldIds());
+  static Set<Integer> checkAndGetEqualityFieldIds(Table table, List<String> 
equalityFieldColumns) {
+    Set<Integer> equalityFieldIds = 
Sets.newHashSet(table.schema().identifierFieldIds());
     if (equalityFieldColumns != null && !equalityFieldColumns.isEmpty()) {
       Set<Integer> equalityFieldSet = 
Sets.newHashSetWithExpectedSize(equalityFieldColumns.size());
       for (String column : equalityFieldColumns) {
@@ -63,7 +62,7 @@ class SinkUtil {
             equalityFieldSet,
             table.schema().identifierFieldIds());
       }
-      equalityFieldIds = Lists.newArrayList(equalityFieldSet);
+      equalityFieldIds = Sets.newHashSet(equalityFieldSet);
     }
     return equalityFieldIds;
   }
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java
index 7680fb933b..b6ad03514b 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java
@@ -19,7 +19,7 @@
 package org.apache.iceberg.flink.sink;
 
 import java.io.IOException;
-import java.util.List;
+import java.util.Set;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.iceberg.FileFormat;
@@ -41,7 +41,7 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter {
       long targetFileSize,
       Schema schema,
       RowType flinkSchema,
-      List<Integer> equalityFieldIds,
+      Set<Integer> equalityFieldIds,
       boolean upsert) {
     super(
         spec,
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java
index 994f7a0865..600a4d8b95 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iceberg.flink.sink.dynamic;
 
-import java.util.List;
+import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.flink.table.data.RowData;
 import org.apache.iceberg.DistributionMode;
@@ -37,7 +37,7 @@ public class DynamicRecord {
   private DistributionMode distributionMode;
   private int writeParallelism;
   private boolean upsertMode;
-  @Nullable private List<String> equalityFields;
+  @Nullable private Set<String> equalityFields;
 
   public DynamicRecord(
       TableIdentifier tableIdentifier,
@@ -120,11 +120,11 @@ public class DynamicRecord {
     this.upsertMode = upsertMode;
   }
 
-  public List<String> equalityFields() {
+  public Set<String> equalityFields() {
     return equalityFields;
   }
 
-  public void setEqualityFields(List<String> equalityFields) {
+  public void setEqualityFields(Set<String> equalityFields) {
     this.equalityFields = equalityFields;
   }
 }
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternal.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternal.java
index 958a1e8539..fe1f4cdac9 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternal.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternal.java
@@ -18,8 +18,8 @@
  */
 package org.apache.iceberg.flink.sink.dynamic;
 
-import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
@@ -37,7 +37,7 @@ class DynamicRecordInternal {
   private int writerKey;
   private RowData rowData;
   private boolean upsertMode;
-  private List<Integer> equalityFieldIds;
+  private Set<Integer> equalityFieldIds;
 
   // Required for serialization instantiation
   DynamicRecordInternal() {}
@@ -50,7 +50,7 @@ class DynamicRecordInternal {
       PartitionSpec spec,
       int writerKey,
       boolean upsertMode,
-      List<Integer> equalityFieldsIds) {
+      Set<Integer> equalityFieldsIds) {
     this.tableName = tableName;
     this.branch = branch;
     this.schema = schema;
@@ -117,11 +117,11 @@ class DynamicRecordInternal {
     this.upsertMode = upsertMode;
   }
 
-  public List<Integer> equalityFields() {
+  public Set<Integer> equalityFields() {
     return equalityFieldIds;
   }
 
-  public void setEqualityFieldIds(List<Integer> equalityFieldIds) {
+  public void setEqualityFieldIds(Set<Integer> equalityFieldIds) {
     this.equalityFieldIds = equalityFieldIds;
   }
 
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializer.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializer.java
index a6f38f7d6b..d0a335b182 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializer.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializer.java
@@ -20,8 +20,8 @@ package org.apache.iceberg.flink.sink.dynamic;
 
 import java.io.IOException;
 import java.util.Collections;
-import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
@@ -31,11 +31,11 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.hadoop.util.Sets;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.PartitionSpecParser;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SchemaParser;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
 @Internal
 class DynamicRecordInternalSerializer extends 
TypeSerializer<DynamicRecordInternal> {
@@ -126,11 +126,11 @@ class DynamicRecordInternalSerializer extends 
TypeSerializer<DynamicRecordIntern
     RowData rowData = rowDataSerializer.deserialize(dataInputView);
     boolean upsertMode = dataInputView.readBoolean();
     int numEqualityFields = dataInputView.readInt();
-    final List<Integer> equalityFieldIds;
+    final Set<Integer> equalityFieldIds;
     if (numEqualityFields > 0) {
-      equalityFieldIds = Lists.newArrayList();
+      equalityFieldIds = Sets.newHashSetWithExpectedSize(numEqualityFields);
     } else {
-      equalityFieldIds = Collections.emptyList();
+      equalityFieldIds = Collections.emptySet();
     }
 
     for (int i = 0; i < numEqualityFields; i++) {
@@ -173,11 +173,11 @@ class DynamicRecordInternalSerializer extends 
TypeSerializer<DynamicRecordIntern
     RowData rowData = rowDataSerializer.deserialize(dataInputView);
     boolean upsertMode = dataInputView.readBoolean();
     int numEqualityFields = dataInputView.readInt();
-    final List<Integer> equalityFieldIds;
+    final Set<Integer> equalityFieldIds;
     if (numEqualityFields > 0) {
-      equalityFieldIds = Lists.newArrayList();
+      equalityFieldIds = Sets.newHashSetWithExpectedSize(numEqualityFields);
     } else {
-      equalityFieldIds = Collections.emptyList();
+      equalityFieldIds = Collections.emptySet();
     }
     for (int i = 0; i < numEqualityFields; i++) {
       equalityFieldIds.add(dataInputView.readInt());
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicSinkUtil.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicSinkUtil.java
new file mode 100644
index 0000000000..6ea6dcab86
--- /dev/null
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicSinkUtil.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.dynamic;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.hadoop.util.Sets;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+
+class DynamicSinkUtil {
+
+  private DynamicSinkUtil() {}
+
+  static Set<Integer> getEqualityFieldIds(Set<String> equalityFields, Schema 
schema) {
+    if (equalityFields == null || equalityFields.isEmpty()) {
+      if (!schema.identifierFieldIds().isEmpty()) {
+        return schema.identifierFieldIds();
+      } else {
+        return Collections.emptySet();
+      }
+    }
+
+    Set<Integer> equalityFieldIds = 
Sets.newHashSetWithExpectedSize(equalityFields.size());
+    for (String equalityField : equalityFields) {
+      Types.NestedField field = schema.findField(equalityField);
+      Preconditions.checkNotNull(
+          field, "Equality field %s does not exist in schema", equalityField);
+      equalityFieldIds.add(field.fieldId());
+    }
+
+    return equalityFieldIds;
+  }
+
+  static int safeAbs(int input) {
+    if (input >= 0) {
+      return input;
+    }
+
+    if (input == Integer.MIN_VALUE) {
+      // -Integer.MIN_VALUE would be Integer.MIN_VALUE due to integer 
overflow. Map to
+      // Integer.MAX_VALUE instead!
+      return Integer.MAX_VALUE;
+    }
+
+    return -input;
+  }
+}
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java
new file mode 100644
index 0000000000..c37532714d
--- /dev/null
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.dynamic;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.OpenContext;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.CatalogLoader;
+
+/**
+ * An optional operator to perform table updates for tables (e.g. schema 
update) in a non-concurrent
+ * way. Records must be keyed / routed to this operator by table name to 
ensure non-concurrent
+ * updates. The operator itself forwards the record after updating schema / 
spec of the table. The
+ * update is also reflected in the record.
+ */
+@Internal
+class DynamicTableUpdateOperator
+    extends RichMapFunction<DynamicRecordInternal, DynamicRecordInternal> {
+  private final CatalogLoader catalogLoader;
+  private final int cacheMaximumSize;
+  private final long cacheRefreshMs;
+  private transient TableUpdater updater;
+
+  DynamicTableUpdateOperator(
+      CatalogLoader catalogLoader, int cacheMaximumSize, long cacheRefreshMs) {
+    this.catalogLoader = catalogLoader;
+    this.cacheMaximumSize = cacheMaximumSize;
+    this.cacheRefreshMs = cacheRefreshMs;
+  }
+
+  @Override
+  public void open(OpenContext openContext) throws Exception {
+    super.open(openContext);
+    Catalog catalog = catalogLoader.loadCatalog();
+    this.updater =
+        new TableUpdater(
+            new TableMetadataCache(catalog, cacheMaximumSize, cacheRefreshMs), 
catalog);
+  }
+
+  @Override
+  public DynamicRecordInternal map(DynamicRecordInternal data) throws 
Exception {
+    Tuple3<Schema, CompareSchemasVisitor.Result, PartitionSpec> newData =
+        updater.update(
+            TableIdentifier.parse(data.tableName()), data.branch(), 
data.schema(), data.spec());
+
+    data.setSchema(newData.f0);
+    data.setSpec(newData.f2);
+
+    if (newData.f1 == CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED) {
+      RowData newRowData = RowDataEvolver.convert(data.rowData(), 
data.schema(), newData.f0);
+      data.setRowData(newRowData);
+    }
+
+    return data;
+  }
+}
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
index 3851dbf956..c4c4e61de1 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
@@ -114,7 +114,7 @@ class DynamicWriter implements 
CommittingSinkWriter<DynamicRecordInternal, Dynam
                             Maps.newHashMap(commonWriteProperties);
                         tableWriteProperties.putAll(table.properties());
 
-                        List<Integer> equalityFieldIds =
+                        Set<Integer> equalityFieldIds =
                             getEqualityFields(table, element.equalityFields());
                         if (element.upsertMode()) {
                           Preconditions.checkState(
@@ -138,7 +138,7 @@ class DynamicWriter implements 
CommittingSinkWriter<DynamicRecordInternal, Dynam
                             targetDataFileSize,
                             dataFileFormat,
                             tableWriteProperties,
-                            equalityFieldIds,
+                            Lists.newArrayList(equalityFieldIds),
                             element.upsertMode(),
                             element.schema(),
                             element.spec());
@@ -199,15 +199,15 @@ class DynamicWriter implements 
CommittingSinkWriter<DynamicRecordInternal, Dynam
     return result;
   }
 
-  private static List<Integer> getEqualityFields(Table table, List<Integer> 
equalityFieldIds) {
+  private static Set<Integer> getEqualityFields(Table table, Set<Integer> 
equalityFieldIds) {
     if (equalityFieldIds != null && !equalityFieldIds.isEmpty()) {
       return equalityFieldIds;
     }
     Set<Integer> identifierFieldIds = table.schema().identifierFieldIds();
     if (identifierFieldIds != null && !identifierFieldIds.isEmpty()) {
-      return Lists.newArrayList(identifierFieldIds);
+      return identifierFieldIds;
     }
-    return Collections.emptyList();
+    return Collections.emptySet();
   }
 
   @VisibleForTesting
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
new file mode 100644
index 0000000000..6cb1f46089
--- /dev/null
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.dynamic;
+
+import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.sink.EqualityFieldKeySelector;
+import org.apache.iceberg.flink.sink.PartitionKeySelector;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The HashKeyGenerator is responsible for creating the appropriate hash key 
for Flink's keyBy
+ * operation. The hash key is generated depending on the user-provided 
DynamicRecord and the table
+ * metadata. Under the hood, we maintain a set of Flink {@link KeySelector}s 
which implement the
+ * appropriate Iceberg {@link DistributionMode}. For every table, we randomly 
select a consistent
+ * subset of writer subtasks which receive data via their associated keys, 
depending on the chosen
+ * DistributionMode.
+ *
+ * <p>Caching ensures that a new key selector is also created when the table 
metadata (e.g. schema,
+ * spec) or the user-provided metadata changes (e.g. distribution mode, write 
parallelism).
+ *
+ * <p>Note: The hashing must be deterministic given the same parameters of the 
KeySelector and the
+ * same provided values.
+ */
+class HashKeyGenerator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HashKeyGenerator.class);
+
+  private final int maxWriteParallelism;
+  private final Cache<SelectorKey, KeySelector<RowData, Integer>> 
keySelectorCache;
+
+  HashKeyGenerator(int maxCacheSize, int maxWriteParallelism) {
+    this.maxWriteParallelism = maxWriteParallelism;
+    this.keySelectorCache = 
Caffeine.newBuilder().maximumSize(maxCacheSize).build();
+  }
+
+  int generateKey(DynamicRecord dynamicRecord) throws Exception {
+    return generateKey(dynamicRecord, null, null, null);
+  }
+
+  int generateKey(
+      DynamicRecord dynamicRecord,
+      @Nullable Schema tableSchema,
+      @Nullable PartitionSpec tableSpec,
+      @Nullable RowData overrideRowData)
+      throws Exception {
+    String tableIdent = dynamicRecord.tableIdentifier().toString();
+    SelectorKey cacheKey =
+        new SelectorKey(
+            tableIdent,
+            dynamicRecord.branch(),
+            tableSchema != null ? tableSchema.schemaId() : null,
+            tableSpec != null ? tableSpec.specId() : null,
+            dynamicRecord.schema(),
+            dynamicRecord.spec(),
+            dynamicRecord.equalityFields());
+    return keySelectorCache
+        .get(
+            cacheKey,
+            k ->
+                getKeySelector(
+                    tableIdent,
+                    MoreObjects.firstNonNull(tableSchema, 
dynamicRecord.schema()),
+                    MoreObjects.firstNonNull(tableSpec, dynamicRecord.spec()),
+                    MoreObjects.firstNonNull(
+                        dynamicRecord.distributionMode(), 
DistributionMode.NONE),
+                    MoreObjects.firstNonNull(
+                        dynamicRecord.equalityFields(), 
Collections.emptySet()),
+                    dynamicRecord.writeParallelism()))
+        .getKey(overrideRowData != null ? overrideRowData : 
dynamicRecord.rowData());
+  }
+
+  private KeySelector<RowData, Integer> getKeySelector(
+      String tableName,
+      Schema schema,
+      PartitionSpec spec,
+      DistributionMode mode,
+      Set<String> equalityFields,
+      int writeParallelism) {
+    LOG.debug(
+        "Creating new KeySelector for table '{}' with distribution mode '{}'", 
tableName, mode);
+    switch (mode) {
+      case NONE:
+        if (equalityFields.isEmpty()) {
+          return tableKeySelector(tableName, writeParallelism, 
maxWriteParallelism);
+        } else {
+          LOG.info(
+              "{}: Distribute rows by equality fields, because there are 
equality fields set",
+              tableName);
+          return equalityFieldKeySelector(
+              tableName, schema, equalityFields, writeParallelism, 
maxWriteParallelism);
+        }
+
+      case HASH:
+        if (equalityFields.isEmpty()) {
+          if (spec.isUnpartitioned()) {
+            LOG.warn(
+                "{}: Fallback to use 'none' distribution mode, because there 
are no equality fields set "
+                    + "and table is unpartitioned",
+                tableName);
+            return tableKeySelector(tableName, writeParallelism, 
maxWriteParallelism);
+          } else {
+            return partitionKeySelector(
+                tableName, schema, spec, writeParallelism, 
maxWriteParallelism);
+          }
+        } else {
+          if (spec.isUnpartitioned()) {
+            LOG.info(
+                "{}: Distribute rows by equality fields, because there are 
equality fields set "
+                    + "and table is unpartitioned",
+                tableName);
+            return equalityFieldKeySelector(
+                tableName, schema, equalityFields, writeParallelism, 
maxWriteParallelism);
+          } else {
+            for (PartitionField partitionField : spec.fields()) {
+              Preconditions.checkState(
+                  equalityFields.contains(partitionField.name()),
+                  "%s: In 'hash' distribution mode with equality fields set, 
partition field '%s' "
+                      + "should be included in equality fields: '%s'",
+                  tableName,
+                  partitionField,
+                  schema.columns().stream()
+                      .filter(c -> equalityFields.contains(c.name()))
+                      .collect(Collectors.toList()));
+            }
+            return partitionKeySelector(
+                tableName, schema, spec, writeParallelism, 
maxWriteParallelism);
+          }
+        }
+
+      case RANGE:
+        if (schema.identifierFieldIds().isEmpty()) {
+          LOG.warn(
+              "{}: Fallback to use 'none' distribution mode, because there are 
no equality fields set "
+                  + "and {}='range' is not supported yet in flink",
+              tableName,
+              WRITE_DISTRIBUTION_MODE);
+          return tableKeySelector(tableName, writeParallelism, 
maxWriteParallelism);
+        } else {
+          LOG.info(
+              "{}: Distribute rows by equality fields, because there are 
equality fields set "
+                  + "and {}='range' is not supported yet in flink",
+              tableName,
+              WRITE_DISTRIBUTION_MODE);
+          return equalityFieldKeySelector(
+              tableName, schema, equalityFields, writeParallelism, 
maxWriteParallelism);
+        }
+
+      default:
+        throw new IllegalArgumentException(
+            tableName + ": Unrecognized " + WRITE_DISTRIBUTION_MODE + ": " + 
mode);
+    }
+  }
+
+  private static KeySelector<RowData, Integer> equalityFieldKeySelector(
+      String tableName,
+      Schema schema,
+      Set<String> equalityFields,
+      int writeParallelism,
+      int maxWriteParallelism) {
+    return new TargetLimitedKeySelector(
+        new EqualityFieldKeySelector(
+            schema,
+            FlinkSchemaUtil.convert(schema),
+            DynamicSinkUtil.getEqualityFieldIds(equalityFields, schema)),
+        tableName,
+        writeParallelism,
+        maxWriteParallelism);
+  }
+
+  private static KeySelector<RowData, Integer> partitionKeySelector(
+      String tableName,
+      Schema schema,
+      PartitionSpec spec,
+      int writeParallelism,
+      int maxWriteParallelism) {
+    KeySelector<RowData, String> inner =
+        new PartitionKeySelector(spec, schema, 
FlinkSchemaUtil.convert(schema));
+    return new TargetLimitedKeySelector(
+        in -> inner.getKey(in).hashCode(), tableName, writeParallelism, 
maxWriteParallelism);
+  }
+
+  private static KeySelector<RowData, Integer> tableKeySelector(
+      String tableName, int writeParallelism, int maxWriteParallelism) {
+    return new TargetLimitedKeySelector(
+        new RoundRobinKeySelector<>(writeParallelism),
+        tableName,
+        writeParallelism,
+        maxWriteParallelism);
+  }
+
+  /**
+   * Generates a new key using the salt as a base, and reduces the target key 
range of the {@link
+   * #wrapped} {@link KeySelector} to {@link #writeParallelism}.
+   */
+  private static class TargetLimitedKeySelector implements 
KeySelector<RowData, Integer> {
+    private final KeySelector<RowData, Integer> wrapped;
+    private final int writeParallelism;
+    private final int[] distinctKeys;
+
+    @SuppressWarnings("checkstyle:ParameterAssignment")
+    TargetLimitedKeySelector(
+        KeySelector<RowData, Integer> wrapped,
+        String tableName,
+        int writeParallelism,
+        int maxWriteParallelism) {
+      if (writeParallelism > maxWriteParallelism) {
+        LOG.warn(
+            "{}: writeParallelism {} is greater than maxWriteParallelism {}. 
Capping writeParallelism at {}",
+            tableName,
+            writeParallelism,
+            maxWriteParallelism,
+            maxWriteParallelism);
+        writeParallelism = maxWriteParallelism;
+      }
+      this.wrapped = wrapped;
+      this.writeParallelism = writeParallelism;
+      this.distinctKeys = new int[writeParallelism];
+
+      // Ensures that the generated keys are always result in unique slotId
+      Set<Integer> targetSlots = 
Sets.newHashSetWithExpectedSize(writeParallelism);
+      int nextKey = tableName.hashCode();
+      for (int i = 0; i < writeParallelism; ++i) {
+        int subtaskId = subtaskId(nextKey, writeParallelism, 
maxWriteParallelism);
+        while (targetSlots.contains(subtaskId)) {
+          ++nextKey;
+          subtaskId = subtaskId(nextKey, writeParallelism, 
maxWriteParallelism);
+        }
+
+        targetSlots.add(subtaskId);
+        distinctKeys[i] = nextKey;
+        ++nextKey;
+      }
+    }
+
+    @Override
+    public Integer getKey(RowData value) throws Exception {
+      return distinctKeys[
+          DynamicSinkUtil.safeAbs(wrapped.getKey(value).hashCode()) % 
writeParallelism];
+    }
+
+    private static int subtaskId(int key, int writeParallelism, int 
maxWriteParallelism) {
+      return KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
+          maxWriteParallelism,
+          writeParallelism,
+          KeyGroupRangeAssignment.computeKeyGroupForKeyHash(key, 
maxWriteParallelism));
+    }
+  }
+
+  /**
+   * Generates evenly distributed keys between [0..{@link #maxTarget}) range 
using round-robin
+   * algorithm.
+   *
+   * @param <T> unused input for key generation
+   */
+  private static class RoundRobinKeySelector<T> implements KeySelector<T, 
Integer> {
+    private final int maxTarget;
+    private int lastTarget = 0;
+
+    RoundRobinKeySelector(int maxTarget) {
+      this.maxTarget = maxTarget;
+    }
+
+    @Override
+    public Integer getKey(T value) {
+      lastTarget = (lastTarget + 1) % maxTarget;
+      return lastTarget;
+    }
+  }
+
+  /**
+   * Cache key for the {@link KeySelector}. Only contains the {@link Schema} 
and the {@link
+   * PartitionSpec} if their ids are not provided.
+   */
+  static class SelectorKey {
+    private final String tableName;
+    private final String branch;
+    private final Integer schemaId;
+    private final Integer specId;
+    private final Schema schema;
+    private final PartitionSpec spec;
+    private final Set<String> equalityFields;
+
+    SelectorKey(
+        String tableName,
+        String branch,
+        @Nullable Integer tableSchemaId,
+        @Nullable Integer tableSpecId,
+        Schema schema,
+        PartitionSpec spec,
+        Set<String> equalityFields) {
+      this.tableName = tableName;
+      this.branch = branch;
+      this.schemaId = tableSchemaId;
+      this.specId = tableSpecId;
+      this.schema = tableSchemaId == null ? schema : null;
+      this.spec = tableSpecId == null ? spec : null;
+      this.equalityFields = equalityFields;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (this == other) {
+        return true;
+      }
+
+      if (other == null || getClass() != other.getClass()) {
+        return false;
+      }
+
+      SelectorKey that = (SelectorKey) other;
+      return Objects.equals(tableName, that.tableName)
+          && Objects.equals(branch, that.branch)
+          && Objects.equals(schemaId, that.schemaId)
+          && Objects.equals(specId, that.specId)
+          && Objects.equals(schema, that.schema)
+          && Objects.equals(spec, that.spec)
+          && Objects.equals(equalityFields, that.equalityFields);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(tableName, branch, schemaId, specId, schema, spec, 
equalityFields);
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("tableName", tableName)
+          .add("branch", branch)
+          .add("schemaId", schemaId)
+          .add("specId", specId)
+          .add("schema", schema)
+          .add("spec", spec)
+          .add("equalityFields", equalityFields)
+          .toString();
+    }
+  }
+
+  @VisibleForTesting
+  Cache<SelectorKey, KeySelector<RowData, Integer>> getKeySelectorCache() {
+    return keySelectorCache;
+  }
+}
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/RowDataEvolver.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/RowDataEvolver.java
new file mode 100644
index 0000000000..fe670c54eb
--- /dev/null
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/RowDataEvolver.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.dynamic;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+/**
+ * A RowDataEvolver is responsible to change the input RowData to make it 
compatible with the target
+ * schema. This is done when
+ *
+ * <ol>
+ *   <li>The input schema has fewer fields than the target schema.
+ *   <li>The table types are wider than the input type.
+ *   <li>The field order differs for source and target schema.
+ * </ol>
+ *
+ * <p>The resolution is as follows:
+ *
+ * <ol>
+ *   <li>In the first case, we would add a null values for the missing field 
(if the field is
+ *       optional).
+ *   <li>In the second case, we would convert the data for the input field to 
a wider type, e.g. int
+ *       (input type) => long (table type).
+ *   <li>In the third case, we would rearrange the input data to match the 
target table.
+ * </ol>
+ */
+class RowDataEvolver {
+  private RowDataEvolver() {}
+
+  public static RowData convert(RowData sourceData, Schema sourceSchema, 
Schema targetSchema) {
+    return convertStruct(
+        sourceData, FlinkSchemaUtil.convert(sourceSchema), 
FlinkSchemaUtil.convert(targetSchema));
+  }
+
+  private static Object convert(Object object, LogicalType sourceType, 
LogicalType targetType) {
+    if (object == null) {
+      return null;
+    }
+
+    switch (targetType.getTypeRoot()) {
+      case BOOLEAN:
+      case INTEGER:
+      case FLOAT:
+      case VARCHAR:
+      case DATE:
+      case TIME_WITHOUT_TIME_ZONE:
+      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+      case BINARY:
+      case VARBINARY:
+        return object;
+      case DOUBLE:
+        if (object instanceof Float) {
+          return ((Float) object).doubleValue();
+        } else {
+          return object;
+        }
+      case BIGINT:
+        if (object instanceof Integer) {
+          return ((Integer) object).longValue();
+        } else {
+          return object;
+        }
+      case DECIMAL:
+        DecimalType toDecimalType = (DecimalType) targetType;
+        DecimalData decimalData = (DecimalData) object;
+        if (((DecimalType) sourceType).getPrecision() == 
toDecimalType.getPrecision()) {
+          return object;
+        } else {
+          return DecimalData.fromBigDecimal(
+              decimalData.toBigDecimal(), toDecimalType.getPrecision(), 
toDecimalType.getScale());
+        }
+      case TIMESTAMP_WITHOUT_TIME_ZONE:
+        if (object instanceof Integer) {
+          LocalDateTime dateTime =
+              LocalDateTime.of(LocalDate.ofEpochDay((Integer) object), 
LocalTime.MIN);
+          return TimestampData.fromLocalDateTime(dateTime);
+        } else {
+          return object;
+        }
+      case ROW:
+        return convertStruct((RowData) object, (RowType) sourceType, (RowType) 
targetType);
+      case ARRAY:
+        return convertArray((ArrayData) object, (ArrayType) sourceType, 
(ArrayType) targetType);
+      case MAP:
+        return convertMap((MapData) object, (MapType) sourceType, (MapType) 
targetType);
+      default:
+        throw new UnsupportedOperationException("Not a supported type: " + 
targetType);
+    }
+  }
+
+  private static RowData convertStruct(RowData sourceData, RowType sourceType, 
RowType targetType) {
+    GenericRowData targetData = new 
GenericRowData(targetType.getFields().size());
+    List<RowType.RowField> targetFields = targetType.getFields();
+    for (int i = 0; i < targetFields.size(); i++) {
+      RowType.RowField targetField = targetFields.get(i);
+
+      int sourceFieldId = sourceType.getFieldIndex(targetField.getName());
+      if (sourceFieldId == -1) {
+        if (targetField.getType().isNullable()) {
+          targetData.setField(i, null);
+        } else {
+          throw new IllegalArgumentException(
+              String.format(
+                  "Field %s in target schema %s is non-nullable but does not 
exist in source schema.",
+                  i + 1, targetType));
+        }
+      } else {
+        RowData.FieldGetter getter =
+            RowData.createFieldGetter(sourceType.getTypeAt(sourceFieldId), 
sourceFieldId);
+        targetData.setField(
+            i,
+            convert(
+                getter.getFieldOrNull(sourceData),
+                sourceType.getFields().get(sourceFieldId).getType(),
+                targetField.getType()));
+      }
+    }
+
+    return targetData;
+  }
+
+  private static ArrayData convertArray(
+      ArrayData sourceData, ArrayType sourceType, ArrayType targetType) {
+    LogicalType fromElementType = sourceType.getElementType();
+    LogicalType toElementType = targetType.getElementType();
+    ArrayData.ElementGetter elementGetter = 
ArrayData.createElementGetter(fromElementType);
+    Object[] convertedArray = new Object[sourceData.size()];
+    for (int i = 0; i < convertedArray.length; i++) {
+      convertedArray[i] =
+          convert(elementGetter.getElementOrNull(sourceData, i), 
fromElementType, toElementType);
+    }
+
+    return new GenericArrayData(convertedArray);
+  }
+
+  private static MapData convertMap(MapData sourceData, MapType sourceType, 
MapType targetType) {
+    LogicalType fromMapKeyType = sourceType.getKeyType();
+    LogicalType fromMapValueType = sourceType.getValueType();
+    LogicalType toMapKeyType = targetType.getKeyType();
+    LogicalType toMapValueType = targetType.getValueType();
+    ArrayData keyArray = sourceData.keyArray();
+    ArrayData valueArray = sourceData.valueArray();
+    ArrayData.ElementGetter keyGetter = 
ArrayData.createElementGetter(fromMapKeyType);
+    ArrayData.ElementGetter valueGetter = 
ArrayData.createElementGetter(fromMapValueType);
+    Map<Object, Object> convertedMap = Maps.newLinkedHashMap();
+    for (int i = 0; i < keyArray.size(); ++i) {
+      convertedMap.put(
+          convert(keyGetter.getElementOrNull(keyArray, i), fromMapKeyType, 
toMapKeyType),
+          convert(valueGetter.getElementOrNull(valueArray, i), 
fromMapValueType, toMapValueType));
+    }
+
+    return new GenericMapData(convertedMap);
+  }
+}
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/WriteTarget.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/WriteTarget.java
index 0a43404d13..afd5b637e9 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/WriteTarget.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/WriteTarget.java
@@ -20,12 +20,12 @@ package org.apache.iceberg.flink.sink.dynamic;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.hadoop.util.Sets;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
 class WriteTarget implements Serializable {
 
@@ -34,7 +34,7 @@ class WriteTarget implements Serializable {
   private final Integer schemaId;
   private final Integer specId;
   private final boolean upsertMode;
-  private final List<Integer> equalityFields;
+  private final Set<Integer> equalityFields;
 
   WriteTarget(
       String tableName,
@@ -42,7 +42,7 @@ class WriteTarget implements Serializable {
       Integer schemaId,
       Integer specId,
       boolean upsertMode,
-      List<Integer> equalityFields) {
+      Set<Integer> equalityFields) {
     this.tableName = tableName;
     this.branch = branch != null ? branch : "main";
     this.schemaId = schemaId;
@@ -71,7 +71,7 @@ class WriteTarget implements Serializable {
     return upsertMode;
   }
 
-  List<Integer> equalityFields() {
+  Set<Integer> equalityFields() {
     return equalityFields;
   }
 
@@ -94,12 +94,12 @@ class WriteTarget implements Serializable {
         view.readInt(),
         view.readInt(),
         view.readBoolean(),
-        readList(view));
+        readSet(view));
   }
 
-  private static List<Integer> readList(DataInputView view) throws IOException 
{
+  private static Set<Integer> readSet(DataInputView view) throws IOException {
     int numFields = view.readInt();
-    List<Integer> equalityFields = Lists.newArrayList();
+    Set<Integer> equalityFields = Sets.newHashSetWithExpectedSize(numFields);
     for (int i = 0; i < numFields; i++) {
       equalityFields.add(view.readInt());
     }
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
index 7df167ec32..a21c51c378 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
@@ -34,6 +34,7 @@ import java.nio.file.Paths;
 import java.time.OffsetDateTime;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
@@ -63,6 +64,7 @@ import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.io.TaskWriter;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Types;
@@ -100,7 +102,7 @@ public class TestDeltaTaskWriter extends TestBase {
   }
 
   private void testCdcEvents(boolean partitioned) throws IOException {
-    List<Integer> equalityFieldIds = Lists.newArrayList(idFieldId());
+    Set<Integer> equalityFieldIds = Sets.newHashSet(idFieldId());
     TaskWriterFactory<RowData> taskWriterFactory = 
createTaskWriterFactory(equalityFieldIds);
     taskWriterFactory.initialize(1, 1);
 
@@ -178,7 +180,7 @@ public class TestDeltaTaskWriter extends TestBase {
 
   private void testWritePureEqDeletes(boolean partitioned) throws IOException {
     createAndInitTable(partitioned);
-    List<Integer> equalityFieldIds = Lists.newArrayList(idFieldId());
+    Set<Integer> equalityFieldIds = Sets.newHashSet(idFieldId());
     TaskWriterFactory<RowData> taskWriterFactory = 
createTaskWriterFactory(equalityFieldIds);
     taskWriterFactory.initialize(1, 1);
 
@@ -207,7 +209,7 @@ public class TestDeltaTaskWriter extends TestBase {
 
   private void testAbort(boolean partitioned) throws IOException {
     createAndInitTable(partitioned);
-    List<Integer> equalityFieldIds = Lists.newArrayList(idFieldId());
+    Set<Integer> equalityFieldIds = Sets.newHashSet(idFieldId());
     TaskWriterFactory<RowData> taskWriterFactory = 
createTaskWriterFactory(equalityFieldIds);
     taskWriterFactory.initialize(1, 1);
 
@@ -247,7 +249,7 @@ public class TestDeltaTaskWriter extends TestBase {
   @TestTemplate
   public void testPartitionedTableWithDataAsKey() throws IOException {
     createAndInitTable(true);
-    List<Integer> equalityFieldIds = Lists.newArrayList(dataFieldId());
+    Set<Integer> equalityFieldIds = Sets.newHashSet(dataFieldId());
     TaskWriterFactory<RowData> taskWriterFactory = 
createTaskWriterFactory(equalityFieldIds);
     taskWriterFactory.initialize(1, 1);
 
@@ -290,7 +292,7 @@ public class TestDeltaTaskWriter extends TestBase {
   @TestTemplate
   public void testPartitionedTableWithDataAndIdAsKey() throws IOException {
     createAndInitTable(true);
-    List<Integer> equalityFieldIds = Lists.newArrayList(dataFieldId(), 
idFieldId());
+    Set<Integer> equalityFieldIds = Sets.newHashSet(dataFieldId(), 
idFieldId());
     TaskWriterFactory<RowData> taskWriterFactory = 
createTaskWriterFactory(equalityFieldIds);
     taskWriterFactory.initialize(1, 1);
 
@@ -325,7 +327,7 @@ public class TestDeltaTaskWriter extends TestBase {
     this.table = create(tableSchema, PartitionSpec.unpartitioned());
     initTable(table);
 
-    List<Integer> equalityIds = 
ImmutableList.of(table.schema().findField("ts").fieldId());
+    Set<Integer> equalityIds = 
ImmutableSet.of(table.schema().findField("ts").fieldId());
     TaskWriterFactory<RowData> taskWriterFactory = 
createTaskWriterFactory(flinkType, equalityIds);
     taskWriterFactory.initialize(1, 1);
 
@@ -383,7 +385,7 @@ public class TestDeltaTaskWriter extends TestBase {
     return SimpleDataUtil.actualRowSet(table, columns);
   }
 
-  private TaskWriterFactory<RowData> createTaskWriterFactory(List<Integer> 
equalityFieldIds) {
+  private TaskWriterFactory<RowData> createTaskWriterFactory(Set<Integer> 
equalityFieldIds) {
     return new RowDataTaskWriterFactory(
         SerializableTable.copyOf(table),
         FlinkSchemaUtil.convert(table.schema()),
@@ -395,7 +397,7 @@ public class TestDeltaTaskWriter extends TestBase {
   }
 
   private TaskWriterFactory<RowData> createTaskWriterFactory(
-      RowType flinkType, List<Integer> equalityFieldIds) {
+      RowType flinkType, Set<Integer> equalityFieldIds) {
     return new RowDataTaskWriterFactory(
         SerializableTable.copyOf(table),
         flinkType,
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializerTestBase.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializerTestBase.java
index 243b7f9095..30782e8d41 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializerTestBase.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializerTestBase.java
@@ -80,7 +80,7 @@ abstract class DynamicRecordInternalSerializerTestBase
 
     return new DynamicRecordInternal[] {
       new DynamicRecordInternal(
-          TABLE, BRANCH, SCHEMA, rowData, SPEC, 42, false, 
Collections.emptyList())
+          TABLE, BRANCH, SCHEMA, rowData, SPEC, 42, false, 
Collections.emptySet())
     };
   }
 
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommittableSerializer.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommittableSerializer.java
index f4109a6476..13a06d3627 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommittableSerializer.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommittableSerializer.java
@@ -24,7 +24,7 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 import java.io.IOException;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.junit.jupiter.api.Test;
 
 class TestDynamicCommittableSerializer {
@@ -33,7 +33,7 @@ class TestDynamicCommittableSerializer {
   void testRoundtrip() throws IOException {
     DynamicCommittable committable =
         new DynamicCommittable(
-            new WriteTarget("table", "branch", 42, 23, false, 
Lists.newArrayList(1, 2)),
+            new WriteTarget("table", "branch", 42, 23, false, 
Sets.newHashSet(1, 2)),
             new byte[] {3, 4},
             JobID.generate().toHexString(),
             new OperatorID().toHexString(),
@@ -48,7 +48,7 @@ class TestDynamicCommittableSerializer {
   void testUnsupportedVersion() throws IOException {
     DynamicCommittable committable =
         new DynamicCommittable(
-            new WriteTarget("table", "branch", 42, 23, false, 
Lists.newArrayList(1, 2)),
+            new WriteTarget("table", "branch", 42, 23, false, 
Sets.newHashSet(1, 2)),
             new byte[] {3, 4},
             JobID.generate().toHexString(),
             new OperatorID().toHexString(),
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
index d9129d6eac..99a5465362 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
@@ -41,8 +41,8 @@ import org.apache.iceberg.flink.HadoopCatalogExtension;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -105,11 +105,11 @@ class TestDynamicCommitter {
             committerMetrics);
 
     WriteTarget writeTarget1 =
-        new WriteTarget(TABLE1, "branch", 42, 0, true, Lists.newArrayList(1, 
2));
+        new WriteTarget(TABLE1, "branch", 42, 0, true, Sets.newHashSet(1, 2));
     WriteTarget writeTarget2 =
-        new WriteTarget(TABLE1, "branch2", 43, 0, true, Lists.newArrayList(1, 
2));
+        new WriteTarget(TABLE1, "branch2", 43, 0, true, Sets.newHashSet(1, 2));
     WriteTarget writeTarget3 =
-        new WriteTarget(TABLE2, "branch2", 43, 0, true, Lists.newArrayList(1, 
2));
+        new WriteTarget(TABLE2, "branch2", 43, 0, true, Sets.newHashSet(1, 2));
 
     DynamicWriteResultAggregator aggregator =
         new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
@@ -120,21 +120,21 @@ class TestDynamicCommitter {
     byte[] deltaManifest1 =
         aggregator.writeToManifest(
             writeTarget1,
-            Lists.newArrayList(
+            Sets.newHashSet(
                 new DynamicWriteResult(
                     writeTarget1, 
WriteResult.builder().addDataFiles(DATA_FILE).build())),
             0);
     byte[] deltaManifest2 =
         aggregator.writeToManifest(
             writeTarget2,
-            Lists.newArrayList(
+            Sets.newHashSet(
                 new DynamicWriteResult(
                     writeTarget2, 
WriteResult.builder().addDataFiles(DATA_FILE).build())),
             0);
     byte[] deltaManifest3 =
         aggregator.writeToManifest(
             writeTarget3,
-            Lists.newArrayList(
+            Sets.newHashSet(
                 new DynamicWriteResult(
                     writeTarget3, 
WriteResult.builder().addDataFiles(DATA_FILE).build())),
             0);
@@ -155,7 +155,7 @@ class TestDynamicCommitter {
         new MockCommitRequest<>(
             new DynamicCommittable(writeTarget3, deltaManifest3, jobId, 
operatorId, checkpointId));
 
-    dynamicCommitter.commit(Lists.newArrayList(commitRequest1, commitRequest2, 
commitRequest3));
+    dynamicCommitter.commit(Sets.newHashSet(commitRequest1, commitRequest2, 
commitRequest3));
 
     table1.refresh();
     assertThat(table1.snapshots()).hasSize(2);
@@ -238,7 +238,7 @@ class TestDynamicCommitter {
             committerMetrics);
 
     WriteTarget writeTarget =
-        new WriteTarget(TABLE1, "branch", 42, 0, false, Lists.newArrayList(1, 
2));
+        new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet(1, 2));
 
     DynamicWriteResultAggregator aggregator =
         new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
@@ -253,7 +253,7 @@ class TestDynamicCommitter {
     byte[] deltaManifest =
         aggregator.writeToManifest(
             writeTarget,
-            Lists.newArrayList(
+            Sets.newHashSet(
                 new DynamicWriteResult(
                     writeTarget, 
WriteResult.builder().addDataFiles(DATA_FILE).build())),
             checkpointId);
@@ -262,7 +262,7 @@ class TestDynamicCommitter {
         new MockCommitRequest<>(
             new DynamicCommittable(writeTarget, deltaManifest, jobId, 
operatorId, checkpointId));
 
-    dynamicCommitter.commit(Lists.newArrayList(commitRequest));
+    dynamicCommitter.commit(Sets.newHashSet(commitRequest));
 
     CommitRequest<DynamicCommittable> oldCommitRequest =
         new MockCommitRequest<>(
@@ -270,7 +270,7 @@ class TestDynamicCommitter {
                 writeTarget, deltaManifest, jobId, operatorId, checkpointId - 
1));
 
     // Old commits requests shouldn't affect the result
-    dynamicCommitter.commit(Lists.newArrayList(oldCommitRequest));
+    dynamicCommitter.commit(Sets.newHashSet(oldCommitRequest));
 
     table1.refresh();
     assertThat(table1.snapshots()).hasSize(1);
@@ -315,7 +315,7 @@ class TestDynamicCommitter {
             committerMetrics);
 
     WriteTarget writeTarget =
-        new WriteTarget(TABLE1, "branch", 42, 0, false, Lists.newArrayList(1, 
2));
+        new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet(1, 2));
 
     DynamicWriteResultAggregator aggregator =
         new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
@@ -330,7 +330,7 @@ class TestDynamicCommitter {
     byte[] deltaManifest =
         aggregator.writeToManifest(
             writeTarget,
-            Lists.newArrayList(
+            Sets.newHashSet(
                 new DynamicWriteResult(
                     writeTarget, 
WriteResult.builder().addDataFiles(DATA_FILE).build())),
             checkpointId);
@@ -339,12 +339,12 @@ class TestDynamicCommitter {
         new MockCommitRequest<>(
             new DynamicCommittable(writeTarget, deltaManifest, jobId, 
operatorId, checkpointId));
 
-    dynamicCommitter.commit(Lists.newArrayList(commitRequest));
+    dynamicCommitter.commit(Sets.newHashSet(commitRequest));
 
     byte[] overwriteManifest =
         aggregator.writeToManifest(
             writeTarget,
-            Lists.newArrayList(
+            Sets.newHashSet(
                 new DynamicWriteResult(
                     writeTarget, 
WriteResult.builder().addDataFiles(DATA_FILE).build())),
             checkpointId + 1);
@@ -354,7 +354,7 @@ class TestDynamicCommitter {
             new DynamicCommittable(
                 writeTarget, overwriteManifest, jobId, operatorId, 
checkpointId + 1));
 
-    dynamicCommitter.commit(Lists.newArrayList(overwriteRequest));
+    dynamicCommitter.commit(Sets.newHashSet(overwriteRequest));
 
     table1.refresh();
     assertThat(table1.snapshots()).hasSize(2);
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java
new file mode 100644
index 0000000000..aa483b3e53
--- /dev/null
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.dynamic;
+
+import static org.apache.iceberg.flink.TestFixtures.DATABASE;
+import static org.apache.iceberg.flink.TestFixtures.TABLE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Collections;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+class TestDynamicTableUpdateOperator {
+
+  @RegisterExtension
+  private static final HadoopCatalogExtension CATALOG_EXTENSION =
+      new HadoopCatalogExtension(DATABASE, TABLE);
+
+  private static final Schema SCHEMA1 =
+      new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get()));
+
+  private static final Schema SCHEMA2 =
+      new Schema(
+          Types.NestedField.required(1, "id", Types.IntegerType.get()),
+          Types.NestedField.optional(2, "data", Types.StringType.get()));
+
+  @Test
+  void testDynamicTableUpdateOperatorNewTable() throws Exception {
+    int cacheMaximumSize = 10;
+    int cacheRefreshMs = 1000;
+    Catalog catalog = CATALOG_EXTENSION.catalog();
+    TableIdentifier table = TableIdentifier.of(TABLE);
+
+    assertThat(catalog.tableExists(table)).isFalse();
+    DynamicTableUpdateOperator operator =
+        new DynamicTableUpdateOperator(
+            CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize, 
cacheRefreshMs);
+    operator.open(null);
+
+    DynamicRecordInternal input =
+        new DynamicRecordInternal(
+            TABLE,
+            "branch",
+            SCHEMA1,
+            GenericRowData.of(1, "test"),
+            PartitionSpec.unpartitioned(),
+            42,
+            false,
+            Collections.emptySet());
+    DynamicRecordInternal output = operator.map(input);
+
+    assertThat(catalog.tableExists(table)).isTrue();
+    assertThat(input).isEqualTo(output);
+  }
+
+  @Test
+  void testDynamicTableUpdateOperatorSchemaChange() throws Exception {
+    int cacheMaximumSize = 10;
+    int cacheRefreshMs = 1000;
+    Catalog catalog = CATALOG_EXTENSION.catalog();
+    TableIdentifier table = TableIdentifier.of(TABLE);
+
+    DynamicTableUpdateOperator operator =
+        new DynamicTableUpdateOperator(
+            CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize, 
cacheRefreshMs);
+    operator.open(null);
+
+    catalog.createTable(table, SCHEMA1);
+    DynamicRecordInternal input =
+        new DynamicRecordInternal(
+            TABLE,
+            "branch",
+            SCHEMA2,
+            GenericRowData.of(1, "test"),
+            PartitionSpec.unpartitioned(),
+            42,
+            false,
+            Collections.emptySet());
+    DynamicRecordInternal output = operator.map(input);
+
+    assertThat(catalog.loadTable(table).schema().sameSchema(SCHEMA2)).isTrue();
+    assertThat(input).isEqualTo(output);
+
+    // Process the same input again
+    DynamicRecordInternal output2 = operator.map(input);
+    assertThat(output2).isEqualTo(output);
+    
assertThat(catalog.loadTable(table).schema().schemaId()).isEqualTo(output.schema().schemaId());
+  }
+}
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java
index 137b87bb17..713c67da17 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java
@@ -24,11 +24,11 @@ import 
org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
 import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.hadoop.util.Sets;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.flink.HadoopCatalogExtension;
 import org.apache.iceberg.io.WriteResult;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
@@ -49,12 +49,11 @@ class TestDynamicWriteResultAggregator {
         testHarness = new OneInputStreamOperatorTestHarness<>(aggregator)) {
       testHarness.open();
 
-      WriteTarget writeTarget1 =
-          new WriteTarget("table", "branch", 42, 0, true, 
Lists.newArrayList());
+      WriteTarget writeTarget1 = new WriteTarget("table", "branch", 42, 0, 
true, Sets.newHashSet());
       DynamicWriteResult dynamicWriteResult1 =
           new DynamicWriteResult(writeTarget1, WriteResult.builder().build());
       WriteTarget writeTarget2 =
-          new WriteTarget("table2", "branch", 42, 0, true, 
Lists.newArrayList(1, 2));
+          new WriteTarget("table2", "branch", 42, 0, true, Sets.newHashSet(1, 
2));
       DynamicWriteResult dynamicWriteResult2 =
           new DynamicWriteResult(writeTarget2, WriteResult.builder().build());
 
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultSerializer.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultSerializer.java
index bede5d42b9..a3a9691107 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultSerializer.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultSerializer.java
@@ -23,13 +23,13 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import org.apache.hadoop.util.Sets;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.Metrics;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.junit.jupiter.api.Test;
 
 class TestDynamicWriteResultSerializer {
@@ -53,7 +53,7 @@ class TestDynamicWriteResultSerializer {
   void testRoundtrip() throws IOException {
     DynamicWriteResult dynamicWriteResult =
         new DynamicWriteResult(
-            new WriteTarget("table", "branch", 42, 23, false, 
Lists.newArrayList(1, 2)),
+            new WriteTarget("table", "branch", 42, 23, false, 
Sets.newHashSet(1, 2)),
             WriteResult.builder().addDataFiles(DATA_FILE).build());
 
     DynamicWriteResultSerializer serializer = new 
DynamicWriteResultSerializer();
@@ -71,7 +71,7 @@ class TestDynamicWriteResultSerializer {
   void testUnsupportedVersion() throws IOException {
     DynamicWriteResult dynamicWriteResult =
         new DynamicWriteResult(
-            new WriteTarget("table", "branch", 42, 23, false, 
Lists.newArrayList(1, 2)),
+            new WriteTarget("table", "branch", 42, 23, false, 
Sets.newHashSet(1, 2)),
             WriteResult.builder().addDataFiles(DATA_FILE).build());
 
     DynamicWriteResultSerializer serializer = new 
DynamicWriteResultSerializer();
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
index 0a723c9d57..0c223f3237 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
@@ -33,7 +33,7 @@ import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase;
 import org.apache.iceberg.io.WriteResult;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.jetbrains.annotations.NotNull;
 import org.junit.jupiter.api.Test;
 
@@ -118,7 +118,7 @@ class TestDynamicWriter extends TestFlinkIcebergSinkBase {
 
     DynamicRecordInternal record = getDynamicRecordInternal(table1);
     record.setUpsertMode(true);
-    record.setEqualityFieldIds(Lists.newArrayList(1));
+    record.setEqualityFieldIds(Sets.newHashSet(1));
 
     dyamicWriter.write(record, null);
     dyamicWriter.prepareCommit();
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java
new file mode 100644
index 0000000000..b8d86feb99
--- /dev/null
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.dynamic;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import java.util.Collections;
+import java.util.Set;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.Test;
+
+class TestHashKeyGenerator {
+
+  private static final Schema SCHEMA =
+      new Schema(
+          Types.NestedField.required(1, "id", Types.IntegerType.get()),
+          Types.NestedField.required(2, "data", Types.StringType.get()));
+
+  private static final String BRANCH = "main";
+  private static final TableIdentifier TABLE_IDENTIFIER = 
TableIdentifier.of("default", "table");
+
+  @Test
+  void testRoundRobinWithDistributionModeNone() throws Exception {
+    int writeParallelism = 10;
+    int maxWriteParallelism = 2;
+    HashKeyGenerator generator = new HashKeyGenerator(1, maxWriteParallelism);
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+
+    GenericRowData row = GenericRowData.of(1, StringData.fromString("z"));
+    int writeKey1 =
+        getWriteKey(
+            generator, spec, DistributionMode.NONE, writeParallelism, 
Collections.emptySet(), row);
+    int writeKey2 =
+        getWriteKey(
+            generator, spec, DistributionMode.NONE, writeParallelism, 
Collections.emptySet(), row);
+    int writeKey3 =
+        getWriteKey(
+            generator, spec, DistributionMode.NONE, writeParallelism, 
Collections.emptySet(), row);
+    int writeKey4 =
+        getWriteKey(
+            generator, spec, DistributionMode.NONE, writeParallelism, 
Collections.emptySet(), row);
+
+    assertThat(writeKey1).isNotEqualTo(writeKey2);
+    assertThat(writeKey3).isEqualTo(writeKey1);
+    assertThat(writeKey4).isEqualTo(writeKey2);
+
+    assertThat(getSubTaskId(writeKey1, writeParallelism, 
maxWriteParallelism)).isEqualTo(0);
+    assertThat(getSubTaskId(writeKey2, writeParallelism, 
maxWriteParallelism)).isEqualTo(5);
+    assertThat(getSubTaskId(writeKey3, writeParallelism, 
maxWriteParallelism)).isEqualTo(0);
+    assertThat(getSubTaskId(writeKey4, writeParallelism, 
maxWriteParallelism)).isEqualTo(5);
+  }
+
+  @Test
+  void testBucketingWithDistributionModeHash() throws Exception {
+    int writeParallelism = 3;
+    int maxWriteParallelism = 8;
+    HashKeyGenerator generator = new HashKeyGenerator(1, maxWriteParallelism);
+    PartitionSpec spec = 
PartitionSpec.builderFor(SCHEMA).identity("id").build();
+
+    GenericRowData row1 = GenericRowData.of(1, StringData.fromString("a"));
+    GenericRowData row2 = GenericRowData.of(1, StringData.fromString("b"));
+    GenericRowData row3 = GenericRowData.of(2, StringData.fromString("c"));
+    GenericRowData row4 = GenericRowData.of(2, StringData.fromString("d"));
+
+    int writeKey1 =
+        getWriteKey(
+            generator, spec, DistributionMode.HASH, writeParallelism, 
Collections.emptySet(), row1);
+    int writeKey2 =
+        getWriteKey(
+            generator, spec, DistributionMode.HASH, writeParallelism, 
Collections.emptySet(), row2);
+    int writeKey3 =
+        getWriteKey(
+            generator, spec, DistributionMode.HASH, writeParallelism, 
Collections.emptySet(), row3);
+    int writeKey4 =
+        getWriteKey(
+            generator, spec, DistributionMode.HASH, writeParallelism, 
Collections.emptySet(), row4);
+
+    assertThat(writeKey1).isEqualTo(writeKey2);
+    assertThat(writeKey3).isNotEqualTo(writeKey1);
+    assertThat(writeKey4).isEqualTo(writeKey3);
+
+    assertThat(getSubTaskId(writeKey1, writeParallelism, 
maxWriteParallelism)).isEqualTo(0);
+    assertThat(getSubTaskId(writeKey2, writeParallelism, 
maxWriteParallelism)).isEqualTo(0);
+    assertThat(getSubTaskId(writeKey3, writeParallelism, 
maxWriteParallelism)).isEqualTo(1);
+    assertThat(getSubTaskId(writeKey4, writeParallelism, 
maxWriteParallelism)).isEqualTo(1);
+  }
+
+  @Test
+  void testEqualityKeys() throws Exception {
+    int writeParallelism = 2;
+    int maxWriteParallelism = 8;
+    HashKeyGenerator generator = new HashKeyGenerator(16, maxWriteParallelism);
+    PartitionSpec unpartitioned = PartitionSpec.unpartitioned();
+
+    GenericRowData row1 = GenericRowData.of(1, StringData.fromString("foo"));
+    GenericRowData row2 = GenericRowData.of(1, StringData.fromString("bar"));
+    GenericRowData row3 = GenericRowData.of(2, StringData.fromString("baz"));
+    Set<String> equalityColumns = Collections.singleton("id");
+
+    int writeKey1 =
+        getWriteKey(
+            generator,
+            unpartitioned,
+            DistributionMode.NONE,
+            writeParallelism,
+            equalityColumns,
+            row1);
+    int writeKey2 =
+        getWriteKey(
+            generator,
+            unpartitioned,
+            DistributionMode.NONE,
+            writeParallelism,
+            equalityColumns,
+            row2);
+    int writeKey3 =
+        getWriteKey(
+            generator,
+            unpartitioned,
+            DistributionMode.NONE,
+            writeParallelism,
+            equalityColumns,
+            row3);
+
+    assertThat(writeKey1).isEqualTo(writeKey2);
+    assertThat(writeKey2).isNotEqualTo(writeKey3);
+
+    assertThat(getSubTaskId(writeKey1, writeParallelism, 
maxWriteParallelism)).isEqualTo(1);
+    assertThat(getSubTaskId(writeKey2, writeParallelism, 
maxWriteParallelism)).isEqualTo(1);
+    assertThat(getSubTaskId(writeKey3, writeParallelism, 
maxWriteParallelism)).isEqualTo(0);
+  }
+
+  @Test
+  void testCapAtMaxWriteParallelism() throws Exception {
+    int writeParallelism = 10;
+    int maxWriteParallelism = 5;
+    HashKeyGenerator generator = new HashKeyGenerator(16, maxWriteParallelism);
+    PartitionSpec unpartitioned = PartitionSpec.unpartitioned();
+
+    Set<Integer> writeKeys = Sets.newHashSet();
+    for (int i = 0; i < 20; i++) {
+      GenericRowData row = GenericRowData.of(i, StringData.fromString("z"));
+      writeKeys.add(
+          getWriteKey(
+              generator,
+              unpartitioned,
+              DistributionMode.NONE,
+              writeParallelism,
+              Collections.emptySet(),
+              row));
+    }
+
+    assertThat(writeKeys).hasSize(maxWriteParallelism);
+    assertThat(
+            writeKeys.stream()
+                .map(key -> getSubTaskId(key, writeParallelism, 
writeParallelism))
+                .distinct()
+                .count())
+        .isEqualTo(maxWriteParallelism);
+  }
+
+  @Test
+  void testHashModeWithoutEqualityFieldsFallsBackToNone() throws Exception {
+    int writeParallelism = 2;
+    int maxWriteParallelism = 8;
+    HashKeyGenerator generator = new HashKeyGenerator(16, maxWriteParallelism);
+    Schema noIdSchema = new Schema(Types.NestedField.required(1, "x", 
Types.StringType.get()));
+    PartitionSpec unpartitioned = PartitionSpec.unpartitioned();
+
+    DynamicRecord record =
+        new DynamicRecord(
+            TABLE_IDENTIFIER,
+            BRANCH,
+            noIdSchema,
+            GenericRowData.of(StringData.fromString("v")),
+            unpartitioned,
+            DistributionMode.HASH,
+            writeParallelism);
+
+    int writeKey1 = generator.generateKey(record);
+    int writeKey2 = generator.generateKey(record);
+    int writeKey3 = generator.generateKey(record);
+    assertThat(writeKey1).isNotEqualTo(writeKey2);
+    assertThat(writeKey3).isEqualTo(writeKey1);
+
+    assertThat(getSubTaskId(writeKey1, writeParallelism, 
maxWriteParallelism)).isEqualTo(1);
+    assertThat(getSubTaskId(writeKey2, writeParallelism, 
maxWriteParallelism)).isEqualTo(0);
+    assertThat(getSubTaskId(writeKey3, writeParallelism, 
maxWriteParallelism)).isEqualTo(1);
+  }
+
+  @Test
+  void testSchemaSpecOverrides() throws Exception {
+    int maxCacheSize = 10;
+    int writeParallelism = 5;
+    int maxWriteParallelism = 10;
+    HashKeyGenerator generator = new HashKeyGenerator(maxCacheSize, 
maxWriteParallelism);
+
+    DynamicRecord record =
+        new DynamicRecord(
+            TABLE_IDENTIFIER,
+            BRANCH,
+            SCHEMA,
+            GenericRowData.of(1, StringData.fromString("foo")),
+            PartitionSpec.unpartitioned(),
+            DistributionMode.NONE,
+            writeParallelism);
+
+    int writeKey1 = generator.generateKey(record);
+    int writeKey2 = generator.generateKey(record);
+    // Assert that we are bucketing via NONE (round-robin)
+    assertThat(writeKey1).isNotEqualTo(writeKey2);
+
+    // Schema has different id
+    Schema overrideSchema = new Schema(42, SCHEMA.columns());
+    // Spec has different id
+    PartitionSpec overrideSpec = 
PartitionSpec.builderFor(SCHEMA).withSpecId(42).build();
+    RowData overrideData = GenericRowData.of(1L, StringData.fromString("foo"));
+
+    // We get a new key selector for the schema which starts off on the same 
offset
+    assertThat(generator.generateKey(record, overrideSchema, null, 
null)).isEqualTo(writeKey1);
+    // We get a new key selector for the spec which starts off on the same 
offset
+    assertThat(generator.generateKey(record, null, overrideSpec, 
null)).isEqualTo(writeKey1);
+    // We get the same key selector which yields a different result for the 
overridden data
+    assertThat(generator.generateKey(record, null, null, 
overrideData)).isNotEqualTo(writeKey1);
+  }
+
+  @Test
+  void testMultipleTables() throws Exception {
+    int maxCacheSize = 10;
+    int writeParallelism = 2;
+    int maxWriteParallelism = 8;
+    HashKeyGenerator generator = new HashKeyGenerator(maxCacheSize, 
maxWriteParallelism);
+
+    PartitionSpec unpartitioned = PartitionSpec.unpartitioned();
+
+    GenericRowData rowData = GenericRowData.of(1, 
StringData.fromString("foo"));
+
+    DynamicRecord record1 =
+        new DynamicRecord(
+            TableIdentifier.of("a", "table"),
+            BRANCH,
+            SCHEMA,
+            rowData,
+            unpartitioned,
+            DistributionMode.HASH,
+            writeParallelism);
+    record1.setEqualityFields(Collections.singleton("id"));
+    DynamicRecord record2 =
+        new DynamicRecord(
+            TableIdentifier.of("my", "other", "table"),
+            BRANCH,
+            SCHEMA,
+            rowData,
+            unpartitioned,
+            DistributionMode.HASH,
+            writeParallelism);
+    record2.setEqualityFields(Collections.singleton("id"));
+
+    // Consistent hashing for the same record due to HASH distribution mode
+    int writeKeyRecord1 = generator.generateKey(record1);
+    assertThat(writeKeyRecord1).isEqualTo(generator.generateKey(record1));
+    int writeKeyRecord2 = generator.generateKey(record2);
+    assertThat(writeKeyRecord2).isEqualTo(generator.generateKey(record2));
+
+    // But the write keys are for different tables and should not be equal
+    assertThat(writeKeyRecord1).isNotEqualTo(writeKeyRecord2);
+
+    assertThat(getSubTaskId(writeKeyRecord1, writeParallelism, 
maxWriteParallelism)).isEqualTo(1);
+    assertThat(getSubTaskId(writeKeyRecord2, writeParallelism, 
maxWriteParallelism)).isEqualTo(0);
+  }
+
+  @Test
+  void testCaching() throws Exception {
+    int maxCacheSize = 1;
+    int writeParallelism = 2;
+    int maxWriteParallelism = 8;
+    HashKeyGenerator generator = new HashKeyGenerator(maxCacheSize, 
maxWriteParallelism);
+    Cache<HashKeyGenerator.SelectorKey, KeySelector<RowData, Integer>> 
keySelectorCache =
+        generator.getKeySelectorCache();
+
+    PartitionSpec unpartitioned = PartitionSpec.unpartitioned();
+    DynamicRecord record =
+        new DynamicRecord(
+            TABLE_IDENTIFIER,
+            BRANCH,
+            SCHEMA,
+            GenericRowData.of(1, StringData.fromString("foo")),
+            unpartitioned,
+            DistributionMode.NONE,
+            writeParallelism);
+
+    int writeKey1 = generator.generateKey(record);
+    assertThat(keySelectorCache.estimatedSize()).isEqualTo(1);
+
+    int writeKey2 = generator.generateKey(record);
+    assertThat(writeKey2).isNotEqualTo(writeKey1);
+    // Manually clean up because the cleanup is not always triggered
+    keySelectorCache.cleanUp();
+    assertThat(keySelectorCache.estimatedSize()).isEqualTo(1);
+
+    int writeKey3 = generator.generateKey(record);
+    // Manually clean up because the cleanup is not always triggered
+    keySelectorCache.cleanUp();
+    assertThat(keySelectorCache.estimatedSize()).isEqualTo(1);
+    // We create a new key selector which will start off at the same position
+    assertThat(writeKey1).isEqualTo(writeKey3);
+  }
+
+  private static int getWriteKey(
+      HashKeyGenerator generator,
+      PartitionSpec spec,
+      DistributionMode mode,
+      int writeParallelism,
+      Set<String> equalityFields,
+      GenericRowData row)
+      throws Exception {
+    DynamicRecord record =
+        new DynamicRecord(TABLE_IDENTIFIER, BRANCH, SCHEMA, row, spec, mode, 
writeParallelism);
+    record.setEqualityFields(equalityFields);
+    return generator.generateKey(record);
+  }
+
+  private static int getSubTaskId(int writeKey1, int writeParallelism, int 
maxWriteParallelism) {
+    return KeyGroupRangeAssignment.assignKeyToParallelOperator(
+        writeKey1, maxWriteParallelism, writeParallelism);
+  }
+}
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestRowDataEvolver.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestRowDataEvolver.java
new file mode 100644
index 0000000000..2553575f18
--- /dev/null
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestRowDataEvolver.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.dynamic;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.math.BigDecimal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.DataGenerator;
+import org.apache.iceberg.flink.DataGenerators;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Days;
+import org.junit.jupiter.api.Test;
+
+class TestRowDataEvolver {
+
+  static final Schema SCHEMA =
+      new Schema(
+          Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+          Types.NestedField.optional(2, "data", Types.StringType.get()));
+
+  static final Schema SCHEMA2 =
+      new Schema(
+          Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+          Types.NestedField.optional(2, "data", Types.StringType.get()),
+          Types.NestedField.optional(3, "onemore", Types.DoubleType.get()));
+
+  @Test
+  void testPrimitiveTypes() {
+    DataGenerator generator = new DataGenerators.Primitives();
+    assertThat(
+            RowDataEvolver.convert(
+                generator.generateFlinkRowData(),
+                generator.icebergSchema(),
+                generator.icebergSchema()))
+        .isEqualTo(generator.generateFlinkRowData());
+  }
+
+  @Test
+  void testAddColumn() {
+    assertThat(RowDataEvolver.convert(SimpleDataUtil.createRowData(1, "a"), 
SCHEMA, SCHEMA2))
+        .isEqualTo(GenericRowData.of(1, StringData.fromString("a"), null));
+  }
+
+  @Test
+  void testAddRequiredColumn() {
+    Schema currentSchema = new Schema(Types.NestedField.optional(1, "id", 
Types.IntegerType.get()));
+    Schema targetSchema =
+        new Schema(
+            Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+            required(2, "data", Types.StringType.get()));
+
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> RowDataEvolver.convert(GenericRowData.of(42), currentSchema, 
targetSchema));
+  }
+
+  @Test
+  void testIntToLong() {
+    Schema schemaWithLong =
+        new Schema(
+            Types.NestedField.optional(2, "id", Types.LongType.get()),
+            Types.NestedField.optional(4, "data", Types.StringType.get()));
+
+    assertThat(
+            RowDataEvolver.convert(
+                SimpleDataUtil.createRowData(1, "a"), SimpleDataUtil.SCHEMA, 
schemaWithLong))
+        .isEqualTo(GenericRowData.of(1L, StringData.fromString("a")));
+  }
+
+  @Test
+  void testFloatToDouble() {
+    Schema schemaWithFloat =
+        new Schema(Types.NestedField.optional(1, "float2double", 
Types.FloatType.get()));
+    Schema schemaWithDouble =
+        new Schema(Types.NestedField.optional(2, "float2double", 
Types.DoubleType.get()));
+
+    assertThat(RowDataEvolver.convert(GenericRowData.of(1.5f), 
schemaWithFloat, schemaWithDouble))
+        .isEqualTo(GenericRowData.of(1.5d));
+  }
+
+  @Test
+  void testDateToTimestamp() {
+    Schema schemaWithFloat =
+        new Schema(Types.NestedField.optional(1, "date2timestamp", 
Types.DateType.get()));
+    Schema schemaWithDouble =
+        new Schema(
+            Types.NestedField.optional(2, "date2timestamp", 
Types.TimestampType.withoutZone()));
+
+    DateTime time = new DateTime(2022, 1, 10, 0, 0, 0, 0, DateTimeZone.UTC);
+    int days =
+        Days.daysBetween(new DateTime(1970, 1, 1, 0, 0, 0, 0, 
DateTimeZone.UTC), time).getDays();
+
+    assertThat(RowDataEvolver.convert(GenericRowData.of(days), 
schemaWithFloat, schemaWithDouble))
+        
.isEqualTo(GenericRowData.of(TimestampData.fromEpochMillis(time.getMillis())));
+  }
+
+  @Test
+  void testIncreasePrecision() {
+    Schema before =
+        new Schema(Types.NestedField.required(14, "decimal_field", 
Types.DecimalType.of(9, 2)));
+    Schema after =
+        new Schema(Types.NestedField.required(14, "decimal_field", 
Types.DecimalType.of(10, 2)));
+
+    assertThat(
+            RowDataEvolver.convert(
+                GenericRowData.of(DecimalData.fromBigDecimal(new 
BigDecimal("-1.50"), 9, 2)),
+                before,
+                after))
+        .isEqualTo(GenericRowData.of(DecimalData.fromBigDecimal(new 
BigDecimal("-1.50"), 10, 2)));
+  }
+
+  @Test
+  void testStructAddOptionalFields() {
+    DataGenerator generator = new DataGenerators.StructOfPrimitive();
+    RowData oldData = generator.generateFlinkRowData();
+    Schema oldSchema = generator.icebergSchema();
+    Types.NestedField structField = oldSchema.columns().get(1);
+    Schema newSchema =
+        new Schema(
+            oldSchema.columns().get(0),
+            Types.NestedField.required(
+                10,
+                structField.name(),
+                Types.StructType.of(
+                    required(101, "id", Types.IntegerType.get()),
+                    optional(103, "optional", Types.StringType.get()),
+                    required(102, "name", Types.StringType.get()))));
+    RowData newData =
+        GenericRowData.of(
+            StringData.fromString("row_id_value"),
+            GenericRowData.of(1, null, StringData.fromString("Jane")));
+
+    assertThat(RowDataEvolver.convert(oldData, oldSchema, 
newSchema)).isEqualTo(newData);
+  }
+
+  @Test
+  void testStructAddRequiredFieldsWithOptionalRoot() {
+    DataGenerator generator = new DataGenerators.StructOfPrimitive();
+    RowData oldData = generator.generateFlinkRowData();
+    Schema oldSchema = generator.icebergSchema();
+    Types.NestedField structField = oldSchema.columns().get(1);
+    Schema newSchema =
+        new Schema(
+            oldSchema.columns().get(0),
+            Types.NestedField.optional(
+                10,
+                "newFieldOptionalField",
+                Types.StructType.of(
+                    Types.NestedField.optional(
+                        structField.fieldId(),
+                        structField.name(),
+                        Types.StructType.of(
+                            optional(101, "id", Types.IntegerType.get()),
+                            // Required columns which leads to nulling the 
entire struct
+                            required(103, "required", Types.StringType.get()),
+                            required(102, "name", Types.StringType.get()))))));
+
+    RowData expectedData = 
GenericRowData.of(StringData.fromString("row_id_value"), null);
+
+    assertThat(RowDataEvolver.convert(oldData, oldSchema, 
newSchema)).isEqualTo(expectedData);
+  }
+
+  @Test
+  void testStructAddRequiredFields() {
+    DataGenerator generator = new DataGenerators.StructOfPrimitive();
+    RowData oldData = generator.generateFlinkRowData();
+    Schema oldSchema = generator.icebergSchema();
+    Types.NestedField structField = oldSchema.columns().get(1);
+    Schema newSchema =
+        new Schema(
+            oldSchema.columns().get(0),
+            Types.NestedField.required(
+                10,
+                structField.name(),
+                Types.StructType.of(
+                    required(101, "id", Types.IntegerType.get()),
+                    required(103, "required", Types.StringType.get()),
+                    required(102, "name", Types.StringType.get()))));
+
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> RowDataEvolver.convert(oldData, oldSchema, newSchema));
+  }
+
+  @Test
+  void testMap() {
+    DataGenerator generator = new DataGenerators.MapOfPrimitives();
+    RowData oldData = generator.generateFlinkRowData();
+    Schema oldSchema = generator.icebergSchema();
+    Types.NestedField mapField = oldSchema.columns().get(1);
+    Schema newSchema =
+        new Schema(
+            oldSchema.columns().get(0),
+            Types.NestedField.optional(
+                10,
+                mapField.name(),
+                Types.MapType.ofRequired(101, 102, Types.StringType.get(), 
Types.LongType.get())));
+    RowData newData =
+        GenericRowData.of(
+            StringData.fromString("row_id_value"),
+            new GenericMapData(
+                ImmutableMap.of(
+                    StringData.fromString("Jane"), 1L, 
StringData.fromString("Joe"), 2L)));
+
+    assertThat(RowDataEvolver.convert(oldData, oldSchema, 
newSchema)).isEqualTo(newData);
+  }
+
+  @Test
+  void testArray() {
+    DataGenerator generator = new DataGenerators.ArrayOfPrimitive();
+    RowData oldData = generator.generateFlinkRowData();
+    Schema oldSchema = generator.icebergSchema();
+    Types.NestedField arrayField = oldSchema.columns().get(1);
+    Schema newSchema =
+        new Schema(
+            oldSchema.columns().get(0),
+            Types.NestedField.optional(
+                10, arrayField.name(), Types.ListType.ofOptional(101, 
Types.LongType.get())));
+    RowData newData =
+        GenericRowData.of(
+            StringData.fromString("row_id_value"), new GenericArrayData(new 
Long[] {1L, 2L, 3L}));
+
+    assertThat(RowDataEvolver.convert(oldData, oldSchema, 
newSchema)).isEqualTo(newData);
+  }
+}
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java
index ce9054ad49..ef8380c216 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.List;
+import java.util.Set;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.iceberg.DataFile;
@@ -46,6 +47,7 @@ import org.apache.iceberg.io.TaskWriter;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -81,7 +83,7 @@ public class TestProjectMetaColumn {
             SimpleDataUtil.createInsert(1, "AAA"),
             SimpleDataUtil.createInsert(2, "BBB"),
             SimpleDataUtil.createInsert(3, "CCC"));
-    writeAndCommit(table, ImmutableList.of(), false, rows);
+    writeAndCommit(table, ImmutableSet.of(), false, rows);
 
     FlinkInputFormat input =
         
FlinkSource.forRowData().tableLoader(TableLoader.fromHadoopTable(location)).buildFormat();
@@ -124,7 +126,7 @@ public class TestProjectMetaColumn {
             SimpleDataUtil.createInsert(2, "AAA"),
             SimpleDataUtil.createInsert(2, "BBB"));
     int eqFieldId = table.schema().findField("data").fieldId();
-    writeAndCommit(table, ImmutableList.of(eqFieldId), true, rows);
+    writeAndCommit(table, ImmutableSet.of(eqFieldId), true, rows);
 
     FlinkInputFormat input =
         
FlinkSource.forRowData().tableLoader(TableLoader.fromHadoopTable(location)).buildFormat();
@@ -147,8 +149,7 @@ public class TestProjectMetaColumn {
   }
 
   private void writeAndCommit(
-      Table table, List<Integer> eqFieldIds, boolean upsert, List<RowData> 
rows)
-      throws IOException {
+      Table table, Set<Integer> eqFieldIds, boolean upsert, List<RowData> 
rows) throws IOException {
     TaskWriter<RowData> writer = createTaskWriter(table, eqFieldIds, upsert);
     try (TaskWriter<RowData> io = writer) {
       for (RowData row : rows) {
@@ -171,7 +172,7 @@ public class TestProjectMetaColumn {
   }
 
   private TaskWriter<RowData> createTaskWriter(
-      Table table, List<Integer> equalityFieldIds, boolean upsert) {
+      Table table, Set<Integer> equalityFieldIds, boolean upsert) {
     TaskWriterFactory<RowData> taskWriterFactory =
         new RowDataTaskWriterFactory(
             SerializableTable.copyOf(table),

Reply via email to