shangxinli commented on code in PR #14182:
URL: https://github.com/apache/iceberg/pull/14182#discussion_r2378994041


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##########
@@ -137,38 +139,68 @@ public void 
commit(Collection<CommitRequest<DynamicCommittable>> commitRequests)
         commitRequestMap.entrySet()) {
       Table table = 
catalog.loadTable(TableIdentifier.parse(entry.getKey().tableName()));
       DynamicCommittable last = 
entry.getValue().lastEntry().getValue().get(0).getCommittable();
-      long maxCommittedCheckpointId =
+
+      CheckpointInfo maxCommittedCheckpointIds =
           getMaxCommittedCheckpointId(
               table, last.jobId(), last.operatorId(), entry.getKey().branch());
       // Mark the already committed FilesCommittable(s) as finished
       entry
           .getValue()
-          .headMap(maxCommittedCheckpointId, true)
+          .headMap(maxCommittedCheckpointIds.maxProcessedCheckpointId, false)
           .values()
           .forEach(list -> 
list.forEach(CommitRequest::signalAlreadyCommitted));
+      // Filter out all committables until the last checkpoint id, which may 
be partially committed.
       NavigableMap<Long, List<CommitRequest<DynamicCommittable>>> uncommitted =
-          entry.getValue().tailMap(maxCommittedCheckpointId, false);
+          
entry.getValue().tailMap(maxCommittedCheckpointIds.maxProcessedCheckpointId, 
true);
+
       if (!uncommitted.isEmpty()) {
         commitPendingRequests(
-            table, entry.getKey().branch(), uncommitted, last.jobId(), 
last.operatorId());
+            table,
+            entry.getKey().branch(),
+            uncommitted,
+            last.jobId(),
+            last.operatorId(),
+            maxCommittedCheckpointIds);
+      }
+    }
+  }
+
+  private static class CheckpointInfo {
+    private final long maxProcessedCheckpointId;
+    private final int maxCommittedWriteResultIndex;
+
+    private CheckpointInfo(long maxProcessedCheckpointId, int 
maxCommittedWriteResultIndex) {
+      this.maxProcessedCheckpointId = maxProcessedCheckpointId;
+      this.maxCommittedWriteResultIndex = maxCommittedWriteResultIndex;
+    }
+
+    int writeResultIndexFor(long checkpointId) {
+      if (checkpointId == this.maxProcessedCheckpointId) {
+        return this.maxCommittedWriteResultIndex;
       }
+      return -1;
     }
   }
 
-  private static long getMaxCommittedCheckpointId(
+  private static CheckpointInfo getMaxCommittedCheckpointId(
       Table table, String flinkJobId, String operatorId, String branch) {
     Snapshot snapshot = table.snapshot(branch);
     long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
+    int writeResultIndex = INITIAL_WRITE_RESULT_INDEX;
 
     while (snapshot != null) {
       Map<String, String> summary = snapshot.summary();
       String snapshotFlinkJobId = summary.get(FLINK_JOB_ID);
       String snapshotOperatorId = summary.get(OPERATOR_ID);
       if (flinkJobId.equals(snapshotFlinkJobId)
           && (snapshotOperatorId == null || 
snapshotOperatorId.equals(operatorId))) {
-        String value = summary.get(MAX_COMMITTED_CHECKPOINT_ID);
-        if (value != null) {
-          lastCommittedCheckpointId = Long.parseLong(value);
+        String maxCheckpointIdString = 
summary.get(MAX_COMMITTED_CHECKPOINT_ID);
+        if (maxCheckpointIdString != null) {
+          lastCommittedCheckpointId = Long.parseLong(maxCheckpointIdString);
+          String writeResultIndexString = summary.get(MAX_WRITE_RESULT_INDEX);
+          if (writeResultIndexString != null) {
+            writeResultIndex = Integer.parseInt(writeResultIndexString);
+          }

Review Comment:
   Not sure if we have test to cover the case of 'writeResultIndexString == 
null'



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to