PeterZh6 commented on code in PR #11130:
URL: https://github.com/apache/inlong/pull/11130#discussion_r1771308552
##########
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java:
##########
@@ -496,6 +524,12 @@ public void notifyCheckpointComplete(long checkpointId) {
schema.flushAudit();
schema.updateLastCheckpointId(checkpointId);
}
+ // get the start time of the currently completed checkpoint
+ Long snapShotStartTimeById =
checkpointStartTimeMap.remove(checkpointId);
Review Comment:
Thank you for the insightful question! Upon reviewing the code, to the best
of my knowledge, it seems that the put method is actually executed first in the
`snapshotState` method, which records the checkpoint start time as soon as a
checkpoint is initiated. By the time `notifyCheckpointComplete` is called, the
`checkpointStartTimeMap` should already contain the start time for that
checkpoint.
The `remove` method is then used in `notifyCheckpointComplete` to clean up
the entry after the checkpoint has been successfully completed while also
getting the start time of the corresponding checkpoint. This approach ensures
that we track the start time for each checkpoint and prevent stale entries from
accumulating in the map.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]