fredia commented on code in PR #21822:
URL: https://github.com/apache/flink/pull/21822#discussion_r1140928508


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryImpl.java:
##########
@@ -87,8 +88,8 @@ public void discardUpToCheckpoint(long upTo) {
     public void prune(long checkpointID) {
         Set<StreamStateHandle> handles =
                 handleToLastUsedCheckpointID.values().stream()
-                        .filter(tuple -> tuple.f1 == checkpointID)
-                        .map(tuple -> tuple.f0)
+                        .filter(entry -> entry.createdCheckpointId == 
checkpointID)

Review Comment:
   Agreed, the pre-emptive upload files would not be re-upload, so those 
aborted files may be referenced by subsequent checkpoints.  But in case of many 
subsequent aborted checkpoints, local state accumulates, all aborted local 
state will not be deleted until the next completed materialization. 
   
   Considering that the priority of correctness is higher than that of space 
amplification, I first deleted this method.
   
   
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistry.java:
##########
@@ -51,9 +58,9 @@ public void prune(long checkpointID) {}
      * are unregistered when the checkpoint completes, because only one 
checkpoint is kept for local
      * recovery.
      *
-     * @param upTo lowest CheckpointID which is still valid.
+     * @param latestSubsumedId latest subsumed checkpointId.
      */
-    void discardUpToCheckpoint(long upTo);
+    void discardUpToCheckpoint(long latestSubsumedId);

Review Comment:
   Previously, `discardUpToCheckpoint` was called in both `confirm()` and 
`subsume()`, and in fact this is a duplicate, now it is only called in 
`subsume()`, and the logic in `discardUpToCheckpoint` also changed (`<` -> 
`<=`).
   If `state.checkpoints.num-retained >1`, this will retain more local files 
that are not needed, because local recovery only needs one previous chk. 
   
   I fixed this by remove `subsume` and only call `discardUpToCheckpoint` when 
`confirm()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to