mxm commented on code in PR #14559:
URL: https://github.com/apache/iceberg/pull/14559#discussion_r2513719871


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##########
@@ -302,30 +303,58 @@ private void commitDeltaTxn(
       CommitSummary summary,
       String newFlinkJobId,
       String operatorId) {
-    for (Map.Entry<Long, List<WriteResult>> e : pendingResults.entrySet()) {
-      long checkpointId = e.getKey();
-      List<WriteResult> writeResults = e.getValue();

Review Comment:
   Can we keep the loop structure? We will need it for both types of snapshots. 
This should work:
   
   ```java
     for (Map.Entry<Long, List<WriteResult>> e : pendingResults.entrySet()) {
       long checkpointId = e.getKey();
       List<WriteResult> writeResults = e.getValue();
   
       boolean appendOnly = true;
       for (WriteResult writeResult : writeResults) {
         if (writeResult.deleteFiles().length > 0) {
           appendOnly = false;
           break;
         }
       }
   
       final SnapshotUpdate snapshotUpdate;
       if (appendOnly) {
           AppendFiles appendFiles = 
table.newAppend().scanManifestsWith(workerPool);
           for (WriteResult result : writeResults) {
             Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile);
           }
           snapshotUpdate = appendFiles
       } else {
           RowDelta rowDelta = 
table.newRowDelta().scanManifestsWith(workerPool);
           for (WriteResult result : writeResults) {
             Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
             Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes);
           }
          snapshotUpdate = rowDelta;
       }
      
       commitOperation(
             table,
             branch,
             snapshotUpdate,
             summary,
             appendOnly ? "append" : "rowDelta",
             newFlinkJobId,
             operatorId,
             checkpointId);
     }
   ```
   



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##########
@@ -302,30 +303,58 @@ private void commitDeltaTxn(
       CommitSummary summary,
       String newFlinkJobId,
       String operatorId) {
-    for (Map.Entry<Long, List<WriteResult>> e : pendingResults.entrySet()) {
-      long checkpointId = e.getKey();
-      List<WriteResult> writeResults = e.getValue();
-
-      RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool);
-      for (WriteResult result : writeResults) {
-        // Row delta validations are not needed for streaming changes that 
write equality deletes.
-        // Equality deletes are applied to data in all previous sequence 
numbers, so retries may
-        // push deletes further in the future, but do not affect correctness. 
Position deletes
-        // committed to the table in this path are used only to delete rows 
from data files that are
-        // being added in this commit. There is no way for data files added 
along with the delete
-        // files to be concurrently removed, so there is no need to validate 
the files referenced by
-        // the position delete files that are being committed.
-        Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
-        Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes);
+    if (summary.deleteFilesCount() == 0) {

Review Comment:
   I'm not sure about the granularity of this value, as every pending result 
could contain or not contain deletes. Probably best to check the WriteResults 
directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to