xintongsong commented on a change in pull request #18303:
URL: https://github.com/apache/flink/pull/18303#discussion_r812525319
##########
File path:
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutor.java
##########
@@ -30,11 +31,15 @@
* Interface for an executor that runs tasks in the main thread of an {@link
* org.apache.flink.runtime.rpc.RpcEndpoint}.
*/
-public interface ComponentMainThreadExecutor extends ScheduledExecutor {
+public interface ComponentMainThreadExecutor extends ScheduledExecutor,
Closeable {
Review comment:
Instead of making `ComponentMainThreadExecutor` extend `Closeable`, we
can make `RpcEndpoint#MainThreadExecutor` implements both
`ComponentMainThreadExecutor` and `Closeable`.
- We won't need all the no-op implementations of the `close()`.
- Notice that `RpcEndpoint#mainThreadExecutor` and
`FencedRpcEndpoint#fencedMainThreadExecutor` are exposed via
`RpcEndpoint/FencedRpcEndpoint#getMainThreadExecutor()`. By making the
executors `Closeable`, we are also exposing the ability of closing the
executors publicly, which is risky. A side benefit of only making
`MainThreadExecutor` implement `Closeable` is that, we can avoid exposing the
ability of closing the executors by changing the return type of
`getMainThreadExecutor()` to `ComponentMainThreadExecutor`.
##########
File path:
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
##########
@@ -401,38 +451,90 @@ public void validateRunsInMainThread() {
/** Executor which executes runnables in the main thread context. */
protected static class MainThreadExecutor implements
ComponentMainThreadExecutor {
+ private static final Logger log =
LoggerFactory.getLogger(MainThreadExecutor.class);
private final MainThreadExecutable gateway;
private final Runnable mainThreadCheck;
-
- MainThreadExecutor(MainThreadExecutable gateway, Runnable
mainThreadCheck) {
+ /**
+ * The main scheduled executor manages the scheduled tasks and send
them to gateway when
+ * they should be executed.
+ */
+ private final ScheduledThreadPoolExecutor mainScheduledExecutor;
+
+ MainThreadExecutor(
+ MainThreadExecutable gateway, Runnable mainThreadCheck, String
endpointId) {
this.gateway = Preconditions.checkNotNull(gateway);
this.mainThreadCheck = Preconditions.checkNotNull(mainThreadCheck);
- }
-
- private void scheduleRunAsync(Runnable runnable, long delayMillis) {
- gateway.scheduleRunAsync(runnable, delayMillis);
+ this.mainScheduledExecutor =
+ new ScheduledThreadPoolExecutor(
+ 1, new ExecutorThreadFactory(endpointId +
"-main-scheduler"));
+ this.mainScheduledExecutor.setRemoveOnCancelPolicy(true);
}
@Override
public void execute(@Nonnull Runnable command) {
gateway.runAsync(command);
}
+ /**
+ * The mainScheduledExecutor manages the task and sends it to the
gateway after the given
+ * delay.
+ *
+ * @param command the task to execute in the future
+ * @param delay the time from now to delay the execution
+ * @param unit the time unit of the delay parameter
+ * @return a ScheduledFuture representing the completion of the
scheduled task
+ */
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay,
TimeUnit unit) {
- final long delayMillis = TimeUnit.MILLISECONDS.convert(delay,
unit);
- FutureTask<Void> ft = new FutureTask<>(command, null);
- scheduleRunAsync(ft, delayMillis);
- return new ScheduledFutureAdapter<>(ft, delayMillis,
TimeUnit.MILLISECONDS);
+ // If the scheduled executor service is shutdown, the command
won't be executed.
+ if (mainScheduledExecutor.isShutdown()) {
+ log.warn(
+ "The scheduled executor service is shutdown and return
throwing scheduled future for command {}",
+ command);
+ return ThrowingScheduledFuture.getInstance();
+ } else {
+ final long delayMillis = TimeUnit.MILLISECONDS.convert(delay,
unit);
+ FutureTask<Void> ft = new FutureTask<>(command, null);
+ ScheduledFuture<?> scheduledFuture =
+ mainScheduledExecutor.schedule(
+ () -> gateway.scheduleRunAsync(ft, 0L),
+ delayMillis,
+ TimeUnit.MILLISECONDS);
+ return new ScheduledFutureAdapter<>(
+ scheduledFuture, ft, delayMillis,
TimeUnit.MILLISECONDS);
Review comment:
IIUC, it won't be necessary to cancel the `scheduledFuture` in order to
cancel the scheduled execution. As long as `ft` is canceled, `command` won't be
executed even later `gateway.scheduleRunAsync(ft, 0L)` is called.
Admittedly, it would be nicer to also cancel the `scheduledFuture`. But
given the complexity that trying to cancel the `scheduledFuture` creates, I'd
suggest to leave it.
Is there any damage of not canceling `scheduledFuture` that I overlooked?
##########
File path:
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
##########
@@ -211,11 +227,40 @@ protected final void stop() {
*/
public final CompletableFuture<Void> internalCallOnStop() {
validateRunsInMainThread();
+ try {
+ resourceRegistry.close();
+ } catch (IOException e) {
+ throw new RuntimeException("Close resource registry fail", e);
+ }
Review comment:
Instead of throwing a `RuntimeException`, I think the error should be
reflected by complete the returning future exceptionally.
##########
File path:
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
##########
@@ -303,6 +348,11 @@ protected MainThreadExecutor getMainThreadExecutor() {
return mainThreadExecutor;
}
+ @VisibleForTesting
+ CloseableRegistry getResourceRegistry() {
+ return resourceRegistry;
+ }
Review comment:
I think the tests should not require exposing the closeable registry.
Instead, they should validate that the executors are properly closed.
##########
File path:
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/concurrent/ScheduledFutureAdapter.java
##########
@@ -44,6 +45,12 @@
/** The uid sequence generator. */
private static final AtomicLong SEQUENCE_GEN = new AtomicLong();
+ /**
+ * The encapsulated scheduled future to which task scheduled by {@link
+ * ScheduledThreadPoolExecutor}.
+ */
+ private final ScheduledFuture<?> scheduledFuture;
Review comment:
Not sure about this change.
`ScheduledFutureAdapter` is, as defined by its JavaDoc, a`Future` enriched
with scheduling information. There's no assumption how it is scheduled
externally, i.e. with a `ScheduledThreadPoolExecutor` or not.
##########
File path:
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
##########
@@ -401,38 +451,90 @@ public void validateRunsInMainThread() {
/** Executor which executes runnables in the main thread context. */
protected static class MainThreadExecutor implements
ComponentMainThreadExecutor {
+ private static final Logger log =
LoggerFactory.getLogger(MainThreadExecutor.class);
private final MainThreadExecutable gateway;
private final Runnable mainThreadCheck;
-
- MainThreadExecutor(MainThreadExecutable gateway, Runnable
mainThreadCheck) {
+ /**
+ * The main scheduled executor manages the scheduled tasks and send
them to gateway when
+ * they should be executed.
+ */
+ private final ScheduledThreadPoolExecutor mainScheduledExecutor;
+
+ MainThreadExecutor(
+ MainThreadExecutable gateway, Runnable mainThreadCheck, String
endpointId) {
this.gateway = Preconditions.checkNotNull(gateway);
this.mainThreadCheck = Preconditions.checkNotNull(mainThreadCheck);
- }
-
- private void scheduleRunAsync(Runnable runnable, long delayMillis) {
- gateway.scheduleRunAsync(runnable, delayMillis);
+ this.mainScheduledExecutor =
+ new ScheduledThreadPoolExecutor(
+ 1, new ExecutorThreadFactory(endpointId +
"-main-scheduler"));
+ this.mainScheduledExecutor.setRemoveOnCancelPolicy(true);
}
@Override
public void execute(@Nonnull Runnable command) {
gateway.runAsync(command);
}
+ /**
+ * The mainScheduledExecutor manages the task and sends it to the
gateway after the given
+ * delay.
+ *
+ * @param command the task to execute in the future
+ * @param delay the time from now to delay the execution
+ * @param unit the time unit of the delay parameter
+ * @return a ScheduledFuture representing the completion of the
scheduled task
+ */
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay,
TimeUnit unit) {
- final long delayMillis = TimeUnit.MILLISECONDS.convert(delay,
unit);
- FutureTask<Void> ft = new FutureTask<>(command, null);
- scheduleRunAsync(ft, delayMillis);
- return new ScheduledFutureAdapter<>(ft, delayMillis,
TimeUnit.MILLISECONDS);
+ // If the scheduled executor service is shutdown, the command
won't be executed.
+ if (mainScheduledExecutor.isShutdown()) {
+ log.warn(
+ "The scheduled executor service is shutdown and return
throwing scheduled future for command {}",
+ command);
+ return ThrowingScheduledFuture.getInstance();
+ } else {
+ final long delayMillis = TimeUnit.MILLISECONDS.convert(delay,
unit);
+ FutureTask<Void> ft = new FutureTask<>(command, null);
+ ScheduledFuture<?> scheduledFuture =
+ mainScheduledExecutor.schedule(
+ () -> gateway.scheduleRunAsync(ft, 0L),
+ delayMillis,
+ TimeUnit.MILLISECONDS);
+ return new ScheduledFutureAdapter<>(
+ scheduledFuture, ft, delayMillis,
TimeUnit.MILLISECONDS);
+ }
}
+ /**
+ * The mainScheduledExecutor manages the given callable and sends it
to the gateway after
+ * the given delay. The result of the callable is returned as a {@link
ScheduledFuture}.
+ *
+ * @param callable the callable to execute
+ * @param delay the time from now to delay the execution
+ * @param unit the time unit of the delay parameter
+ * @param <V> result type of the callable
+ * @return a ScheduledFuture which holds the future value of the given
callable
+ */
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long
delay, TimeUnit unit) {
- final long delayMillis = TimeUnit.MILLISECONDS.convert(delay,
unit);
- FutureTask<V> ft = new FutureTask<>(callable);
- scheduleRunAsync(ft, delayMillis);
- return new ScheduledFutureAdapter<>(ft, delayMillis,
TimeUnit.MILLISECONDS);
+ // If the scheduled executor service is shutdown, the callable
won't be executed.
+ if (mainScheduledExecutor.isShutdown()) {
+ log.warn(
+ "The scheduled executor service is shutdown and return
throwing scheduled future for callable {}",
+ callable);
+ return ThrowingScheduledFuture.getInstance();
Review comment:
What was the previous behavior? If `schedule` is called on a closed
endpoint, would it be ignored or a `RuntimeException` is thrown?
I'm asking because, my gut feeling it is not expected that `schedule` is
called after `mainScheduledExecutor` being closed. If that happens, we might
want to throw an `RuntimeException`. We can either assert that
`mainScheduledExecutor` is not shutdown, or don't check it and relies on
`mainScheduledExecutor.schedule` to throw `RejectedExecutionException`.
The problem of `ThrowingScheduledFuture` is that, it can get ignored if the
caller does not check the returned future.
##########
File path:
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
##########
@@ -56,10 +56,13 @@ protected FencedRpcEndpoint(
this.fencingToken = fencingToken;
this.unfencedMainThreadExecutor =
new UnfencedMainThreadExecutor((FencedMainThreadExecutable)
rpcServer);
+
+ MainThreadExecutable mainThreadExecutable =
+ getRpcService().fenceRpcServer(rpcServer, fencingToken);
this.fencedMainThreadExecutor =
new MainThreadExecutor(
- getRpcService().fenceRpcServer(rpcServer,
fencingToken),
- this::validateRunsInMainThread);
+ mainThreadExecutable, this::validateRunsInMainThread,
endpointId);
+ registerResource(this.fencedMainThreadExecutor);
Review comment:
We may add a method `setFencedMainThreadExecutor` to make sure the
following happens whenever a new fenced executor is set.
- new executor is registered
- previous executor, if any, is closed and unregistered
##########
File path:
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
##########
@@ -401,38 +451,90 @@ public void validateRunsInMainThread() {
/** Executor which executes runnables in the main thread context. */
protected static class MainThreadExecutor implements
ComponentMainThreadExecutor {
+ private static final Logger log =
LoggerFactory.getLogger(MainThreadExecutor.class);
private final MainThreadExecutable gateway;
private final Runnable mainThreadCheck;
-
- MainThreadExecutor(MainThreadExecutable gateway, Runnable
mainThreadCheck) {
+ /**
+ * The main scheduled executor manages the scheduled tasks and send
them to gateway when
+ * they should be executed.
+ */
+ private final ScheduledThreadPoolExecutor mainScheduledExecutor;
+
+ MainThreadExecutor(
+ MainThreadExecutable gateway, Runnable mainThreadCheck, String
endpointId) {
this.gateway = Preconditions.checkNotNull(gateway);
this.mainThreadCheck = Preconditions.checkNotNull(mainThreadCheck);
- }
-
- private void scheduleRunAsync(Runnable runnable, long delayMillis) {
- gateway.scheduleRunAsync(runnable, delayMillis);
+ this.mainScheduledExecutor =
+ new ScheduledThreadPoolExecutor(
+ 1, new ExecutorThreadFactory(endpointId +
"-main-scheduler"));
+ this.mainScheduledExecutor.setRemoveOnCancelPolicy(true);
}
@Override
public void execute(@Nonnull Runnable command) {
gateway.runAsync(command);
}
+ /**
+ * The mainScheduledExecutor manages the task and sends it to the
gateway after the given
+ * delay.
+ *
+ * @param command the task to execute in the future
+ * @param delay the time from now to delay the execution
+ * @param unit the time unit of the delay parameter
+ * @return a ScheduledFuture representing the completion of the
scheduled task
+ */
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay,
TimeUnit unit) {
- final long delayMillis = TimeUnit.MILLISECONDS.convert(delay,
unit);
- FutureTask<Void> ft = new FutureTask<>(command, null);
- scheduleRunAsync(ft, delayMillis);
- return new ScheduledFutureAdapter<>(ft, delayMillis,
TimeUnit.MILLISECONDS);
+ // If the scheduled executor service is shutdown, the command
won't be executed.
+ if (mainScheduledExecutor.isShutdown()) {
+ log.warn(
+ "The scheduled executor service is shutdown and return
throwing scheduled future for command {}",
+ command);
+ return ThrowingScheduledFuture.getInstance();
+ } else {
+ final long delayMillis = TimeUnit.MILLISECONDS.convert(delay,
unit);
+ FutureTask<Void> ft = new FutureTask<>(command, null);
+ ScheduledFuture<?> scheduledFuture =
+ mainScheduledExecutor.schedule(
+ () -> gateway.scheduleRunAsync(ft, 0L),
Review comment:
```suggestion
() -> gateway.runAsync(ft),
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]