zjureel commented on a change in pull request #18303:
URL: https://github.com/apache/flink/pull/18303#discussion_r812608394
##########
File path:
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
##########
@@ -401,38 +451,90 @@ public void validateRunsInMainThread() {
/** Executor which executes runnables in the main thread context. */
protected static class MainThreadExecutor implements
ComponentMainThreadExecutor {
+ private static final Logger log =
LoggerFactory.getLogger(MainThreadExecutor.class);
private final MainThreadExecutable gateway;
private final Runnable mainThreadCheck;
-
- MainThreadExecutor(MainThreadExecutable gateway, Runnable
mainThreadCheck) {
+ /**
+ * The main scheduled executor manages the scheduled tasks and send
them to gateway when
+ * they should be executed.
+ */
+ private final ScheduledThreadPoolExecutor mainScheduledExecutor;
+
+ MainThreadExecutor(
+ MainThreadExecutable gateway, Runnable mainThreadCheck, String
endpointId) {
this.gateway = Preconditions.checkNotNull(gateway);
this.mainThreadCheck = Preconditions.checkNotNull(mainThreadCheck);
- }
-
- private void scheduleRunAsync(Runnable runnable, long delayMillis) {
- gateway.scheduleRunAsync(runnable, delayMillis);
+ this.mainScheduledExecutor =
+ new ScheduledThreadPoolExecutor(
+ 1, new ExecutorThreadFactory(endpointId +
"-main-scheduler"));
+ this.mainScheduledExecutor.setRemoveOnCancelPolicy(true);
}
@Override
public void execute(@Nonnull Runnable command) {
gateway.runAsync(command);
}
+ /**
+ * The mainScheduledExecutor manages the task and sends it to the
gateway after the given
+ * delay.
+ *
+ * @param command the task to execute in the future
+ * @param delay the time from now to delay the execution
+ * @param unit the time unit of the delay parameter
+ * @return a ScheduledFuture representing the completion of the
scheduled task
+ */
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay,
TimeUnit unit) {
- final long delayMillis = TimeUnit.MILLISECONDS.convert(delay,
unit);
- FutureTask<Void> ft = new FutureTask<>(command, null);
- scheduleRunAsync(ft, delayMillis);
- return new ScheduledFutureAdapter<>(ft, delayMillis,
TimeUnit.MILLISECONDS);
+ // If the scheduled executor service is shutdown, the command
won't be executed.
+ if (mainScheduledExecutor.isShutdown()) {
+ log.warn(
+ "The scheduled executor service is shutdown and return
throwing scheduled future for command {}",
+ command);
+ return ThrowingScheduledFuture.getInstance();
+ } else {
+ final long delayMillis = TimeUnit.MILLISECONDS.convert(delay,
unit);
+ FutureTask<Void> ft = new FutureTask<>(command, null);
+ ScheduledFuture<?> scheduledFuture =
+ mainScheduledExecutor.schedule(
+ () -> gateway.scheduleRunAsync(ft, 0L),
+ delayMillis,
+ TimeUnit.MILLISECONDS);
+ return new ScheduledFutureAdapter<>(
+ scheduledFuture, ft, delayMillis,
TimeUnit.MILLISECONDS);
Review comment:
Cancel the `scheduledFuture` will remove the `command` from the queue of
thread pool. As mentioned above, if the `command` won't be executed even later
`gateway.scheduleRunAsync(ft, 0L)` is called after `ft` is canceled, it may no
need to cancel the `scheduledFuture`. The queue of thread pool will be clear in
`MainThreadExecutor.close` too, I will check and fix it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]