echauchot commented on a change in pull request #13040:
URL: https://github.com/apache/flink/pull/13040#discussion_r469284951



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
##########
@@ -519,24 +522,8 @@ private void dispose(boolean releaseState) {
                        try {
                                numAcknowledgedTasks = -1;
                                if (!discarded && releaseState) {
-                                       executor.execute(new Runnable() {
-                                               @Override
-                                               public void run() {
-
-                                                       // discard the private 
states.
-                                                       // unregistered shared 
states are still considered private at this point.
-                                                       try {
-                                                               
StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values());
-                                                               
targetLocation.disposeOnFailure();
-                                                       } catch (Throwable t) {
-                                                               LOG.warn("Could 
not properly dispose the private states in the pending checkpoint {} of job 
{}.",
-                                                                       
checkpointId, jobId, t);
-                                                       } finally {
-                                                               
operatorStates.clear();
-                                                       }
-                                               }
-                                       });
-
+                                       
checkpointsCleaner.asyncDiscardPrivateStatesAndCountCheckpoints(operatorStates,

Review comment:
       `CheckpointsCleaner` is just a deledate that submits tasks to the 
executor as before. Do you mean that the synchronisation is lost because 
`CheckpointsCleaner` object is created on the `buildGraph()` thread ? Sorry I 
don't know the threading model of Flink very well yet.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to