echauchot edited a comment on pull request #13040:
URL: https://github.com/apache/flink/pull/13040#issuecomment-688845131


   > Thanks for updating your PR @echauchot
   > There is no cyclic dependency now and `CompletedCheckpointStore`s are not 
affected!
   > 
   > I'm not sure I understood you about having discard logic in 
`Pending/CompletedCheckpoint` classes. I'd prefer not to put execution (or any) 
logic there as their concern IMO is to just represent checkpoints.
   > 
   > I think something like this should be possible:
   > 
   > ```
   > public class CheckpointsCleaner {
   >     public void clean(Runnable cleanAction, Runnable postCleanAction) {
   >         counter.incrementAndGet();
   >         executor.execute(() -> {
   >             try { cleanAction.run(); }
   >             finally {
   >                 counter.decrementAndGet();
   >                 postCleanAction.run();
   >             }
   >         });
   >     }
   > }
   > ```
   > 
   > Then in `CheckpointCoordinator`:
   > 
   > ```
   > pendingCheckpoint.finalizeCheckpoint(checkpointsCleaner::clean)
   > // CompletedCheckpoint saves this callback in a field
   > ```
   > 
   > And in `CompletedCheckpoint`:
   > 
   > ```
   > void 
asyncDiscardCheckpointAndCountCheckpoint(Consumer<CompletedCheckpoint> 
discardAction) {
   >     discardCallbackRunner.accept(
   >         () -> discardAction.accept(this),
   >         checkpointCleaningFinishedCallback);
   > }
   > ```
   > 
   > WDYT?
   
   Yes I think this refactoring will work for _CompletedCheckpoint_ and is 
indeed better as it allows to keep the cleaning logic inside 
_CheckpointsCleaner_. 
   But for _PendingCheckpoint_, there was already, before this PR, some 
cleaning logic inside _PendingCheckpoint.dispose()_ for which we said that we 
need to count submitted tasks to the executor as well. Do you want that we move 
this logic to _CheckpointsCleaner_ to keep all counter update logic inside 
_CheckpointCleaner_ ? 
   If yes, it would mean creating a dep from PendingCheckpoint to 
CheckpointCleaner to allow cleaning PendingCheckpoint states because it would 
be not doable to pass all targetLocation, operatorstates etc.. to a new 
cleaning callback.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to