stevenzwu commented on code in PR #13714:
URL: https://github.com/apache/iceberg/pull/13714#discussion_r2353201142


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java:
##########
@@ -76,11 +85,21 @@ public void open() throws Exception {
 
   @Override
   public void finish() throws IOException {
-    prepareSnapshotPreBarrier(Long.MAX_VALUE);
+    prepareSnapshotPreBarrier(lastCheckpointId + 1);
   }
 
   @Override
   public void prepareSnapshotPreBarrier(long checkpointId) throws IOException {
+    if (checkpointId == lastCheckpointId) {

Review Comment:
   agree that the change in the `finish` method fixes the checkpoint id for 
streaming execution. But it is arguable that checkpoint id (`1`) is incorrect 
for the batch execution.
   
   I tried the `TestFlinkTableSinkExtended#testHashDistributeMode` test as you 
suggested. 
   ```
     @Override
     public void finish() throws IOException {
       if (getRuntimeContext().getJobType() == JobType.BATCH) {
         prepareSnapshotPreBarrier(Long.MAX_VALUE);
       } else  {
         prepareSnapshotPreBarrier(lastCheckpointId + 1);
       }
     }
   ```
   
   It works correctly with V1 sink,  as the `IcebergFileCommitter#endInput()` 
method was called in the batch execution mode that would commit the flushed 
data with checkpoint id `Long.MAX_VALUE`. 
   
   But with V2 sink, the `IcebergCommitter` doesn't have similar callback for 
end of input. Is this a limitation of V2 sink that can be improved/fixed? I 
would imagine `Committer#commit(Collection<CommitRequest<CommT>>)` should be 
called for end of input in the batch execution mode for the v2 sink framework 
regardless of the checkpoint id passed down? It seems more correct semantically 
to pass down checkpoint id as `Long.MAX_VALUE` than `1` for batch execution 
mode.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to