fredia commented on code in PR #19907:
URL: https://github.com/apache/flink/pull/19907#discussion_r895423197


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java:
##########
@@ -307,14 +340,23 @@ public void abortCheckpoint(long abortedCheckpointId) {
                 jobVertexID,
                 subtaskIndex);
 
+        // delete the referenced checkpoint meantime
         pruneCheckpoints(
-                snapshotCheckpointId -> snapshotCheckpointId == 
abortedCheckpointId, false);
+                snapshotCheckpointId ->
+                        (snapshotCheckpointId == abortedCheckpointId
+                                || 
referredByCheckpointID.getOrDefault(snapshotCheckpointId, -1L)
+                                        == abortedCheckpointId),

Review Comment:
   > 1. discard does nothing for jm part, because of the empty
   discardState() in ChangelogStateBackendHandleImpl
   
   Partially agree:
   - For local state: discard does nothing for jm part, because of the empty
   `discardState()` in `ChangelogStateBackendHandleImpl` and **no local handle 
would be registered in** `registerSharedStates(SharedStateRegistry 
stateRegistry, long checkpointID)`
   - For remote state: `SharedStateRegistry` on JM would discard 
changelogStateHandle.
   
   > 2. the local part can is discarded, although it could be re-used; but it 
won't hurt correctness (on recovery, non-local state will be used)
   
   Yes, local recovery is nice to have but not a must. 
`BackendRestorerProcedure#createAndRestore()` would try to restore from remote. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to