xintongsong commented on a change in pull request #18303:
URL: https://github.com/apache/flink/pull/18303#discussion_r812525319



##########
File path: 
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutor.java
##########
@@ -30,11 +31,15 @@
  * Interface for an executor that runs tasks in the main thread of an {@link
  * org.apache.flink.runtime.rpc.RpcEndpoint}.
  */
-public interface ComponentMainThreadExecutor extends ScheduledExecutor {
+public interface ComponentMainThreadExecutor extends ScheduledExecutor, 
Closeable {

Review comment:
       Instead of making `ComponentMainThreadExecutor` extend `Closeable`, we 
can make `RpcEndpoint#MainThreadExecutor` implements both 
`ComponentMainThreadExecutor` and `Closeable`.
   - We won't need all the no-op implementations of the `close()`.
   - Notice that `RpcEndpoint#mainThreadExecutor` and 
`FencedRpcEndpoint#fencedMainThreadExecutor` are exposed via 
`RpcEndpoint/FencedRpcEndpoint#getMainThreadExecutor()`. By making the 
executors `Closeable`, we are also exposing the ability of closing the 
executors publicly, which is risky. A side benefit of only making 
`MainThreadExecutor` implement `Closeable` is that, we can avoid exposing the 
ability of closing the executors by changing the return type of 
`getMainThreadExecutor()` to `ComponentMainThreadExecutor`.

##########
File path: 
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
##########
@@ -401,38 +451,90 @@ public void validateRunsInMainThread() {
 
     /** Executor which executes runnables in the main thread context. */
     protected static class MainThreadExecutor implements 
ComponentMainThreadExecutor {
+        private static final Logger log = 
LoggerFactory.getLogger(MainThreadExecutor.class);
 
         private final MainThreadExecutable gateway;
         private final Runnable mainThreadCheck;
-
-        MainThreadExecutor(MainThreadExecutable gateway, Runnable 
mainThreadCheck) {
+        /**
+         * The main scheduled executor manages the scheduled tasks and send 
them to gateway when
+         * they should be executed.
+         */
+        private final ScheduledThreadPoolExecutor mainScheduledExecutor;
+
+        MainThreadExecutor(
+                MainThreadExecutable gateway, Runnable mainThreadCheck, String 
endpointId) {
             this.gateway = Preconditions.checkNotNull(gateway);
             this.mainThreadCheck = Preconditions.checkNotNull(mainThreadCheck);
-        }
-
-        private void scheduleRunAsync(Runnable runnable, long delayMillis) {
-            gateway.scheduleRunAsync(runnable, delayMillis);
+            this.mainScheduledExecutor =
+                    new ScheduledThreadPoolExecutor(
+                            1, new ExecutorThreadFactory(endpointId + 
"-main-scheduler"));
+            this.mainScheduledExecutor.setRemoveOnCancelPolicy(true);
         }
 
         @Override
         public void execute(@Nonnull Runnable command) {
             gateway.runAsync(command);
         }
 
+        /**
+         * The mainScheduledExecutor manages the task and sends it to the 
gateway after the given
+         * delay.
+         *
+         * @param command the task to execute in the future
+         * @param delay the time from now to delay the execution
+         * @param unit the time unit of the delay parameter
+         * @return a ScheduledFuture representing the completion of the 
scheduled task
+         */
         @Override
         public ScheduledFuture<?> schedule(Runnable command, long delay, 
TimeUnit unit) {
-            final long delayMillis = TimeUnit.MILLISECONDS.convert(delay, 
unit);
-            FutureTask<Void> ft = new FutureTask<>(command, null);
-            scheduleRunAsync(ft, delayMillis);
-            return new ScheduledFutureAdapter<>(ft, delayMillis, 
TimeUnit.MILLISECONDS);
+            // If the scheduled executor service is shutdown, the command 
won't be executed.
+            if (mainScheduledExecutor.isShutdown()) {
+                log.warn(
+                        "The scheduled executor service is shutdown and return 
throwing scheduled future for command {}",
+                        command);
+                return ThrowingScheduledFuture.getInstance();
+            } else {
+                final long delayMillis = TimeUnit.MILLISECONDS.convert(delay, 
unit);
+                FutureTask<Void> ft = new FutureTask<>(command, null);
+                ScheduledFuture<?> scheduledFuture =
+                        mainScheduledExecutor.schedule(
+                                () -> gateway.scheduleRunAsync(ft, 0L),
+                                delayMillis,
+                                TimeUnit.MILLISECONDS);
+                return new ScheduledFutureAdapter<>(
+                        scheduledFuture, ft, delayMillis, 
TimeUnit.MILLISECONDS);

Review comment:
       IIUC, it won't be necessary to cancel the `scheduledFuture` in order to 
cancel the scheduled execution. As long as `ft` is canceled, `command` won't be 
executed even later `gateway.scheduleRunAsync(ft, 0L)` is called.
   
   Admittedly, it would be nicer to also cancel the `scheduledFuture`. But 
given the complexity that trying to cancel the `scheduledFuture` creates, I'd 
suggest to leave it.
   
   Is there any damage of not canceling `scheduledFuture` that I overlooked?

##########
File path: 
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
##########
@@ -211,11 +227,40 @@ protected final void stop() {
      */
     public final CompletableFuture<Void> internalCallOnStop() {
         validateRunsInMainThread();
+        try {
+            resourceRegistry.close();
+        } catch (IOException e) {
+            throw new RuntimeException("Close resource registry fail", e);
+        }

Review comment:
       Instead of throwing a `RuntimeException`, I think the error should be 
reflected by complete the returning future exceptionally.

##########
File path: 
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
##########
@@ -303,6 +348,11 @@ protected MainThreadExecutor getMainThreadExecutor() {
         return mainThreadExecutor;
     }
 
+    @VisibleForTesting
+    CloseableRegistry getResourceRegistry() {
+        return resourceRegistry;
+    }

Review comment:
       I think the tests should not require exposing the closeable registry. 
Instead, they should validate that the executors are properly closed.

##########
File path: 
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/concurrent/ScheduledFutureAdapter.java
##########
@@ -44,6 +45,12 @@
     /** The uid sequence generator. */
     private static final AtomicLong SEQUENCE_GEN = new AtomicLong();
 
+    /**
+     * The encapsulated scheduled future to which task scheduled by {@link
+     * ScheduledThreadPoolExecutor}.
+     */
+    private final ScheduledFuture<?> scheduledFuture;

Review comment:
       Not sure about this change.
   
   `ScheduledFutureAdapter` is, as defined by its JavaDoc, a`Future` enriched 
with scheduling information. There's no assumption how it is scheduled 
externally, i.e. with a `ScheduledThreadPoolExecutor` or not.

##########
File path: 
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
##########
@@ -401,38 +451,90 @@ public void validateRunsInMainThread() {
 
     /** Executor which executes runnables in the main thread context. */
     protected static class MainThreadExecutor implements 
ComponentMainThreadExecutor {
+        private static final Logger log = 
LoggerFactory.getLogger(MainThreadExecutor.class);
 
         private final MainThreadExecutable gateway;
         private final Runnable mainThreadCheck;
-
-        MainThreadExecutor(MainThreadExecutable gateway, Runnable 
mainThreadCheck) {
+        /**
+         * The main scheduled executor manages the scheduled tasks and send 
them to gateway when
+         * they should be executed.
+         */
+        private final ScheduledThreadPoolExecutor mainScheduledExecutor;
+
+        MainThreadExecutor(
+                MainThreadExecutable gateway, Runnable mainThreadCheck, String 
endpointId) {
             this.gateway = Preconditions.checkNotNull(gateway);
             this.mainThreadCheck = Preconditions.checkNotNull(mainThreadCheck);
-        }
-
-        private void scheduleRunAsync(Runnable runnable, long delayMillis) {
-            gateway.scheduleRunAsync(runnable, delayMillis);
+            this.mainScheduledExecutor =
+                    new ScheduledThreadPoolExecutor(
+                            1, new ExecutorThreadFactory(endpointId + 
"-main-scheduler"));
+            this.mainScheduledExecutor.setRemoveOnCancelPolicy(true);
         }
 
         @Override
         public void execute(@Nonnull Runnable command) {
             gateway.runAsync(command);
         }
 
+        /**
+         * The mainScheduledExecutor manages the task and sends it to the 
gateway after the given
+         * delay.
+         *
+         * @param command the task to execute in the future
+         * @param delay the time from now to delay the execution
+         * @param unit the time unit of the delay parameter
+         * @return a ScheduledFuture representing the completion of the 
scheduled task
+         */
         @Override
         public ScheduledFuture<?> schedule(Runnable command, long delay, 
TimeUnit unit) {
-            final long delayMillis = TimeUnit.MILLISECONDS.convert(delay, 
unit);
-            FutureTask<Void> ft = new FutureTask<>(command, null);
-            scheduleRunAsync(ft, delayMillis);
-            return new ScheduledFutureAdapter<>(ft, delayMillis, 
TimeUnit.MILLISECONDS);
+            // If the scheduled executor service is shutdown, the command 
won't be executed.
+            if (mainScheduledExecutor.isShutdown()) {
+                log.warn(
+                        "The scheduled executor service is shutdown and return 
throwing scheduled future for command {}",
+                        command);
+                return ThrowingScheduledFuture.getInstance();
+            } else {
+                final long delayMillis = TimeUnit.MILLISECONDS.convert(delay, 
unit);
+                FutureTask<Void> ft = new FutureTask<>(command, null);
+                ScheduledFuture<?> scheduledFuture =
+                        mainScheduledExecutor.schedule(
+                                () -> gateway.scheduleRunAsync(ft, 0L),
+                                delayMillis,
+                                TimeUnit.MILLISECONDS);
+                return new ScheduledFutureAdapter<>(
+                        scheduledFuture, ft, delayMillis, 
TimeUnit.MILLISECONDS);
+            }
         }
 
+        /**
+         * The mainScheduledExecutor manages the given callable and sends it 
to the gateway after
+         * the given delay. The result of the callable is returned as a {@link 
ScheduledFuture}.
+         *
+         * @param callable the callable to execute
+         * @param delay the time from now to delay the execution
+         * @param unit the time unit of the delay parameter
+         * @param <V> result type of the callable
+         * @return a ScheduledFuture which holds the future value of the given 
callable
+         */
         @Override
         public <V> ScheduledFuture<V> schedule(Callable<V> callable, long 
delay, TimeUnit unit) {
-            final long delayMillis = TimeUnit.MILLISECONDS.convert(delay, 
unit);
-            FutureTask<V> ft = new FutureTask<>(callable);
-            scheduleRunAsync(ft, delayMillis);
-            return new ScheduledFutureAdapter<>(ft, delayMillis, 
TimeUnit.MILLISECONDS);
+            // If the scheduled executor service is shutdown, the callable 
won't be executed.
+            if (mainScheduledExecutor.isShutdown()) {
+                log.warn(
+                        "The scheduled executor service is shutdown and return 
throwing scheduled future for callable {}",
+                        callable);
+                return ThrowingScheduledFuture.getInstance();

Review comment:
       What was the previous behavior? If `schedule` is called on a closed 
endpoint, would it be ignored or a `RuntimeException` is thrown?
   
   I'm asking because, my gut feeling it is not expected that `schedule` is 
called after `mainScheduledExecutor` being closed. If that happens, we might 
want to throw an `RuntimeException`. We can either assert that 
`mainScheduledExecutor` is not shutdown, or don't check it and relies on 
`mainScheduledExecutor.schedule` to throw `RejectedExecutionException`.
   
   The problem of `ThrowingScheduledFuture` is that, it can get ignored if the 
caller does not check the returned future.

##########
File path: 
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
##########
@@ -56,10 +56,13 @@ protected FencedRpcEndpoint(
         this.fencingToken = fencingToken;
         this.unfencedMainThreadExecutor =
                 new UnfencedMainThreadExecutor((FencedMainThreadExecutable) 
rpcServer);
+
+        MainThreadExecutable mainThreadExecutable =
+                getRpcService().fenceRpcServer(rpcServer, fencingToken);
         this.fencedMainThreadExecutor =
                 new MainThreadExecutor(
-                        getRpcService().fenceRpcServer(rpcServer, 
fencingToken),
-                        this::validateRunsInMainThread);
+                        mainThreadExecutable, this::validateRunsInMainThread, 
endpointId);
+        registerResource(this.fencedMainThreadExecutor);

Review comment:
       We may add a method `setFencedMainThreadExecutor` to make sure the 
following happens whenever a new fenced executor  is set.
   - new executor is registered
   - previous executor, if any, is closed and unregistered

##########
File path: 
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
##########
@@ -401,38 +451,90 @@ public void validateRunsInMainThread() {
 
     /** Executor which executes runnables in the main thread context. */
     protected static class MainThreadExecutor implements 
ComponentMainThreadExecutor {
+        private static final Logger log = 
LoggerFactory.getLogger(MainThreadExecutor.class);
 
         private final MainThreadExecutable gateway;
         private final Runnable mainThreadCheck;
-
-        MainThreadExecutor(MainThreadExecutable gateway, Runnable 
mainThreadCheck) {
+        /**
+         * The main scheduled executor manages the scheduled tasks and send 
them to gateway when
+         * they should be executed.
+         */
+        private final ScheduledThreadPoolExecutor mainScheduledExecutor;
+
+        MainThreadExecutor(
+                MainThreadExecutable gateway, Runnable mainThreadCheck, String 
endpointId) {
             this.gateway = Preconditions.checkNotNull(gateway);
             this.mainThreadCheck = Preconditions.checkNotNull(mainThreadCheck);
-        }
-
-        private void scheduleRunAsync(Runnable runnable, long delayMillis) {
-            gateway.scheduleRunAsync(runnable, delayMillis);
+            this.mainScheduledExecutor =
+                    new ScheduledThreadPoolExecutor(
+                            1, new ExecutorThreadFactory(endpointId + 
"-main-scheduler"));
+            this.mainScheduledExecutor.setRemoveOnCancelPolicy(true);
         }
 
         @Override
         public void execute(@Nonnull Runnable command) {
             gateway.runAsync(command);
         }
 
+        /**
+         * The mainScheduledExecutor manages the task and sends it to the 
gateway after the given
+         * delay.
+         *
+         * @param command the task to execute in the future
+         * @param delay the time from now to delay the execution
+         * @param unit the time unit of the delay parameter
+         * @return a ScheduledFuture representing the completion of the 
scheduled task
+         */
         @Override
         public ScheduledFuture<?> schedule(Runnable command, long delay, 
TimeUnit unit) {
-            final long delayMillis = TimeUnit.MILLISECONDS.convert(delay, 
unit);
-            FutureTask<Void> ft = new FutureTask<>(command, null);
-            scheduleRunAsync(ft, delayMillis);
-            return new ScheduledFutureAdapter<>(ft, delayMillis, 
TimeUnit.MILLISECONDS);
+            // If the scheduled executor service is shutdown, the command 
won't be executed.
+            if (mainScheduledExecutor.isShutdown()) {
+                log.warn(
+                        "The scheduled executor service is shutdown and return 
throwing scheduled future for command {}",
+                        command);
+                return ThrowingScheduledFuture.getInstance();
+            } else {
+                final long delayMillis = TimeUnit.MILLISECONDS.convert(delay, 
unit);
+                FutureTask<Void> ft = new FutureTask<>(command, null);
+                ScheduledFuture<?> scheduledFuture =
+                        mainScheduledExecutor.schedule(
+                                () -> gateway.scheduleRunAsync(ft, 0L),

Review comment:
       ```suggestion
                                   () -> gateway.runAsync(ft),
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to