stevenzwu commented on code in PR #13714: URL: https://github.com/apache/iceberg/pull/13714#discussion_r2353866953
########## flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java: ########## @@ -76,11 +85,21 @@ public void open() throws Exception { @Override public void finish() throws IOException { - prepareSnapshotPreBarrier(Long.MAX_VALUE); + prepareSnapshotPreBarrier(lastCheckpointId + 1); } @Override public void prepareSnapshotPreBarrier(long checkpointId) throws IOException { + if (checkpointId == lastCheckpointId) { Review Comment: > I think what we can do, is to write Long.MAX_VALUE to the manifest for batch pipelines and thereby keep the current behavior. The runtime can still use a different checkpoint id for batch. It is les about the temp/staging manifest file that the aggregator wrote. It is more about the checkpointId put in the snapshot summary, which will still be `1` for batch execution. https://github.com/apache/iceberg/blob/main/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java#L292 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org