liming30 commented on code in PR #21503:
URL: https://github.com/apache/flink/pull/21503#discussion_r1102335317
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##########
@@ -177,6 +182,14 @@ class SubtaskCheckpointCoordinatorImpl implements
SubtaskCheckpointCoordinator {
this.checkpoints = new HashMap<>();
this.lock = new Object();
this.asyncOperationsThreadPool =
checkNotNull(asyncOperationsThreadPool);
+ this.asyncDisposeThreadPool =
+ new ThreadPoolExecutor(
+ 0,
+ 4,
+ 60L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ new ExecutorThreadFactory("AsyncDispose"));
Review Comment:
In @gaoyunhaii's comment in
[FLINK-30251](https://issues.apache.org/jira/browse/FLINK-30251):
> There is also one concern:
>
> 1. The current `asyncOperationsThreadPool` is a cached thread pool, which
do not have an upper limit of the number of threads, and it will create a new
thread whenever there is not free thread when submitting tasks. Then if we have
a large number of file to close, we might end up with a lot of threads, which
might further cause a large number of memory consumption (1MB for each thread
RSS region).
>
> 2. Thus we might change it to a thread pool with a limited maximum number
of thread and one unbounded Blocking Queue. Also since the thread in this pool
might be blocked, we might need to use a separate thread pool.
The main concern is that too many threads will be generated when
asynchronous cleanup may block. From your point of view, which is better to
reuse `asyncOperationsThreadPool` or create a new pool?
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##########
@@ -529,15 +542,19 @@ public void close() throws IOException {
}
public void cancel() throws IOException {
- List<AsyncCheckpointRunnable> asyncCheckpointRunnables = null;
synchronized (lock) {
if (!closed) {
closed = true;
- asyncCheckpointRunnables = new
ArrayList<>(checkpoints.values());
+ final List<AsyncCheckpointRunnable> asyncCheckpointRunnables =
+ new ArrayList<>(checkpoints.values());
checkpoints.clear();
+ if (!asyncCheckpointRunnables.isEmpty()) {
+ asyncDisposeThreadPool.execute(
+ () ->
IOUtils.closeAllQuietly(asyncCheckpointRunnables));
+ }
+ asyncDisposeThreadPool.shutdown();
Review Comment:
Yes, you are right, I originally just hoped that all async operations should
not block the process, but from the perspective of preventing resource leaks,
cancel should not be asynchronous. I'll change this part of the logic back.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]