stevenzwu commented on code in PR #13714: URL: https://github.com/apache/iceberg/pull/13714#discussion_r2353201142
########## flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java: ########## @@ -76,11 +85,21 @@ public void open() throws Exception { @Override public void finish() throws IOException { - prepareSnapshotPreBarrier(Long.MAX_VALUE); + prepareSnapshotPreBarrier(lastCheckpointId + 1); } @Override public void prepareSnapshotPreBarrier(long checkpointId) throws IOException { + if (checkpointId == lastCheckpointId) { Review Comment: agree that the change in the `finish` method fixes the checkpoint id for streaming execution. But it is arguable that checkpoint id (`1`) is incorrect for the batch execution. I tried the `TestFlinkTableSinkExtended#testHashDistributeMode` test as you suggested. ``` @Override public void finish() throws IOException { if (getRuntimeContext().getJobType() == JobType.BATCH) { prepareSnapshotPreBarrier(Long.MAX_VALUE); } else { prepareSnapshotPreBarrier(lastCheckpointId + 1); } } ``` It works correctly with V1 sink, as the `IcebergFileCommitter#endInput()` method was called in the batch execution mode that would commit the flushed data with checkpoint id `Long.MAX_VALUE`. But with V2 sink, the `IcebergCommitter` doesn't have similar callback for end of input. Is this a limitation of V2 sink that can be improved/fixed? I would imagine `Committer#commit(Collection<CommitRequest<CommT>>)` should be called for end of input in the batch execution mode for the v2 sink framework regardless of the checkpoint id passed down? It seems more correct semantically to pass down checkpoint id as `Long.MAX_VALUE` than `1` for batch execution mode. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org