This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ae42e745e9deee3c564ffe6b52e6d6c8abab46ce Author: Till Rohrmann <[email protected]> AuthorDate: Wed Mar 17 09:00:47 2021 +0100 [hotfix] Rename SchedulerNG.getTerminationFuture into getJobTerminationFuture This is a preparational step to better distinguish the job termination future from the Scheduler termination future. Moreover, the return type was changed to CompletableFuture<JobStatus> to be more expressive. --- .../apache/flink/runtime/scheduler/SchedulerBase.java | 7 +++---- .../apache/flink/runtime/scheduler/SchedulerNG.java | 2 +- .../runtime/scheduler/adaptive/AdaptiveScheduler.java | 8 ++++---- .../flink/runtime/scheduler/DefaultSchedulerTest.java | 2 +- .../flink/runtime/scheduler/TestingSchedulerNG.java | 18 +++++++++--------- 5 files changed, 18 insertions(+), 19 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index 3ddb55f..56c06ae 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -83,7 +83,6 @@ import org.apache.flink.runtime.util.IntArrayList; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.IterableUtils; -import org.apache.flink.util.function.FunctionUtils; import org.slf4j.Logger; @@ -407,7 +406,7 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling protected void failJob(Throwable cause) { incrementVersionsOfAllVertices(); executionGraph.failJob(cause); - getTerminationFuture().thenRun(() -> archiveGlobalFailure(cause)); + getJobTerminationFuture().thenRun(() -> archiveGlobalFailure(cause)); } protected final SchedulingTopology getSchedulingTopology() { @@ -523,8 +522,8 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling } @Override - public CompletableFuture<Void> getTerminationFuture() { - return executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn()); + public CompletableFuture<JobStatus> getJobTerminationFuture() { + return executionGraph.getTerminationFuture(); } protected final void archiveGlobalFailure(@Nullable Throwable failure) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java index 9896f2a..505714d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java @@ -72,7 +72,7 @@ public interface SchedulerNG { void cancel(); - CompletableFuture<Void> getTerminationFuture(); + CompletableFuture<JobStatus> getJobTerminationFuture(); void handleGlobalFailure(Throwable cause); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index a7783db..db88e05 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -160,7 +160,7 @@ public class AdaptiveScheduler private final CheckpointIDCounter checkpointIdCounter; private final CheckpointsCleaner checkpointsCleaner; - private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>(); + private final CompletableFuture<JobStatus> jobTerminationFuture = new CompletableFuture<>(); private final RestartBackoffTimeStrategy restartBackoffTimeStrategy; @@ -292,8 +292,8 @@ public class AdaptiveScheduler } @Override - public CompletableFuture<Void> getTerminationFuture() { - return terminationFuture; + public CompletableFuture<JobStatus> getJobTerminationFuture() { + return jobTerminationFuture; } @Override @@ -829,7 +829,7 @@ public class AdaptiveScheduler : null); } - terminationFuture.complete(null); + jobTerminationFuture.complete(archivedExecutionGraph.getState()); } private void stopCheckpointServicesSafely(JobStatus terminalState) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java index e924538..dc2a1ee 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java @@ -1155,7 +1155,7 @@ public class DefaultSchedulerTest extends TestLogger { } private void waitForTermination(final DefaultScheduler scheduler) throws Exception { - scheduler.getTerminationFuture().get(TIMEOUT_MS, TimeUnit.MILLISECONDS); + scheduler.getJobTerminationFuture().get(TIMEOUT_MS, TimeUnit.MILLISECONDS); } private static JobGraph singleNonParallelJobVertexJobGraph() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java index 2527e20..c87ac20 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java @@ -51,19 +51,19 @@ import java.util.function.Consumer; /** Testing implementation of the {@link SchedulerNG}. */ public class TestingSchedulerNG implements SchedulerNG { - private final CompletableFuture<Void> terminationFuture; + private final CompletableFuture<JobStatus> jobTerminationFuture; private final Runnable startSchedulingRunnable; private final Consumer<Throwable> suspendConsumer; private final BiFunction<String, Boolean, CompletableFuture<String>> triggerSavepointFunction; private final Consumer<Throwable> handleGlobalFailureConsumer; private TestingSchedulerNG( - CompletableFuture<Void> terminationFuture, + CompletableFuture<JobStatus> jobTerminationFuture, Runnable startSchedulingRunnable, Consumer<Throwable> suspendConsumer, BiFunction<String, Boolean, CompletableFuture<String>> triggerSavepointFunction, Consumer<Throwable> handleGlobalFailureConsumer) { - this.terminationFuture = terminationFuture; + this.jobTerminationFuture = jobTerminationFuture; this.startSchedulingRunnable = startSchedulingRunnable; this.suspendConsumer = suspendConsumer; this.triggerSavepointFunction = triggerSavepointFunction; @@ -88,8 +88,8 @@ public class TestingSchedulerNG implements SchedulerNG { public void cancel() {} @Override - public CompletableFuture<Void> getTerminationFuture() { - return terminationFuture; + public CompletableFuture<JobStatus> getJobTerminationFuture() { + return jobTerminationFuture; } @Override @@ -224,15 +224,15 @@ public class TestingSchedulerNG implements SchedulerNG { /** Builder for the TestingSchedulerNG. */ public static final class Builder { - private CompletableFuture<Void> terminationFuture = new CompletableFuture<>(); + private CompletableFuture<JobStatus> jobTerminationFuture = new CompletableFuture<>(); private Runnable startSchedulingRunnable = () -> {}; private Consumer<Throwable> suspendConsumer = ignored -> {}; private BiFunction<String, Boolean, CompletableFuture<String>> triggerSavepointFunction = (ignoredA, ignoredB) -> new CompletableFuture<>(); private Consumer<Throwable> handleGlobalFailureConsumer = (ignored) -> {}; - public Builder setTerminationFuture(CompletableFuture<Void> terminationFuture) { - this.terminationFuture = terminationFuture; + public Builder setJobTerminationFuture(CompletableFuture<JobStatus> jobTerminationFuture) { + this.jobTerminationFuture = jobTerminationFuture; return this; } @@ -260,7 +260,7 @@ public class TestingSchedulerNG implements SchedulerNG { public TestingSchedulerNG build() { return new TestingSchedulerNG( - terminationFuture, + jobTerminationFuture, startSchedulingRunnable, suspendConsumer, triggerSavepointFunction,
