KarmaGYZ commented on a change in pull request #18007:
URL: https://github.com/apache/flink/pull/18007#discussion_r769199031



##########
File path: 
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
##########
@@ -56,10 +56,12 @@ protected FencedRpcEndpoint(
         this.fencingToken = fencingToken;
         this.unfencedMainThreadExecutor =
                 new UnfencedMainThreadExecutor((FencedMainThreadExecutable) 
rpcServer);
+
+        MainThreadExecutable mainThreadExecutable =
+                getRpcService().fenceRpcServer(rpcServer, fencingToken);

Review comment:
       nit: Maybe there is no need to separate out this line.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
##########
@@ -309,22 +322,57 @@ public void testScheduleCallableWithDelayInSeconds() 
throws Exception {
                                 () -> 1, expectedDelay.toMillis() / 1000, 
TimeUnit.SECONDS));
     }
 
+    @Test
+    public void testScheduleCallableAfterClose() throws Exception {
+        testScheduleAfterClose(
+                (mainThreadExecutor, expectedDelay) ->
+                        mainThreadExecutor.schedule(
+                                () -> 1, expectedDelay.toMillis() / 1000, 
TimeUnit.SECONDS));
+    }
+
     private static void testScheduleWithDelay(
             BiConsumer<RpcEndpoint.MainThreadExecutor, Duration> scheduler) 
throws Exception {
         final CompletableFuture<Long> actualDelayMsFuture = new 
CompletableFuture<>();
+        final String endpointId = "foobar";
 
+        final Duration expectedDelay = Duration.ofSeconds(1);
         final MainThreadExecutable mainThreadExecutable =
                 new TestMainThreadExecutable(
-                        (runnable, delay) -> 
actualDelayMsFuture.complete(delay));
+                        (runnable, delay) -> 
actualDelayMsFuture.complete(delay),
+                        expectedDelay.toMillis());
 
         final RpcEndpoint.MainThreadExecutor mainThreadExecutor =
-                new RpcEndpoint.MainThreadExecutor(mainThreadExecutable, () -> 
{});
-
-        final Duration expectedDelay = Duration.ofSeconds(1);
+                new RpcEndpoint.MainThreadExecutor(mainThreadExecutable, () -> 
{}, endpointId);
 
         scheduler.accept(mainThreadExecutor, expectedDelay);
 
         assertThat(actualDelayMsFuture.get(), is(expectedDelay.toMillis()));
+        mainThreadExecutor.close();
+    }
+
+    private static void testScheduleAfterClose(
+            BiFunction<RpcEndpoint.MainThreadExecutor, Duration, 
ScheduledFuture<?>> scheduler) {
+        final CompletableFuture<Long> actualDelayMsFuture = new 
CompletableFuture<>();
+        final String endpointId = "foobar";
+
+        final Duration expectedDelay = Duration.ofSeconds(1);
+        final MainThreadExecutable mainThreadExecutable =
+                new TestMainThreadExecutable(
+                        (runnable, delay) -> 
actualDelayMsFuture.complete(delay),
+                        expectedDelay.toMillis());

Review comment:
       ```suggestion
           final CompletableFuture<Void> actualDelayMsFuture = new 
CompletableFuture<>();
           final String endpointId = "foobar";
   
           final Duration expectedDelay = Duration.ofSeconds(1);
           final MainThreadExecutable mainThreadExecutable =
                   new TestMainThreadExecutable(
                           runnable -> actualDelayMsFuture.complete(null));
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
##########
@@ -386,14 +434,17 @@ public void testCallAsyncTimeout()
     private static class TestMainThreadExecutable implements 
MainThreadExecutable {
 
         private final BiConsumer<Runnable, Long> scheduleRunAsyncConsumer;
+        private final long delay;
 
-        private TestMainThreadExecutable(BiConsumer<Runnable, Long> 
scheduleRunAsyncConsumer) {
+        private TestMainThreadExecutable(
+                BiConsumer<Runnable, Long> scheduleRunAsyncConsumer, long 
delay) {

Review comment:
       I don't think we need `BiCounsumer` here. In previous, it is used to 
verify the param `delay` of `scheduleRunAsync`.

##########
File path: 
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
##########
@@ -399,40 +405,59 @@ public void validateRunsInMainThread() {
     //  Utilities
     // ------------------------------------------------------------------------
 
-    /** Executor which executes runnables in the main thread context. */
+    /**
+     * Executor which executes runnables in the main thread context. The 
executor will holds all the
+     * periodic tasks, sends them to akka thread pool via gateway when they 
should be executed.
+     */
     protected static class MainThreadExecutor implements 
ComponentMainThreadExecutor {
 
         private final MainThreadExecutable gateway;
         private final Runnable mainThreadCheck;
+        private final ScheduledExecutorService scheduledExecutorService;
 
-        MainThreadExecutor(MainThreadExecutable gateway, Runnable 
mainThreadCheck) {
+        MainThreadExecutor(
+                MainThreadExecutable gateway, Runnable mainThreadCheck, String 
endpointId) {
             this.gateway = Preconditions.checkNotNull(gateway);
             this.mainThreadCheck = Preconditions.checkNotNull(mainThreadCheck);
-        }
-
-        private void scheduleRunAsync(Runnable runnable, long delayMillis) {
-            gateway.scheduleRunAsync(runnable, delayMillis);
+            this.scheduledExecutorService =
+                    Executors.newSingleThreadScheduledExecutor(
+                            new ExecutorThreadFactory(endpointId + 
"-scheduled-main-executor"));
         }
 
         @Override
         public void execute(@Nonnull Runnable command) {
-            gateway.runAsync(command);
+            if (!scheduledExecutorService.isShutdown()) {
+                gateway.runAsync(command);
+            }
         }
 
+        /**
+         * The result future will be used to cancel the runnable, we return a 
throwing scheduled
+         * future when the scheduled executor service is shutdown.
+         *
+         * @param command the task to execute in the future
+         * @param delay the time from now to delay the execution
+         * @param unit the time unit of the delay parameter
+         * @return the result future
+         */
         @Override
         public ScheduledFuture<?> schedule(Runnable command, long delay, 
TimeUnit unit) {
-            final long delayMillis = TimeUnit.MILLISECONDS.convert(delay, 
unit);
-            FutureTask<Void> ft = new FutureTask<>(command, null);
-            scheduleRunAsync(ft, delayMillis);
-            return new ScheduledFutureAdapter<>(ft, delayMillis, 
TimeUnit.MILLISECONDS);
+            if (scheduledExecutorService.isShutdown()) {
+                return ThrowingScheduledFuture.create();
+            } else {
+                return scheduledExecutorService.schedule(() -> 
execute(command), delay, unit);
+            }
         }
 
         @Override
         public <V> ScheduledFuture<V> schedule(Callable<V> callable, long 
delay, TimeUnit unit) {
-            final long delayMillis = TimeUnit.MILLISECONDS.convert(delay, 
unit);
-            FutureTask<V> ft = new FutureTask<>(callable);
-            scheduleRunAsync(ft, delayMillis);
-            return new ScheduledFutureAdapter<>(ft, delayMillis, 
TimeUnit.MILLISECONDS);
+            if (scheduledExecutorService.isShutdown()) {

Review comment:
       nit: Also add javadoc here.

##########
File path: 
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
##########
@@ -453,5 +478,12 @@ public void execute(@Nonnull Runnable command) {
         public void assertRunningInMainThread() {
             mainThreadCheck.run();
         }
+
+        @Override
+        public void close() {

Review comment:
       nit: add javadoc here.

##########
File path: 
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
##########
@@ -399,40 +405,59 @@ public void validateRunsInMainThread() {
     //  Utilities
     // ------------------------------------------------------------------------
 
-    /** Executor which executes runnables in the main thread context. */
+    /**
+     * Executor which executes runnables in the main thread context. The 
executor will holds all the
+     * periodic tasks, sends them to akka thread pool via gateway when they 
should be executed.
+     */
     protected static class MainThreadExecutor implements 
ComponentMainThreadExecutor {
 
         private final MainThreadExecutable gateway;
         private final Runnable mainThreadCheck;
+        private final ScheduledExecutorService scheduledExecutorService;
 
-        MainThreadExecutor(MainThreadExecutable gateway, Runnable 
mainThreadCheck) {
+        MainThreadExecutor(
+                MainThreadExecutable gateway, Runnable mainThreadCheck, String 
endpointId) {
             this.gateway = Preconditions.checkNotNull(gateway);
             this.mainThreadCheck = Preconditions.checkNotNull(mainThreadCheck);
-        }
-
-        private void scheduleRunAsync(Runnable runnable, long delayMillis) {
-            gateway.scheduleRunAsync(runnable, delayMillis);
+            this.scheduledExecutorService =
+                    Executors.newSingleThreadScheduledExecutor(
+                            new ExecutorThreadFactory(endpointId + 
"-scheduled-main-executor"));
         }
 
         @Override
         public void execute(@Nonnull Runnable command) {
-            gateway.runAsync(command);
+            if (!scheduledExecutorService.isShutdown()) {
+                gateway.runAsync(command);

Review comment:
       I think we may add a log for it since it changes the behavior.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to