stevenzwu commented on code in PR #13714:
URL: https://github.com/apache/iceberg/pull/13714#discussion_r2353866953


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java:
##########
@@ -76,11 +85,21 @@ public void open() throws Exception {
 
   @Override
   public void finish() throws IOException {
-    prepareSnapshotPreBarrier(Long.MAX_VALUE);
+    prepareSnapshotPreBarrier(lastCheckpointId + 1);
   }
 
   @Override
   public void prepareSnapshotPreBarrier(long checkpointId) throws IOException {
+    if (checkpointId == lastCheckpointId) {

Review Comment:
   > I think what we can do, is to write Long.MAX_VALUE to the manifest for 
batch pipelines and thereby keep the current behavior. The runtime can still 
use a different checkpoint id for batch.
   
   It is les about the temp/staging manifest file that the aggregator wrote. It 
is more about the checkpointId put in the snapshot summary, which will still be 
`1` for batch execution.
   
   
https://github.com/apache/iceberg/blob/main/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java#L292



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to