openinx commented on a change in pull request #2042:
URL: https://github.com/apache/iceberg/pull/2042#discussion_r606102569



##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -154,10 +159,13 @@ public void snapshotState(StateSnapshotContext context) 
throws Exception {
     super.snapshotState(context);
     long checkpointId = context.getCheckpointId();
     LOG.info("Start to flush snapshot state to state backend, table: {}, 
checkpointId: {}", table, checkpointId);
+    long current = System.currentTimeMillis();
 
-    // Update the checkpoint state.
-    dataFilesPerCheckpoint.put(checkpointId, writeToManifest(checkpointId));
-
+    if (checkNeedCommit(current)) {

Review comment:
       So I think we could check how many continuous empty checkpoint we've 
encountered,  if it has exceeded the given threshold we could just commit a 
dummy txn to iceberg:
   
   ```diff
   diff --git 
a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java 
b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
   index c1d3440d..00845ccd 100644
   --- 
a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
   +++ 
b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
   @@ -95,6 +95,7 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
      private transient Table table;
      private transient ManifestOutputFileFactory manifestOutputFileFactory;
      private transient long maxCommittedCheckpointId;
   +  private transient int continuousEmptyCheckpoints;
    
      // There're two cases that we restore from flink checkpoints: the first 
case is restoring from snapshot created by the
      // same flink job; another case is restoring from snapshot created by 
another different job. For the second case, we
   @@ -125,6 +126,7 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
        int attemptId = getRuntimeContext().getAttemptNumber();
        this.manifestOutputFileFactory = 
FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, subTaskId, 
attemptId);
        this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
   +    this.continuousEmptyCheckpoints = 0;
    
        this.checkpointsState = 
context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
        this.jobIdState = 
context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
   @@ -204,10 +206,18 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
          manifests.addAll(deltaManifests.manifests());
        }
    
   -    if (replacePartitions) {
   -      replacePartitions(pendingResults, newFlinkJobId, checkpointId);
   -    } else {
   -      commitDeltaTxn(pendingResults, newFlinkJobId, checkpointId);
   +    int totalFiles = pendingResults.values().stream()
   +        .mapToInt(r -> r.dataFiles().length + r.deleteFiles().length).sum();
   +    continuousEmptyCheckpoints = totalFiles == 0 ? 
continuousEmptyCheckpoints + 1 : 0;
   +
   +    if (totalFiles != 0 || continuousEmptyCheckpoints % 10 == 0) {
   +      if (replacePartitions) {
   +        replacePartitions(pendingResults, newFlinkJobId, checkpointId);
   +      } else {
   +        commitDeltaTxn(pendingResults, newFlinkJobId, checkpointId);
   +      }
   +
   +      continuousEmptyCheckpoints = 0;
        }
    
        pendingMap.clear();
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to