mxm commented on code in PR #13714: URL: https://github.com/apache/iceberg/pull/13714#discussion_r2353581979
########## flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java: ########## @@ -76,11 +85,21 @@ public void open() throws Exception { @Override public void finish() throws IOException { - prepareSnapshotPreBarrier(Long.MAX_VALUE); + prepareSnapshotPreBarrier(lastCheckpointId + 1); } @Override public void prepareSnapshotPreBarrier(long checkpointId) throws IOException { + if (checkpointId == lastCheckpointId) { Review Comment: >agree that the change in the finish method fixes the checkpoint id for streaming execution. But it is arguable that checkpoint id (1) is incorrect for the batch execution. I agree that it might confuse users if they do not see `Long.MAX_VALUE` anymore for batch pipelines. Apart from that, a checkpoint id of 1 for batch can make sense because there is technically only one checkpoint in batch pipelines when they finish. As you observed, this specifically is a limitation of the V2 commit framework. The reason why this no longer works is here: https://github.com/apache/flink/blob/5ad464d8156b5094e0aba7712e4aa22b9f44e84d/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java#L154 Flink's CommitterOperator keeps track of the last completed checkpoint id and filters any committables which have a higher checkpoint id: https://github.com/apache/flink/blob/5ad464d8156b5094e0aba7712e4aa22b9f44e84d/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java#L167 I think what we can do, is to write `Long.MAX_VALUE` to the manifest for batch pipelines and thereby keep the current behavior. The runtime can still use a different checkpoint id for batch. edit: I pushed the above suggestion. See https://github.com/apache/iceberg/compare/d9605b89f853038499d10e699621644d7cda528b..ae9e95277ea980e24a52db46d0c6e05ffa4eb0d9 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org