fredia commented on code in PR #24181:
URL: https://github.com/apache/flink/pull/24181#discussion_r1475523078
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManager.java:
##########
@@ -50,6 +50,43 @@ public WithinCheckpointFileMergingSnapshotManager(String id,
Executor ioExecutor
writablePhysicalFilePool = new HashMap<>();
}
+ // ------------------------------------------------------------------------
+ // CheckpointListener
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void notifyCheckpointComplete(SubtaskKey subtaskKey, long
checkpointId)
Review Comment:
Why `notifyCheckpointSubsumed` is not overriden?
IIUC, checkpoint subsume and complete are triggered at the same
time([code](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java#L1082-L1085)),
although the effect of the two methods is the same, it feels more
straightforward to close the physical file in `notifyCheckpointSubsumed`, WDYT?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java:
##########
@@ -345,6 +371,79 @@ protected abstract PhysicalFile
getOrCreatePhysicalFileForCheckpoint(
protected abstract void returnPhysicalFileForNextReuse(
SubtaskKey subtaskKey, long checkpointId, PhysicalFile
physicalFile) throws IOException;
+ /**
+ * The callback which will be triggered when all subtasks discarded
(aborted or subsumed).
+ *
+ * @param checkpointId the discarded checkpoint id.
+ * @throws IOException if anything goes wrong with file system.
+ */
+ protected abstract void discardCheckpoint(long checkpointId) throws
IOException;
+
+ // ------------------------------------------------------------------------
+ // Checkpoint Listener
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void notifyCheckpointComplete(SubtaskKey subtaskKey, long
checkpointId)
+ throws Exception {
+ // does nothing
+ }
+
+ @Override
+ public void notifyCheckpointAborted(SubtaskKey subtaskKey, long
checkpointId) throws Exception {
+ synchronized (lock) {
+ Set<LogicalFile> logicalFilesForCurrentCp =
uploadedStates.get(checkpointId);
+ if (logicalFilesForCurrentCp == null) {
+ return;
+ }
+ Iterator<LogicalFile> logicalFileIterator =
logicalFilesForCurrentCp.iterator();
+ while (logicalFileIterator.hasNext()) {
+ LogicalFile logicalFile = logicalFileIterator.next();
+ if (logicalFile.getSubtaskKey().equals(subtaskKey)
+ && logicalFile.getLastUsedCheckpointID() <=
checkpointId) {
+ logicalFile.discardWithCheckpointId(checkpointId);
+ logicalFileIterator.remove();
+ }
+ }
+
+ if (logicalFilesForCurrentCp.isEmpty()) {
+ uploadedStates.remove(checkpointId);
+ discardCheckpoint(checkpointId);
+ }
Review Comment:
nit: There is some overlap with the code of `notifyCheckpointSubsumed`,
maybe it can be extracted into a function.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]