ahmedabu98 commented on code in PR #34264:
URL: https://github.com/apache/beam/pull/34264#discussion_r2005518737


##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java:
##########
@@ -211,5 +213,31 @@ private ManifestWriter<DataFile> createManifestWriter(
                   tableLocation, manifestFilePrefix, uuid, spec.specId()));
       return ManifestFiles.write(spec, io.newOutputFile(location));
     }
+
+    // If bundle fails following a successful commit and gets retried, it may 
attempt to re-commit
+    // the same data.
+    // To mitigate, we check the files in this bundle and remove anything that 
was already
+    // committed in the last successful snapshot.
+    //
+    // TODO(ahmedabu98): This does not cover concurrent writes from other 
pipelines, where the
+    //  "last successful snapshot" might reflect commits from other sources. 
Ideally, we would make
+    //  this stateful, but that is update incompatible.
+    // TODO(ahmedabu98): add load test pipelines with intentional periodic 
crashing
+    private Iterable<FileWriteResult> removeAlreadyCommittedFiles(
+        Table table, Iterable<FileWriteResult> fileWriteResults) {
+      if (table.currentSnapshot() == null) {
+        return fileWriteResults;
+      }
+
+      List<String> committedFiles =
+          Streams.stream(table.currentSnapshot().addedDataFiles(table.io()))

Review Comment:
   Thanks for bringing this up, I later realized that we have an upstream GBK 
which means we'll have the same batch of files for each retry. We can just 
check for a single overlapping file instead of doing a full search



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to