fredia commented on code in PR #24181: URL: https://github.com/apache/flink/pull/24181#discussion_r1475523078
########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManager.java: ########## @@ -50,6 +50,43 @@ public WithinCheckpointFileMergingSnapshotManager(String id, Executor ioExecutor writablePhysicalFilePool = new HashMap<>(); } + // ------------------------------------------------------------------------ + // CheckpointListener + // ------------------------------------------------------------------------ + + @Override + public void notifyCheckpointComplete(SubtaskKey subtaskKey, long checkpointId) Review Comment: Why `notifyCheckpointSubsumed` is not overriden? IIUC, checkpoint subsume and complete are triggered at the same time([code](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java#L1082-L1085)), although the effect of the two methods is the same, it feels more straightforward to close the physical file in `notifyCheckpointSubsumed`, WDYT? ########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java: ########## @@ -345,6 +371,79 @@ protected abstract PhysicalFile getOrCreatePhysicalFileForCheckpoint( protected abstract void returnPhysicalFileForNextReuse( SubtaskKey subtaskKey, long checkpointId, PhysicalFile physicalFile) throws IOException; + /** + * The callback which will be triggered when all subtasks discarded (aborted or subsumed). + * + * @param checkpointId the discarded checkpoint id. + * @throws IOException if anything goes wrong with file system. + */ + protected abstract void discardCheckpoint(long checkpointId) throws IOException; + + // ------------------------------------------------------------------------ + // Checkpoint Listener + // ------------------------------------------------------------------------ + + @Override + public void notifyCheckpointComplete(SubtaskKey subtaskKey, long checkpointId) + throws Exception { + // does nothing + } + + @Override + public void notifyCheckpointAborted(SubtaskKey subtaskKey, long checkpointId) throws Exception { + synchronized (lock) { + Set<LogicalFile> logicalFilesForCurrentCp = uploadedStates.get(checkpointId); + if (logicalFilesForCurrentCp == null) { + return; + } + Iterator<LogicalFile> logicalFileIterator = logicalFilesForCurrentCp.iterator(); + while (logicalFileIterator.hasNext()) { + LogicalFile logicalFile = logicalFileIterator.next(); + if (logicalFile.getSubtaskKey().equals(subtaskKey) + && logicalFile.getLastUsedCheckpointID() <= checkpointId) { + logicalFile.discardWithCheckpointId(checkpointId); + logicalFileIterator.remove(); + } + } + + if (logicalFilesForCurrentCp.isEmpty()) { + uploadedStates.remove(checkpointId); + discardCheckpoint(checkpointId); + } Review Comment: nit: There is some overlap with the code of `notifyCheckpointSubsumed`, maybe it can be extracted into a function. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org