damccorm commented on code in PR #34264: URL: https://github.com/apache/beam/pull/34264#discussion_r1993617860
########## sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java: ########## @@ -211,5 +213,31 @@ private ManifestWriter<DataFile> createManifestWriter( tableLocation, manifestFilePrefix, uuid, spec.specId())); return ManifestFiles.write(spec, io.newOutputFile(location)); } + + // If bundle fails following a successful commit and gets retried, it may attempt to re-commit + // the same data. + // To mitigate, we check the files in this bundle and remove anything that was already + // committed in the last successful snapshot. + // + // TODO(ahmedabu98): This does not cover concurrent writes from other pipelines, where the + // "last successful snapshot" might reflect commits from other sources. Ideally, we would make + // this stateful, but that is update incompatible. + // TODO(ahmedabu98): add load test pipelines with intentional periodic crashing + private Iterable<FileWriteResult> removeAlreadyCommittedFiles( + Table table, Iterable<FileWriteResult> fileWriteResults) { + if (table.currentSnapshot() == null) { + return fileWriteResults; + } + + List<String> committedFiles = + Streams.stream(table.currentSnapshot().addedDataFiles(table.io())) Review Comment: How many data files could you have per snapshot? Is there any chance this list gets large enough that it causes problems/is there any way to do filtering at the source instead of later in this process? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org