elkhand edited a comment on issue #2033:
URL: https://github.com/apache/iceberg/issues/2033#issuecomment-765864038
The root cause of the issue is related to this part of the code in
`IcebergFilesCommitter` class:
```
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
// It's possible that we have the following events:
// 1. snapshotState(ckpId);
// 2. snapshotState(ckpId+1);
// 3. notifyCheckpointComplete(ckpId+1);
// 4. notifyCheckpointComplete(ckpId);
// For step#4, we don't need to commit iceberg table again because in
step#3 we've committed all the files,
// Besides, we need to maintain the max-committed-checkpoint-id to be
increasing.
if (checkpointId > maxCommittedCheckpointId) {
commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
this.maxCommittedCheckpointId = checkpointId;
}
}
```
When **savepoint** is created, in `metadata.json` file
`flink.max-committed-checkpoint-id` is set to `Long.MAX_VALUE`
`9223372036854775807`:
```
...
{
"snapshot-id" : 1466502797494274716,
"parent-snapshot-id" : 7039191958488319023,
"timestamp-ms" : 1611375333522,
"summary" : {
...
"flink.max-committed-checkpoint-id" : "9223372036854775807",
...
...
```
This condition `if (checkpointId > maxCommittedCheckpointId)` always
evaluates to fase, thus the transaction is not committed., thus Iceberg
specific files are not created.
Flink job when started from the saved savepoint, gets checkpoint ID
correctly (I created savepoint savepoint after checkpoint 4):
```
# Triggering savepoint
2021-01-22 20:15:32,092 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Triggering stop-with-savepoint for job
5a5b5ce39afbdfaf3afe58b3fbecae5e.
2021-01-22 20:15:32,096 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
checkpoint 4 (type=SYNC_SAVEPOINT) @ 1611375332094 for job
5a5b5ce39afbdfaf3afe58b3fbecae5e.
2021-01-22 20:15:33,064 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed
checkpoint 4 for job 5a5b5ce39afbdfaf3afe58b3fbecae5e (8754 bytes in 969 ms).
...
# After starting job from savepoint
2021-01-22 20:33:35,600 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Starting job
6ed8c20ab75edf9a741b266f145de214 from savepoint
/tmp/flink-savepoints/savepoint-5a5b5c-1a65b8fad3ea ()
2021-01-22 20:33:35,605 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Reset the
checkpoint ID of job 6ed8c20ab75edf9a741b266f145de214 to 5.
2021-01-22 20:33:35,605 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job
6ed8c20ab75edf9a741b266f145de214 from latest valid checkpoint: Checkpoint 4 @ 0
for 6ed8c20ab75edf9a741b266f145de214.
```
@openinx can you please share the reason for `Flush the buffered data files
into 'dataFilesPerCheckpoint' firstly.` case, why `currentCheckpointId` is set
to `Long.MAX_VALUE`?
in `IcebergFilesCommitter` class :
```
@Override
public void endInput() throws IOException {
// Flush the buffered data files into 'dataFilesPerCheckpoint' firstly.
long currentCheckpointId = Long.MAX_VALUE;
dataFilesPerCheckpoint.put(currentCheckpointId,
writeToManifest(currentCheckpointId));
dataFilesOfCurrentCheckpoint.clear();
commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId,
currentCheckpointId);
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]