zjureel commented on a change in pull request #18303:
URL: https://github.com/apache/flink/pull/18303#discussion_r812608394



##########
File path: 
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
##########
@@ -401,38 +451,90 @@ public void validateRunsInMainThread() {
 
     /** Executor which executes runnables in the main thread context. */
     protected static class MainThreadExecutor implements 
ComponentMainThreadExecutor {
+        private static final Logger log = 
LoggerFactory.getLogger(MainThreadExecutor.class);
 
         private final MainThreadExecutable gateway;
         private final Runnable mainThreadCheck;
-
-        MainThreadExecutor(MainThreadExecutable gateway, Runnable 
mainThreadCheck) {
+        /**
+         * The main scheduled executor manages the scheduled tasks and send 
them to gateway when
+         * they should be executed.
+         */
+        private final ScheduledThreadPoolExecutor mainScheduledExecutor;
+
+        MainThreadExecutor(
+                MainThreadExecutable gateway, Runnable mainThreadCheck, String 
endpointId) {
             this.gateway = Preconditions.checkNotNull(gateway);
             this.mainThreadCheck = Preconditions.checkNotNull(mainThreadCheck);
-        }
-
-        private void scheduleRunAsync(Runnable runnable, long delayMillis) {
-            gateway.scheduleRunAsync(runnable, delayMillis);
+            this.mainScheduledExecutor =
+                    new ScheduledThreadPoolExecutor(
+                            1, new ExecutorThreadFactory(endpointId + 
"-main-scheduler"));
+            this.mainScheduledExecutor.setRemoveOnCancelPolicy(true);
         }
 
         @Override
         public void execute(@Nonnull Runnable command) {
             gateway.runAsync(command);
         }
 
+        /**
+         * The mainScheduledExecutor manages the task and sends it to the 
gateway after the given
+         * delay.
+         *
+         * @param command the task to execute in the future
+         * @param delay the time from now to delay the execution
+         * @param unit the time unit of the delay parameter
+         * @return a ScheduledFuture representing the completion of the 
scheduled task
+         */
         @Override
         public ScheduledFuture<?> schedule(Runnable command, long delay, 
TimeUnit unit) {
-            final long delayMillis = TimeUnit.MILLISECONDS.convert(delay, 
unit);
-            FutureTask<Void> ft = new FutureTask<>(command, null);
-            scheduleRunAsync(ft, delayMillis);
-            return new ScheduledFutureAdapter<>(ft, delayMillis, 
TimeUnit.MILLISECONDS);
+            // If the scheduled executor service is shutdown, the command 
won't be executed.
+            if (mainScheduledExecutor.isShutdown()) {
+                log.warn(
+                        "The scheduled executor service is shutdown and return 
throwing scheduled future for command {}",
+                        command);
+                return ThrowingScheduledFuture.getInstance();
+            } else {
+                final long delayMillis = TimeUnit.MILLISECONDS.convert(delay, 
unit);
+                FutureTask<Void> ft = new FutureTask<>(command, null);
+                ScheduledFuture<?> scheduledFuture =
+                        mainScheduledExecutor.schedule(
+                                () -> gateway.scheduleRunAsync(ft, 0L),
+                                delayMillis,
+                                TimeUnit.MILLISECONDS);
+                return new ScheduledFutureAdapter<>(
+                        scheduledFuture, ft, delayMillis, 
TimeUnit.MILLISECONDS);

Review comment:
       Cancel the `scheduledFuture` will remove the `command` from the queue of 
thread pool. As mentioned above, if the `command` won't be executed even later 
`gateway.scheduleRunAsync(ft, 0L)` is called after `ft` is canceled, it may no 
need to cancel the `scheduledFuture`. The queue of thread pool will be clear in 
`MainThreadExecutor.close` too, I will check and fix it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to