This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 3615fae615 Flink:Backport DynamicSink support dvs to Flink 2.0 and 
1.20 (#14623)
3615fae615 is described below

commit 3615fae6158e76e1ccf61c03ca5e91f5cfc538dd
Author: GuoYu <[email protected]>
AuthorDate: Wed Nov 19 20:47:45 2025 +0800

    Flink:Backport DynamicSink support dvs to Flink 2.0 and 1.20 (#14623)
---
 .../flink/sink/dynamic/DynamicCommitter.java       |  20 +-
 .../sink/dynamic/DynamicWriteResultAggregator.java |  36 ++--
 .../iceberg/flink/sink/dynamic/DynamicWriter.java  |   5 -
 .../flink/sink/dynamic/TestDynamicCommitter.java   | 137 ++++++++++++++
 .../flink/sink/dynamic/TestDynamicIcebergSink.java | 205 +++++++++++++++++++++
 .../flink/sink/dynamic/DynamicCommitter.java       |  20 +-
 .../sink/dynamic/DynamicWriteResultAggregator.java |  36 ++--
 .../iceberg/flink/sink/dynamic/DynamicWriter.java  |   5 -
 .../flink/sink/dynamic/TestDynamicCommitter.java   | 137 ++++++++++++++
 .../flink/sink/dynamic/TestDynamicIcebergSink.java | 205 +++++++++++++++++++++
 10 files changed, 766 insertions(+), 40 deletions(-)

diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
index d60b42b95d..61b20cb27b 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
@@ -31,6 +31,8 @@ import java.util.concurrent.TimeUnit;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ReplacePartitions;
 import org.apache.iceberg.RowDelta;
@@ -39,6 +41,7 @@ import org.apache.iceberg.SnapshotAncestryValidator;
 import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.ValidationException;
@@ -52,6 +55,7 @@ import 
org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.ContentFileUtil;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.iceberg.util.ThreadPools;
@@ -214,9 +218,23 @@ class DynamicCommitter implements 
Committer<DynamicCommittable> {
           DeltaManifests deltaManifests =
               SimpleVersionedSerialization.readVersionAndDeSerialize(
                   DeltaManifestsSerializer.INSTANCE, 
committable.getCommittable().manifest());
+
+          WriteResult writeResult =
+              FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), 
table.specs());
+          if (TableUtil.formatVersion(table) > 2) {
+            for (DeleteFile deleteFile : writeResult.deleteFiles()) {
+              if (deleteFile.content() == FileContent.POSITION_DELETES) {
+                Preconditions.checkArgument(
+                    ContentFileUtil.isDV(deleteFile),
+                    "Can't add position delete file to the %s table. 
Concurrent table upgrade to V3 is not supported.",
+                    table.name());
+              }
+            }
+          }
+
           pendingResults
               .computeIfAbsent(e.getKey(), unused -> Lists.newArrayList())
-              .add(FlinkManifestUtil.readCompletedFiles(deltaManifests, 
table.io(), table.specs()));
+              .add(writeResult);
           manifests.addAll(deltaManifests.manifests());
         }
       }
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
index 77bd2a0f97..927491fa89 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
 import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
@@ -32,6 +33,7 @@ import 
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.flink.CatalogLoader;
@@ -61,7 +63,8 @@ class DynamicWriteResultAggregator
   private final int cacheMaximumSize;
   private transient Map<WriteTarget, Collection<DynamicWriteResult>> results;
   private transient Map<String, Map<Integer, PartitionSpec>> specs;
-  private transient Map<String, ManifestOutputFileFactory> outputFileFactories;
+  private transient Map<String, Tuple2<ManifestOutputFileFactory, Integer>>
+      outputFileFactoriesAndFormatVersions;
   private transient String flinkJobId;
   private transient String operatorId;
   private transient int subTaskId;
@@ -81,7 +84,7 @@ class DynamicWriteResultAggregator
     this.attemptId = getRuntimeContext().getTaskInfo().getAttemptNumber();
     this.results = Maps.newHashMap();
     this.specs = new LRUCache<>(cacheMaximumSize);
-    this.outputFileFactories = new LRUCache<>(cacheMaximumSize);
+    this.outputFileFactoriesAndFormatVersions = new 
LRUCache<>(cacheMaximumSize);
     this.catalog = catalogLoader.loadCatalog();
   }
 
@@ -137,12 +140,14 @@ class DynamicWriteResultAggregator
     writeResults.forEach(w -> builder.add(w.writeResult()));
     WriteResult result = builder.build();
 
+    Tuple2<ManifestOutputFileFactory, Integer> outputFileFactoryAndVersion =
+        outputFileFactoryAndFormatVersion(key.tableName());
     DeltaManifests deltaManifests =
         FlinkManifestUtil.writeCompletedFiles(
             result,
-            () -> outputFileFactory(key.tableName()).create(checkpointId),
+            () -> outputFileFactoryAndVersion.f0.create(checkpointId),
             spec(key.tableName(), key.specId()),
-            2);
+            outputFileFactoryAndVersion.f1);
 
     return SimpleVersionedSerialization.writeVersionAndSerialize(
         DeltaManifestsSerializer.INSTANCE, deltaManifests);
@@ -160,8 +165,9 @@ class DynamicWriteResultAggregator
     }
   }
 
-  private ManifestOutputFileFactory outputFileFactory(String tableName) {
-    return outputFileFactories.computeIfAbsent(
+  private Tuple2<ManifestOutputFileFactory, Integer> 
outputFileFactoryAndFormatVersion(
+      String tableName) {
+    return outputFileFactoriesAndFormatVersions.computeIfAbsent(
         tableName,
         unused -> {
           Table table = catalog.loadTable(TableIdentifier.parse(tableName));
@@ -169,14 +175,16 @@ class DynamicWriteResultAggregator
           // Make sure to append an identifier to avoid file clashes in case 
the factory was to get
           // re-created during a checkpoint, i.e. due to cache eviction.
           String fileSuffix = UUID.randomUUID().toString();
-          return FlinkManifestUtil.createOutputFileFactory(
-              () -> table,
-              table.properties(),
-              flinkJobId,
-              operatorId,
-              subTaskId,
-              attemptId,
-              fileSuffix);
+          ManifestOutputFileFactory outputFileFactory =
+              FlinkManifestUtil.createOutputFileFactory(
+                  () -> table,
+                  table.properties(),
+                  flinkJobId,
+                  operatorId,
+                  subTaskId,
+                  attemptId,
+                  fileSuffix);
+          return Tuple2.of(outputFileFactory, TableUtil.formatVersion(table));
         });
   }
 
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
index 5ed9da8623..c2a3032858 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
@@ -32,7 +32,6 @@ import org.apache.flink.table.data.RowData;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.TableUtil;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
@@ -118,10 +117,6 @@ class DynamicWriter implements 
CommittingSinkWriter<DynamicRecordInternal, Dynam
                               !equalityFieldIds.isEmpty(),
                               "Equality field columns shouldn't be empty when 
configuring to use UPSERT data.");
 
-                          Preconditions.checkArgument(
-                              !(TableUtil.formatVersion(table) > 2),
-                              "Dynamic Sink writer does not support upsert 
mode in tables (V3+)");
-
                           if (!table.spec().isUnpartitioned()) {
                             for (PartitionField partitionField : 
table.spec().fields()) {
                               Preconditions.checkState(
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
index 3dbec22794..1497458e60 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
@@ -44,6 +44,9 @@ import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TableUtil;
+import org.apache.iceberg.UpdateProperties;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.flink.HadoopCatalogExtension;
@@ -338,6 +341,140 @@ class TestDynamicCommitter {
                 .build());
   }
 
+  @Test
+  void testCommitDeleteInDifferentFormatVersion() throws Exception {
+    Table table1 = catalog.loadTable(TableIdentifier.of(TABLE1));
+    assertThat(table1.snapshots()).isEmpty();
+
+    boolean overwriteMode = false;
+    int workerPoolSize = 1;
+    String sinkId = "sinkId";
+    UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup();
+    DynamicCommitterMetrics committerMetrics = new 
DynamicCommitterMetrics(metricGroup);
+    DynamicCommitter dynamicCommitter =
+        new DynamicCommitter(
+            CATALOG_EXTENSION.catalog(),
+            Maps.newHashMap(),
+            overwriteMode,
+            workerPoolSize,
+            sinkId,
+            committerMetrics);
+
+    WriteTarget writeTarget =
+        new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet(1, 2));
+
+    DynamicWriteResultAggregator aggregator =
+        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), 
cacheMaximumSize);
+    OneInputStreamOperatorTestHarness aggregatorHarness =
+        new OneInputStreamOperatorTestHarness(aggregator);
+    aggregatorHarness.open();
+
+    final String jobId = JobID.generate().toHexString();
+    final String operatorId = new OperatorID().toHexString();
+    final int checkpointId = 10;
+
+    byte[] deltaManifest =
+        aggregator.writeToManifest(
+            writeTarget,
+            Sets.newHashSet(
+                new DynamicWriteResult(
+                    writeTarget,
+                    WriteResult.builder()
+                        .addDataFiles(DATA_FILE)
+                        .addDeleteFiles(DELETE_FILE)
+                        .build())),
+            checkpointId);
+
+    CommitRequest<DynamicCommittable> commitRequest =
+        new MockCommitRequest<>(
+            new DynamicCommittable(writeTarget, deltaManifest, jobId, 
operatorId, checkpointId));
+
+    // Upgrade the table version
+    UpdateProperties updateApi = table1.updateProperties();
+    updateApi.set(
+        TableProperties.FORMAT_VERSION, 
String.valueOf(TableUtil.formatVersion(table1) + 1));
+    updateApi.commit();
+
+    assertThatThrownBy(() -> 
dynamicCommitter.commit(Sets.newHashSet(commitRequest)))
+        .hasMessage(
+            "Can't add position delete file to the %s table. Concurrent table 
upgrade to V3 is not supported.",
+            table1.name())
+        .isInstanceOf(IllegalArgumentException.class);
+  }
+
+  @Test
+  void testCommitOnlyDataInDifferentFormatVersion() throws Exception {
+    Table table1 = catalog.loadTable(TableIdentifier.of(TABLE1));
+    assertThat(table1.snapshots()).isEmpty();
+
+    boolean overwriteMode = false;
+    int workerPoolSize = 1;
+    String sinkId = "sinkId";
+    UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup();
+    DynamicCommitterMetrics committerMetrics = new 
DynamicCommitterMetrics(metricGroup);
+    DynamicCommitter dynamicCommitter =
+        new DynamicCommitter(
+            CATALOG_EXTENSION.catalog(),
+            Maps.newHashMap(),
+            overwriteMode,
+            workerPoolSize,
+            sinkId,
+            committerMetrics);
+
+    WriteTarget writeTarget =
+        new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet(1, 2));
+
+    DynamicWriteResultAggregator aggregator =
+        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), 
cacheMaximumSize);
+    OneInputStreamOperatorTestHarness aggregatorHarness =
+        new OneInputStreamOperatorTestHarness(aggregator);
+    aggregatorHarness.open();
+
+    final String jobId = JobID.generate().toHexString();
+    final String operatorId = new OperatorID().toHexString();
+    final int checkpointId = 10;
+
+    byte[] deltaManifest =
+        aggregator.writeToManifest(
+            writeTarget,
+            Sets.newHashSet(
+                new DynamicWriteResult(
+                    writeTarget, 
WriteResult.builder().addDataFiles(DATA_FILE).build())),
+            checkpointId);
+
+    CommitRequest<DynamicCommittable> commitRequest =
+        new MockCommitRequest<>(
+            new DynamicCommittable(writeTarget, deltaManifest, jobId, 
operatorId, checkpointId));
+
+    dynamicCommitter.commit(Sets.newHashSet(commitRequest));
+
+    // Upgrade the table version
+    UpdateProperties updateApi = table1.updateProperties();
+    updateApi.set(
+        TableProperties.FORMAT_VERSION, 
String.valueOf(TableUtil.formatVersion(table1) + 1));
+    updateApi.commit();
+
+    table1.refresh();
+    assertThat(table1.snapshots()).hasSize(1);
+    Snapshot first = Iterables.getFirst(table1.snapshots(), null);
+    assertThat(first.summary())
+        .containsAllEntriesOf(
+            ImmutableMap.<String, String>builder()
+                .put("added-data-files", "1")
+                .put("added-records", "42")
+                .put("changed-partition-count", "1")
+                .put("flink.job-id", jobId)
+                .put("flink.max-committed-checkpoint-id", "" + checkpointId)
+                .put("flink.operator-id", operatorId)
+                .put("total-data-files", "1")
+                .put("total-delete-files", "0")
+                .put("total-equality-deletes", "0")
+                .put("total-files-size", "0")
+                .put("total-position-deletes", "0")
+                .put("total-records", "42")
+                .build());
+  }
+
   @Test
   void testTableBranchAtomicCommitForAppendOnlyData() throws Exception {
     Table table = catalog.loadTable(TableIdentifier.of(TABLE1));
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
index 777082dcb9..b660d8e285 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
@@ -59,6 +59,7 @@ import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.data.IcebergGenerics;
@@ -76,6 +77,7 @@ import 
org.apache.iceberg.flink.sink.dynamic.TestDynamicCommitter.FailBeforeAndA
 import org.apache.iceberg.inmemory.InMemoryInputFile;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -527,6 +529,209 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
     }
   }
 
+  @Test
+  void testUpsertV3() throws Exception {
+    ImmutableMap<String, String> properties = 
ImmutableMap.of(TableProperties.FORMAT_VERSION, "3");
+    CATALOG_EXTENSION
+        .catalog()
+        .createTable(
+            TableIdentifier.of(DATABASE, "t1"),
+            SimpleDataUtil.SCHEMA,
+            PartitionSpec.unpartitioned(),
+            null,
+            properties);
+
+    List<DynamicIcebergDataImpl> rows =
+        Lists.newArrayList(
+            // Insert one rows
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA,
+                "t1",
+                "main",
+                PartitionSpec.unpartitioned(),
+                true,
+                Sets.newHashSet("id"),
+                false),
+            // Remaining rows are duplicates
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA,
+                "t1",
+                "main",
+                PartitionSpec.unpartitioned(),
+                true,
+                Sets.newHashSet("id"),
+                true),
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA,
+                "t1",
+                "main",
+                PartitionSpec.unpartitioned(),
+                true,
+                Sets.newHashSet("id"),
+                true),
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA,
+                "t1",
+                "main",
+                PartitionSpec.unpartitioned(),
+                true,
+                Sets.newHashSet("id"),
+                true));
+
+    executeDynamicSink(rows, env, true, 1, null);
+
+    try (CloseableIterable<Record> iterable =
+        IcebergGenerics.read(
+                
CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.of("default", "t1")))
+            .build()) {
+      List<Record> records = Lists.newArrayList();
+      for (Record record : iterable) {
+        records.add(record);
+      }
+
+      assertThat(records).hasSize(1);
+      Record actual = records.get(0);
+      DynamicIcebergDataImpl input = rows.get(0);
+      assertThat(actual.get(0)).isEqualTo(input.rowProvided.getField(0));
+      assertThat(actual.get(1)).isEqualTo(input.rowProvided.getField(1));
+    }
+  }
+
+  @Test
+  void testMultiFormatVersion() throws Exception {
+    ImmutableMap<String, String> properties = 
ImmutableMap.of(TableProperties.FORMAT_VERSION, "3");
+    CATALOG_EXTENSION
+        .catalog()
+        .createTable(
+            TableIdentifier.of(DATABASE, "t1"),
+            SimpleDataUtil.SCHEMA,
+            PartitionSpec.unpartitioned(),
+            null,
+            properties);
+
+    ImmutableMap<String, String> properties1 = 
ImmutableMap.of(TableProperties.FORMAT_VERSION, "2");
+    CATALOG_EXTENSION
+        .catalog()
+        .createTable(
+            TableIdentifier.of(DATABASE, "t2"),
+            SimpleDataUtil.SCHEMA,
+            PartitionSpec.unpartitioned(),
+            null,
+            properties1);
+
+    List<DynamicIcebergDataImpl> rowsForTable1 =
+        Lists.newArrayList(
+            // Insert one rows
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA,
+                "t1",
+                "main",
+                PartitionSpec.unpartitioned(),
+                true,
+                Sets.newHashSet("id"),
+                false),
+            // Remaining rows are duplicates
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA,
+                "t1",
+                "main",
+                PartitionSpec.unpartitioned(),
+                true,
+                Sets.newHashSet("id"),
+                true),
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA,
+                "t1",
+                "main",
+                PartitionSpec.unpartitioned(),
+                true,
+                Sets.newHashSet("id"),
+                true),
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA,
+                "t1",
+                "main",
+                PartitionSpec.unpartitioned(),
+                true,
+                Sets.newHashSet("id"),
+                true));
+
+    List<DynamicIcebergDataImpl> rowsForTable2 =
+        Lists.newArrayList(
+            // Insert one rows
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA,
+                "t2",
+                "main",
+                PartitionSpec.unpartitioned(),
+                true,
+                Sets.newHashSet("id"),
+                false),
+            // Remaining rows are duplicates
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA,
+                "t2",
+                "main",
+                PartitionSpec.unpartitioned(),
+                true,
+                Sets.newHashSet("id"),
+                true),
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA,
+                "t2",
+                "main",
+                PartitionSpec.unpartitioned(),
+                true,
+                Sets.newHashSet("id"),
+                true),
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA,
+                "t2",
+                "main",
+                PartitionSpec.unpartitioned(),
+                true,
+                Sets.newHashSet("id"),
+                true));
+
+    List<DynamicIcebergDataImpl> rows = Lists.newArrayList();
+    rows.addAll(rowsForTable1);
+    rows.addAll(rowsForTable2);
+
+    executeDynamicSink(rows, env, true, 1, null);
+
+    try (CloseableIterable<Record> iterable =
+        IcebergGenerics.read(
+                
CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.of("default", "t1")))
+            .build()) {
+      List<Record> records = Lists.newArrayList();
+      for (Record record : iterable) {
+        records.add(record);
+      }
+
+      assertThat(records).hasSize(1);
+      Record actual = records.get(0);
+      DynamicIcebergDataImpl input = rowsForTable1.get(0);
+      assertThat(actual.get(0)).isEqualTo(input.rowProvided.getField(0));
+      assertThat(actual.get(1)).isEqualTo(input.rowProvided.getField(1));
+    }
+
+    try (CloseableIterable<Record> iterable =
+        IcebergGenerics.read(
+                
CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.of("default", "t2")))
+            .build()) {
+      List<Record> records = Lists.newArrayList();
+      for (Record record : iterable) {
+        records.add(record);
+      }
+
+      assertThat(records).hasSize(1);
+      Record actual = records.get(0);
+      DynamicIcebergDataImpl input = rowsForTable2.get(0);
+      assertThat(actual.get(0)).isEqualTo(input.rowProvided.getField(0));
+      assertThat(actual.get(1)).isEqualTo(input.rowProvided.getField(1));
+    }
+  }
+
   @Test
   void testCommitFailedBeforeOrAfterCommit() throws Exception {
     // Configure a Restart strategy to allow recovery
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
index 54d506b663..d5774b66af 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
@@ -31,6 +31,8 @@ import java.util.concurrent.TimeUnit;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ReplacePartitions;
 import org.apache.iceberg.RowDelta;
@@ -38,6 +40,7 @@ import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.flink.sink.CommitSummary;
@@ -50,6 +53,7 @@ import 
org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.ContentFileUtil;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.ThreadPools;
 import org.slf4j.Logger;
@@ -211,9 +215,23 @@ class DynamicCommitter implements 
Committer<DynamicCommittable> {
           DeltaManifests deltaManifests =
               SimpleVersionedSerialization.readVersionAndDeSerialize(
                   DeltaManifestsSerializer.INSTANCE, 
committable.getCommittable().manifest());
+
+          WriteResult writeResult =
+              FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), 
table.specs());
+          if (TableUtil.formatVersion(table) > 2) {
+            for (DeleteFile deleteFile : writeResult.deleteFiles()) {
+              if (deleteFile.content() == FileContent.POSITION_DELETES) {
+                Preconditions.checkArgument(
+                    ContentFileUtil.isDV(deleteFile),
+                    "Can't add position delete file to the %s table. 
Concurrent table upgrade to V3 is not supported.",
+                    table.name());
+              }
+            }
+          }
+
           pendingResults
               .computeIfAbsent(e.getKey(), unused -> Lists.newArrayList())
-              .add(FlinkManifestUtil.readCompletedFiles(deltaManifests, 
table.io(), table.specs()));
+              .add(writeResult);
           manifests.addAll(deltaManifests.manifests());
         }
       }
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
index 77bd2a0f97..927491fa89 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
 import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
@@ -32,6 +33,7 @@ import 
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.flink.CatalogLoader;
@@ -61,7 +63,8 @@ class DynamicWriteResultAggregator
   private final int cacheMaximumSize;
   private transient Map<WriteTarget, Collection<DynamicWriteResult>> results;
   private transient Map<String, Map<Integer, PartitionSpec>> specs;
-  private transient Map<String, ManifestOutputFileFactory> outputFileFactories;
+  private transient Map<String, Tuple2<ManifestOutputFileFactory, Integer>>
+      outputFileFactoriesAndFormatVersions;
   private transient String flinkJobId;
   private transient String operatorId;
   private transient int subTaskId;
@@ -81,7 +84,7 @@ class DynamicWriteResultAggregator
     this.attemptId = getRuntimeContext().getTaskInfo().getAttemptNumber();
     this.results = Maps.newHashMap();
     this.specs = new LRUCache<>(cacheMaximumSize);
-    this.outputFileFactories = new LRUCache<>(cacheMaximumSize);
+    this.outputFileFactoriesAndFormatVersions = new 
LRUCache<>(cacheMaximumSize);
     this.catalog = catalogLoader.loadCatalog();
   }
 
@@ -137,12 +140,14 @@ class DynamicWriteResultAggregator
     writeResults.forEach(w -> builder.add(w.writeResult()));
     WriteResult result = builder.build();
 
+    Tuple2<ManifestOutputFileFactory, Integer> outputFileFactoryAndVersion =
+        outputFileFactoryAndFormatVersion(key.tableName());
     DeltaManifests deltaManifests =
         FlinkManifestUtil.writeCompletedFiles(
             result,
-            () -> outputFileFactory(key.tableName()).create(checkpointId),
+            () -> outputFileFactoryAndVersion.f0.create(checkpointId),
             spec(key.tableName(), key.specId()),
-            2);
+            outputFileFactoryAndVersion.f1);
 
     return SimpleVersionedSerialization.writeVersionAndSerialize(
         DeltaManifestsSerializer.INSTANCE, deltaManifests);
@@ -160,8 +165,9 @@ class DynamicWriteResultAggregator
     }
   }
 
-  private ManifestOutputFileFactory outputFileFactory(String tableName) {
-    return outputFileFactories.computeIfAbsent(
+  private Tuple2<ManifestOutputFileFactory, Integer> 
outputFileFactoryAndFormatVersion(
+      String tableName) {
+    return outputFileFactoriesAndFormatVersions.computeIfAbsent(
         tableName,
         unused -> {
           Table table = catalog.loadTable(TableIdentifier.parse(tableName));
@@ -169,14 +175,16 @@ class DynamicWriteResultAggregator
           // Make sure to append an identifier to avoid file clashes in case 
the factory was to get
           // re-created during a checkpoint, i.e. due to cache eviction.
           String fileSuffix = UUID.randomUUID().toString();
-          return FlinkManifestUtil.createOutputFileFactory(
-              () -> table,
-              table.properties(),
-              flinkJobId,
-              operatorId,
-              subTaskId,
-              attemptId,
-              fileSuffix);
+          ManifestOutputFileFactory outputFileFactory =
+              FlinkManifestUtil.createOutputFileFactory(
+                  () -> table,
+                  table.properties(),
+                  flinkJobId,
+                  operatorId,
+                  subTaskId,
+                  attemptId,
+                  fileSuffix);
+          return Tuple2.of(outputFileFactory, TableUtil.formatVersion(table));
         });
   }
 
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
index 5ed9da8623..c2a3032858 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
@@ -32,7 +32,6 @@ import org.apache.flink.table.data.RowData;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.TableUtil;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
@@ -118,10 +117,6 @@ class DynamicWriter implements 
CommittingSinkWriter<DynamicRecordInternal, Dynam
                               !equalityFieldIds.isEmpty(),
                               "Equality field columns shouldn't be empty when 
configuring to use UPSERT data.");
 
-                          Preconditions.checkArgument(
-                              !(TableUtil.formatVersion(table) > 2),
-                              "Dynamic Sink writer does not support upsert 
mode in tables (V3+)");
-
                           if (!table.spec().isUnpartitioned()) {
                             for (PartitionField partitionField : 
table.spec().fields()) {
                               Preconditions.checkState(
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
index 7894428a78..d2c688e28c 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
@@ -43,6 +43,9 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TableUtil;
+import org.apache.iceberg.UpdateProperties;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.flink.HadoopCatalogExtension;
@@ -335,6 +338,140 @@ class TestDynamicCommitter {
                 .build());
   }
 
+  @Test
+  void testCommitDeleteInDifferentFormatVersion() throws Exception {
+    Table table1 = catalog.loadTable(TableIdentifier.of(TABLE1));
+    assertThat(table1.snapshots()).isEmpty();
+
+    boolean overwriteMode = false;
+    int workerPoolSize = 1;
+    String sinkId = "sinkId";
+    UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup();
+    DynamicCommitterMetrics committerMetrics = new 
DynamicCommitterMetrics(metricGroup);
+    DynamicCommitter dynamicCommitter =
+        new DynamicCommitter(
+            CATALOG_EXTENSION.catalog(),
+            Maps.newHashMap(),
+            overwriteMode,
+            workerPoolSize,
+            sinkId,
+            committerMetrics);
+
+    WriteTarget writeTarget =
+        new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet(1, 2));
+
+    DynamicWriteResultAggregator aggregator =
+        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), 
cacheMaximumSize);
+    OneInputStreamOperatorTestHarness aggregatorHarness =
+        new OneInputStreamOperatorTestHarness(aggregator);
+    aggregatorHarness.open();
+
+    final String jobId = JobID.generate().toHexString();
+    final String operatorId = new OperatorID().toHexString();
+    final int checkpointId = 10;
+
+    byte[] deltaManifest =
+        aggregator.writeToManifest(
+            writeTarget,
+            Sets.newHashSet(
+                new DynamicWriteResult(
+                    writeTarget,
+                    WriteResult.builder()
+                        .addDataFiles(DATA_FILE)
+                        .addDeleteFiles(DELETE_FILE)
+                        .build())),
+            checkpointId);
+
+    CommitRequest<DynamicCommittable> commitRequest =
+        new MockCommitRequest<>(
+            new DynamicCommittable(writeTarget, deltaManifest, jobId, 
operatorId, checkpointId));
+
+    // Upgrade the table version
+    UpdateProperties updateApi = table1.updateProperties();
+    updateApi.set(
+        TableProperties.FORMAT_VERSION, 
String.valueOf(TableUtil.formatVersion(table1) + 1));
+    updateApi.commit();
+
+    assertThatThrownBy(() -> 
dynamicCommitter.commit(Sets.newHashSet(commitRequest)))
+        .hasMessage(
+            "Can't add position delete file to the %s table. Concurrent table 
upgrade to V3 is not supported.",
+            table1.name())
+        .isInstanceOf(IllegalArgumentException.class);
+  }
+
+  @Test
+  void testCommitOnlyDataInDifferentFormatVersion() throws Exception {
+    Table table1 = catalog.loadTable(TableIdentifier.of(TABLE1));
+    assertThat(table1.snapshots()).isEmpty();
+
+    boolean overwriteMode = false;
+    int workerPoolSize = 1;
+    String sinkId = "sinkId";
+    UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup();
+    DynamicCommitterMetrics committerMetrics = new 
DynamicCommitterMetrics(metricGroup);
+    DynamicCommitter dynamicCommitter =
+        new DynamicCommitter(
+            CATALOG_EXTENSION.catalog(),
+            Maps.newHashMap(),
+            overwriteMode,
+            workerPoolSize,
+            sinkId,
+            committerMetrics);
+
+    WriteTarget writeTarget =
+        new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet(1, 2));
+
+    DynamicWriteResultAggregator aggregator =
+        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), 
cacheMaximumSize);
+    OneInputStreamOperatorTestHarness aggregatorHarness =
+        new OneInputStreamOperatorTestHarness(aggregator);
+    aggregatorHarness.open();
+
+    final String jobId = JobID.generate().toHexString();
+    final String operatorId = new OperatorID().toHexString();
+    final int checkpointId = 10;
+
+    byte[] deltaManifest =
+        aggregator.writeToManifest(
+            writeTarget,
+            Sets.newHashSet(
+                new DynamicWriteResult(
+                    writeTarget, 
WriteResult.builder().addDataFiles(DATA_FILE).build())),
+            checkpointId);
+
+    CommitRequest<DynamicCommittable> commitRequest =
+        new MockCommitRequest<>(
+            new DynamicCommittable(writeTarget, deltaManifest, jobId, 
operatorId, checkpointId));
+
+    dynamicCommitter.commit(Sets.newHashSet(commitRequest));
+
+    // Upgrade the table version
+    UpdateProperties updateApi = table1.updateProperties();
+    updateApi.set(
+        TableProperties.FORMAT_VERSION, 
String.valueOf(TableUtil.formatVersion(table1) + 1));
+    updateApi.commit();
+
+    table1.refresh();
+    assertThat(table1.snapshots()).hasSize(1);
+    Snapshot first = Iterables.getFirst(table1.snapshots(), null);
+    assertThat(first.summary())
+        .containsAllEntriesOf(
+            ImmutableMap.<String, String>builder()
+                .put("added-data-files", "1")
+                .put("added-records", "42")
+                .put("changed-partition-count", "1")
+                .put("flink.job-id", jobId)
+                .put("flink.max-committed-checkpoint-id", "" + checkpointId)
+                .put("flink.operator-id", operatorId)
+                .put("total-data-files", "1")
+                .put("total-delete-files", "0")
+                .put("total-equality-deletes", "0")
+                .put("total-files-size", "0")
+                .put("total-position-deletes", "0")
+                .put("total-records", "42")
+                .build());
+  }
+
   @Test
   void testTableBranchAtomicCommitForAppendOnlyData() throws Exception {
     Table table = catalog.loadTable(TableIdentifier.of(TABLE1));
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
index 20fae212b4..f0cc46df46 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
@@ -56,6 +56,7 @@ import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.data.IcebergGenerics;
@@ -73,6 +74,7 @@ import 
org.apache.iceberg.flink.sink.dynamic.TestDynamicCommitter.FailBeforeAndA
 import org.apache.iceberg.inmemory.InMemoryInputFile;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -522,6 +524,209 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
     }
   }
 
+  @Test
+  void testUpsertV3() throws Exception {
+    ImmutableMap<String, String> properties = 
ImmutableMap.of(TableProperties.FORMAT_VERSION, "3");
+    CATALOG_EXTENSION
+        .catalog()
+        .createTable(
+            TableIdentifier.of(DATABASE, "t1"),
+            SimpleDataUtil.SCHEMA,
+            PartitionSpec.unpartitioned(),
+            null,
+            properties);
+
+    List<DynamicIcebergDataImpl> rows =
+        Lists.newArrayList(
+            // Insert one rows
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA,
+                "t1",
+                "main",
+                PartitionSpec.unpartitioned(),
+                true,
+                Sets.newHashSet("id"),
+                false),
+            // Remaining rows are duplicates
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA,
+                "t1",
+                "main",
+                PartitionSpec.unpartitioned(),
+                true,
+                Sets.newHashSet("id"),
+                true),
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA,
+                "t1",
+                "main",
+                PartitionSpec.unpartitioned(),
+                true,
+                Sets.newHashSet("id"),
+                true),
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA,
+                "t1",
+                "main",
+                PartitionSpec.unpartitioned(),
+                true,
+                Sets.newHashSet("id"),
+                true));
+
+    executeDynamicSink(rows, env, true, 1, null);
+
+    try (CloseableIterable<Record> iterable =
+        IcebergGenerics.read(
+                
CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.of("default", "t1")))
+            .build()) {
+      List<Record> records = Lists.newArrayList();
+      for (Record record : iterable) {
+        records.add(record);
+      }
+
+      assertThat(records).hasSize(1);
+      Record actual = records.get(0);
+      DynamicIcebergDataImpl input = rows.get(0);
+      assertThat(actual.get(0)).isEqualTo(input.rowProvided.getField(0));
+      assertThat(actual.get(1)).isEqualTo(input.rowProvided.getField(1));
+    }
+  }
+
+  @Test
+  void testMultiFormatVersion() throws Exception {
+    ImmutableMap<String, String> properties = 
ImmutableMap.of(TableProperties.FORMAT_VERSION, "3");
+    CATALOG_EXTENSION
+        .catalog()
+        .createTable(
+            TableIdentifier.of(DATABASE, "t1"),
+            SimpleDataUtil.SCHEMA,
+            PartitionSpec.unpartitioned(),
+            null,
+            properties);
+
+    ImmutableMap<String, String> properties1 = 
ImmutableMap.of(TableProperties.FORMAT_VERSION, "2");
+    CATALOG_EXTENSION
+        .catalog()
+        .createTable(
+            TableIdentifier.of(DATABASE, "t2"),
+            SimpleDataUtil.SCHEMA,
+            PartitionSpec.unpartitioned(),
+            null,
+            properties1);
+
+    List<DynamicIcebergDataImpl> rowsForTable1 =
+        Lists.newArrayList(
+            // Insert one rows
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA,
+                "t1",
+                "main",
+                PartitionSpec.unpartitioned(),
+                true,
+                Sets.newHashSet("id"),
+                false),
+            // Remaining rows are duplicates
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA,
+                "t1",
+                "main",
+                PartitionSpec.unpartitioned(),
+                true,
+                Sets.newHashSet("id"),
+                true),
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA,
+                "t1",
+                "main",
+                PartitionSpec.unpartitioned(),
+                true,
+                Sets.newHashSet("id"),
+                true),
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA,
+                "t1",
+                "main",
+                PartitionSpec.unpartitioned(),
+                true,
+                Sets.newHashSet("id"),
+                true));
+
+    List<DynamicIcebergDataImpl> rowsForTable2 =
+        Lists.newArrayList(
+            // Insert one rows
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA,
+                "t2",
+                "main",
+                PartitionSpec.unpartitioned(),
+                true,
+                Sets.newHashSet("id"),
+                false),
+            // Remaining rows are duplicates
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA,
+                "t2",
+                "main",
+                PartitionSpec.unpartitioned(),
+                true,
+                Sets.newHashSet("id"),
+                true),
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA,
+                "t2",
+                "main",
+                PartitionSpec.unpartitioned(),
+                true,
+                Sets.newHashSet("id"),
+                true),
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA,
+                "t2",
+                "main",
+                PartitionSpec.unpartitioned(),
+                true,
+                Sets.newHashSet("id"),
+                true));
+
+    List<DynamicIcebergDataImpl> rows = Lists.newArrayList();
+    rows.addAll(rowsForTable1);
+    rows.addAll(rowsForTable2);
+
+    executeDynamicSink(rows, env, true, 1, null);
+
+    try (CloseableIterable<Record> iterable =
+        IcebergGenerics.read(
+                
CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.of("default", "t1")))
+            .build()) {
+      List<Record> records = Lists.newArrayList();
+      for (Record record : iterable) {
+        records.add(record);
+      }
+
+      assertThat(records).hasSize(1);
+      Record actual = records.get(0);
+      DynamicIcebergDataImpl input = rowsForTable1.get(0);
+      assertThat(actual.get(0)).isEqualTo(input.rowProvided.getField(0));
+      assertThat(actual.get(1)).isEqualTo(input.rowProvided.getField(1));
+    }
+
+    try (CloseableIterable<Record> iterable =
+        IcebergGenerics.read(
+                
CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.of("default", "t2")))
+            .build()) {
+      List<Record> records = Lists.newArrayList();
+      for (Record record : iterable) {
+        records.add(record);
+      }
+
+      assertThat(records).hasSize(1);
+      Record actual = records.get(0);
+      DynamicIcebergDataImpl input = rowsForTable2.get(0);
+      assertThat(actual.get(0)).isEqualTo(input.rowProvided.getField(0));
+      assertThat(actual.get(1)).isEqualTo(input.rowProvided.getField(1));
+    }
+  }
+
   @Test
   void testCommitFailedBeforeOrAfterCommit() throws Exception {
     // Configure a Restart strategy to allow recovery

Reply via email to