fredia commented on code in PR #24181:
URL: https://github.com/apache/flink/pull/24181#discussion_r1475523078


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManager.java:
##########
@@ -50,6 +50,43 @@ public WithinCheckpointFileMergingSnapshotManager(String id, 
Executor ioExecutor
         writablePhysicalFilePool = new HashMap<>();
     }
 
+    // ------------------------------------------------------------------------
+    //  CheckpointListener
+    // ------------------------------------------------------------------------
+
+    @Override
+    public void notifyCheckpointComplete(SubtaskKey subtaskKey, long 
checkpointId)

Review Comment:
   Why `notifyCheckpointSubsumed` is not overriden? 
   
   IIUC, checkpoint subsume and complete are triggered at the same 
time([code](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java#L1082-L1085)),
  although the effect of the two methods is the same, it feels more 
straightforward to close the physical file in `notifyCheckpointSubsumed`, WDYT?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java:
##########
@@ -345,6 +371,79 @@ protected abstract PhysicalFile 
getOrCreatePhysicalFileForCheckpoint(
     protected abstract void returnPhysicalFileForNextReuse(
             SubtaskKey subtaskKey, long checkpointId, PhysicalFile 
physicalFile) throws IOException;
 
+    /**
+     * The callback which will be triggered when all subtasks discarded 
(aborted or subsumed).
+     *
+     * @param checkpointId the discarded checkpoint id.
+     * @throws IOException if anything goes wrong with file system.
+     */
+    protected abstract void discardCheckpoint(long checkpointId) throws 
IOException;
+
+    // ------------------------------------------------------------------------
+    //  Checkpoint Listener
+    // ------------------------------------------------------------------------
+
+    @Override
+    public void notifyCheckpointComplete(SubtaskKey subtaskKey, long 
checkpointId)
+            throws Exception {
+        // does nothing
+    }
+
+    @Override
+    public void notifyCheckpointAborted(SubtaskKey subtaskKey, long 
checkpointId) throws Exception {
+        synchronized (lock) {
+            Set<LogicalFile> logicalFilesForCurrentCp = 
uploadedStates.get(checkpointId);
+            if (logicalFilesForCurrentCp == null) {
+                return;
+            }
+            Iterator<LogicalFile> logicalFileIterator = 
logicalFilesForCurrentCp.iterator();
+            while (logicalFileIterator.hasNext()) {
+                LogicalFile logicalFile = logicalFileIterator.next();
+                if (logicalFile.getSubtaskKey().equals(subtaskKey)
+                        && logicalFile.getLastUsedCheckpointID() <= 
checkpointId) {
+                    logicalFile.discardWithCheckpointId(checkpointId);
+                    logicalFileIterator.remove();
+                }
+            }
+
+            if (logicalFilesForCurrentCp.isEmpty()) {
+                uploadedStates.remove(checkpointId);
+                discardCheckpoint(checkpointId);
+            }

Review Comment:
   nit: There is some overlap with the code of `notifyCheckpointSubsumed`, 
maybe it can be extracted into a function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to