openinx commented on a change in pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#discussion_r544343472



##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -229,33 +232,71 @@ private void commitUpToCheckpoint(NavigableMap<Long, 
byte[]> manifestsMap,
     }
   }
 
-  private void replacePartitions(List<DataFile> dataFiles, String 
newFlinkJobId, long checkpointId) {
+  private void replacePartitions(NavigableMap<Long, WriteResult> 
pendingResults, String newFlinkJobId,
+                                 long checkpointId) {
+    // Merge all the pending results into a single write result.
+    WriteResult result = 
WriteResult.builder().add(pendingResults.values()).build();
+
+    // Partition overwrite does not support delete files.
+    Preconditions.checkArgument(result.deleteFiles().length == 0,
+        "Cannot overwrite partitions with delete files.");
     ReplacePartitions dynamicOverwrite = table.newReplacePartitions();
 
+    // Commit the overwrite transaction.
     int numFiles = 0;
-    for (DataFile file : dataFiles) {
+    for (DataFile file : result.dataFiles()) {
       numFiles += 1;
       dynamicOverwrite.addFile(file);
     }
 
-    commitOperation(dynamicOverwrite, numFiles, "dynamic partition overwrite", 
newFlinkJobId, checkpointId);
+    commitOperation(dynamicOverwrite, numFiles, 0, "dynamic partition 
overwrite", newFlinkJobId, checkpointId);
   }
 
-  private void append(List<DataFile> dataFiles, String newFlinkJobId, long 
checkpointId) {
-    AppendFiles appendFiles = table.newAppend();
+  private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, 
String newFlinkJobId, long checkpointId) {
+    // Merge all pending results into a single write result.
+    WriteResult mergedResult = 
WriteResult.builder().add(pendingResults.values()).build();
 
-    int numFiles = 0;
-    for (DataFile file : dataFiles) {
-      numFiles += 1;
-      appendFiles.appendFile(file);
-    }
+    if (mergedResult.deleteFiles().length < 1) {
+      // To be compatible with iceberg format V1.
+      AppendFiles appendFiles = table.newAppend();
+
+      int numFiles = 0;
+      for (DataFile file : mergedResult.dataFiles()) {
+        numFiles += 1;
+        appendFiles.appendFile(file);
+      }
+
+      commitOperation(appendFiles, numFiles, 0, "append", newFlinkJobId, 
checkpointId);
+    } else {
+      // To be compatible with iceberg format V2.
+      for (Map.Entry<Long, WriteResult> e : pendingResults.entrySet()) {
+        // We don't commit the merged result into a single transaction because 
for the sequential transaction txn1 and

Review comment:
       I will provide an unit test to address it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to