stevenzwu commented on code in PR #13714: URL: https://github.com/apache/iceberg/pull/13714#discussion_r2353906851
########## flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java: ########## @@ -76,11 +85,21 @@ public void open() throws Exception { @Override public void finish() throws IOException { - prepareSnapshotPreBarrier(Long.MAX_VALUE); + prepareSnapshotPreBarrier(lastCheckpointId + 1); } @Override public void prepareSnapshotPreBarrier(long checkpointId) throws IOException { + if (checkpointId == lastCheckpointId) { Review Comment: I am looking at the linked PR https://github.com/apache/flink/pull/26433/files It seems to me that it is incorrect for the `CommitterOperator` to validate the checkpoint id for the `endInput` method. <img width="671" height="188" alt="image" src="https://github.com/user-attachments/assets/973a6732-7729-4dc5-a839-56ce1d93e15d" /> As the code comment suggested, all committables should be committed here. ``` // There will be no final checkpoint, all committables should be committed here ``` It should have called ``` commitAndEmitCheckpoints(Long.MAX_VALUE) ``` If the `IcebergWriteAggregator.lastCheckpointId` has an initial value larger than 0 (like 1) for batch execution, this PR would fail. Flink `CommitterOperator` would only allows checkpointId `1` to commit with its current logic after the Flink PR 26433. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org