stevenzwu commented on a change in pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#discussion_r545471359



##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -184,78 +185,106 @@ public void notifyCheckpointComplete(long checkpointId) 
throws Exception {
     }
   }
 
-  private void commitUpToCheckpoint(NavigableMap<Long, byte[]> manifestsMap,
+  private void commitUpToCheckpoint(NavigableMap<Long, byte[]> 
deltaManifestsMap,
                                     String newFlinkJobId,
                                     long checkpointId) throws IOException {
-    NavigableMap<Long, byte[]> pendingManifestMap = 
manifestsMap.headMap(checkpointId, true);
+    NavigableMap<Long, byte[]> pendingMap = 
deltaManifestsMap.headMap(checkpointId, true);
 
-    List<ManifestFile> manifestFiles = Lists.newArrayList();
-    List<DataFile> pendingDataFiles = Lists.newArrayList();
-    for (byte[] manifestData : pendingManifestMap.values()) {
-      if (Arrays.equals(EMPTY_MANIFEST_DATA, manifestData)) {
+    List<ManifestFile> manifests = Lists.newArrayList();
+    NavigableMap<Long, WriteResult> pendingResults = Maps.newTreeMap();
+    for (Map.Entry<Long, byte[]> e : pendingMap.entrySet()) {
+      if (Arrays.equals(EMPTY_MANIFEST_DATA, e.getValue())) {
         // Skip the empty flink manifest.
         continue;
       }
 
-      ManifestFile manifestFile =
-          
SimpleVersionedSerialization.readVersionAndDeSerialize(FlinkManifestSerializer.INSTANCE,
 manifestData);
-
-      manifestFiles.add(manifestFile);
-      pendingDataFiles.addAll(FlinkManifestUtil.readDataFiles(manifestFile, 
table.io()));
+      DeltaManifests deltaManifests = SimpleVersionedSerialization
+          .readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, 
e.getValue());
+      pendingResults.put(e.getKey(), 
FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io()));
+      Iterables.addAll(manifests, deltaManifests);
     }
 
     if (replacePartitions) {
-      replacePartitions(pendingDataFiles, newFlinkJobId, checkpointId);
+      replacePartitions(pendingResults, newFlinkJobId, checkpointId);
     } else {
-      append(pendingDataFiles, newFlinkJobId, checkpointId);
+      commitDeltaTxn(pendingResults, newFlinkJobId, checkpointId);
     }
 
-    pendingManifestMap.clear();
+    pendingMap.clear();
 
-    // Delete the committed manifests and clear the committed data files from 
dataFilesPerCheckpoint.
-    for (ManifestFile manifestFile : manifestFiles) {
+    // Delete the committed manifests.
+    for (ManifestFile manifest : manifests) {
       try {
-        table.io().deleteFile(manifestFile.path());
+        table.io().deleteFile(manifest.path());
       } catch (Exception e) {
         // The flink manifests cleaning failure shouldn't abort the completed 
checkpoint.
         String details = MoreObjects.toStringHelper(this)
             .add("flinkJobId", newFlinkJobId)
             .add("checkpointId", checkpointId)
-            .add("manifestPath", manifestFile.path())
+            .add("manifestPath", manifest.path())
             .toString();
         LOG.warn("The iceberg transaction has been committed, but we failed to 
clean the temporary flink manifests: {}",
             details, e);
       }
     }
   }
 
-  private void replacePartitions(List<DataFile> dataFiles, String 
newFlinkJobId, long checkpointId) {
+  private void replacePartitions(NavigableMap<Long, WriteResult> 
pendingResults, String newFlinkJobId,
+                                 long checkpointId) {
+    // Partition overwrite does not support delete files.
+    int deleteFilesNum = pendingResults.values().stream().mapToInt(r -> 
r.deleteFiles().length).sum();
+    Preconditions.checkState(deleteFilesNum == 0, "Cannot overwrite partitions 
with delete files.");
+
+    // Commit the overwrite transaction.
     ReplacePartitions dynamicOverwrite = table.newReplacePartitions();
 
     int numFiles = 0;
-    for (DataFile file : dataFiles) {
-      numFiles += 1;
-      dynamicOverwrite.addFile(file);
+    for (WriteResult result : pendingResults.values()) {
+      numFiles += result.dataFiles().length;
+      Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile);
     }
 
-    commitOperation(dynamicOverwrite, numFiles, "dynamic partition overwrite", 
newFlinkJobId, checkpointId);
+    commitOperation(dynamicOverwrite, numFiles, 0, "dynamic partition 
overwrite", newFlinkJobId, checkpointId);
   }
 
-  private void append(List<DataFile> dataFiles, String newFlinkJobId, long 
checkpointId) {
-    AppendFiles appendFiles = table.newAppend();
+  private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, 
String newFlinkJobId, long checkpointId) {
+    int deleteFilesNum = pendingResults.values().stream().mapToInt(r -> 
r.deleteFiles().length).sum();
 
-    int numFiles = 0;
-    for (DataFile file : dataFiles) {
-      numFiles += 1;
-      appendFiles.appendFile(file);
-    }
+    if (deleteFilesNum == 0) {
+      // To be compatible with iceberg format V1.
+      AppendFiles appendFiles = table.newAppend();
 
-    commitOperation(appendFiles, numFiles, "append", newFlinkJobId, 
checkpointId);
+      int numFiles = 0;
+      for (WriteResult result : pendingResults.values()) {

Review comment:
       here we are merging data files potentially from multiple checkpoint 
cycles/manifests into a single manifest file.
   
    We are using this API from `AppendFiles` interface. 
   ```
     AppendFiles appendManifest(ManifestFile file);
   ```
   
   When we had an extended outage and accumulated a few hundreds of 
transactions/manifests in Flink checkpoint, this help avoiding rewrite of many 
manifest files. @rdblue can probably explain it better than I do.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to