noorall commented on code in PR #27211:
URL: https://github.com/apache/flink/pull/27211#discussion_r2508091155
##########
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java:
##########
@@ -526,14 +553,21 @@ public void execute(@Nonnull Runnable command) {
public ScheduledFuture<?> schedule(Runnable command, long delay,
TimeUnit unit) {
final long delayMillis = TimeUnit.MILLISECONDS.convert(delay,
unit);
FutureTask<Void> ft = new FutureTask<>(command, null);
- if (mainScheduledExecutor.isShutdown()) {
- log.warn(
- "The scheduled executor service is shutdown and
ignores the command {}",
- command);
- } else {
- mainScheduledExecutor.schedule(
- () -> gateway.runAsync(ft), delayMillis,
TimeUnit.MILLISECONDS);
- }
+ getRunningFuture
+ .get()
+ .thenAccept(
+ ignore -> {
+ if (mainScheduledExecutor.isShutdown()) {
+ log.warn(
+ "The scheduled executor service is
shutdown and ignores the command {}",
+ command);
+ } else {
+ mainScheduledExecutor.schedule(
Review Comment:
Could we add an `isRunning()` method to the gateway and logs a waring if
it's not running, instead of introducing `getRunningFuture`? I'm concerned that
`getRunningFuture` would break some expected behaviors.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]