stevenzwu commented on code in PR #13714:
URL: https://github.com/apache/iceberg/pull/13714#discussion_r2353906851


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java:
##########
@@ -76,11 +85,21 @@ public void open() throws Exception {
 
   @Override
   public void finish() throws IOException {
-    prepareSnapshotPreBarrier(Long.MAX_VALUE);
+    prepareSnapshotPreBarrier(lastCheckpointId + 1);
   }
 
   @Override
   public void prepareSnapshotPreBarrier(long checkpointId) throws IOException {
+    if (checkpointId == lastCheckpointId) {

Review Comment:
   I am looking at the linked PR
   https://github.com/apache/flink/pull/26433/files
   
   It seems to me that it is incorrect for the `CommitterOperator` to validate 
the checkpoint id for the `endInput` method. 
   
   <img width="671" height="188" alt="image" 
src="https://github.com/user-attachments/assets/973a6732-7729-4dc5-a839-56ce1d93e15d";
 />
   
   As the code comment suggested, all committables should be committed here.
   ```
   // There will be no final checkpoint, all committables should be committed 
here
   ```
   
   It should have called 
   ```
   commitAndEmitCheckpoints(Long.MAX_VALUE)
   ```
   
   If the `IcebergWriteAggregator.lastCheckpointId` has an initial value larger 
than 0 (like 1) for batch execution, this PR would fail. Flink 
`CommitterOperator` would only allows checkpointId `1` to commit with its 
current logic after the Flink PR 26433.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to