rkhachatryan commented on pull request #13040:
URL: https://github.com/apache/flink/pull/13040#issuecomment-686639977


   Thanks for updating your PR @echauchot 
   There is no cyclic dependency now and `CompletedCheckpointStore`s are not 
affected!
   
   I'm not sure I understood you about having discard logic in 
`Pending/CompletedCheckpoint` classes. I'd prefer not to put execution (or any) 
logic there as their concern IMO is to just represent checkpoints.
   
   I think something like this should be possible:
   ```
   public class CheckpointsCleaner {
       public void clean(Runnable cleanAction, Runnable postCleanAction) {
           counter.incrementAndGet();
           executor.execute(() -> {
               try { cleanAction.run(); }
               finally {
                   counter.decrementAndGet();
                   postCleanAction.run();
               }
           });
       }
   }
   ```
   Then in `CheckpointCoordinator`:
   ```
   pendingCheckpoint.finalizeCheckpoint(checkpointsCleaner::clean)
   // CompletedCheckpoint saves this callback in a field
   ```
   
   And in `CompletedCheckpoint`:
   ```
   void asyncDiscardCheckpointAndCountCheckpoint(Consumer<CompletedCheckpoint> 
discardAction) {
       discardCallbackRunner.accept(
           () -> discardAction.accept(this),
           checkpointCleaningFinishedCallback);
   }
   ```
   
   WDYT?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to