pnowojski commented on code in PR #21503:
URL: https://github.com/apache/flink/pull/21503#discussion_r1100231851


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##########
@@ -177,6 +182,14 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
         this.checkpoints = new HashMap<>();
         this.lock = new Object();
         this.asyncOperationsThreadPool = 
checkNotNull(asyncOperationsThreadPool);
+        this.asyncDisposeThreadPool =
+                new ThreadPoolExecutor(
+                        0,
+                        4,
+                        60L,
+                        TimeUnit.SECONDS,
+                        new LinkedBlockingQueue<>(),
+                        new ExecutorThreadFactory("AsyncDispose"));

Review Comment:
   I don't think we should be creating yet another thread pool, but just re-use 
the existing one. For example `asyncOperationsThreadPool` seems appropriate 



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##########
@@ -529,15 +542,19 @@ public void close() throws IOException {
     }
 
     public void cancel() throws IOException {
-        List<AsyncCheckpointRunnable> asyncCheckpointRunnables = null;
         synchronized (lock) {
             if (!closed) {
                 closed = true;
-                asyncCheckpointRunnables = new 
ArrayList<>(checkpoints.values());
+                final List<AsyncCheckpointRunnable> asyncCheckpointRunnables =
+                        new ArrayList<>(checkpoints.values());
                 checkpoints.clear();
+                if (!asyncCheckpointRunnables.isEmpty()) {
+                    asyncDisposeThreadPool.execute(
+                            () -> 
IOUtils.closeAllQuietly(asyncCheckpointRunnables));
+                }
+                asyncDisposeThreadPool.shutdown();

Review Comment:
   What's the point of this construct? If clean up is taking long time, I don't 
think we should be hiding this fact when shutting down subtask, otherwise we 
run a risk of a resource leak. If subtask is not able to shutdown within a 
certain timeout, TM shut be killed and restarted (this is responsibility of the 
`TaskCancelerWatchDog`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to