elkhand edited a comment on issue #2033:
URL: https://github.com/apache/iceberg/issues/2033#issuecomment-765864038


   The root cause of the issue is related to this part of the code in 
`IcebergFilesCommitter` class:
   
   ```
    @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
       super.notifyCheckpointComplete(checkpointId);
       // It's possible that we have the following events:
       //   1. snapshotState(ckpId);
       //   2. snapshotState(ckpId+1);
       //   3. notifyCheckpointComplete(ckpId+1);
       //   4. notifyCheckpointComplete(ckpId);
       // For step#4, we don't need to commit iceberg table again because in 
step#3 we've committed all the files,
       // Besides, we need to maintain the max-committed-checkpoint-id to be 
increasing.
       if (checkpointId > maxCommittedCheckpointId) {
         commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
         this.maxCommittedCheckpointId = checkpointId;
       }
     }
   ```
   
   When **savepoint** is created, in `metadata.json` file 
`flink.max-committed-checkpoint-id` is set to the very big number 
`9223372036854775807`:
   ```
   ...
   {
       "snapshot-id" : 1466502797494274716,
       "parent-snapshot-id" : 7039191958488319023,
       "timestamp-ms" : 1611375333522,
       "summary" : {
            ...
            "flink.max-committed-checkpoint-id" : "9223372036854775807",
            ...
   ...
   ```
   
   This condition `if (checkpointId > maxCommittedCheckpointId)` always 
evaluates to fase, thus the transaction is not committed., thus Iceberg 
specific files are not created. 
   
   Flink job when started from the saved savepoint, gets checkpoint ID 
correctly (I created savepoint savepoint after checkpoint 4):
   ```
   # Triggering savepoint
   
   2021-01-22 20:15:32,092 INFO  org.apache.flink.runtime.jobmaster.JobMaster   
              [] - Triggering stop-with-savepoint for job 
5a5b5ce39afbdfaf3afe58b3fbecae5e.
   2021-01-22 20:15:32,096 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering 
checkpoint 4 (type=SYNC_SAVEPOINT) @ 1611375332094 for job 
5a5b5ce39afbdfaf3afe58b3fbecae5e.
   2021-01-22 20:15:33,064 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed 
checkpoint 4 for job 5a5b5ce39afbdfaf3afe58b3fbecae5e (8754 bytes in 969 ms).
   ...
   # After starting job from savepoint
   
   2021-01-22 20:33:35,600 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Starting job 
6ed8c20ab75edf9a741b266f145de214 from savepoint 
/tmp/flink-savepoints/savepoint-5a5b5c-1a65b8fad3ea ()
   2021-01-22 20:33:35,605 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Reset the 
checkpoint ID of job 6ed8c20ab75edf9a741b266f145de214 to 5.
   2021-01-22 20:33:35,605 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Restoring job 
6ed8c20ab75edf9a741b266f145de214 from latest valid checkpoint: Checkpoint 4 @ 0 
for 6ed8c20ab75edf9a741b266f145de214.
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to