xintongsong commented on a change in pull request #18303:
URL: https://github.com/apache/flink/pull/18303#discussion_r824450130
##########
File path:
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
##########
@@ -80,9 +82,25 @@ protected void setFencingToken(@Nullable F newFencingToken) {
// which is bound to the new fencing token
MainThreadExecutable mainThreadExecutable =
getRpcService().fenceRpcServer(rpcServer, newFencingToken);
+ setFencedMainThreadExecutor(
+ new MainThreadExecutor(
+ mainThreadExecutable, this::validateRunsInMainThread,
getEndpointId()));
+ }
- this.fencedMainThreadExecutor =
- new MainThreadExecutor(mainThreadExecutable,
this::validateRunsInMainThread);
+ /**
+ * Set fenced main thread executor and register it to closeable register.
+ *
+ * @param fencedMainThreadExecutor the given fenced main thread executor
+ */
+ private void setFencedMainThreadExecutor(MainThreadExecutor
fencedMainThreadExecutor) {
+ if (this.fencedMainThreadExecutor != null) {
+ this.fencedMainThreadExecutor.close();
+ if (!unregisterResource(this.fencedMainThreadExecutor)) {
+ throw new RuntimeException("Unregister resource failed");
Review comment:
Throwing a `RuntimeException` is probably not necessary. It looks like
`unregisterResource` can return `false` only if 1) the resource passed in is
`null` or 2) the resource has already been unregistered.
##########
File path:
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
##########
@@ -453,5 +563,18 @@ public void execute(@Nonnull Runnable command) {
public void assertRunningInMainThread() {
mainThreadCheck.run();
}
+
+ /** Shutdown the {@link ScheduledThreadPoolExecutor} and remove all
the pending tasks. */
+ @Override
+ public void close() {
+ if (!mainScheduledExecutor.isShutdown()) {
+ mainScheduledExecutor.shutdownNow();
+ }
+ }
+
+ @VisibleForTesting
+ ScheduledThreadPoolExecutor getMainScheduledExecutor() {
+ return mainScheduledExecutor;
+ }
Review comment:
I'd avoid exposing internal things for tests whenever it's possible,
especially for non-readonly things.
I noticed this is only used for `RpcEndpointTest#testCancelScheduledTask()`.
See my other comment there.
##########
File path:
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
##########
@@ -400,39 +454,95 @@ public void validateRunsInMainThread() {
// ------------------------------------------------------------------------
/** Executor which executes runnables in the main thread context. */
- protected static class MainThreadExecutor implements
ComponentMainThreadExecutor {
+ protected static class MainThreadExecutor implements
ComponentMainThreadExecutor, Closeable {
+ private static final Logger log =
LoggerFactory.getLogger(MainThreadExecutor.class);
private final MainThreadExecutable gateway;
private final Runnable mainThreadCheck;
-
- MainThreadExecutor(MainThreadExecutable gateway, Runnable
mainThreadCheck) {
+ /**
+ * The main scheduled executor manages the scheduled tasks and send
them to gateway when
+ * they should be executed.
+ */
+ private final ScheduledThreadPoolExecutor mainScheduledExecutor;
+
+ MainThreadExecutor(
+ MainThreadExecutable gateway, Runnable mainThreadCheck, String
endpointId) {
this.gateway = Preconditions.checkNotNull(gateway);
this.mainThreadCheck = Preconditions.checkNotNull(mainThreadCheck);
- }
-
- private void scheduleRunAsync(Runnable runnable, long delayMillis) {
- gateway.scheduleRunAsync(runnable, delayMillis);
+ this.mainScheduledExecutor =
+ new ScheduledThreadPoolExecutor(
+ 1, new ExecutorThreadFactory(endpointId +
"-main-scheduler"));
+ this.mainScheduledExecutor.setRemoveOnCancelPolicy(true);
Review comment:
```suggestion
this.mainScheduledExecutor =
Executors.newSingleThreadScheduledExecutor(
new ExecutorThreadFactory(endpointId +
"-main-scheduler"));
```
##########
File path:
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
##########
@@ -211,11 +227,44 @@ protected final void stop() {
*/
public final CompletableFuture<Void> internalCallOnStop() {
validateRunsInMainThread();
- CompletableFuture<Void> stopFuture = onStop();
+ CompletableFuture<Void> stopFuture = new CompletableFuture<>();
+ try {
+ resourceRegistry.close();
+ stopFuture.complete(null);
+ } catch (IOException e) {
+ stopFuture.completeExceptionally(
+ new RuntimeException("Close resource registry fail", e));
+ return stopFuture;
+ }
+ stopFuture = stopFuture.thenCompose(v -> onStop());
Review comment:
```suggestion
stopFuture = CompletableFuture.allOf(stopFuture, onStop());
```
This makes sure both `resourceRegistry.close()` and `onStop()` are always
invoked.
##########
File path:
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
##########
@@ -400,39 +454,95 @@ public void validateRunsInMainThread() {
// ------------------------------------------------------------------------
/** Executor which executes runnables in the main thread context. */
- protected static class MainThreadExecutor implements
ComponentMainThreadExecutor {
+ protected static class MainThreadExecutor implements
ComponentMainThreadExecutor, Closeable {
+ private static final Logger log =
LoggerFactory.getLogger(MainThreadExecutor.class);
private final MainThreadExecutable gateway;
private final Runnable mainThreadCheck;
-
- MainThreadExecutor(MainThreadExecutable gateway, Runnable
mainThreadCheck) {
+ /**
+ * The main scheduled executor manages the scheduled tasks and send
them to gateway when
+ * they should be executed.
+ */
+ private final ScheduledThreadPoolExecutor mainScheduledExecutor;
Review comment:
Since this is always single-threaded, we don't need a pool.
```suggestion
private final ScheduledExecutorService mainScheduledExecutor;
```
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
##########
@@ -309,22 +354,83 @@ public void testScheduleCallableWithDelayInSeconds()
throws Exception {
() -> 1, expectedDelay.toMillis() / 1000,
TimeUnit.SECONDS));
}
+ @Test
+ public void testScheduleCallableAfterClose() throws Exception {
+ testScheduleAfterClose(
+ (mainThreadExecutor, expectedDelay) ->
+ mainThreadExecutor.schedule(
+ () -> 1, expectedDelay.toMillis() / 1000,
TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testCancelScheduledCallable() {
+ testCancelScheduledTask(
+ (mainThreadExecutor, timeDelay) ->
+ mainThreadExecutor.schedule(
+ () -> 1, timeDelay.toMillis(),
TimeUnit.MILLISECONDS));
+ }
+
private static void testScheduleWithDelay(
BiConsumer<RpcEndpoint.MainThreadExecutor, Duration> scheduler)
throws Exception {
- final CompletableFuture<Long> actualDelayMsFuture = new
CompletableFuture<>();
+ final CompletableFuture<Void> taskCompletedFuture = new
CompletableFuture<>();
+ final String endpointId = "foobar";
final MainThreadExecutable mainThreadExecutable =
- new TestMainThreadExecutable(
- (runnable, delay) ->
actualDelayMsFuture.complete(delay));
+ new TestMainThreadExecutable((runnable) ->
taskCompletedFuture.complete(null));
final RpcEndpoint.MainThreadExecutor mainThreadExecutor =
- new RpcEndpoint.MainThreadExecutor(mainThreadExecutable, () ->
{});
+ new RpcEndpoint.MainThreadExecutor(mainThreadExecutable, () ->
{}, endpointId);
final Duration expectedDelay = Duration.ofSeconds(1);
scheduler.accept(mainThreadExecutor, expectedDelay);
- assertThat(actualDelayMsFuture.get(), is(expectedDelay.toMillis()));
+ taskCompletedFuture.get();
+ mainThreadExecutor.close();
+ }
+
+ private static void testScheduleAfterClose(
+ BiFunction<RpcEndpoint.MainThreadExecutor, Duration,
ScheduledFuture<?>> scheduler) {
+ final CompletableFuture<Void> taskCompletedFuture = new
CompletableFuture<>();
+ final String endpointId = "foobar";
+
+ final Duration expectedDelay = Duration.ofSeconds(1);
+ final MainThreadExecutable mainThreadExecutable =
+ new TestMainThreadExecutable((runnable) ->
taskCompletedFuture.complete(null));
+
+ final RpcEndpoint.MainThreadExecutor mainThreadExecutor =
+ new RpcEndpoint.MainThreadExecutor(mainThreadExecutable, () ->
{}, endpointId);
+
+ mainThreadExecutor.close();
+ ScheduledFuture<?> future = scheduler.apply(mainThreadExecutor,
expectedDelay);
+
+ assertTrue(future.isDone());
+ assertTrue(future instanceof ThrowingScheduledFuture);
+ assertTrue(future.cancel(true));
+ assertTrue(future.cancel(false));
+ assertFalse(taskCompletedFuture.isDone());
+ }
+
+ private static void testCancelScheduledTask(
+ BiFunction<RpcEndpoint.MainThreadExecutor, Duration,
ScheduledFuture<?>> scheduler) {
+ final CompletableFuture<Void> actualDelayMsFuture = new
CompletableFuture<>();
+ final String endpointId = "foobar";
+
+ final MainThreadExecutable mainThreadExecutable =
+ new TestMainThreadExecutable((runnable) ->
actualDelayMsFuture.complete(null));
+
+ final RpcEndpoint.MainThreadExecutor mainThreadExecutor =
+ new RpcEndpoint.MainThreadExecutor(mainThreadExecutable, () ->
{}, endpointId);
+ final Duration timeDelay = Duration.ofSeconds(10);
+ ScheduledFuture<?> scheduledFuture =
scheduler.apply(mainThreadExecutor, timeDelay);
+
+ assertEquals(1,
mainThreadExecutor.getMainScheduledExecutor().getQueue().size());
+ scheduledFuture.cancel(true);
+ mainThreadExecutor.close();
+
+ assertTrue(scheduledFuture.isCancelled());
+
assertTrue(mainThreadExecutor.getMainScheduledExecutor().getQueue().isEmpty());
+ assertFalse(actualDelayMsFuture.isDone());
Review comment:
I see two problems in this test:
1. We should verify that `(runnable) -> actualDelayMsFuture.complete(null)`
will not be executed, rather than verifying it's not passed to
`mainThreadExecutor.runAsync()`. Actually, canceling `scheduledFuture` will not
stop it from being scheduled to `mainThreadExecutor`. It is `FutureTask` that
guarantees itself won't be executed once canceled.
Currently, the test will fail if you remove `mainThreadExecutor.close()`,
because it is a `() -> {}` or `() -> 1` that actually wrapped int the
`scheduledFuture`, thus cancelling it will not stop `actualDelayMsFuture` from
being completed.
2. Another challenge here is to make sure task is not executed before being
canceled.
- If we schedule with a short delay, the test might be unstable because the
task can finish before canceling.
- If we schedule with a large delay, we would need to wait longer to see if
the task is performed, if not looking into the internal queue of the executor.
I would suggest to use a short delay, while use `assumeTrue` to skip the
test if the task is occasionally finished before canceling. You can find out
whether the task has been finished before canceling from the return value of
`cancel()`.
##########
File path:
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
##########
@@ -401,38 +451,90 @@ public void validateRunsInMainThread() {
/** Executor which executes runnables in the main thread context. */
protected static class MainThreadExecutor implements
ComponentMainThreadExecutor {
+ private static final Logger log =
LoggerFactory.getLogger(MainThreadExecutor.class);
private final MainThreadExecutable gateway;
private final Runnable mainThreadCheck;
-
- MainThreadExecutor(MainThreadExecutable gateway, Runnable
mainThreadCheck) {
+ /**
+ * The main scheduled executor manages the scheduled tasks and send
them to gateway when
+ * they should be executed.
+ */
+ private final ScheduledThreadPoolExecutor mainScheduledExecutor;
+
+ MainThreadExecutor(
+ MainThreadExecutable gateway, Runnable mainThreadCheck, String
endpointId) {
this.gateway = Preconditions.checkNotNull(gateway);
this.mainThreadCheck = Preconditions.checkNotNull(mainThreadCheck);
- }
-
- private void scheduleRunAsync(Runnable runnable, long delayMillis) {
- gateway.scheduleRunAsync(runnable, delayMillis);
+ this.mainScheduledExecutor =
+ new ScheduledThreadPoolExecutor(
+ 1, new ExecutorThreadFactory(endpointId +
"-main-scheduler"));
+ this.mainScheduledExecutor.setRemoveOnCancelPolicy(true);
}
@Override
public void execute(@Nonnull Runnable command) {
gateway.runAsync(command);
}
+ /**
+ * The mainScheduledExecutor manages the task and sends it to the
gateway after the given
+ * delay.
+ *
+ * @param command the task to execute in the future
+ * @param delay the time from now to delay the execution
+ * @param unit the time unit of the delay parameter
+ * @return a ScheduledFuture representing the completion of the
scheduled task
+ */
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay,
TimeUnit unit) {
- final long delayMillis = TimeUnit.MILLISECONDS.convert(delay,
unit);
- FutureTask<Void> ft = new FutureTask<>(command, null);
- scheduleRunAsync(ft, delayMillis);
- return new ScheduledFutureAdapter<>(ft, delayMillis,
TimeUnit.MILLISECONDS);
+ // If the scheduled executor service is shutdown, the command
won't be executed.
+ if (mainScheduledExecutor.isShutdown()) {
+ log.warn(
+ "The scheduled executor service is shutdown and return
throwing scheduled future for command {}",
+ command);
+ return ThrowingScheduledFuture.getInstance();
+ } else {
+ final long delayMillis = TimeUnit.MILLISECONDS.convert(delay,
unit);
+ FutureTask<Void> ft = new FutureTask<>(command, null);
+ ScheduledFuture<?> scheduledFuture =
+ mainScheduledExecutor.schedule(
+ () -> gateway.scheduleRunAsync(ft, 0L),
+ delayMillis,
+ TimeUnit.MILLISECONDS);
+ return new ScheduledFutureAdapter<>(
+ scheduledFuture, ft, delayMillis,
TimeUnit.MILLISECONDS);
+ }
}
+ /**
+ * The mainScheduledExecutor manages the given callable and sends it
to the gateway after
+ * the given delay. The result of the callable is returned as a {@link
ScheduledFuture}.
+ *
+ * @param callable the callable to execute
+ * @param delay the time from now to delay the execution
+ * @param unit the time unit of the delay parameter
+ * @param <V> result type of the callable
+ * @return a ScheduledFuture which holds the future value of the given
callable
+ */
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long
delay, TimeUnit unit) {
- final long delayMillis = TimeUnit.MILLISECONDS.convert(delay,
unit);
- FutureTask<V> ft = new FutureTask<>(callable);
- scheduleRunAsync(ft, delayMillis);
- return new ScheduledFutureAdapter<>(ft, delayMillis,
TimeUnit.MILLISECONDS);
+ // If the scheduled executor service is shutdown, the callable
won't be executed.
+ if (mainScheduledExecutor.isShutdown()) {
+ log.warn(
+ "The scheduled executor service is shutdown and return
throwing scheduled future for callable {}",
+ callable);
+ return ThrowingScheduledFuture.getInstance();
Review comment:
@zjureel,
I think my major point was to figure out what was the previous behavior
(when `schedule` is called on a closed endpoint) and to align with it. Looking
at my previous comment now, I realize I haven't make that clear.
- If it is expected to fail explicitly, we should not rely on the returned
future which can easily get ignored.
- If it is expected to be ignored, we won't need to return a special future.
I looked a bit more into how `schedule` behaves previously. It seems I was
wrong about throwing a `RuntimeException`. In fact, commands scheduled on a
closed endpoint will simply be ignored (with some info logs in `AkkaRpcActor`).
Therefore, I'd suggest not to introduce the special class
`ThrowingScheduledFuture`, and simply log and ignore the command if
`mainScheduledExecutor` is shutdown.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]