This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 53ac701e22d27fe537bd9fb790ac67e1033f4842 Author: Till Rohrmann <[email protected]> AuthorDate: Sun Mar 14 13:28:53 2021 +0100 [hotfix] Let delayed AdaptiveScheduler.runIfState method return ScheduledFuture By letting the AdaptiveScheduler.runIfState method return a ScheduledFuture it is now possible to cancel scheduled actions. --- .../scheduler/adaptive/AdaptiveScheduler.java | 5 +- .../scheduler/adaptive/CreatingExecutionGraph.java | 4 +- .../runtime/scheduler/adaptive/Restarting.java | 4 +- .../scheduler/adaptive/WaitingForResources.java | 4 +- .../adaptive/CreatingExecutionGraphTest.java | 6 +- .../runtime/scheduler/adaptive/RestartingTest.java | 5 +- .../adaptive/WaitingForResourcesTest.java | 66 +++++------- .../core/testutils/CompletedScheduledFuture.java | 79 ++++++++++++++ .../ManuallyTriggeredScheduledExecutorService.java | 85 --------------- .../apache/flink/core/testutils/ScheduledTask.java | 115 +++++++++++++++++++++ 10 files changed, 241 insertions(+), 132 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index f03e162..be7b068 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -127,6 +127,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; /** @@ -962,8 +963,8 @@ public class AdaptiveScheduler } @Override - public void runIfState(State expectedState, Runnable action, Duration delay) { - componentMainThreadExecutor.schedule( + public ScheduledFuture<?> runIfState(State expectedState, Runnable action, Duration delay) { + return componentMainThreadExecutor.schedule( () -> runIfState(expectedState, action), delay.toMillis(), TimeUnit.MILLISECONDS); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java index b6f8df4..5e21ba6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java @@ -30,6 +30,7 @@ import javax.annotation.Nullable; import java.time.Duration; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledFuture; /** * State which waits for the creation of the {@link ExecutionGraph}. If the creation fails, then the @@ -149,8 +150,9 @@ public class CreatingExecutionGraph implements State { * the action * @param action action to run if the expected state equals the actual state * @param delay delay after which to run the action + * @return a ScheduledFuture representing pending completion of the task */ - void runIfState(State expectedState, Runnable action, Duration delay); + ScheduledFuture<?> runIfState(State expectedState, Runnable action, Duration delay); /** * Try to assign slots to the created {@link ExecutionGraph}. If it is possible, then this diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java index 5a49c48..307dfa02 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java @@ -28,6 +28,7 @@ import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import java.time.Duration; +import java.util.concurrent.ScheduledFuture; /** State which describes a job which is currently being restarted. */ class Restarting extends StateWithExecutionGraph { @@ -105,8 +106,9 @@ class Restarting extends StateWithExecutionGraph { * the delay * @param action action to run if the state equals the expected state * @param delay delay after which the action should be executed + * @return a ScheduledFuture representing pending completion of the task */ - void runIfState(State expectedState, Runnable action, Duration delay); + ScheduledFuture<?> runIfState(State expectedState, Runnable action, Duration delay); } static class Factory implements StateFactory<Restarting> { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java index 848d729..9489ddd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java @@ -28,6 +28,7 @@ import org.slf4j.Logger; import javax.annotation.Nullable; import java.time.Duration; +import java.util.concurrent.ScheduledFuture; /** * State which describes that the scheduler is waiting for resources in order to execute the job. @@ -142,8 +143,9 @@ class WaitingForResources implements State, ResourceConsumer { * the action * @param action action to run if the expected state equals the actual state * @param delay delay after which to run the action + * @return a ScheduledFuture representing pending completion of the task */ - void runIfState(State expectedState, Runnable action, Duration delay); + ScheduledFuture<?> runIfState(State expectedState, Runnable action, Duration delay); } static class Factory implements StateFactory<WaitingForResources> { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java index 3d467fb..a352734 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.scheduler.adaptive; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.core.testutils.CompletedScheduledFuture; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.util.FlinkException; @@ -31,6 +32,7 @@ import javax.annotation.Nullable; import java.time.Duration; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledFuture; import java.util.function.Consumer; import java.util.function.Function; @@ -187,10 +189,12 @@ public class CreatingExecutionGraphTest extends TestLogger { } @Override - public void runIfState(State expectedState, Runnable action, Duration delay) { + public ScheduledFuture<?> runIfState(State expectedState, Runnable action, Duration delay) { if (!hadStateTransitionHappened) { action.run(); } + + return CompletedScheduledFuture.create(null); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java index dd53263..2358709 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.scheduler.adaptive; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.core.testutils.CompletedScheduledFuture; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.executiongraph.ExecutionGraph; @@ -29,6 +30,7 @@ import org.apache.flink.util.TestLogger; import org.junit.Test; import java.time.Duration; +import java.util.concurrent.ScheduledFuture; import java.util.function.Consumer; import static org.apache.flink.runtime.scheduler.adaptive.WaitingForResourcesTest.assertNonNull; @@ -172,10 +174,11 @@ public class RestartingTest extends TestLogger { } @Override - public void runIfState(State expectedState, Runnable action, Duration delay) { + public ScheduledFuture<?> runIfState(State expectedState, Runnable action, Duration delay) { if (!hadStateTransition) { action.run(); } + return CompletedScheduledFuture.create(null); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java index 8586551..c70dc2e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.scheduler.adaptive; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.core.testutils.ScheduledTask; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.ErrorInfo; @@ -33,12 +34,12 @@ import javax.annotation.Nullable; import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ScheduledFuture; import java.util.function.Consumer; import java.util.function.Supplier; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -57,12 +58,7 @@ public class WaitingForResourcesTest extends TestLogger { new WaitingForResources(ctx, log, RESOURCE_COUNTER, Duration.ZERO); // run delayed actions - for (ScheduledRunnable scheduledRunnable : ctx.getScheduledRunnables()) { - scheduledRunnable.runAction(); - if (ctx.hasStateTransition()) { - break; - } - } + ctx.runScheduledTasks(); } } @@ -100,12 +96,7 @@ public class WaitingForResourcesTest extends TestLogger { ctx.setExpectCreatingExecutionGraph(); // immediately execute all scheduled runnables - assertThat(ctx.getScheduledRunnables().size(), greaterThan(0)); - for (ScheduledRunnable scheduledRunnable : ctx.getScheduledRunnables()) { - if (scheduledRunnable.getExpectedState() == wfr) { - scheduledRunnable.runAction(); - } - } + ctx.runScheduledTasks(); } } @@ -172,13 +163,9 @@ public class WaitingForResourcesTest extends TestLogger { new StateValidator<>("finished"); private Supplier<Boolean> hasEnoughResourcesSupplier = () -> false; - private final List<ScheduledRunnable> scheduledRunnables = new ArrayList<>(); + private final List<ScheduledTask<Void>> scheduledTasks = new ArrayList<>(); private boolean hasStateTransition = false; - public List<ScheduledRunnable> getScheduledRunnables() { - return scheduledRunnables; - } - public void setHasEnoughResources(Supplier<Boolean> sup) { hasEnoughResourcesSupplier = sup; } @@ -191,6 +178,12 @@ public class WaitingForResourcesTest extends TestLogger { creatingExecutionGraphStateValidator.expectInput(none -> {}); } + void runScheduledTasks() { + for (ScheduledTask<Void> scheduledTask : scheduledTasks) { + scheduledTask.execute(); + } + } + @Override public void close() throws Exception { creatingExecutionGraphStateValidator.close(); @@ -212,8 +205,21 @@ public class WaitingForResourcesTest extends TestLogger { } @Override - public void runIfState(State expectedState, Runnable action, Duration delay) { - scheduledRunnables.add(new ScheduledRunnable(expectedState, action, delay)); + public ScheduledFuture<?> runIfState(State expectedState, Runnable action, Duration delay) { + final ScheduledTask<Void> scheduledTask = + new ScheduledTask<>( + () -> { + if (!hasStateTransition) { + action.run(); + } + + return null; + }, + delay.toMillis()); + + scheduledTasks.add(scheduledTask); + + return scheduledTask; } @Override @@ -233,26 +239,6 @@ public class WaitingForResourcesTest extends TestLogger { } } - private static final class ScheduledRunnable { - private final Runnable action; - private final State expectedState; - private final Duration delay; - - private ScheduledRunnable(State expectedState, Runnable action, Duration delay) { - this.expectedState = expectedState; - this.action = action; - this.delay = delay; - } - - public void runAction() { - action.run(); - } - - public State getExpectedState() { - return expectedState; - } - } - static <T> Consumer<T> assertNonNull() { return (item) -> assertThat(item, notNullValue()); } diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CompletedScheduledFuture.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CompletedScheduledFuture.java new file mode 100644 index 0000000..6667612 --- /dev/null +++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CompletedScheduledFuture.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.testutils; + +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Completed {@link ScheduledFuture} implementation. + * + * @param <T> type of the {@link ScheduledFuture} + */ +public final class CompletedScheduledFuture<T> implements ScheduledFuture<T> { + + private final T value; + + private CompletedScheduledFuture(T value) { + this.value = value; + } + + @Override + public long getDelay(TimeUnit unit) { + return 0; + } + + @Override + public int compareTo(Delayed o) { + return Long.compare(getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return true; + } + + @Override + public T get() throws InterruptedException, ExecutionException { + return value; + } + + @Override + public T get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return value; + } + + public static <T> CompletedScheduledFuture<T> create(T value) { + return new CompletedScheduledFuture<>(value); + } +} diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java index 1752dc6..7683804 100644 --- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java +++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java @@ -27,17 +27,12 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; -import java.util.Objects; import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Delayed; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; /** @@ -317,84 +312,4 @@ public class ManuallyTriggeredScheduledExecutorService implements ScheduledExecu return scheduledTask; } - - private static final class ScheduledTask<T> implements ScheduledFuture<T> { - - private final Callable<T> callable; - - private final long delay; - - private final long period; - - private final CompletableFuture<T> result; - - private ScheduledTask(Callable<T> callable, long delay) { - this(callable, delay, 0); - } - - private ScheduledTask(Callable<T> callable, long delay, long period) { - this.callable = Objects.requireNonNull(callable); - this.result = new CompletableFuture<>(); - this.delay = delay; - this.period = period; - } - - private boolean isPeriodic() { - return period > 0; - } - - public void execute() { - if (!result.isDone()) { - if (!isPeriodic()) { - try { - result.complete(callable.call()); - } catch (Exception e) { - result.completeExceptionally(e); - } - } else { - try { - callable.call(); - } catch (Exception e) { - result.completeExceptionally(e); - } - } - } - } - - @Override - public long getDelay(TimeUnit unit) { - return unit.convert(delay, TimeUnit.MILLISECONDS); - } - - @Override - public int compareTo(Delayed o) { - return Long.compare(getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return result.cancel(mayInterruptIfRunning); - } - - @Override - public boolean isCancelled() { - return result.isCancelled(); - } - - @Override - public boolean isDone() { - return result.isDone(); - } - - @Override - public T get() throws InterruptedException, ExecutionException { - return result.get(); - } - - @Override - public T get(long timeout, @Nonnull TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - return result.get(timeout, unit); - } - } } diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ScheduledTask.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ScheduledTask.java new file mode 100644 index 0000000..899b6e1 --- /dev/null +++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ScheduledTask.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.testutils; + +import javax.annotation.Nonnull; + +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * ScheduledTask represents a task which is executed at a later point in time. + * + * @param <T> type of the result + */ +public final class ScheduledTask<T> implements ScheduledFuture<T> { + + private final Callable<T> callable; + + private final long delay; + + private final long period; + + private final CompletableFuture<T> result; + + public ScheduledTask(Callable<T> callable, long delay) { + this(callable, delay, 0); + } + + public ScheduledTask(Callable<T> callable, long delay, long period) { + this.callable = Objects.requireNonNull(callable); + this.result = new CompletableFuture<>(); + this.delay = delay; + this.period = period; + } + + private boolean isPeriodic() { + return period > 0; + } + + public void execute() { + if (!result.isDone()) { + if (!isPeriodic()) { + try { + result.complete(callable.call()); + } catch (Exception e) { + result.completeExceptionally(e); + } + } else { + try { + callable.call(); + } catch (Exception e) { + result.completeExceptionally(e); + } + } + } + } + + @Override + public long getDelay(TimeUnit unit) { + return unit.convert(delay, TimeUnit.MILLISECONDS); + } + + @Override + public int compareTo(Delayed o) { + return Long.compare(getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return result.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return result.isCancelled(); + } + + @Override + public boolean isDone() { + return result.isDone(); + } + + @Override + public T get() throws InterruptedException, ExecutionException { + return result.get(); + } + + @Override + public T get(long timeout, @Nonnull TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return result.get(timeout, unit); + } +}
