This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 11274f65dc Flink: Backport fix commit duplication in 
DynamicIcebergSink (#14637)
11274f65dc is described below

commit 11274f65dc5972a9d260b3c64e7026ab97fd08f0
Author: aiborodin <[email protected]>
AuthorDate: Thu Nov 20 20:00:39 2025 +1100

    Flink: Backport fix commit duplication in DynamicIcebergSink (#14637)
---
 .../flink/sink/dynamic/DynamicCommitter.java       |  69 +++++++++--
 .../flink/sink/dynamic/TestDynamicCommitter.java   | 130 ++++++++++++++++++---
 .../flink/sink/dynamic/TestDynamicIcebergSink.java | 126 +++++++++++++++++---
 .../flink/sink/dynamic/DynamicCommitter.java       |  69 +++++++++--
 .../flink/sink/dynamic/TestDynamicCommitter.java   | 130 ++++++++++++++++++---
 .../flink/sink/dynamic/TestDynamicIcebergSink.java | 126 +++++++++++++++++---
 6 files changed, 574 insertions(+), 76 deletions(-)

diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
index d5774b66af..61b20cb27b 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
@@ -37,12 +37,14 @@ import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ReplacePartitions;
 import org.apache.iceberg.RowDelta;
 import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotAncestryValidator;
 import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableUtil;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.flink.sink.CommitSummary;
 import org.apache.iceberg.flink.sink.DeltaManifests;
 import org.apache.iceberg.flink.sink.DeltaManifestsSerializer;
@@ -55,6 +57,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.util.ContentFileUtil;
 import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.iceberg.util.ThreadPools;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -141,9 +144,13 @@ class DynamicCommitter implements 
Committer<DynamicCommittable> {
         commitRequestMap.entrySet()) {
       Table table = 
catalog.loadTable(TableIdentifier.parse(entry.getKey().tableName()));
       DynamicCommittable last = 
entry.getValue().lastEntry().getValue().get(0).getCommittable();
+      Snapshot latestSnapshot = table.snapshot(entry.getKey().branch());
+      Iterable<Snapshot> ancestors =
+          latestSnapshot != null
+              ? SnapshotUtil.ancestorsOf(latestSnapshot.snapshotId(), 
table::snapshot)
+              : List.of();
       long maxCommittedCheckpointId =
-          getMaxCommittedCheckpointId(
-              table, last.jobId(), last.operatorId(), entry.getKey().branch());
+          getMaxCommittedCheckpointId(ancestors, last.jobId(), 
last.operatorId());
       // Mark the already committed FilesCommittable(s) as finished
       entry
           .getValue()
@@ -160,12 +167,11 @@ class DynamicCommitter implements 
Committer<DynamicCommittable> {
   }
 
   private static long getMaxCommittedCheckpointId(
-      Table table, String flinkJobId, String operatorId, String branch) {
-    Snapshot snapshot = table.snapshot(branch);
+      Iterable<Snapshot> ancestors, String flinkJobId, String operatorId) {
     long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
 
-    while (snapshot != null) {
-      Map<String, String> summary = snapshot.summary();
+    for (Snapshot ancestor : ancestors) {
+      Map<String, String> summary = ancestor.summary();
       String snapshotFlinkJobId = summary.get(FLINK_JOB_ID);
       String snapshotOperatorId = summary.get(OPERATOR_ID);
       if (flinkJobId.equals(snapshotFlinkJobId)
@@ -176,9 +182,6 @@ class DynamicCommitter implements 
Committer<DynamicCommittable> {
           break;
         }
       }
-
-      Long parentSnapshotId = snapshot.parentId();
-      snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) : 
null;
     }
 
     return lastCommittedCheckpointId;
@@ -347,6 +350,36 @@ class DynamicCommitter implements 
Committer<DynamicCommittable> {
     }
   }
 
+  private static class MaxCommittedCheckpointMismatchException extends 
ValidationException {
+    private MaxCommittedCheckpointMismatchException() {
+      super("Table already contains staged changes.");
+    }
+  }
+
+  private static class MaxCommittedCheckpointIdValidator implements 
SnapshotAncestryValidator {
+    private final long stagedCheckpointId;
+    private final String flinkJobId;
+    private final String flinkOperatorId;
+
+    private MaxCommittedCheckpointIdValidator(
+        long stagedCheckpointId, String flinkJobId, String flinkOperatorId) {
+      this.stagedCheckpointId = stagedCheckpointId;
+      this.flinkJobId = flinkJobId;
+      this.flinkOperatorId = flinkOperatorId;
+    }
+
+    @Override
+    public Boolean apply(Iterable<Snapshot> baseSnapshots) {
+      long maxCommittedCheckpointId =
+          getMaxCommittedCheckpointId(baseSnapshots, flinkJobId, 
flinkOperatorId);
+      if (maxCommittedCheckpointId >= stagedCheckpointId) {
+        throw new MaxCommittedCheckpointMismatchException();
+      }
+
+      return true;
+    }
+  }
+
   @VisibleForTesting
   void commitOperation(
       Table table,
@@ -372,9 +405,25 @@ class DynamicCommitter implements 
Committer<DynamicCommittable> {
     operation.set(FLINK_JOB_ID, newFlinkJobId);
     operation.set(OPERATOR_ID, operatorId);
     operation.toBranch(branch);
+    operation.validateWith(
+        new MaxCommittedCheckpointIdValidator(checkpointId, newFlinkJobId, 
operatorId));
 
     long startNano = System.nanoTime();
-    operation.commit(); // abort is automatically called if this fails.
+    try {
+      operation.commit(); // abort is automatically called if this fails.
+    } catch (MaxCommittedCheckpointMismatchException e) {
+      LOG.info(
+          "Skipping commit operation {} because the {} branch of the {} table 
already contains changes for checkpoint {}."
+              + " This can occur when a failure prevents the committer from 
receiving confirmation of a"
+              + " successful commit, causing the Flink job to retry committing 
the same set of changes.",
+          description,
+          branch,
+          table.name(),
+          checkpointId,
+          e);
+      return;
+    }
+
     long durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startNano);
     LOG.info(
         "Committed {} to table: {}, branch: {}, checkpointId {} in {} ms",
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
index d2c688e28c..1497458e60 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
@@ -41,6 +41,7 @@ import org.apache.iceberg.Metrics;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
@@ -59,6 +60,8 @@ import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 class TestDynamicCommitter {
 
@@ -669,11 +672,15 @@ class TestDynamicCommitter {
     assertThatThrownBy(commitExecutable);
     assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isTrue();
 
-    // Second fail during commit
+    // Second fail before table update
     assertThatThrownBy(commitExecutable);
-    assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isTrue();
+    assertThat(FailBeforeAndAfterCommit.failedBeforeCommitOperation).isTrue();
 
-    // Third fail after commit
+    // Third fail after table update
+    assertThatThrownBy(commitExecutable);
+    assertThat(FailBeforeAndAfterCommit.failedAfterCommitOperation).isTrue();
+
+    // Fourth fail after commit
     assertThatThrownBy(commitExecutable);
     assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isTrue();
 
@@ -812,18 +819,101 @@ class TestDynamicCommitter {
                 .build());
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {false, true})
+  void testThrowsValidationExceptionOnDuplicateCommit(boolean overwriteMode) 
throws Exception {
+    Table table = catalog.loadTable(TableIdentifier.of(TABLE1));
+    assertThat(table.snapshots()).isEmpty();
+
+    DynamicWriteResultAggregator aggregator =
+        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), 
cacheMaximumSize);
+    OneInputStreamOperatorTestHarness aggregatorHarness =
+        new OneInputStreamOperatorTestHarness(aggregator);
+    aggregatorHarness.open();
+
+    final String jobId = JobID.generate().toHexString();
+    final String operatorId = new OperatorID().toHexString();
+    final int checkpointId = 1;
+    final String branch = SnapshotRef.MAIN_BRANCH;
+
+    WriteTarget writeTarget = new WriteTarget(TABLE1, branch, 42, 0, false, 
Sets.newHashSet(1, 2));
+    byte[] manifest =
+        aggregator.writeToManifest(
+            writeTarget,
+            Sets.newHashSet(
+                new DynamicWriteResult(
+                    writeTarget, 
WriteResult.builder().addDataFiles(DATA_FILE).build())),
+            checkpointId);
+
+    CommitRequest<DynamicCommittable> commitRequest1 =
+        new MockCommitRequest<>(
+            new DynamicCommittable(writeTarget, manifest, jobId, operatorId, 
checkpointId));
+    Collection<CommitRequest<DynamicCommittable>> commitRequests = 
Sets.newHashSet(commitRequest1);
+
+    int workerPoolSize = 1;
+    String sinkId = "sinkId";
+    UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup();
+    DynamicCommitterMetrics committerMetrics = new 
DynamicCommitterMetrics(metricGroup);
+
+    CommitHook commitHook =
+        new TestDynamicIcebergSink.DuplicateCommitHook(
+            () ->
+                new DynamicCommitter(
+                    CATALOG_EXTENSION.catalog(),
+                    Map.of(),
+                    overwriteMode,
+                    workerPoolSize,
+                    sinkId,
+                    committerMetrics));
+
+    DynamicCommitter mainCommitter =
+        new CommitHookEnabledDynamicCommitter(
+            commitHook,
+            CATALOG_EXTENSION.catalog(),
+            Maps.newHashMap(),
+            overwriteMode,
+            workerPoolSize,
+            sinkId,
+            committerMetrics);
+
+    mainCommitter.commit(commitRequests);
+
+    // Only one commit should succeed
+    table.refresh();
+    assertThat(table.snapshots()).hasSize(1);
+    assertThat(table.currentSnapshot().summary())
+        .containsAllEntriesOf(
+            ImmutableMap.<String, String>builder()
+                .put("added-data-files", "1")
+                .put("added-records", "42")
+                .put("changed-partition-count", "1")
+                .put("flink.job-id", jobId)
+                .put("flink.max-committed-checkpoint-id", 
String.valueOf(checkpointId))
+                .put("flink.operator-id", operatorId)
+                .put("total-data-files", "1")
+                .put("total-delete-files", "0")
+                .put("total-equality-deletes", "0")
+                .put("total-files-size", "0")
+                .put("total-position-deletes", "0")
+                .put("total-records", "42")
+                .build());
+  }
+
   interface CommitHook extends Serializable {
-    void beforeCommit();
+    default void beforeCommit(Collection<CommitRequest<DynamicCommittable>> 
commitRequests) {}
 
-    void duringCommit();
+    default void beforeCommitOperation() {}
 
-    void afterCommit();
+    default void afterCommitOperation() {}
+
+    default void afterCommit() {}
   }
 
   static class FailBeforeAndAfterCommit implements CommitHook {
 
     static boolean failedBeforeCommit;
-    static boolean failedDuringCommit;
+    static boolean failedBeforeCommitOperation;
+    static boolean failedAfterCommitOperation;
     static boolean failedAfterCommit;
 
     FailBeforeAndAfterCommit() {
@@ -831,7 +921,7 @@ class TestDynamicCommitter {
     }
 
     @Override
-    public void beforeCommit() {
+    public void beforeCommit(Collection<CommitRequest<DynamicCommittable>> 
ignored) {
       if (!failedBeforeCommit) {
         failedBeforeCommit = true;
         throw new RuntimeException("Failing before commit");
@@ -839,10 +929,18 @@ class TestDynamicCommitter {
     }
 
     @Override
-    public void duringCommit() {
-      if (!failedDuringCommit) {
-        failedDuringCommit = true;
-        throw new RuntimeException("Failing during commit");
+    public void beforeCommitOperation() {
+      if (!failedBeforeCommitOperation) {
+        failedBeforeCommitOperation = true;
+        throw new RuntimeException("Failing before commit operation");
+      }
+    }
+
+    @Override
+    public void afterCommitOperation() {
+      if (!failedAfterCommitOperation) {
+        failedAfterCommitOperation = true;
+        throw new RuntimeException("Failing after commit operation");
       }
     }
 
@@ -856,7 +954,8 @@ class TestDynamicCommitter {
 
     static void reset() {
       failedBeforeCommit = false;
-      failedDuringCommit = false;
+      failedBeforeCommitOperation = false;
+      failedAfterCommitOperation = false;
       failedAfterCommit = false;
     }
   }
@@ -880,7 +979,7 @@ class TestDynamicCommitter {
     @Override
     public void commit(Collection<CommitRequest<DynamicCommittable>> 
commitRequests)
         throws IOException, InterruptedException {
-      commitHook.beforeCommit();
+      commitHook.beforeCommit(commitRequests);
       super.commit(commitRequests);
       commitHook.afterCommit();
     }
@@ -895,9 +994,10 @@ class TestDynamicCommitter {
         String newFlinkJobId,
         String operatorId,
         long checkpointId) {
+      commitHook.beforeCommitOperation();
       super.commitOperation(
           table, branch, operation, summary, description, newFlinkJobId, 
operatorId, checkpointId);
-      commitHook.duringCommit();
+      commitHook.afterCommitOperation();
     }
   }
 }
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
index f0cc46df46..b660d8e285 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
@@ -25,6 +25,7 @@ import static org.assertj.core.api.Assertions.fail;
 import java.io.IOException;
 import java.io.Serializable;
 import java.time.Duration;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -38,6 +39,7 @@ import 
org.apache.flink.api.connector.sink2.CommitterInitContext;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestartStrategyOptions;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -49,6 +51,7 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.function.SerializableSupplier;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.DistributionMode;
@@ -81,6 +84,8 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Types;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 class TestDynamicIcebergSink extends TestFlinkIcebergSinkBase {
 
@@ -732,8 +737,8 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
     // Configure a Restart strategy to allow recovery
     Configuration configuration = new Configuration();
     configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
-    // Allow max 3 retries to make up for the three failures we are simulating 
here
-    
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 
3);
+    // Allow max 4 retries to make up for the four failures we are simulating 
here
+    
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 
4);
     
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, 
Duration.ZERO);
     env.configure(configuration);
 
@@ -746,13 +751,15 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
 
     final CommitHook commitHook = new FailBeforeAndAfterCommit();
     assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isFalse();
-    assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isFalse();
+    assertThat(FailBeforeAndAfterCommit.failedBeforeCommitOperation).isFalse();
+    assertThat(FailBeforeAndAfterCommit.failedAfterCommitOperation).isFalse();
     assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isFalse();
 
     executeDynamicSink(rows, env, true, 1, commitHook);
 
     assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isTrue();
-    assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isTrue();
+    assertThat(FailBeforeAndAfterCommit.failedBeforeCommitOperation).isTrue();
+    assertThat(FailBeforeAndAfterCommit.failedAfterCommitOperation).isTrue();
     assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isTrue();
   }
 
@@ -775,6 +782,90 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
     executeDynamicSink(rows, env, true, 1, commitHook);
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {false, true})
+  void testCommitsOnceWhenConcurrentDuplicateCommit(boolean overwriteMode) 
throws Exception {
+    TableIdentifier tableId = TableIdentifier.of(DATABASE, "t1");
+    List<DynamicIcebergDataImpl> records =
+        Lists.newArrayList(
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA, tableId.name(), "main", 
PartitionSpec.unpartitioned()),
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA, tableId.name(), "main", 
PartitionSpec.unpartitioned()));
+
+    CommitHook duplicateCommit =
+        new DuplicateCommitHook(
+            () ->
+                new DynamicCommitter(
+                    CATALOG_EXTENSION.catalogLoader().loadCatalog(),
+                    Collections.emptyMap(),
+                    overwriteMode,
+                    10,
+                    "sinkId",
+                    new DynamicCommitterMetrics(new 
UnregisteredMetricsGroup())));
+
+    executeDynamicSink(records, env, true, 2, duplicateCommit, overwriteMode);
+
+    Table table = CATALOG_EXTENSION.catalog().loadTable(tableId);
+
+    if (!overwriteMode) {
+      verifyResults(records);
+      assertThat(table.currentSnapshot().summary())
+          .containsAllEntriesOf(Map.of("total-records", 
String.valueOf(records.size())));
+    }
+
+    long totalAddedRecords =
+        Lists.newArrayList(table.snapshots()).stream()
+            .map(snapshot -> snapshot.summary().getOrDefault("added-records", 
"0"))
+            .mapToLong(Long::valueOf)
+            .sum();
+    assertThat(totalAddedRecords).isEqualTo(records.size());
+  }
+
+  /**
+   * Represents a concurrent duplicate commit during an ongoing commit 
operation, which can happen
+   * in production scenarios when using REST catalog.
+   */
+  static class DuplicateCommitHook implements CommitHook {
+    // Static to maintain state after Flink restarts
+    private static boolean hasTriggered = false;
+
+    private final SerializableSupplier<DynamicCommitter> 
duplicateCommitterSupplier;
+    private final List<Committer.CommitRequest<DynamicCommittable>> 
commitRequests;
+
+    DuplicateCommitHook(SerializableSupplier<DynamicCommitter> 
duplicateCommitterSupplier) {
+      this.duplicateCommitterSupplier = duplicateCommitterSupplier;
+      this.commitRequests = Lists.newArrayList();
+
+      resetState();
+    }
+
+    private static void resetState() {
+      hasTriggered = false;
+    }
+
+    @Override
+    public void 
beforeCommit(Collection<Committer.CommitRequest<DynamicCommittable>> requests) {
+      if (!hasTriggered) {
+        this.commitRequests.addAll(requests);
+      }
+    }
+
+    @Override
+    public void beforeCommitOperation() {
+      if (!hasTriggered) {
+        try {
+          duplicateCommitterSupplier.get().commit(commitRequests);
+        } catch (final IOException | InterruptedException e) {
+          throw new RuntimeException("Duplicate committer failed", e);
+        }
+
+        commitRequests.clear();
+        hasTriggered = true;
+      }
+    }
+  }
+
   private static class AppendRightBeforeCommit implements CommitHook {
 
     final String tableIdentifier;
@@ -784,10 +875,7 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
     }
 
     @Override
-    public void beforeCommit() {}
-
-    @Override
-    public void duringCommit() {
+    public void beforeCommitOperation() {
       // Create a conflict
       Table table = 
CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.parse(tableIdentifier));
       DataFile dataFile =
@@ -798,9 +886,6 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
               .build();
       table.newAppend().appendFile(dataFile).commit();
     }
-
-    @Override
-    public void afterCommit() {}
   }
 
   private void runTest(List<DynamicIcebergDataImpl> dynamicData) throws 
Exception {
@@ -831,8 +916,19 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
       int parallelism,
       @Nullable CommitHook commitHook)
       throws Exception {
+    executeDynamicSink(dynamicData, env, immediateUpdate, parallelism, 
commitHook, false);
+  }
+
+  private void executeDynamicSink(
+      List<DynamicIcebergDataImpl> dynamicData,
+      StreamExecutionEnvironment env,
+      boolean immediateUpdate,
+      int parallelism,
+      @Nullable CommitHook commitHook,
+      boolean overwrite)
+      throws Exception {
     DataStream<DynamicIcebergDataImpl> dataStream =
-        env.addSource(createBoundedSource(dynamicData), TypeInformation.of(new 
TypeHint<>() {}));
+        env.fromData(dynamicData, TypeInformation.of(new TypeHint<>() {}));
     env.setParallelism(parallelism);
 
     if (commitHook != null) {
@@ -843,6 +939,7 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
           .writeParallelism(parallelism)
           .immediateTableUpdate(immediateUpdate)
           .setSnapshotProperty("commit.retry.num-retries", "0")
+          .overwrite(overwrite)
           .append();
     } else {
       DynamicIcebergSink.forInput(dataStream)
@@ -850,6 +947,7 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
           .catalogLoader(CATALOG_EXTENSION.catalogLoader())
           .writeParallelism(parallelism)
           .immediateTableUpdate(immediateUpdate)
+          .overwrite(overwrite)
           .append();
     }
 
@@ -881,6 +979,7 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
   static class CommitHookDynamicIcebergSink extends DynamicIcebergSink {
 
     private final CommitHook commitHook;
+    private final boolean overwriteMode;
 
     CommitHookDynamicIcebergSink(
         CommitHook commitHook,
@@ -898,6 +997,7 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
           flinkWriteConf,
           cacheMaximumSize);
       this.commitHook = commitHook;
+      this.overwriteMode = flinkWriteConf.overwriteMode();
     }
 
     @Override
@@ -906,7 +1006,7 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
           commitHook,
           CATALOG_EXTENSION.catalogLoader().loadCatalog(),
           Collections.emptyMap(),
-          false,
+          overwriteMode,
           10,
           "sinkId",
           new DynamicCommitterMetrics(context.metricGroup()));
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
index d5774b66af..61b20cb27b 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
@@ -37,12 +37,14 @@ import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ReplacePartitions;
 import org.apache.iceberg.RowDelta;
 import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotAncestryValidator;
 import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableUtil;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.flink.sink.CommitSummary;
 import org.apache.iceberg.flink.sink.DeltaManifests;
 import org.apache.iceberg.flink.sink.DeltaManifestsSerializer;
@@ -55,6 +57,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.util.ContentFileUtil;
 import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.iceberg.util.ThreadPools;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -141,9 +144,13 @@ class DynamicCommitter implements 
Committer<DynamicCommittable> {
         commitRequestMap.entrySet()) {
       Table table = 
catalog.loadTable(TableIdentifier.parse(entry.getKey().tableName()));
       DynamicCommittable last = 
entry.getValue().lastEntry().getValue().get(0).getCommittable();
+      Snapshot latestSnapshot = table.snapshot(entry.getKey().branch());
+      Iterable<Snapshot> ancestors =
+          latestSnapshot != null
+              ? SnapshotUtil.ancestorsOf(latestSnapshot.snapshotId(), 
table::snapshot)
+              : List.of();
       long maxCommittedCheckpointId =
-          getMaxCommittedCheckpointId(
-              table, last.jobId(), last.operatorId(), entry.getKey().branch());
+          getMaxCommittedCheckpointId(ancestors, last.jobId(), 
last.operatorId());
       // Mark the already committed FilesCommittable(s) as finished
       entry
           .getValue()
@@ -160,12 +167,11 @@ class DynamicCommitter implements 
Committer<DynamicCommittable> {
   }
 
   private static long getMaxCommittedCheckpointId(
-      Table table, String flinkJobId, String operatorId, String branch) {
-    Snapshot snapshot = table.snapshot(branch);
+      Iterable<Snapshot> ancestors, String flinkJobId, String operatorId) {
     long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
 
-    while (snapshot != null) {
-      Map<String, String> summary = snapshot.summary();
+    for (Snapshot ancestor : ancestors) {
+      Map<String, String> summary = ancestor.summary();
       String snapshotFlinkJobId = summary.get(FLINK_JOB_ID);
       String snapshotOperatorId = summary.get(OPERATOR_ID);
       if (flinkJobId.equals(snapshotFlinkJobId)
@@ -176,9 +182,6 @@ class DynamicCommitter implements 
Committer<DynamicCommittable> {
           break;
         }
       }
-
-      Long parentSnapshotId = snapshot.parentId();
-      snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) : 
null;
     }
 
     return lastCommittedCheckpointId;
@@ -347,6 +350,36 @@ class DynamicCommitter implements 
Committer<DynamicCommittable> {
     }
   }
 
+  private static class MaxCommittedCheckpointMismatchException extends 
ValidationException {
+    private MaxCommittedCheckpointMismatchException() {
+      super("Table already contains staged changes.");
+    }
+  }
+
+  private static class MaxCommittedCheckpointIdValidator implements 
SnapshotAncestryValidator {
+    private final long stagedCheckpointId;
+    private final String flinkJobId;
+    private final String flinkOperatorId;
+
+    private MaxCommittedCheckpointIdValidator(
+        long stagedCheckpointId, String flinkJobId, String flinkOperatorId) {
+      this.stagedCheckpointId = stagedCheckpointId;
+      this.flinkJobId = flinkJobId;
+      this.flinkOperatorId = flinkOperatorId;
+    }
+
+    @Override
+    public Boolean apply(Iterable<Snapshot> baseSnapshots) {
+      long maxCommittedCheckpointId =
+          getMaxCommittedCheckpointId(baseSnapshots, flinkJobId, 
flinkOperatorId);
+      if (maxCommittedCheckpointId >= stagedCheckpointId) {
+        throw new MaxCommittedCheckpointMismatchException();
+      }
+
+      return true;
+    }
+  }
+
   @VisibleForTesting
   void commitOperation(
       Table table,
@@ -372,9 +405,25 @@ class DynamicCommitter implements 
Committer<DynamicCommittable> {
     operation.set(FLINK_JOB_ID, newFlinkJobId);
     operation.set(OPERATOR_ID, operatorId);
     operation.toBranch(branch);
+    operation.validateWith(
+        new MaxCommittedCheckpointIdValidator(checkpointId, newFlinkJobId, 
operatorId));
 
     long startNano = System.nanoTime();
-    operation.commit(); // abort is automatically called if this fails.
+    try {
+      operation.commit(); // abort is automatically called if this fails.
+    } catch (MaxCommittedCheckpointMismatchException e) {
+      LOG.info(
+          "Skipping commit operation {} because the {} branch of the {} table 
already contains changes for checkpoint {}."
+              + " This can occur when a failure prevents the committer from 
receiving confirmation of a"
+              + " successful commit, causing the Flink job to retry committing 
the same set of changes.",
+          description,
+          branch,
+          table.name(),
+          checkpointId,
+          e);
+      return;
+    }
+
     long durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startNano);
     LOG.info(
         "Committed {} to table: {}, branch: {}, checkpointId {} in {} ms",
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
index 16c0cb8e6c..5f938d4e88 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
@@ -41,6 +41,7 @@ import org.apache.iceberg.Metrics;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
@@ -59,6 +60,8 @@ import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 class TestDynamicCommitter {
 
@@ -669,11 +672,15 @@ class TestDynamicCommitter {
     assertThatThrownBy(commitExecutable);
     assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isTrue();
 
-    // Second fail during commit
+    // Second fail before table update
     assertThatThrownBy(commitExecutable);
-    assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isTrue();
+    assertThat(FailBeforeAndAfterCommit.failedBeforeCommitOperation).isTrue();
 
-    // Third fail after commit
+    // Third fail after table update
+    assertThatThrownBy(commitExecutable);
+    assertThat(FailBeforeAndAfterCommit.failedAfterCommitOperation).isTrue();
+
+    // Fourth fail after commit
     assertThatThrownBy(commitExecutable);
     assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isTrue();
 
@@ -877,18 +884,101 @@ class TestDynamicCommitter {
                 .build());
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {false, true})
+  void testThrowsValidationExceptionOnDuplicateCommit(boolean overwriteMode) 
throws Exception {
+    Table table = catalog.loadTable(TableIdentifier.of(TABLE1));
+    assertThat(table.snapshots()).isEmpty();
+
+    DynamicWriteResultAggregator aggregator =
+        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), 
cacheMaximumSize);
+    OneInputStreamOperatorTestHarness aggregatorHarness =
+        new OneInputStreamOperatorTestHarness(aggregator);
+    aggregatorHarness.open();
+
+    final String jobId = JobID.generate().toHexString();
+    final String operatorId = new OperatorID().toHexString();
+    final int checkpointId = 1;
+    final String branch = SnapshotRef.MAIN_BRANCH;
+
+    WriteTarget writeTarget = new WriteTarget(TABLE1, branch, 42, 0, false, 
Sets.newHashSet(1, 2));
+    byte[] manifest =
+        aggregator.writeToManifest(
+            writeTarget,
+            Sets.newHashSet(
+                new DynamicWriteResult(
+                    writeTarget, 
WriteResult.builder().addDataFiles(DATA_FILE).build())),
+            checkpointId);
+
+    CommitRequest<DynamicCommittable> commitRequest1 =
+        new MockCommitRequest<>(
+            new DynamicCommittable(writeTarget, manifest, jobId, operatorId, 
checkpointId));
+    Collection<CommitRequest<DynamicCommittable>> commitRequests = 
Sets.newHashSet(commitRequest1);
+
+    int workerPoolSize = 1;
+    String sinkId = "sinkId";
+    UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup();
+    DynamicCommitterMetrics committerMetrics = new 
DynamicCommitterMetrics(metricGroup);
+
+    CommitHook commitHook =
+        new TestDynamicIcebergSink.DuplicateCommitHook(
+            () ->
+                new DynamicCommitter(
+                    CATALOG_EXTENSION.catalog(),
+                    Map.of(),
+                    overwriteMode,
+                    workerPoolSize,
+                    sinkId,
+                    committerMetrics));
+
+    DynamicCommitter mainCommitter =
+        new CommitHookEnabledDynamicCommitter(
+            commitHook,
+            CATALOG_EXTENSION.catalog(),
+            Maps.newHashMap(),
+            overwriteMode,
+            workerPoolSize,
+            sinkId,
+            committerMetrics);
+
+    mainCommitter.commit(commitRequests);
+
+    // Only one commit should succeed
+    table.refresh();
+    assertThat(table.snapshots()).hasSize(1);
+    assertThat(table.currentSnapshot().summary())
+        .containsAllEntriesOf(
+            ImmutableMap.<String, String>builder()
+                .put("added-data-files", "1")
+                .put("added-records", "42")
+                .put("changed-partition-count", "1")
+                .put("flink.job-id", jobId)
+                .put("flink.max-committed-checkpoint-id", 
String.valueOf(checkpointId))
+                .put("flink.operator-id", operatorId)
+                .put("total-data-files", "1")
+                .put("total-delete-files", "0")
+                .put("total-equality-deletes", "0")
+                .put("total-files-size", "0")
+                .put("total-position-deletes", "0")
+                .put("total-records", "42")
+                .build());
+  }
+
   interface CommitHook extends Serializable {
-    void beforeCommit();
+    default void beforeCommit(Collection<CommitRequest<DynamicCommittable>> 
commitRequests) {}
 
-    void duringCommit();
+    default void beforeCommitOperation() {}
 
-    void afterCommit();
+    default void afterCommitOperation() {}
+
+    default void afterCommit() {}
   }
 
   static class FailBeforeAndAfterCommit implements CommitHook {
 
     static boolean failedBeforeCommit;
-    static boolean failedDuringCommit;
+    static boolean failedBeforeCommitOperation;
+    static boolean failedAfterCommitOperation;
     static boolean failedAfterCommit;
 
     FailBeforeAndAfterCommit() {
@@ -896,7 +986,7 @@ class TestDynamicCommitter {
     }
 
     @Override
-    public void beforeCommit() {
+    public void beforeCommit(Collection<CommitRequest<DynamicCommittable>> 
ignored) {
       if (!failedBeforeCommit) {
         failedBeforeCommit = true;
         throw new RuntimeException("Failing before commit");
@@ -904,10 +994,18 @@ class TestDynamicCommitter {
     }
 
     @Override
-    public void duringCommit() {
-      if (!failedDuringCommit) {
-        failedDuringCommit = true;
-        throw new RuntimeException("Failing during commit");
+    public void beforeCommitOperation() {
+      if (!failedBeforeCommitOperation) {
+        failedBeforeCommitOperation = true;
+        throw new RuntimeException("Failing before commit operation");
+      }
+    }
+
+    @Override
+    public void afterCommitOperation() {
+      if (!failedAfterCommitOperation) {
+        failedAfterCommitOperation = true;
+        throw new RuntimeException("Failing after commit operation");
       }
     }
 
@@ -921,7 +1019,8 @@ class TestDynamicCommitter {
 
     static void reset() {
       failedBeforeCommit = false;
-      failedDuringCommit = false;
+      failedBeforeCommitOperation = false;
+      failedAfterCommitOperation = false;
       failedAfterCommit = false;
     }
   }
@@ -945,7 +1044,7 @@ class TestDynamicCommitter {
     @Override
     public void commit(Collection<CommitRequest<DynamicCommittable>> 
commitRequests)
         throws IOException, InterruptedException {
-      commitHook.beforeCommit();
+      commitHook.beforeCommit(commitRequests);
       super.commit(commitRequests);
       commitHook.afterCommit();
     }
@@ -960,9 +1059,10 @@ class TestDynamicCommitter {
         String newFlinkJobId,
         String operatorId,
         long checkpointId) {
+      commitHook.beforeCommitOperation();
       super.commitOperation(
           table, branch, operation, summary, description, newFlinkJobId, 
operatorId, checkpointId);
-      commitHook.duringCommit();
+      commitHook.afterCommitOperation();
     }
   }
 }
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
index f0cc46df46..b660d8e285 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
@@ -25,6 +25,7 @@ import static org.assertj.core.api.Assertions.fail;
 import java.io.IOException;
 import java.io.Serializable;
 import java.time.Duration;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -38,6 +39,7 @@ import 
org.apache.flink.api.connector.sink2.CommitterInitContext;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestartStrategyOptions;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -49,6 +51,7 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.function.SerializableSupplier;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.DistributionMode;
@@ -81,6 +84,8 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Types;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 class TestDynamicIcebergSink extends TestFlinkIcebergSinkBase {
 
@@ -732,8 +737,8 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
     // Configure a Restart strategy to allow recovery
     Configuration configuration = new Configuration();
     configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
-    // Allow max 3 retries to make up for the three failures we are simulating 
here
-    
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 
3);
+    // Allow max 4 retries to make up for the four failures we are simulating 
here
+    
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 
4);
     
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, 
Duration.ZERO);
     env.configure(configuration);
 
@@ -746,13 +751,15 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
 
     final CommitHook commitHook = new FailBeforeAndAfterCommit();
     assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isFalse();
-    assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isFalse();
+    assertThat(FailBeforeAndAfterCommit.failedBeforeCommitOperation).isFalse();
+    assertThat(FailBeforeAndAfterCommit.failedAfterCommitOperation).isFalse();
     assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isFalse();
 
     executeDynamicSink(rows, env, true, 1, commitHook);
 
     assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isTrue();
-    assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isTrue();
+    assertThat(FailBeforeAndAfterCommit.failedBeforeCommitOperation).isTrue();
+    assertThat(FailBeforeAndAfterCommit.failedAfterCommitOperation).isTrue();
     assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isTrue();
   }
 
@@ -775,6 +782,90 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
     executeDynamicSink(rows, env, true, 1, commitHook);
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {false, true})
+  void testCommitsOnceWhenConcurrentDuplicateCommit(boolean overwriteMode) 
throws Exception {
+    TableIdentifier tableId = TableIdentifier.of(DATABASE, "t1");
+    List<DynamicIcebergDataImpl> records =
+        Lists.newArrayList(
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA, tableId.name(), "main", 
PartitionSpec.unpartitioned()),
+            new DynamicIcebergDataImpl(
+                SimpleDataUtil.SCHEMA, tableId.name(), "main", 
PartitionSpec.unpartitioned()));
+
+    CommitHook duplicateCommit =
+        new DuplicateCommitHook(
+            () ->
+                new DynamicCommitter(
+                    CATALOG_EXTENSION.catalogLoader().loadCatalog(),
+                    Collections.emptyMap(),
+                    overwriteMode,
+                    10,
+                    "sinkId",
+                    new DynamicCommitterMetrics(new 
UnregisteredMetricsGroup())));
+
+    executeDynamicSink(records, env, true, 2, duplicateCommit, overwriteMode);
+
+    Table table = CATALOG_EXTENSION.catalog().loadTable(tableId);
+
+    if (!overwriteMode) {
+      verifyResults(records);
+      assertThat(table.currentSnapshot().summary())
+          .containsAllEntriesOf(Map.of("total-records", 
String.valueOf(records.size())));
+    }
+
+    long totalAddedRecords =
+        Lists.newArrayList(table.snapshots()).stream()
+            .map(snapshot -> snapshot.summary().getOrDefault("added-records", 
"0"))
+            .mapToLong(Long::valueOf)
+            .sum();
+    assertThat(totalAddedRecords).isEqualTo(records.size());
+  }
+
+  /**
+   * Represents a concurrent duplicate commit during an ongoing commit 
operation, which can happen
+   * in production scenarios when using REST catalog.
+   */
+  static class DuplicateCommitHook implements CommitHook {
+    // Static to maintain state after Flink restarts
+    private static boolean hasTriggered = false;
+
+    private final SerializableSupplier<DynamicCommitter> 
duplicateCommitterSupplier;
+    private final List<Committer.CommitRequest<DynamicCommittable>> 
commitRequests;
+
+    DuplicateCommitHook(SerializableSupplier<DynamicCommitter> 
duplicateCommitterSupplier) {
+      this.duplicateCommitterSupplier = duplicateCommitterSupplier;
+      this.commitRequests = Lists.newArrayList();
+
+      resetState();
+    }
+
+    private static void resetState() {
+      hasTriggered = false;
+    }
+
+    @Override
+    public void 
beforeCommit(Collection<Committer.CommitRequest<DynamicCommittable>> requests) {
+      if (!hasTriggered) {
+        this.commitRequests.addAll(requests);
+      }
+    }
+
+    @Override
+    public void beforeCommitOperation() {
+      if (!hasTriggered) {
+        try {
+          duplicateCommitterSupplier.get().commit(commitRequests);
+        } catch (final IOException | InterruptedException e) {
+          throw new RuntimeException("Duplicate committer failed", e);
+        }
+
+        commitRequests.clear();
+        hasTriggered = true;
+      }
+    }
+  }
+
   private static class AppendRightBeforeCommit implements CommitHook {
 
     final String tableIdentifier;
@@ -784,10 +875,7 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
     }
 
     @Override
-    public void beforeCommit() {}
-
-    @Override
-    public void duringCommit() {
+    public void beforeCommitOperation() {
       // Create a conflict
       Table table = 
CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.parse(tableIdentifier));
       DataFile dataFile =
@@ -798,9 +886,6 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
               .build();
       table.newAppend().appendFile(dataFile).commit();
     }
-
-    @Override
-    public void afterCommit() {}
   }
 
   private void runTest(List<DynamicIcebergDataImpl> dynamicData) throws 
Exception {
@@ -831,8 +916,19 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
       int parallelism,
       @Nullable CommitHook commitHook)
       throws Exception {
+    executeDynamicSink(dynamicData, env, immediateUpdate, parallelism, 
commitHook, false);
+  }
+
+  private void executeDynamicSink(
+      List<DynamicIcebergDataImpl> dynamicData,
+      StreamExecutionEnvironment env,
+      boolean immediateUpdate,
+      int parallelism,
+      @Nullable CommitHook commitHook,
+      boolean overwrite)
+      throws Exception {
     DataStream<DynamicIcebergDataImpl> dataStream =
-        env.addSource(createBoundedSource(dynamicData), TypeInformation.of(new 
TypeHint<>() {}));
+        env.fromData(dynamicData, TypeInformation.of(new TypeHint<>() {}));
     env.setParallelism(parallelism);
 
     if (commitHook != null) {
@@ -843,6 +939,7 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
           .writeParallelism(parallelism)
           .immediateTableUpdate(immediateUpdate)
           .setSnapshotProperty("commit.retry.num-retries", "0")
+          .overwrite(overwrite)
           .append();
     } else {
       DynamicIcebergSink.forInput(dataStream)
@@ -850,6 +947,7 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
           .catalogLoader(CATALOG_EXTENSION.catalogLoader())
           .writeParallelism(parallelism)
           .immediateTableUpdate(immediateUpdate)
+          .overwrite(overwrite)
           .append();
     }
 
@@ -881,6 +979,7 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
   static class CommitHookDynamicIcebergSink extends DynamicIcebergSink {
 
     private final CommitHook commitHook;
+    private final boolean overwriteMode;
 
     CommitHookDynamicIcebergSink(
         CommitHook commitHook,
@@ -898,6 +997,7 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
           flinkWriteConf,
           cacheMaximumSize);
       this.commitHook = commitHook;
+      this.overwriteMode = flinkWriteConf.overwriteMode();
     }
 
     @Override
@@ -906,7 +1006,7 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
           commitHook,
           CATALOG_EXTENSION.catalogLoader().loadCatalog(),
           Collections.emptyMap(),
-          false,
+          overwriteMode,
           10,
           "sinkId",
           new DynamicCommitterMetrics(context.metricGroup()));

Reply via email to