openinx commented on a change in pull request #2042:
URL: https://github.com/apache/iceberg/pull/2042#discussion_r606102023



##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -154,10 +159,13 @@ public void snapshotState(StateSnapshotContext context) 
throws Exception {
     super.snapshotState(context);
     long checkpointId = context.getCheckpointId();
     LOG.info("Start to flush snapshot state to state backend, table: {}, 
checkpointId: {}", table, checkpointId);
+    long current = System.currentTimeMillis();
 
-    // Update the checkpoint state.
-    dataFilesPerCheckpoint.put(checkpointId, writeToManifest(checkpointId));
-
+    if (checkNeedCommit(current)) {

Review comment:
       If the `maxCommitIdleTimeMs` set to be 6min,   and the checkpoints 
produces such `WriteResult`: 
   
   ```
   ckpt1:   {data-file1, data-file2},
   ckpt2:   {},
   ckpt3:   {data-file3, data-fille4},
   ckpt4:   {},
   ```
   
   If the checkpoint interval is set to be 3min,  then for every 6 min, we will 
still commit a dummy txn which has no data files in iceberg table ?   I mean 
currently we will periodily check whether the `WriteResult` is empty or not, if 
true then we will commit a dummy iceberg txn to refresh the latest 
max-committed-txn-id.  In fact when there're some txn that has committed few 
files into iceberg in a `maxCommitIdleTimeMs` interval, then we don't have to 
commit those empty txn to iceberg, right ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to