fredia commented on code in PR #20217:
URL: https://github.com/apache/flink/pull/20217#discussion_r939841395


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java:
##########
@@ -97,6 +101,47 @@ private void updateReference(long checkpointId, 
TaskStateSnapshot localState) {
         }
     }
 
+    public static Path 
getLocalTaskOwnedDirectory(LocalRecoveryDirectoryProvider provider) {
+        File outDir = provider.selectAllocationBaseDirectory(0);
+        if (!outDir.exists() && !outDir.mkdirs()) {
+            LOG.error(
+                    "Local state base directory does not exist and could not 
be created: "
+                            + outDir);
+        }
+        return new Path(outDir.toURI().toString(), 
CHECKPOINT_TASK_OWNED_STATE_DIR);
+    }
+
+    @Override
+    public void abortCheckpoint(long abortedCheckpointId) {
+
+        LOG.debug(
+                "Received abort information for checkpoint {} in subtask ({} - 
{} - {}). Starting to prune history.",
+                abortedCheckpointId,
+                jobID,
+                jobVertexID,
+                subtaskIndex);
+
+        pruneCheckpoints(
+                snapshotCheckpointId -> snapshotCheckpointId == 
abortedCheckpointId, false);
+
+        // Local store only keeps one checkpoint, discard all changelog handle 
in taskowned
+        // directory.
+        // Scenarios:
+        //   cp1: m1
+        //   confirm cp1, do nothing
+        //   cp2: m1, c1
+        //   abort cp2, delete m1, c1
+        //   cp3: m1, c1, c2
+        //   confirm cp3, do nothing
+        //   -> if failover, restore from local cp3 will fail, because m1 does 
not exist, c1 may not
+        // exist either(depend on BatchingStateChangeUploadScheduler).
+        File[] fileInTaskOwned =
+                new 
File(getLocalTaskOwnedDirectory(getLocalRecoveryDirectoryProvider()).toUri())
+                        .listFiles();
+        syncDiscardFileForCollection(
+                fileInTaskOwned == null ? Collections.emptyList() : 
Arrays.asList(fileInTaskOwned));

Review Comment:
   You're right, I moved the deletion to `LocalStateRegistry#prune` to handle 
"the last checkpoint(s) aborted and then job terminated".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to