rdblue commented on a change in pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#discussion_r545994813



##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -184,78 +185,106 @@ public void notifyCheckpointComplete(long checkpointId) 
throws Exception {
     }
   }
 
-  private void commitUpToCheckpoint(NavigableMap<Long, byte[]> manifestsMap,
+  private void commitUpToCheckpoint(NavigableMap<Long, byte[]> 
deltaManifestsMap,
                                     String newFlinkJobId,
                                     long checkpointId) throws IOException {
-    NavigableMap<Long, byte[]> pendingManifestMap = 
manifestsMap.headMap(checkpointId, true);
+    NavigableMap<Long, byte[]> pendingMap = 
deltaManifestsMap.headMap(checkpointId, true);
 
-    List<ManifestFile> manifestFiles = Lists.newArrayList();
-    List<DataFile> pendingDataFiles = Lists.newArrayList();
-    for (byte[] manifestData : pendingManifestMap.values()) {
-      if (Arrays.equals(EMPTY_MANIFEST_DATA, manifestData)) {
+    List<ManifestFile> manifests = Lists.newArrayList();
+    NavigableMap<Long, WriteResult> pendingResults = Maps.newTreeMap();
+    for (Map.Entry<Long, byte[]> e : pendingMap.entrySet()) {
+      if (Arrays.equals(EMPTY_MANIFEST_DATA, e.getValue())) {
         // Skip the empty flink manifest.
         continue;
       }
 
-      ManifestFile manifestFile =
-          
SimpleVersionedSerialization.readVersionAndDeSerialize(FlinkManifestSerializer.INSTANCE,
 manifestData);
-
-      manifestFiles.add(manifestFile);
-      pendingDataFiles.addAll(FlinkManifestUtil.readDataFiles(manifestFile, 
table.io()));
+      DeltaManifests deltaManifests = SimpleVersionedSerialization
+          .readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, 
e.getValue());
+      pendingResults.put(e.getKey(), 
FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io()));
+      Iterables.addAll(manifests, deltaManifests);
     }
 
     if (replacePartitions) {
-      replacePartitions(pendingDataFiles, newFlinkJobId, checkpointId);
+      replacePartitions(pendingResults, newFlinkJobId, checkpointId);
     } else {
-      append(pendingDataFiles, newFlinkJobId, checkpointId);
+      commitDeltaTxn(pendingResults, newFlinkJobId, checkpointId);
     }
 
-    pendingManifestMap.clear();
+    pendingMap.clear();
 
-    // Delete the committed manifests and clear the committed data files from 
dataFilesPerCheckpoint.
-    for (ManifestFile manifestFile : manifestFiles) {
+    // Delete the committed manifests.
+    for (ManifestFile manifest : manifests) {
       try {
-        table.io().deleteFile(manifestFile.path());
+        table.io().deleteFile(manifest.path());
       } catch (Exception e) {
         // The flink manifests cleaning failure shouldn't abort the completed 
checkpoint.
         String details = MoreObjects.toStringHelper(this)
             .add("flinkJobId", newFlinkJobId)
             .add("checkpointId", checkpointId)
-            .add("manifestPath", manifestFile.path())
+            .add("manifestPath", manifest.path())
             .toString();
         LOG.warn("The iceberg transaction has been committed, but we failed to 
clean the temporary flink manifests: {}",
             details, e);
       }
     }
   }
 
-  private void replacePartitions(List<DataFile> dataFiles, String 
newFlinkJobId, long checkpointId) {
+  private void replacePartitions(NavigableMap<Long, WriteResult> 
pendingResults, String newFlinkJobId,
+                                 long checkpointId) {
+    // Partition overwrite does not support delete files.
+    int deleteFilesNum = pendingResults.values().stream().mapToInt(r -> 
r.deleteFiles().length).sum();
+    Preconditions.checkState(deleteFilesNum == 0, "Cannot overwrite partitions 
with delete files.");
+
+    // Commit the overwrite transaction.
     ReplacePartitions dynamicOverwrite = table.newReplacePartitions();
 
     int numFiles = 0;
-    for (DataFile file : dataFiles) {
-      numFiles += 1;
-      dynamicOverwrite.addFile(file);
+    for (WriteResult result : pendingResults.values()) {
+      numFiles += result.dataFiles().length;
+      Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile);
     }
 
-    commitOperation(dynamicOverwrite, numFiles, "dynamic partition overwrite", 
newFlinkJobId, checkpointId);
+    commitOperation(dynamicOverwrite, numFiles, 0, "dynamic partition 
overwrite", newFlinkJobId, checkpointId);
   }
 
-  private void append(List<DataFile> dataFiles, String newFlinkJobId, long 
checkpointId) {
-    AppendFiles appendFiles = table.newAppend();
+  private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, 
String newFlinkJobId, long checkpointId) {
+    int deleteFilesNum = pendingResults.values().stream().mapToInt(r -> 
r.deleteFiles().length).sum();
 
-    int numFiles = 0;
-    for (DataFile file : dataFiles) {
-      numFiles += 1;
-      appendFiles.appendFile(file);
-    }
+    if (deleteFilesNum == 0) {
+      // To be compatible with iceberg format V1.
+      AppendFiles appendFiles = table.newAppend();
 
-    commitOperation(appendFiles, numFiles, "append", newFlinkJobId, 
checkpointId);
+      int numFiles = 0;
+      for (WriteResult result : pendingResults.values()) {

Review comment:
       > Maybe we can add a similar API in DeleteFiles interface?
   
   We don't currently do this because we need delete entries to exist when we 
delete files. That way we can track when something was deleted and clean it up 
incrementally in `ExpireSnapshots`. If we did have a method like this, it would 
always rewrite the manifest with deletes, or would need to ensure that the 
manifest that is added contains only deletes, and these requirements are not 
very obvious. I think it is better to pass the deleted files through the 
existing methods.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to