mxm commented on code in PR #13714:
URL: https://github.com/apache/iceberg/pull/13714#discussion_r2353581979


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java:
##########
@@ -76,11 +85,21 @@ public void open() throws Exception {
 
   @Override
   public void finish() throws IOException {
-    prepareSnapshotPreBarrier(Long.MAX_VALUE);
+    prepareSnapshotPreBarrier(lastCheckpointId + 1);
   }
 
   @Override
   public void prepareSnapshotPreBarrier(long checkpointId) throws IOException {
+    if (checkpointId == lastCheckpointId) {

Review Comment:
   >agree that the change in the finish method fixes the checkpoint id for 
streaming execution. But it is arguable that checkpoint id (1) is incorrect for 
the batch execution.
   
   I agree that it might confuse users if they do not see `Long.MAX_VALUE` 
anymore for batch pipelines. Apart from that, a checkpoint id of 1 for batch 
can make sense because there is technically only one checkpoint in batch 
pipelines when they finish.
   
   As you observed, this specifically is a limitation of the V2 commit 
framework.
   
   The reason why this no longer works is here: 
https://github.com/apache/flink/blob/5ad464d8156b5094e0aba7712e4aa22b9f44e84d/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java#L154
 
   
   Flink's CommitterOperator keeps track of the last completed checkpoint id 
and filters any committables which have a higher checkpoint id: 
https://github.com/apache/flink/blob/5ad464d8156b5094e0aba7712e4aa22b9f44e84d/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java#L167
   
   I think what we can do, is to write `Long.MAX_VALUE` to the manifest for 
batch pipelines and thereby keep the current behavior. The runtime can still 
use a different checkpoint id for batch. 
   
   edit: I pushed the above suggestion. See 
https://github.com/apache/iceberg/compare/d9605b89f853038499d10e699621644d7cda528b..ae9e95277ea980e24a52db46d0c6e05ffa4eb0d9



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to