dawidwys commented on a change in pull request #14857:
URL: https://github.com/apache/flink/pull/14857#discussion_r665316276



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##########
@@ -527,20 +552,22 @@ private void finishAndReportAsync(
             CheckpointMetaData metadata,
             CheckpointMetricsBuilder metrics,
             Supplier<Boolean> isRunning) {
-        // we are transferring ownership over snapshotInProgressList for 
cleanup to the thread,
-        // active on submit
-        asyncOperationsThreadPool.execute(
+        AsyncCheckpointRunnable asyncCheckpointRunnable =
                 new AsyncCheckpointRunnable(
                         snapshotFutures,
                         metadata,
                         metrics,
                         System.nanoTime(),
                         taskName,
-                        registerConsumer(),
                         unregisterConsumer(),
                         env,
                         asyncExceptionHandler,
-                        isRunning));
+                        isRunning);
+        registerConsumer().accept(asyncCheckpointRunnable);

Review comment:
       BTW, sorry I missed one comment in the previous review (not sure how). 
Let's inline the regsiterConsumer() and remove the method. It is unnecessary to 
go through the Consumer here.
   
   I believe we can replace it with:
   ```
   registerAsyncCheckpointRunnable(
                           asyncCheckpointRunnable.getCheckpointId(), 
asyncCheckpointRunnable);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to