This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c09e9fee1 Flink: Backport: Enable multiple flink sinks for the same 
table in the same job (#6536)
0c09e9fee1 is described below

commit 0c09e9fee15b5621086fc03e60a4deaa29d83c01
Author: pvary <[email protected]>
AuthorDate: Fri Jan 6 21:20:14 2023 +0100

    Flink: Backport: Enable multiple flink sinks for the same table in the same 
job (#6536)
---
 .../iceberg/flink/sink/IcebergFilesCommitter.java  |  47 +++--
 .../flink/sink/TestIcebergFilesCommitter.java      | 225 ++++++++++++++++-----
 .../iceberg/flink/sink/IcebergFilesCommitter.java  |  47 +++--
 .../flink/sink/TestIcebergFilesCommitter.java      | 225 ++++++++++++++++-----
 4 files changed, 422 insertions(+), 122 deletions(-)

diff --git 
a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
 
b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index 9c5bbf89e3..b686a76c98 100644
--- 
a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++ 
b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -69,6 +69,7 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
 
   private static final Logger LOG = 
LoggerFactory.getLogger(IcebergFilesCommitter.class);
   private static final String FLINK_JOB_ID = "flink.job-id";
+  private static final String OPERATOR_ID = "flink.operator-id";
 
   // The max checkpoint id we've committed to iceberg table. As the flink's 
checkpoint is always
   // increasing, so we could correctly commit all the data files whose 
checkpoint id is greater than
@@ -97,6 +98,7 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
 
   // It will have an unique identifier for one job.
   private transient String flinkJobId;
+  private transient String operatorUniqueId;
   private transient Table table;
   private transient IcebergFilesCommitterMetrics committerMetrics;
   private transient ManifestOutputFileFactory manifestOutputFileFactory;
@@ -134,6 +136,7 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
   public void initializeState(StateInitializationContext context) throws 
Exception {
     super.initializeState(context);
     this.flinkJobId = 
getContainingTask().getEnvironment().getJobID().toString();
+    this.operatorUniqueId = getRuntimeContext().getOperatorUniqueID();
 
     // Open the table loader and load the table.
     this.tableLoader.open();
@@ -147,7 +150,6 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
 
     int subTaskId = getRuntimeContext().getIndexOfThisSubtask();
     int attemptId = getRuntimeContext().getAttemptNumber();
-    String operatorUniqueId = getRuntimeContext().getOperatorUniqueID();
     this.manifestOutputFileFactory =
         FlinkManifestUtil.createOutputFileFactory(
             table, flinkJobId, operatorUniqueId, subTaskId, attemptId);
@@ -176,7 +178,8 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
       // flink job even if it's restored from a snapshot created by another 
different flink job, so
       // it's safe to assign the max committed checkpoint id from restored 
flink job to the current
       // flink job.
-      this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, 
restoredFlinkJobId);
+      this.maxCommittedCheckpointId =
+          getMaxCommittedCheckpointId(table, restoredFlinkJobId, 
operatorUniqueId);
 
       NavigableMap<Long, byte[]> uncommittedDataFiles =
           Maps.newTreeMap(checkpointsState.get().iterator().next())
@@ -184,7 +187,8 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
       if (!uncommittedDataFiles.isEmpty()) {
         // Committed all uncommitted data files from the old flink job to 
iceberg table.
         long maxUncommittedCheckpointId = uncommittedDataFiles.lastKey();
-        commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId, 
maxUncommittedCheckpointId);
+        commitUpToCheckpoint(
+            uncommittedDataFiles, restoredFlinkJobId, operatorUniqueId, 
maxUncommittedCheckpointId);
       }
     }
   }
@@ -226,7 +230,7 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
     // the files,
     // Besides, we need to maintain the max-committed-checkpoint-id to be 
increasing.
     if (checkpointId > maxCommittedCheckpointId) {
-      commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
+      commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, 
operatorUniqueId, checkpointId);
       this.maxCommittedCheckpointId = checkpointId;
     } else {
       LOG.info(
@@ -237,7 +241,10 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
   }
 
   private void commitUpToCheckpoint(
-      NavigableMap<Long, byte[]> deltaManifestsMap, String newFlinkJobId, long 
checkpointId)
+      NavigableMap<Long, byte[]> deltaManifestsMap,
+      String newFlinkJobId,
+      String operatorId,
+      long checkpointId)
       throws IOException {
     NavigableMap<Long, byte[]> pendingMap = 
deltaManifestsMap.headMap(checkpointId, true);
     List<ManifestFile> manifests = Lists.newArrayList();
@@ -257,7 +264,7 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
     }
 
     CommitSummary summary = new CommitSummary(pendingResults);
-    commitPendingResult(pendingResults, summary, newFlinkJobId, checkpointId);
+    commitPendingResult(pendingResults, summary, newFlinkJobId, operatorId, 
checkpointId);
     committerMetrics.updateCommitSummary(summary);
     pendingMap.clear();
     deleteCommittedManifests(manifests, newFlinkJobId, checkpointId);
@@ -267,14 +274,15 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
       NavigableMap<Long, WriteResult> pendingResults,
       CommitSummary summary,
       String newFlinkJobId,
+      String operatorId,
       long checkpointId) {
     long totalFiles = summary.dataFilesCount() + summary.deleteFilesCount();
     continuousEmptyCheckpoints = totalFiles == 0 ? continuousEmptyCheckpoints 
+ 1 : 0;
     if (totalFiles != 0 || continuousEmptyCheckpoints % 
maxContinuousEmptyCommits == 0) {
       if (replacePartitions) {
-        replacePartitions(pendingResults, summary, newFlinkJobId, 
checkpointId);
+        replacePartitions(pendingResults, summary, newFlinkJobId, operatorId, 
checkpointId);
       } else {
-        commitDeltaTxn(pendingResults, summary, newFlinkJobId, checkpointId);
+        commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId, 
checkpointId);
       }
       continuousEmptyCheckpoints = 0;
     }
@@ -305,6 +313,7 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
       NavigableMap<Long, WriteResult> pendingResults,
       CommitSummary summary,
       String newFlinkJobId,
+      String operatorId,
       long checkpointId) {
     Preconditions.checkState(
         summary.deleteFilesCount() == 0, "Cannot overwrite partitions with 
delete files.");
@@ -317,13 +326,19 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
     }
 
     commitOperation(
-        dynamicOverwrite, summary, "dynamic partition overwrite", 
newFlinkJobId, checkpointId);
+        dynamicOverwrite,
+        summary,
+        "dynamic partition overwrite",
+        newFlinkJobId,
+        operatorId,
+        checkpointId);
   }
 
   private void commitDeltaTxn(
       NavigableMap<Long, WriteResult> pendingResults,
       CommitSummary summary,
       String newFlinkJobId,
+      String operatorId,
       long checkpointId) {
     if (summary.deleteFilesCount() == 0) {
       // To be compatible with iceberg format V1.
@@ -334,7 +349,7 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
             "Should have no referenced data files for append.");
         Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile);
       }
-      commitOperation(appendFiles, summary, "append", newFlinkJobId, 
checkpointId);
+      commitOperation(appendFiles, summary, "append", newFlinkJobId, 
operatorId, checkpointId);
     } else {
       // To be compatible with iceberg format V2.
       for (Map.Entry<Long, WriteResult> e : pendingResults.entrySet()) {
@@ -355,7 +370,7 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
 
         Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
         Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes);
-        commitOperation(rowDelta, summary, "rowDelta", newFlinkJobId, 
e.getKey());
+        commitOperation(rowDelta, summary, "rowDelta", newFlinkJobId, 
operatorId, e.getKey());
       }
     }
   }
@@ -365,6 +380,7 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
       CommitSummary summary,
       String description,
       String newFlinkJobId,
+      String operatorId,
       long checkpointId) {
     LOG.info(
         "Committing {} for checkpoint {} to table {} with summary: {}",
@@ -377,6 +393,7 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
     // used by the sink.
     operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId));
     operation.set(FLINK_JOB_ID, newFlinkJobId);
+    operation.set(OPERATOR_ID, operatorId);
 
     long startNano = System.nanoTime();
     operation.commit(); // abort is automatically called if this fails.
@@ -402,7 +419,7 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
     dataFilesPerCheckpoint.put(currentCheckpointId, 
writeToManifest(currentCheckpointId));
     writeResultsOfCurrentCkpt.clear();
 
-    commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, 
currentCheckpointId);
+    commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId, 
currentCheckpointId);
   }
 
   /**
@@ -454,14 +471,16 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
     return new ListStateDescriptor<>("iceberg-files-committer-state", 
sortedMapTypeInfo);
   }
 
-  static long getMaxCommittedCheckpointId(Table table, String flinkJobId) {
+  static long getMaxCommittedCheckpointId(Table table, String flinkJobId, 
String operatorId) {
     Snapshot snapshot = table.currentSnapshot();
     long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
 
     while (snapshot != null) {
       Map<String, String> summary = snapshot.summary();
       String snapshotFlinkJobId = summary.get(FLINK_JOB_ID);
-      if (flinkJobId.equals(snapshotFlinkJobId)) {
+      String snapshotOperatorId = summary.get(OPERATOR_ID);
+      if (flinkJobId.equals(snapshotFlinkJobId)
+          && (snapshotOperatorId == null || 
snapshotOperatorId.equals(operatorId))) {
         String value = summary.get(MAX_COMMITTED_CHECKPOINT_ID);
         if (value != null) {
           lastCommittedCheckpointId = Long.parseLong(value);
diff --git 
a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
 
b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
index ca643ad532..c4f93f0ec2 100644
--- 
a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
+++ 
b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
@@ -32,6 +32,7 @@ import java.util.stream.Collectors;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
@@ -121,13 +122,15 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
     long checkpointId = 0;
     long timestamp = 0;
     JobID jobId = new JobID();
+    OperatorID operatorId;
     try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = 
createStreamSink(jobId)) {
       harness.setup();
       harness.open();
+      operatorId = harness.getOperator().getOperatorID();
 
       SimpleDataUtil.assertTableRows(table, Lists.newArrayList());
       assertSnapshotSize(0);
-      assertMaxCommittedCheckpointId(jobId, -1L);
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
 
       // It's better to advance the max-committed-checkpoint-id in iceberg 
snapshot, so that the
       // future flink job
@@ -140,7 +143,7 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
         assertFlinkManifests(0);
 
         assertSnapshotSize(i);
-        assertMaxCommittedCheckpointId(jobId, checkpointId);
+        assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
       }
     }
   }
@@ -183,9 +186,12 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
     long timestamp = 0;
 
     JobID jobID = new JobID();
+    OperatorID operatorId;
     try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = 
createStreamSink(jobID)) {
       harness.setup();
       harness.open();
+      operatorId = harness.getOperator().getOperatorID();
+
       assertSnapshotSize(0);
 
       List<RowData> rows = Lists.newArrayListWithExpectedSize(3);
@@ -203,7 +209,7 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
 
         SimpleDataUtil.assertTableRows(table, ImmutableList.copyOf(rows));
         assertSnapshotSize(i);
-        assertMaxCommittedCheckpointId(jobID, i);
+        assertMaxCommittedCheckpointId(jobID, operatorId, i);
         Assert.assertEquals(
             TestIcebergFilesCommitter.class.getName(),
             table.currentSnapshot().summary().get("flink.test"));
@@ -221,17 +227,19 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
     long timestamp = 0;
 
     JobID jobId = new JobID();
+    OperatorID operatorId;
     try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = 
createStreamSink(jobId)) {
       harness.setup();
       harness.open();
+      operatorId = harness.getOperator().getOperatorID();
 
-      assertMaxCommittedCheckpointId(jobId, -1L);
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
 
       RowData row1 = SimpleDataUtil.createRowData(1, "hello");
       DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row1));
 
       harness.processElement(of(dataFile1), ++timestamp);
-      assertMaxCommittedCheckpointId(jobId, -1L);
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
 
       // 1. snapshotState for checkpoint#1
       long firstCheckpointId = 1;
@@ -241,7 +249,7 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
       RowData row2 = SimpleDataUtil.createRowData(2, "world");
       DataFile dataFile2 = writeDataFile("data-2", ImmutableList.of(row2));
       harness.processElement(of(dataFile2), ++timestamp);
-      assertMaxCommittedCheckpointId(jobId, -1L);
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
 
       // 2. snapshotState for checkpoint#2
       long secondCheckpointId = 2;
@@ -251,13 +259,13 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
       // 3. notifyCheckpointComplete for checkpoint#1
       harness.notifyOfCompletedCheckpoint(firstCheckpointId);
       SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1));
-      assertMaxCommittedCheckpointId(jobId, firstCheckpointId);
+      assertMaxCommittedCheckpointId(jobId, operatorId, firstCheckpointId);
       assertFlinkManifests(1);
 
       // 4. notifyCheckpointComplete for checkpoint#2
       harness.notifyOfCompletedCheckpoint(secondCheckpointId);
       SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2));
-      assertMaxCommittedCheckpointId(jobId, secondCheckpointId);
+      assertMaxCommittedCheckpointId(jobId, operatorId, secondCheckpointId);
       assertFlinkManifests(0);
     }
   }
@@ -272,17 +280,19 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
     long timestamp = 0;
 
     JobID jobId = new JobID();
+    OperatorID operatorId;
     try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = 
createStreamSink(jobId)) {
       harness.setup();
       harness.open();
+      operatorId = harness.getOperator().getOperatorID();
 
-      assertMaxCommittedCheckpointId(jobId, -1L);
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
 
       RowData row1 = SimpleDataUtil.createRowData(1, "hello");
       DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row1));
 
       harness.processElement(of(dataFile1), ++timestamp);
-      assertMaxCommittedCheckpointId(jobId, -1L);
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
 
       // 1. snapshotState for checkpoint#1
       long firstCheckpointId = 1;
@@ -292,7 +302,7 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
       RowData row2 = SimpleDataUtil.createRowData(2, "world");
       DataFile dataFile2 = writeDataFile("data-2", ImmutableList.of(row2));
       harness.processElement(of(dataFile2), ++timestamp);
-      assertMaxCommittedCheckpointId(jobId, -1L);
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
 
       // 2. snapshotState for checkpoint#2
       long secondCheckpointId = 2;
@@ -302,13 +312,13 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
       // 3. notifyCheckpointComplete for checkpoint#2
       harness.notifyOfCompletedCheckpoint(secondCheckpointId);
       SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2));
-      assertMaxCommittedCheckpointId(jobId, secondCheckpointId);
+      assertMaxCommittedCheckpointId(jobId, operatorId, secondCheckpointId);
       assertFlinkManifests(0);
 
       // 4. notifyCheckpointComplete for checkpoint#1
       harness.notifyOfCompletedCheckpoint(firstCheckpointId);
       SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2));
-      assertMaxCommittedCheckpointId(jobId, secondCheckpointId);
+      assertMaxCommittedCheckpointId(jobId, operatorId, secondCheckpointId);
       assertFlinkManifests(0);
     }
   }
@@ -321,12 +331,14 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
     OperatorSubtaskState snapshot;
 
     JobID jobId = new JobID();
+    OperatorID operatorId;
     try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = 
createStreamSink(jobId)) {
       harness.setup();
       harness.open();
+      operatorId = harness.getOperator().getOperatorID();
 
       assertSnapshotSize(0);
-      assertMaxCommittedCheckpointId(jobId, -1L);
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
 
       RowData row = SimpleDataUtil.createRowData(1, "hello");
       expectedRows.add(row);
@@ -341,18 +353,19 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
 
       SimpleDataUtil.assertTableRows(table, ImmutableList.of(row));
       assertSnapshotSize(1);
-      assertMaxCommittedCheckpointId(jobId, checkpointId);
+      assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
     }
 
     // Restore from the given snapshot
     try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = 
createStreamSink(jobId)) {
+      harness.getStreamConfig().setOperatorID(operatorId);
       harness.setup();
       harness.initializeState(snapshot);
       harness.open();
 
       SimpleDataUtil.assertTableRows(table, expectedRows);
       assertSnapshotSize(1);
-      assertMaxCommittedCheckpointId(jobId, checkpointId);
+      assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
 
       RowData row = SimpleDataUtil.createRowData(2, "world");
       expectedRows.add(row);
@@ -367,7 +380,7 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
 
       SimpleDataUtil.assertTableRows(table, expectedRows);
       assertSnapshotSize(2);
-      assertMaxCommittedCheckpointId(jobId, checkpointId);
+      assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
     }
   }
 
@@ -381,12 +394,14 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
     OperatorSubtaskState snapshot;
     List<RowData> expectedRows = Lists.newArrayList();
     JobID jobId = new JobID();
+    OperatorID operatorId;
     try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = 
createStreamSink(jobId)) {
       harness.setup();
       harness.open();
+      operatorId = harness.getOperator().getOperatorID();
 
       assertSnapshotSize(0);
-      assertMaxCommittedCheckpointId(jobId, -1L);
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
 
       RowData row = SimpleDataUtil.createRowData(1, "hello");
       expectedRows.add(row);
@@ -395,11 +410,12 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
 
       snapshot = harness.snapshot(++checkpointId, ++timestamp);
       SimpleDataUtil.assertTableRows(table, ImmutableList.of());
-      assertMaxCommittedCheckpointId(jobId, -1L);
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
       assertFlinkManifests(1);
     }
 
     try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = 
createStreamSink(jobId)) {
+      harness.getStreamConfig().setOperatorID(operatorId);
       harness.setup();
       harness.initializeState(snapshot);
       harness.open();
@@ -409,7 +425,7 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
       assertFlinkManifests(0);
 
       SimpleDataUtil.assertTableRows(table, expectedRows);
-      assertMaxCommittedCheckpointId(jobId, checkpointId);
+      assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
 
       harness.snapshot(++checkpointId, ++timestamp);
       // Did not write any new record, so it won't generate new manifest.
@@ -420,7 +436,7 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
 
       SimpleDataUtil.assertTableRows(table, expectedRows);
       assertSnapshotSize(2);
-      assertMaxCommittedCheckpointId(jobId, checkpointId);
+      assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
 
       RowData row = SimpleDataUtil.createRowData(2, "world");
       expectedRows.add(row);
@@ -438,13 +454,14 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
       harness.setup();
       harness.initializeState(snapshot);
       harness.open();
+      operatorId = harness.getOperator().getOperatorID();
 
       // All flink manifests should be cleaned because it has committed the 
unfinished iceberg
       // transaction.
       assertFlinkManifests(0);
 
-      assertMaxCommittedCheckpointId(newJobId, -1);
-      assertMaxCommittedCheckpointId(jobId, checkpointId);
+      assertMaxCommittedCheckpointId(newJobId, operatorId, -1);
+      assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
       SimpleDataUtil.assertTableRows(table, expectedRows);
       assertSnapshotSize(3);
 
@@ -461,7 +478,7 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
 
       SimpleDataUtil.assertTableRows(table, expectedRows);
       assertSnapshotSize(4);
-      assertMaxCommittedCheckpointId(newJobId, checkpointId);
+      assertMaxCommittedCheckpointId(newJobId, operatorId, checkpointId);
     }
   }
 
@@ -473,13 +490,15 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
     List<RowData> tableRows = Lists.newArrayList();
 
     JobID oldJobId = new JobID();
+    OperatorID oldOperatorId;
     try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness =
         createStreamSink(oldJobId)) {
       harness.setup();
       harness.open();
+      oldOperatorId = harness.getOperator().getOperatorID();
 
       assertSnapshotSize(0);
-      assertMaxCommittedCheckpointId(oldJobId, -1L);
+      assertMaxCommittedCheckpointId(oldJobId, oldOperatorId, -1L);
 
       for (int i = 1; i <= 3; i++) {
         rows.add(SimpleDataUtil.createRowData(i, "hello" + i));
@@ -495,7 +514,7 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
 
         SimpleDataUtil.assertTableRows(table, tableRows);
         assertSnapshotSize(i);
-        assertMaxCommittedCheckpointId(oldJobId, checkpointId);
+        assertMaxCommittedCheckpointId(oldJobId, oldOperatorId, checkpointId);
       }
     }
 
@@ -503,14 +522,16 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
     checkpointId = 0;
     timestamp = 0;
     JobID newJobId = new JobID();
+    OperatorID newOperatorId;
     try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness =
         createStreamSink(newJobId)) {
       harness.setup();
       harness.open();
+      newOperatorId = harness.getOperator().getOperatorID();
 
       assertSnapshotSize(3);
-      assertMaxCommittedCheckpointId(oldJobId, 3);
-      assertMaxCommittedCheckpointId(newJobId, -1);
+      assertMaxCommittedCheckpointId(oldJobId, oldOperatorId, 3);
+      assertMaxCommittedCheckpointId(newJobId, newOperatorId, -1);
 
       rows.add(SimpleDataUtil.createRowData(2, "world"));
       tableRows.addAll(rows);
@@ -524,7 +545,7 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
       assertFlinkManifests(0);
       SimpleDataUtil.assertTableRows(table, tableRows);
       assertSnapshotSize(4);
-      assertMaxCommittedCheckpointId(newJobId, checkpointId);
+      assertMaxCommittedCheckpointId(newJobId, newOperatorId, checkpointId);
     }
   }
 
@@ -534,16 +555,20 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
     List<RowData> tableRows = Lists.newArrayList();
 
     JobID[] jobs = new JobID[] {new JobID(), new JobID(), new JobID()};
+    OperatorID[] operatorIds =
+        new OperatorID[] {new OperatorID(), new OperatorID(), new 
OperatorID()};
     for (int i = 0; i < 20; i++) {
       int jobIndex = i % 3;
       int checkpointId = i / 3;
       JobID jobId = jobs[jobIndex];
+      OperatorID operatorId = operatorIds[jobIndex];
       try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = 
createStreamSink(jobId)) {
+        harness.getStreamConfig().setOperatorID(operatorId);
         harness.setup();
         harness.open();
 
         assertSnapshotSize(i);
-        assertMaxCommittedCheckpointId(jobId, checkpointId == 0 ? -1 : 
checkpointId);
+        assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId == 0 ? 
-1 : checkpointId);
 
         List<RowData> rows = 
Lists.newArrayList(SimpleDataUtil.createRowData(i, "word-" + i));
         tableRows.addAll(rows);
@@ -557,21 +582,121 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
         assertFlinkManifests(0);
         SimpleDataUtil.assertTableRows(table, tableRows);
         assertSnapshotSize(i + 1);
-        assertMaxCommittedCheckpointId(jobId, checkpointId + 1);
+        assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId + 1);
       }
     }
   }
 
+  @Test
+  public void testMultipleSinksRecoveryFromValidSnapshot() throws Exception {
+    long checkpointId = 0;
+    long timestamp = 0;
+    List<RowData> expectedRows = Lists.newArrayList();
+    OperatorSubtaskState snapshot1;
+    OperatorSubtaskState snapshot2;
+
+    JobID jobId = new JobID();
+    OperatorID operatorId1 = new OperatorID();
+    OperatorID operatorId2 = new OperatorID();
+    try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness1 = 
createStreamSink(jobId);
+        OneInputStreamOperatorTestHarness<WriteResult, Void> harness2 = 
createStreamSink(jobId)) {
+      harness1.getStreamConfig().setOperatorID(operatorId1);
+      harness1.setup();
+      harness1.open();
+      harness2.getStreamConfig().setOperatorID(operatorId2);
+      harness2.setup();
+      harness2.open();
+
+      assertSnapshotSize(0);
+      assertMaxCommittedCheckpointId(jobId, operatorId1, -1L);
+      assertMaxCommittedCheckpointId(jobId, operatorId2, -1L);
+
+      RowData row1 = SimpleDataUtil.createRowData(1, "hello1");
+      expectedRows.add(row1);
+      DataFile dataFile1 = writeDataFile("data-1-1", ImmutableList.of(row1));
+
+      harness1.processElement(of(dataFile1), ++timestamp);
+      snapshot1 = harness1.snapshot(++checkpointId, ++timestamp);
+
+      RowData row2 = SimpleDataUtil.createRowData(1, "hello2");
+      expectedRows.add(row2);
+      DataFile dataFile2 = writeDataFile("data-1-2", ImmutableList.of(row2));
+
+      harness2.processElement(of(dataFile2), ++timestamp);
+      snapshot2 = harness2.snapshot(checkpointId, ++timestamp);
+      assertFlinkManifests(2);
+
+      // Only notify one of the committers
+      harness1.notifyOfCompletedCheckpoint(checkpointId);
+      assertFlinkManifests(1);
+
+      // Only the first row is committed at this point
+      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1));
+      assertSnapshotSize(1);
+      assertMaxCommittedCheckpointId(jobId, operatorId1, checkpointId);
+      assertMaxCommittedCheckpointId(jobId, operatorId2, -1);
+    }
+
+    // Restore from the given snapshot
+    try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness1 = 
createStreamSink(jobId);
+        OneInputStreamOperatorTestHarness<WriteResult, Void> harness2 = 
createStreamSink(jobId)) {
+      harness1.getStreamConfig().setOperatorID(operatorId1);
+      harness1.setup();
+      harness1.initializeState(snapshot1);
+      harness1.open();
+
+      harness2.getStreamConfig().setOperatorID(operatorId2);
+      harness2.setup();
+      harness2.initializeState(snapshot2);
+      harness2.open();
+
+      // All flink manifests should be cleaned because it has committed the 
unfinished iceberg
+      // transaction.
+      assertFlinkManifests(0);
+
+      SimpleDataUtil.assertTableRows(table, expectedRows);
+      assertSnapshotSize(2);
+      assertMaxCommittedCheckpointId(jobId, operatorId1, checkpointId);
+      assertMaxCommittedCheckpointId(jobId, operatorId2, checkpointId);
+
+      RowData row1 = SimpleDataUtil.createRowData(2, "world1");
+      expectedRows.add(row1);
+      DataFile dataFile1 = writeDataFile("data-2-1", ImmutableList.of(row1));
+
+      harness1.processElement(of(dataFile1), ++timestamp);
+      harness1.snapshot(++checkpointId, ++timestamp);
+
+      RowData row2 = SimpleDataUtil.createRowData(2, "world2");
+      expectedRows.add(row2);
+      DataFile dataFile2 = writeDataFile("data-2-2", ImmutableList.of(row2));
+      harness2.processElement(of(dataFile2), ++timestamp);
+      harness2.snapshot(checkpointId, ++timestamp);
+
+      assertFlinkManifests(2);
+
+      harness1.notifyOfCompletedCheckpoint(checkpointId);
+      harness2.notifyOfCompletedCheckpoint(checkpointId);
+      assertFlinkManifests(0);
+
+      SimpleDataUtil.assertTableRows(table, expectedRows);
+      assertSnapshotSize(4);
+      assertMaxCommittedCheckpointId(jobId, operatorId1, checkpointId);
+      assertMaxCommittedCheckpointId(jobId, operatorId2, checkpointId);
+    }
+  }
+
   @Test
   public void testBoundedStream() throws Exception {
     JobID jobId = new JobID();
+    OperatorID operatorId;
     try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = 
createStreamSink(jobId)) {
       harness.setup();
       harness.open();
+      operatorId = harness.getOperator().getOperatorID();
 
       assertFlinkManifests(0);
       assertSnapshotSize(0);
-      assertMaxCommittedCheckpointId(jobId, -1L);
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
 
       List<RowData> tableRows = 
Lists.newArrayList(SimpleDataUtil.createRowData(1, "word-1"));
 
@@ -582,7 +707,7 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
       assertFlinkManifests(0);
       SimpleDataUtil.assertTableRows(table, tableRows);
       assertSnapshotSize(1);
-      assertMaxCommittedCheckpointId(jobId, Long.MAX_VALUE);
+      assertMaxCommittedCheckpointId(jobId, operatorId, Long.MAX_VALUE);
       Assert.assertEquals(
           TestIcebergFilesCommitter.class.getName(),
           table.currentSnapshot().summary().get("flink.test"));
@@ -595,23 +720,24 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
     final long checkpoint = 10;
 
     JobID jobId = new JobID();
+    OperatorID operatorId;
     try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = 
createStreamSink(jobId)) {
       harness.setup();
       harness.open();
+      operatorId = harness.getOperator().getOperatorID();
 
-      assertMaxCommittedCheckpointId(jobId, -1L);
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
 
       RowData row1 = SimpleDataUtil.createRowData(1, "hello");
       DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row1));
 
       harness.processElement(of(dataFile1), ++timestamp);
-      assertMaxCommittedCheckpointId(jobId, -1L);
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
 
       // 1. snapshotState for checkpoint#1
       harness.snapshot(checkpoint, ++timestamp);
       List<Path> manifestPaths = assertFlinkManifests(1);
       Path manifestPath = manifestPaths.get(0);
-      String operatorId = 
harness.getOneInputOperator().getOperatorID().toString();
       Assert.assertEquals(
           "File name should have the expected pattern.",
           String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0, 
checkpoint, 1),
@@ -626,7 +752,7 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
       // 3. notifyCheckpointComplete for checkpoint#1
       harness.notifyOfCompletedCheckpoint(checkpoint);
       SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1));
-      assertMaxCommittedCheckpointId(jobId, checkpoint);
+      assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
       assertFlinkManifests(0);
     }
   }
@@ -639,24 +765,25 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
     long checkpoint = 10;
 
     JobID jobId = new JobID();
+    OperatorID operatorId;
     FileAppenderFactory<RowData> appenderFactory = 
createDeletableAppenderFactory();
 
     try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = 
createStreamSink(jobId)) {
       harness.setup();
       harness.open();
+      operatorId = harness.getOperator().getOperatorID();
 
-      assertMaxCommittedCheckpointId(jobId, -1L);
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
 
       RowData row1 = SimpleDataUtil.createInsert(1, "aaa");
       DataFile dataFile1 = writeDataFile("data-file-1", 
ImmutableList.of(row1));
       harness.processElement(of(dataFile1), ++timestamp);
-      assertMaxCommittedCheckpointId(jobId, -1L);
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
 
       // 1. snapshotState for checkpoint#1
       harness.snapshot(checkpoint, ++timestamp);
       List<Path> manifestPaths = assertFlinkManifests(1);
       Path manifestPath = manifestPaths.get(0);
-      String operatorId = 
harness.getOneInputOperator().getOperatorID().toString();
       Assert.assertEquals(
           "File name should have the expected pattern.",
           String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0, 
checkpoint, 1),
@@ -671,7 +798,7 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
       // 3. notifyCheckpointComplete for checkpoint#1
       harness.notifyOfCompletedCheckpoint(checkpoint);
       SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1));
-      assertMaxCommittedCheckpointId(jobId, checkpoint);
+      assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
       assertFlinkManifests(0);
 
       // 4. process both data files and delete files.
@@ -684,7 +811,7 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
       harness.processElement(
           
WriteResult.builder().addDataFiles(dataFile2).addDeleteFiles(deleteFile1).build(),
           ++timestamp);
-      assertMaxCommittedCheckpointId(jobId, checkpoint);
+      assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
 
       // 5. snapshotState for checkpoint#2
       harness.snapshot(++checkpoint, ++timestamp);
@@ -693,7 +820,7 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
       // 6. notifyCheckpointComplete for checkpoint#2
       harness.notifyOfCompletedCheckpoint(checkpoint);
       SimpleDataUtil.assertTableRows(table, ImmutableList.of(row2));
-      assertMaxCommittedCheckpointId(jobId, checkpoint);
+      assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
       assertFlinkManifests(0);
     }
   }
@@ -706,13 +833,15 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
     long checkpoint = 10;
 
     JobID jobId = new JobID();
+    OperatorID operatorId;
     FileAppenderFactory<RowData> appenderFactory = 
createDeletableAppenderFactory();
 
     try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = 
createStreamSink(jobId)) {
       harness.setup();
       harness.open();
+      operatorId = harness.getOperator().getOperatorID();
 
-      assertMaxCommittedCheckpointId(jobId, -1L);
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
 
       RowData insert1 = SimpleDataUtil.createInsert(1, "aaa");
       RowData insert2 = SimpleDataUtil.createInsert(2, "bbb");
@@ -742,7 +871,7 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
       // Notify the 2nd snapshot to complete.
       harness.notifyOfCompletedCheckpoint(checkpoint);
       SimpleDataUtil.assertTableRows(table, ImmutableList.of(insert1, 
insert4));
-      assertMaxCommittedCheckpointId(jobId, checkpoint);
+      assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
       assertFlinkManifests(0);
       Assert.assertEquals(
           "Should have committed 2 txn.", 2, 
ImmutableList.copyOf(table.snapshots()).size());
@@ -817,9 +946,11 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
         table, table.schema(), table.spec(), CONF, tablePath, 
format.addExtension(filename), rows);
   }
 
-  private void assertMaxCommittedCheckpointId(JobID jobID, long expectedId) {
+  private void assertMaxCommittedCheckpointId(JobID jobID, OperatorID 
operatorID, long expectedId) {
     table.refresh();
-    long actualId = IcebergFilesCommitter.getMaxCommittedCheckpointId(table, 
jobID.toString());
+    long actualId =
+        IcebergFilesCommitter.getMaxCommittedCheckpointId(
+            table, jobID.toString(), operatorID.toHexString());
     Assert.assertEquals(expectedId, actualId);
   }
 
diff --git 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index 93dc0049d1..d84e2cb706 100644
--- 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++ 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -69,6 +69,7 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
 
   private static final Logger LOG = 
LoggerFactory.getLogger(IcebergFilesCommitter.class);
   private static final String FLINK_JOB_ID = "flink.job-id";
+  private static final String OPERATOR_ID = "flink.operator-id";
 
   // The max checkpoint id we've committed to iceberg table. As the flink's 
checkpoint is always
   // increasing, so we could correctly commit all the data files whose 
checkpoint id is greater than
@@ -97,6 +98,7 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
 
   // It will have an unique identifier for one job.
   private transient String flinkJobId;
+  private transient String operatorUniqueId;
   private transient Table table;
   private transient IcebergFilesCommitterMetrics committerMetrics;
   private transient ManifestOutputFileFactory manifestOutputFileFactory;
@@ -134,6 +136,7 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
   public void initializeState(StateInitializationContext context) throws 
Exception {
     super.initializeState(context);
     this.flinkJobId = 
getContainingTask().getEnvironment().getJobID().toString();
+    this.operatorUniqueId = getRuntimeContext().getOperatorUniqueID();
 
     // Open the table loader and load the table.
     this.tableLoader.open();
@@ -147,7 +150,6 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
 
     int subTaskId = getRuntimeContext().getIndexOfThisSubtask();
     int attemptId = getRuntimeContext().getAttemptNumber();
-    String operatorUniqueId = getRuntimeContext().getOperatorUniqueID();
     this.manifestOutputFileFactory =
         FlinkManifestUtil.createOutputFileFactory(
             table, flinkJobId, operatorUniqueId, subTaskId, attemptId);
@@ -176,7 +178,8 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
       // flink job even if it's restored from a snapshot created by another 
different flink job, so
       // it's safe to assign the max committed checkpoint id from restored 
flink job to the current
       // flink job.
-      this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, 
restoredFlinkJobId);
+      this.maxCommittedCheckpointId =
+          getMaxCommittedCheckpointId(table, restoredFlinkJobId, 
operatorUniqueId);
 
       NavigableMap<Long, byte[]> uncommittedDataFiles =
           Maps.newTreeMap(checkpointsState.get().iterator().next())
@@ -184,7 +187,8 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
       if (!uncommittedDataFiles.isEmpty()) {
         // Committed all uncommitted data files from the old flink job to 
iceberg table.
         long maxUncommittedCheckpointId = uncommittedDataFiles.lastKey();
-        commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId, 
maxUncommittedCheckpointId);
+        commitUpToCheckpoint(
+            uncommittedDataFiles, restoredFlinkJobId, operatorUniqueId, 
maxUncommittedCheckpointId);
       }
     }
   }
@@ -226,7 +230,7 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
     // the files,
     // Besides, we need to maintain the max-committed-checkpoint-id to be 
increasing.
     if (checkpointId > maxCommittedCheckpointId) {
-      commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
+      commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, 
operatorUniqueId, checkpointId);
       this.maxCommittedCheckpointId = checkpointId;
     } else {
       LOG.info(
@@ -237,7 +241,10 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
   }
 
   private void commitUpToCheckpoint(
-      NavigableMap<Long, byte[]> deltaManifestsMap, String newFlinkJobId, long 
checkpointId)
+      NavigableMap<Long, byte[]> deltaManifestsMap,
+      String newFlinkJobId,
+      String operatorId,
+      long checkpointId)
       throws IOException {
     NavigableMap<Long, byte[]> pendingMap = 
deltaManifestsMap.headMap(checkpointId, true);
     List<ManifestFile> manifests = Lists.newArrayList();
@@ -257,7 +264,7 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
     }
 
     CommitSummary summary = new CommitSummary(pendingResults);
-    commitPendingResult(pendingResults, summary, newFlinkJobId, checkpointId);
+    commitPendingResult(pendingResults, summary, newFlinkJobId, operatorId, 
checkpointId);
     committerMetrics.updateCommitSummary(summary);
     pendingMap.clear();
     deleteCommittedManifests(manifests, newFlinkJobId, checkpointId);
@@ -267,14 +274,15 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
       NavigableMap<Long, WriteResult> pendingResults,
       CommitSummary summary,
       String newFlinkJobId,
+      String operatorId,
       long checkpointId) {
     long totalFiles = summary.dataFilesCount() + summary.deleteFilesCount();
     continuousEmptyCheckpoints = totalFiles == 0 ? continuousEmptyCheckpoints 
+ 1 : 0;
     if (totalFiles != 0 || continuousEmptyCheckpoints % 
maxContinuousEmptyCommits == 0) {
       if (replacePartitions) {
-        replacePartitions(pendingResults, summary, newFlinkJobId, 
checkpointId);
+        replacePartitions(pendingResults, summary, newFlinkJobId, operatorId, 
checkpointId);
       } else {
-        commitDeltaTxn(pendingResults, summary, newFlinkJobId, checkpointId);
+        commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId, 
checkpointId);
       }
       continuousEmptyCheckpoints = 0;
     } else {
@@ -307,6 +315,7 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
       NavigableMap<Long, WriteResult> pendingResults,
       CommitSummary summary,
       String newFlinkJobId,
+      String operatorId,
       long checkpointId) {
     Preconditions.checkState(
         summary.deleteFilesCount() == 0, "Cannot overwrite partitions with 
delete files.");
@@ -319,13 +328,19 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
     }
 
     commitOperation(
-        dynamicOverwrite, summary, "dynamic partition overwrite", 
newFlinkJobId, checkpointId);
+        dynamicOverwrite,
+        summary,
+        "dynamic partition overwrite",
+        newFlinkJobId,
+        operatorId,
+        checkpointId);
   }
 
   private void commitDeltaTxn(
       NavigableMap<Long, WriteResult> pendingResults,
       CommitSummary summary,
       String newFlinkJobId,
+      String operatorId,
       long checkpointId) {
     if (summary.deleteFilesCount() == 0) {
       // To be compatible with iceberg format V1.
@@ -336,7 +351,7 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
             "Should have no referenced data files for append.");
         Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile);
       }
-      commitOperation(appendFiles, summary, "append", newFlinkJobId, 
checkpointId);
+      commitOperation(appendFiles, summary, "append", newFlinkJobId, 
operatorId, checkpointId);
     } else {
       // To be compatible with iceberg format V2.
       for (Map.Entry<Long, WriteResult> e : pendingResults.entrySet()) {
@@ -357,7 +372,7 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
 
         Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
         Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes);
-        commitOperation(rowDelta, summary, "rowDelta", newFlinkJobId, 
e.getKey());
+        commitOperation(rowDelta, summary, "rowDelta", newFlinkJobId, 
operatorId, e.getKey());
       }
     }
   }
@@ -367,6 +382,7 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
       CommitSummary summary,
       String description,
       String newFlinkJobId,
+      String operatorId,
       long checkpointId) {
     LOG.info(
         "Committing {} for checkpoint {} to table {} with summary: {}",
@@ -379,6 +395,7 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
     // used by the sink.
     operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId));
     operation.set(FLINK_JOB_ID, newFlinkJobId);
+    operation.set(OPERATOR_ID, operatorId);
 
     long startNano = System.nanoTime();
     operation.commit(); // abort is automatically called if this fails.
@@ -404,7 +421,7 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
     dataFilesPerCheckpoint.put(currentCheckpointId, 
writeToManifest(currentCheckpointId));
     writeResultsOfCurrentCkpt.clear();
 
-    commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, 
currentCheckpointId);
+    commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId, 
currentCheckpointId);
   }
 
   /**
@@ -456,14 +473,16 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
     return new ListStateDescriptor<>("iceberg-files-committer-state", 
sortedMapTypeInfo);
   }
 
-  static long getMaxCommittedCheckpointId(Table table, String flinkJobId) {
+  static long getMaxCommittedCheckpointId(Table table, String flinkJobId, 
String operatorId) {
     Snapshot snapshot = table.currentSnapshot();
     long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
 
     while (snapshot != null) {
       Map<String, String> summary = snapshot.summary();
       String snapshotFlinkJobId = summary.get(FLINK_JOB_ID);
-      if (flinkJobId.equals(snapshotFlinkJobId)) {
+      String snapshotOperatorId = summary.get(OPERATOR_ID);
+      if (flinkJobId.equals(snapshotFlinkJobId)
+          && (snapshotOperatorId == null || 
snapshotOperatorId.equals(operatorId))) {
         String value = summary.get(MAX_COMMITTED_CHECKPOINT_ID);
         if (value != null) {
           lastCommittedCheckpointId = Long.parseLong(value);
diff --git 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
index 5a4cf6223c..66baaeb0e9 100644
--- 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
+++ 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
@@ -32,6 +32,7 @@ import java.util.stream.Collectors;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
@@ -118,13 +119,15 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
     long checkpointId = 0;
     long timestamp = 0;
     JobID jobId = new JobID();
+    OperatorID operatorId;
     try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = 
createStreamSink(jobId)) {
       harness.setup();
       harness.open();
+      operatorId = harness.getOperator().getOperatorID();
 
       SimpleDataUtil.assertTableRows(table, Lists.newArrayList());
       assertSnapshotSize(0);
-      assertMaxCommittedCheckpointId(jobId, -1L);
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
 
       // It's better to advance the max-committed-checkpoint-id in iceberg 
snapshot, so that the
       // future flink job
@@ -137,7 +140,7 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
         assertFlinkManifests(0);
 
         assertSnapshotSize(i);
-        assertMaxCommittedCheckpointId(jobId, checkpointId);
+        assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
       }
     }
   }
@@ -180,9 +183,12 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
     long timestamp = 0;
 
     JobID jobID = new JobID();
+    OperatorID operatorId;
     try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = 
createStreamSink(jobID)) {
       harness.setup();
       harness.open();
+      operatorId = harness.getOperator().getOperatorID();
+
       assertSnapshotSize(0);
 
       List<RowData> rows = Lists.newArrayListWithExpectedSize(3);
@@ -200,7 +206,7 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
 
         SimpleDataUtil.assertTableRows(table, ImmutableList.copyOf(rows));
         assertSnapshotSize(i);
-        assertMaxCommittedCheckpointId(jobID, i);
+        assertMaxCommittedCheckpointId(jobID, operatorId, i);
         Assert.assertEquals(
             TestIcebergFilesCommitter.class.getName(),
             table.currentSnapshot().summary().get("flink.test"));
@@ -218,17 +224,19 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
     long timestamp = 0;
 
     JobID jobId = new JobID();
+    OperatorID operatorId;
     try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = 
createStreamSink(jobId)) {
       harness.setup();
       harness.open();
+      operatorId = harness.getOperator().getOperatorID();
 
-      assertMaxCommittedCheckpointId(jobId, -1L);
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
 
       RowData row1 = SimpleDataUtil.createRowData(1, "hello");
       DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row1));
 
       harness.processElement(of(dataFile1), ++timestamp);
-      assertMaxCommittedCheckpointId(jobId, -1L);
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
 
       // 1. snapshotState for checkpoint#1
       long firstCheckpointId = 1;
@@ -238,7 +246,7 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
       RowData row2 = SimpleDataUtil.createRowData(2, "world");
       DataFile dataFile2 = writeDataFile("data-2", ImmutableList.of(row2));
       harness.processElement(of(dataFile2), ++timestamp);
-      assertMaxCommittedCheckpointId(jobId, -1L);
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
 
       // 2. snapshotState for checkpoint#2
       long secondCheckpointId = 2;
@@ -248,13 +256,13 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
       // 3. notifyCheckpointComplete for checkpoint#1
       harness.notifyOfCompletedCheckpoint(firstCheckpointId);
       SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1));
-      assertMaxCommittedCheckpointId(jobId, firstCheckpointId);
+      assertMaxCommittedCheckpointId(jobId, operatorId, firstCheckpointId);
       assertFlinkManifests(1);
 
       // 4. notifyCheckpointComplete for checkpoint#2
       harness.notifyOfCompletedCheckpoint(secondCheckpointId);
       SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2));
-      assertMaxCommittedCheckpointId(jobId, secondCheckpointId);
+      assertMaxCommittedCheckpointId(jobId, operatorId, secondCheckpointId);
       assertFlinkManifests(0);
     }
   }
@@ -269,17 +277,19 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
     long timestamp = 0;
 
     JobID jobId = new JobID();
+    OperatorID operatorId;
     try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = 
createStreamSink(jobId)) {
       harness.setup();
       harness.open();
+      operatorId = harness.getOperator().getOperatorID();
 
-      assertMaxCommittedCheckpointId(jobId, -1L);
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
 
       RowData row1 = SimpleDataUtil.createRowData(1, "hello");
       DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row1));
 
       harness.processElement(of(dataFile1), ++timestamp);
-      assertMaxCommittedCheckpointId(jobId, -1L);
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
 
       // 1. snapshotState for checkpoint#1
       long firstCheckpointId = 1;
@@ -289,7 +299,7 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
       RowData row2 = SimpleDataUtil.createRowData(2, "world");
       DataFile dataFile2 = writeDataFile("data-2", ImmutableList.of(row2));
       harness.processElement(of(dataFile2), ++timestamp);
-      assertMaxCommittedCheckpointId(jobId, -1L);
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
 
       // 2. snapshotState for checkpoint#2
       long secondCheckpointId = 2;
@@ -299,13 +309,13 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
       // 3. notifyCheckpointComplete for checkpoint#2
       harness.notifyOfCompletedCheckpoint(secondCheckpointId);
       SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2));
-      assertMaxCommittedCheckpointId(jobId, secondCheckpointId);
+      assertMaxCommittedCheckpointId(jobId, operatorId, secondCheckpointId);
       assertFlinkManifests(0);
 
       // 4. notifyCheckpointComplete for checkpoint#1
       harness.notifyOfCompletedCheckpoint(firstCheckpointId);
       SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2));
-      assertMaxCommittedCheckpointId(jobId, secondCheckpointId);
+      assertMaxCommittedCheckpointId(jobId, operatorId, secondCheckpointId);
       assertFlinkManifests(0);
     }
   }
@@ -318,12 +328,14 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
     OperatorSubtaskState snapshot;
 
     JobID jobId = new JobID();
+    OperatorID operatorId;
     try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = 
createStreamSink(jobId)) {
       harness.setup();
       harness.open();
+      operatorId = harness.getOperator().getOperatorID();
 
       assertSnapshotSize(0);
-      assertMaxCommittedCheckpointId(jobId, -1L);
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
 
       RowData row = SimpleDataUtil.createRowData(1, "hello");
       expectedRows.add(row);
@@ -338,18 +350,19 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
 
       SimpleDataUtil.assertTableRows(table, ImmutableList.of(row));
       assertSnapshotSize(1);
-      assertMaxCommittedCheckpointId(jobId, checkpointId);
+      assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
     }
 
     // Restore from the given snapshot
     try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = 
createStreamSink(jobId)) {
+      harness.getStreamConfig().setOperatorID(operatorId);
       harness.setup();
       harness.initializeState(snapshot);
       harness.open();
 
       SimpleDataUtil.assertTableRows(table, expectedRows);
       assertSnapshotSize(1);
-      assertMaxCommittedCheckpointId(jobId, checkpointId);
+      assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
 
       RowData row = SimpleDataUtil.createRowData(2, "world");
       expectedRows.add(row);
@@ -364,7 +377,7 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
 
       SimpleDataUtil.assertTableRows(table, expectedRows);
       assertSnapshotSize(2);
-      assertMaxCommittedCheckpointId(jobId, checkpointId);
+      assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
     }
   }
 
@@ -378,12 +391,14 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
     OperatorSubtaskState snapshot;
     List<RowData> expectedRows = Lists.newArrayList();
     JobID jobId = new JobID();
+    OperatorID operatorId;
     try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = 
createStreamSink(jobId)) {
       harness.setup();
       harness.open();
+      operatorId = harness.getOperator().getOperatorID();
 
       assertSnapshotSize(0);
-      assertMaxCommittedCheckpointId(jobId, -1L);
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
 
       RowData row = SimpleDataUtil.createRowData(1, "hello");
       expectedRows.add(row);
@@ -392,11 +407,12 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
 
       snapshot = harness.snapshot(++checkpointId, ++timestamp);
       SimpleDataUtil.assertTableRows(table, ImmutableList.of());
-      assertMaxCommittedCheckpointId(jobId, -1L);
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
       assertFlinkManifests(1);
     }
 
     try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = 
createStreamSink(jobId)) {
+      harness.getStreamConfig().setOperatorID(operatorId);
       harness.setup();
       harness.initializeState(snapshot);
       harness.open();
@@ -406,7 +422,7 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
       assertFlinkManifests(0);
 
       SimpleDataUtil.assertTableRows(table, expectedRows);
-      assertMaxCommittedCheckpointId(jobId, checkpointId);
+      assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
 
       harness.snapshot(++checkpointId, ++timestamp);
       // Did not write any new record, so it won't generate new manifest.
@@ -417,7 +433,7 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
 
       SimpleDataUtil.assertTableRows(table, expectedRows);
       assertSnapshotSize(2);
-      assertMaxCommittedCheckpointId(jobId, checkpointId);
+      assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
 
       RowData row = SimpleDataUtil.createRowData(2, "world");
       expectedRows.add(row);
@@ -435,13 +451,14 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
       harness.setup();
       harness.initializeState(snapshot);
       harness.open();
+      operatorId = harness.getOperator().getOperatorID();
 
       // All flink manifests should be cleaned because it has committed the 
unfinished iceberg
       // transaction.
       assertFlinkManifests(0);
 
-      assertMaxCommittedCheckpointId(newJobId, -1);
-      assertMaxCommittedCheckpointId(jobId, checkpointId);
+      assertMaxCommittedCheckpointId(newJobId, operatorId, -1);
+      assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
       SimpleDataUtil.assertTableRows(table, expectedRows);
       assertSnapshotSize(3);
 
@@ -458,7 +475,7 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
 
       SimpleDataUtil.assertTableRows(table, expectedRows);
       assertSnapshotSize(4);
-      assertMaxCommittedCheckpointId(newJobId, checkpointId);
+      assertMaxCommittedCheckpointId(newJobId, operatorId, checkpointId);
     }
   }
 
@@ -470,13 +487,15 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
     List<RowData> tableRows = Lists.newArrayList();
 
     JobID oldJobId = new JobID();
+    OperatorID oldOperatorId;
     try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness =
         createStreamSink(oldJobId)) {
       harness.setup();
       harness.open();
+      oldOperatorId = harness.getOperator().getOperatorID();
 
       assertSnapshotSize(0);
-      assertMaxCommittedCheckpointId(oldJobId, -1L);
+      assertMaxCommittedCheckpointId(oldJobId, oldOperatorId, -1L);
 
       for (int i = 1; i <= 3; i++) {
         rows.add(SimpleDataUtil.createRowData(i, "hello" + i));
@@ -492,7 +511,7 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
 
         SimpleDataUtil.assertTableRows(table, tableRows);
         assertSnapshotSize(i);
-        assertMaxCommittedCheckpointId(oldJobId, checkpointId);
+        assertMaxCommittedCheckpointId(oldJobId, oldOperatorId, checkpointId);
       }
     }
 
@@ -500,14 +519,16 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
     checkpointId = 0;
     timestamp = 0;
     JobID newJobId = new JobID();
+    OperatorID newOperatorId;
     try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness =
         createStreamSink(newJobId)) {
       harness.setup();
       harness.open();
+      newOperatorId = harness.getOperator().getOperatorID();
 
       assertSnapshotSize(3);
-      assertMaxCommittedCheckpointId(oldJobId, 3);
-      assertMaxCommittedCheckpointId(newJobId, -1);
+      assertMaxCommittedCheckpointId(oldJobId, oldOperatorId, 3);
+      assertMaxCommittedCheckpointId(newJobId, newOperatorId, -1);
 
       rows.add(SimpleDataUtil.createRowData(2, "world"));
       tableRows.addAll(rows);
@@ -521,7 +542,7 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
       assertFlinkManifests(0);
       SimpleDataUtil.assertTableRows(table, tableRows);
       assertSnapshotSize(4);
-      assertMaxCommittedCheckpointId(newJobId, checkpointId);
+      assertMaxCommittedCheckpointId(newJobId, newOperatorId, checkpointId);
     }
   }
 
@@ -531,16 +552,20 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
     List<RowData> tableRows = Lists.newArrayList();
 
     JobID[] jobs = new JobID[] {new JobID(), new JobID(), new JobID()};
+    OperatorID[] operatorIds =
+        new OperatorID[] {new OperatorID(), new OperatorID(), new 
OperatorID()};
     for (int i = 0; i < 20; i++) {
       int jobIndex = i % 3;
       int checkpointId = i / 3;
       JobID jobId = jobs[jobIndex];
+      OperatorID operatorId = operatorIds[jobIndex];
       try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = 
createStreamSink(jobId)) {
+        harness.getStreamConfig().setOperatorID(operatorId);
         harness.setup();
         harness.open();
 
         assertSnapshotSize(i);
-        assertMaxCommittedCheckpointId(jobId, checkpointId == 0 ? -1 : 
checkpointId);
+        assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId == 0 ? 
-1 : checkpointId);
 
         List<RowData> rows = 
Lists.newArrayList(SimpleDataUtil.createRowData(i, "word-" + i));
         tableRows.addAll(rows);
@@ -554,21 +579,121 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
         assertFlinkManifests(0);
         SimpleDataUtil.assertTableRows(table, tableRows);
         assertSnapshotSize(i + 1);
-        assertMaxCommittedCheckpointId(jobId, checkpointId + 1);
+        assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId + 1);
       }
     }
   }
 
+  @Test
+  public void testMultipleSinksRecoveryFromValidSnapshot() throws Exception {
+    long checkpointId = 0;
+    long timestamp = 0;
+    List<RowData> expectedRows = Lists.newArrayList();
+    OperatorSubtaskState snapshot1;
+    OperatorSubtaskState snapshot2;
+
+    JobID jobId = new JobID();
+    OperatorID operatorId1 = new OperatorID();
+    OperatorID operatorId2 = new OperatorID();
+    try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness1 = 
createStreamSink(jobId);
+        OneInputStreamOperatorTestHarness<WriteResult, Void> harness2 = 
createStreamSink(jobId)) {
+      harness1.getStreamConfig().setOperatorID(operatorId1);
+      harness1.setup();
+      harness1.open();
+      harness2.getStreamConfig().setOperatorID(operatorId2);
+      harness2.setup();
+      harness2.open();
+
+      assertSnapshotSize(0);
+      assertMaxCommittedCheckpointId(jobId, operatorId1, -1L);
+      assertMaxCommittedCheckpointId(jobId, operatorId2, -1L);
+
+      RowData row1 = SimpleDataUtil.createRowData(1, "hello1");
+      expectedRows.add(row1);
+      DataFile dataFile1 = writeDataFile("data-1-1", ImmutableList.of(row1));
+
+      harness1.processElement(of(dataFile1), ++timestamp);
+      snapshot1 = harness1.snapshot(++checkpointId, ++timestamp);
+
+      RowData row2 = SimpleDataUtil.createRowData(1, "hello2");
+      expectedRows.add(row2);
+      DataFile dataFile2 = writeDataFile("data-1-2", ImmutableList.of(row2));
+
+      harness2.processElement(of(dataFile2), ++timestamp);
+      snapshot2 = harness2.snapshot(checkpointId, ++timestamp);
+      assertFlinkManifests(2);
+
+      // Only notify one of the committers
+      harness1.notifyOfCompletedCheckpoint(checkpointId);
+      assertFlinkManifests(1);
+
+      // Only the first row is committed at this point
+      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1));
+      assertSnapshotSize(1);
+      assertMaxCommittedCheckpointId(jobId, operatorId1, checkpointId);
+      assertMaxCommittedCheckpointId(jobId, operatorId2, -1);
+    }
+
+    // Restore from the given snapshot
+    try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness1 = 
createStreamSink(jobId);
+        OneInputStreamOperatorTestHarness<WriteResult, Void> harness2 = 
createStreamSink(jobId)) {
+      harness1.getStreamConfig().setOperatorID(operatorId1);
+      harness1.setup();
+      harness1.initializeState(snapshot1);
+      harness1.open();
+
+      harness2.getStreamConfig().setOperatorID(operatorId2);
+      harness2.setup();
+      harness2.initializeState(snapshot2);
+      harness2.open();
+
+      // All flink manifests should be cleaned because it has committed the 
unfinished iceberg
+      // transaction.
+      assertFlinkManifests(0);
+
+      SimpleDataUtil.assertTableRows(table, expectedRows);
+      assertSnapshotSize(2);
+      assertMaxCommittedCheckpointId(jobId, operatorId1, checkpointId);
+      assertMaxCommittedCheckpointId(jobId, operatorId2, checkpointId);
+
+      RowData row1 = SimpleDataUtil.createRowData(2, "world1");
+      expectedRows.add(row1);
+      DataFile dataFile1 = writeDataFile("data-2-1", ImmutableList.of(row1));
+
+      harness1.processElement(of(dataFile1), ++timestamp);
+      harness1.snapshot(++checkpointId, ++timestamp);
+
+      RowData row2 = SimpleDataUtil.createRowData(2, "world2");
+      expectedRows.add(row2);
+      DataFile dataFile2 = writeDataFile("data-2-2", ImmutableList.of(row2));
+      harness2.processElement(of(dataFile2), ++timestamp);
+      harness2.snapshot(checkpointId, ++timestamp);
+
+      assertFlinkManifests(2);
+
+      harness1.notifyOfCompletedCheckpoint(checkpointId);
+      harness2.notifyOfCompletedCheckpoint(checkpointId);
+      assertFlinkManifests(0);
+
+      SimpleDataUtil.assertTableRows(table, expectedRows);
+      assertSnapshotSize(4);
+      assertMaxCommittedCheckpointId(jobId, operatorId1, checkpointId);
+      assertMaxCommittedCheckpointId(jobId, operatorId2, checkpointId);
+    }
+  }
+
   @Test
   public void testBoundedStream() throws Exception {
     JobID jobId = new JobID();
+    OperatorID operatorId;
     try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = 
createStreamSink(jobId)) {
       harness.setup();
       harness.open();
+      operatorId = harness.getOperator().getOperatorID();
 
       assertFlinkManifests(0);
       assertSnapshotSize(0);
-      assertMaxCommittedCheckpointId(jobId, -1L);
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
 
       List<RowData> tableRows = 
Lists.newArrayList(SimpleDataUtil.createRowData(1, "word-1"));
 
@@ -579,7 +704,7 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
       assertFlinkManifests(0);
       SimpleDataUtil.assertTableRows(table, tableRows);
       assertSnapshotSize(1);
-      assertMaxCommittedCheckpointId(jobId, Long.MAX_VALUE);
+      assertMaxCommittedCheckpointId(jobId, operatorId, Long.MAX_VALUE);
       Assert.assertEquals(
           TestIcebergFilesCommitter.class.getName(),
           table.currentSnapshot().summary().get("flink.test"));
@@ -592,23 +717,24 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
     final long checkpoint = 10;
 
     JobID jobId = new JobID();
+    OperatorID operatorId;
     try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = 
createStreamSink(jobId)) {
       harness.setup();
       harness.open();
+      operatorId = harness.getOperator().getOperatorID();
 
-      assertMaxCommittedCheckpointId(jobId, -1L);
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
 
       RowData row1 = SimpleDataUtil.createRowData(1, "hello");
       DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row1));
 
       harness.processElement(of(dataFile1), ++timestamp);
-      assertMaxCommittedCheckpointId(jobId, -1L);
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
 
       // 1. snapshotState for checkpoint#1
       harness.snapshot(checkpoint, ++timestamp);
       List<Path> manifestPaths = assertFlinkManifests(1);
       Path manifestPath = manifestPaths.get(0);
-      String operatorId = 
harness.getOneInputOperator().getOperatorID().toString();
       Assert.assertEquals(
           "File name should have the expected pattern.",
           String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0, 
checkpoint, 1),
@@ -623,7 +749,7 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
       // 3. notifyCheckpointComplete for checkpoint#1
       harness.notifyOfCompletedCheckpoint(checkpoint);
       SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1));
-      assertMaxCommittedCheckpointId(jobId, checkpoint);
+      assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
       assertFlinkManifests(0);
     }
   }
@@ -636,24 +762,25 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
     long checkpoint = 10;
 
     JobID jobId = new JobID();
+    OperatorID operatorId;
     FileAppenderFactory<RowData> appenderFactory = 
createDeletableAppenderFactory();
 
     try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = 
createStreamSink(jobId)) {
       harness.setup();
       harness.open();
+      operatorId = harness.getOperator().getOperatorID();
 
-      assertMaxCommittedCheckpointId(jobId, -1L);
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
 
       RowData row1 = SimpleDataUtil.createInsert(1, "aaa");
       DataFile dataFile1 = writeDataFile("data-file-1", 
ImmutableList.of(row1));
       harness.processElement(of(dataFile1), ++timestamp);
-      assertMaxCommittedCheckpointId(jobId, -1L);
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
 
       // 1. snapshotState for checkpoint#1
       harness.snapshot(checkpoint, ++timestamp);
       List<Path> manifestPaths = assertFlinkManifests(1);
       Path manifestPath = manifestPaths.get(0);
-      String operatorId = 
harness.getOneInputOperator().getOperatorID().toString();
       Assert.assertEquals(
           "File name should have the expected pattern.",
           String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0, 
checkpoint, 1),
@@ -668,7 +795,7 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
       // 3. notifyCheckpointComplete for checkpoint#1
       harness.notifyOfCompletedCheckpoint(checkpoint);
       SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1));
-      assertMaxCommittedCheckpointId(jobId, checkpoint);
+      assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
       assertFlinkManifests(0);
 
       // 4. process both data files and delete files.
@@ -681,7 +808,7 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
       harness.processElement(
           
WriteResult.builder().addDataFiles(dataFile2).addDeleteFiles(deleteFile1).build(),
           ++timestamp);
-      assertMaxCommittedCheckpointId(jobId, checkpoint);
+      assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
 
       // 5. snapshotState for checkpoint#2
       harness.snapshot(++checkpoint, ++timestamp);
@@ -690,7 +817,7 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
       // 6. notifyCheckpointComplete for checkpoint#2
       harness.notifyOfCompletedCheckpoint(checkpoint);
       SimpleDataUtil.assertTableRows(table, ImmutableList.of(row2));
-      assertMaxCommittedCheckpointId(jobId, checkpoint);
+      assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
       assertFlinkManifests(0);
     }
   }
@@ -703,13 +830,15 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
     long checkpoint = 10;
 
     JobID jobId = new JobID();
+    OperatorID operatorId;
     FileAppenderFactory<RowData> appenderFactory = 
createDeletableAppenderFactory();
 
     try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = 
createStreamSink(jobId)) {
       harness.setup();
       harness.open();
+      operatorId = harness.getOperator().getOperatorID();
 
-      assertMaxCommittedCheckpointId(jobId, -1L);
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
 
       RowData insert1 = SimpleDataUtil.createInsert(1, "aaa");
       RowData insert2 = SimpleDataUtil.createInsert(2, "bbb");
@@ -739,7 +868,7 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
       // Notify the 2nd snapshot to complete.
       harness.notifyOfCompletedCheckpoint(checkpoint);
       SimpleDataUtil.assertTableRows(table, ImmutableList.of(insert1, 
insert4));
-      assertMaxCommittedCheckpointId(jobId, checkpoint);
+      assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
       assertFlinkManifests(0);
       Assert.assertEquals(
           "Should have committed 2 txn.", 2, 
ImmutableList.copyOf(table.snapshots()).size());
@@ -818,9 +947,11 @@ public class TestIcebergFilesCommitter extends 
TableTestBase {
         rows);
   }
 
-  private void assertMaxCommittedCheckpointId(JobID jobID, long expectedId) {
+  private void assertMaxCommittedCheckpointId(JobID jobID, OperatorID 
operatorID, long expectedId) {
     table.refresh();
-    long actualId = IcebergFilesCommitter.getMaxCommittedCheckpointId(table, 
jobID.toString());
+    long actualId =
+        IcebergFilesCommitter.getMaxCommittedCheckpointId(
+            table, jobID.toString(), operatorID.toHexString());
     Assert.assertEquals(expectedId, actualId);
   }
 

Reply via email to