mxm commented on code in PR #15913:
URL: https://github.com/apache/iceberg/pull/15913#discussion_r3066168737


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -423,21 +423,53 @@ public void endInput() throws IOException {
   }
 
   private void writeToManifestUptoLatestCheckpoint(long checkpointId) throws 
IOException {
-    if (!writeResultsSinceLastSnapshot.containsKey(checkpointId)) {
+
+    if (!dataFilesPerCheckpoint.containsKey(checkpointId)) {
       dataFilesPerCheckpoint.put(checkpointId, EMPTY_MANIFEST_DATA);
     }
 
-    for (Map.Entry<Long, List<WriteResult>> writeResultsOfCheckpoint :
-        writeResultsSinceLastSnapshot.entrySet()) {
-      dataFilesPerCheckpoint.put(
-          writeResultsOfCheckpoint.getKey(),
-          writeToManifest(writeResultsOfCheckpoint.getKey(), 
writeResultsOfCheckpoint.getValue()));
+    Map<Long, List<WriteResult>> pendingWriteResults = Maps.newHashMap();
+    for (Map.Entry<Long, List<WriteResult>> entry : 
writeResultsSinceLastSnapshot.entrySet()) {
+      long assignedCheckpointId = computeCheckpointId(checkpointId, entry);
+      pendingWriteResults
+          .computeIfAbsent(assignedCheckpointId, k -> Lists.newArrayList())
+          .addAll(entry.getValue());
+    }
+
+    for (Map.Entry<Long, List<WriteResult>> entry : 
pendingWriteResults.entrySet()) {
+      dataFilesPerCheckpoint.put(entry.getKey(), 
writeToManifest(entry.getKey(), entry.getValue()));
     }
 
     // Clear the local buffer for current checkpoint.
     writeResultsSinceLastSnapshot.clear();
   }
 
+  /**
+   * in case of unaligned checkpoints, data files that were part of checkpoint 
N in the writer may

Review Comment:
   NIT
   ```suggestion
      * In case of unaligned checkpoints, data files that were part of 
checkpoint N in the writer may
   ```



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -423,21 +423,53 @@ public void endInput() throws IOException {
   }
 
   private void writeToManifestUptoLatestCheckpoint(long checkpointId) throws 
IOException {
-    if (!writeResultsSinceLastSnapshot.containsKey(checkpointId)) {
+
+    if (!dataFilesPerCheckpoint.containsKey(checkpointId)) {
       dataFilesPerCheckpoint.put(checkpointId, EMPTY_MANIFEST_DATA);
     }
 
-    for (Map.Entry<Long, List<WriteResult>> writeResultsOfCheckpoint :
-        writeResultsSinceLastSnapshot.entrySet()) {
-      dataFilesPerCheckpoint.put(
-          writeResultsOfCheckpoint.getKey(),
-          writeToManifest(writeResultsOfCheckpoint.getKey(), 
writeResultsOfCheckpoint.getValue()));
+    Map<Long, List<WriteResult>> pendingWriteResults = Maps.newHashMap();
+    for (Map.Entry<Long, List<WriteResult>> entry : 
writeResultsSinceLastSnapshot.entrySet()) {
+      long assignedCheckpointId = computeCheckpointId(checkpointId, entry);
+      pendingWriteResults
+          .computeIfAbsent(assignedCheckpointId, k -> Lists.newArrayList())
+          .addAll(entry.getValue());
+    }
+
+    for (Map.Entry<Long, List<WriteResult>> entry : 
pendingWriteResults.entrySet()) {
+      dataFilesPerCheckpoint.put(entry.getKey(), 
writeToManifest(entry.getKey(), entry.getValue()));
     }
 
     // Clear the local buffer for current checkpoint.
     writeResultsSinceLastSnapshot.clear();
   }
 
+  /**
+   * in case of unaligned checkpoints, data files that were part of checkpoint 
N in the writer may
+   * have to become part of a later checkpoint in the committer if:
+   *
+   * <ul>
+   *   <li>previous files were already committed for checkpoint N. We have to 
keep the manifests for
+   *       new files under a later key, otherwise they are discarded during 
recovery after a crash
+   *   <li>we already have a manifest of files to be committed for checkpoint 
N, even though it
+   *       might not have been committed yet. In this case, we must not 
overwrite the manifests we
+   *       already have, and we must keep them consistent with our checkpoint
+   * </ul>
+   */
+  private long computeCheckpointId(long checkpointId, Map.Entry<Long, 
List<WriteResult>> entry) {
+    long sourceCheckpointId = entry.getKey();
+
+    boolean sourceCheckpointIdAlreadyCommitted = sourceCheckpointId <= 
maxCommittedCheckpointId;
+    boolean sourceCheckpointIdHasDataInSnapshot =
+        dataFilesPerCheckpoint.containsKey(sourceCheckpointId);
+    // for aligned checkpoints, both conditions will be false and the upstream 
operator's checkpoint
+    // ID
+    // will be chosen.
+    return sourceCheckpointIdAlreadyCommitted || 
sourceCheckpointIdHasDataInSnapshot
+        ? checkpointId
+        : sourceCheckpointId;

Review Comment:
   Is this correct under all scenarios? This will mean that we include files of 
an older snapshot into the current checkpoint, but IMHO they need to be in the 
following snapshot (`sourceCheckpointId + 1`), not in the latest. There can be 
outstanding WriteResults for multiple snapshots, which have not been committed 
to Iceberg. 
   
   Otherwise, we violate the order in case of deletions.



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -423,21 +423,53 @@ public void endInput() throws IOException {
   }
 
   private void writeToManifestUptoLatestCheckpoint(long checkpointId) throws 
IOException {
-    if (!writeResultsSinceLastSnapshot.containsKey(checkpointId)) {
+
+    if (!dataFilesPerCheckpoint.containsKey(checkpointId)) {
       dataFilesPerCheckpoint.put(checkpointId, EMPTY_MANIFEST_DATA);
     }
 
-    for (Map.Entry<Long, List<WriteResult>> writeResultsOfCheckpoint :
-        writeResultsSinceLastSnapshot.entrySet()) {
-      dataFilesPerCheckpoint.put(
-          writeResultsOfCheckpoint.getKey(),
-          writeToManifest(writeResultsOfCheckpoint.getKey(), 
writeResultsOfCheckpoint.getValue()));
+    Map<Long, List<WriteResult>> pendingWriteResults = Maps.newHashMap();
+    for (Map.Entry<Long, List<WriteResult>> entry : 
writeResultsSinceLastSnapshot.entrySet()) {
+      long assignedCheckpointId = computeCheckpointId(checkpointId, entry);
+      pendingWriteResults
+          .computeIfAbsent(assignedCheckpointId, k -> Lists.newArrayList())
+          .addAll(entry.getValue());
+    }
+
+    for (Map.Entry<Long, List<WriteResult>> entry : 
pendingWriteResults.entrySet()) {
+      dataFilesPerCheckpoint.put(entry.getKey(), 
writeToManifest(entry.getKey(), entry.getValue()));
     }
 
     // Clear the local buffer for current checkpoint.
     writeResultsSinceLastSnapshot.clear();
   }
 
+  /**
+   * in case of unaligned checkpoints, data files that were part of checkpoint 
N in the writer may
+   * have to become part of a later checkpoint in the committer if:
+   *
+   * <ul>
+   *   <li>previous files were already committed for checkpoint N. We have to 
keep the manifests for
+   *       new files under a later key, otherwise they are discarded during 
recovery after a crash
+   *   <li>we already have a manifest of files to be committed for checkpoint 
N, even though it
+   *       might not have been committed yet. In this case, we must not 
overwrite the manifests we
+   *       already have, and we must keep them consistent with our checkpoint
+   * </ul>
+   */
+  private long computeCheckpointId(long checkpointId, Map.Entry<Long, 
List<WriteResult>> entry) {
+    long sourceCheckpointId = entry.getKey();
+
+    boolean sourceCheckpointIdAlreadyCommitted = sourceCheckpointId <= 
maxCommittedCheckpointId;
+    boolean sourceCheckpointIdHasDataInSnapshot =
+        dataFilesPerCheckpoint.containsKey(sourceCheckpointId);
+    // for aligned checkpoints, both conditions will be false and the upstream 
operator's checkpoint
+    // ID
+    // will be chosen.

Review Comment:
   ```suggestion
       // ID will be chosen.
   ```
   NIT



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to