becketqin commented on a change in pull request #15161:
URL: https://github.com/apache/flink/pull/15161#discussion_r600612782
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
##########
@@ -74,7 +74,8 @@ public void start() throws Exception {
@Override
public void close() throws Exception {
closed = true;
- coordinator.closeAsync(closingTimeoutMs);
+ // Wait for coordinator close before destructing any user class loader.
+ coordinator.closeAsync(closingTimeoutMs).get();
Review comment:
I do not remember the exact original issue I saw there. It could be that
the `SplitEnumerator` is still doing something (e.g. partition discovery)
before it is closed.
I am not sure if we need to worry about this here. For two reasons:
1. In Flink the closure of all the components has always been synchronous,
so closing the `OperatorCoordinator` here synchronously seems fine. This call
may only block if the `SplitEnumerator` itself blocks on closure. Although this
might be something we cannot not avoid if there is user code, it is very likely
the synchronous closure is still non-blocking. For example,
`CheckpointCoordinator.shutdown()` may block on the closure of master hooks.
2. It is the JM's responsibility to ensure the components are closed
normally to avoid potential resource leak. Theoretically speaking, we can make
all the component closure asynchronous so the main thread of JM is totally
non-blocking, but chances are it still has to wait until all the components are
closed normally before it can move to the next step. So to some extent,
blocking on `closure` is a correctness requirement.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]