This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ae42e745e9deee3c564ffe6b52e6d6c8abab46ce
Author: Till Rohrmann <[email protected]>
AuthorDate: Wed Mar 17 09:00:47 2021 +0100

    [hotfix] Rename SchedulerNG.getTerminationFuture into 
getJobTerminationFuture
    
    This is a preparational step to better distinguish the job termination 
future from
    the Scheduler termination future. Moreover, the return type was changed to
    CompletableFuture<JobStatus> to be more expressive.
---
 .../apache/flink/runtime/scheduler/SchedulerBase.java  |  7 +++----
 .../apache/flink/runtime/scheduler/SchedulerNG.java    |  2 +-
 .../runtime/scheduler/adaptive/AdaptiveScheduler.java  |  8 ++++----
 .../flink/runtime/scheduler/DefaultSchedulerTest.java  |  2 +-
 .../flink/runtime/scheduler/TestingSchedulerNG.java    | 18 +++++++++---------
 5 files changed, 18 insertions(+), 19 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 3ddb55f..56c06ae 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -83,7 +83,6 @@ import org.apache.flink.runtime.util.IntArrayList;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.IterableUtils;
-import org.apache.flink.util.function.FunctionUtils;
 
 import org.slf4j.Logger;
 
@@ -407,7 +406,7 @@ public abstract class SchedulerBase implements SchedulerNG, 
CheckpointScheduling
     protected void failJob(Throwable cause) {
         incrementVersionsOfAllVertices();
         executionGraph.failJob(cause);
-        getTerminationFuture().thenRun(() -> archiveGlobalFailure(cause));
+        getJobTerminationFuture().thenRun(() -> archiveGlobalFailure(cause));
     }
 
     protected final SchedulingTopology getSchedulingTopology() {
@@ -523,8 +522,8 @@ public abstract class SchedulerBase implements SchedulerNG, 
CheckpointScheduling
     }
 
     @Override
-    public CompletableFuture<Void> getTerminationFuture() {
-        return 
executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn());
+    public CompletableFuture<JobStatus> getJobTerminationFuture() {
+        return executionGraph.getTerminationFuture();
     }
 
     protected final void archiveGlobalFailure(@Nullable Throwable failure) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
index 9896f2a..505714d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
@@ -72,7 +72,7 @@ public interface SchedulerNG {
 
     void cancel();
 
-    CompletableFuture<Void> getTerminationFuture();
+    CompletableFuture<JobStatus> getJobTerminationFuture();
 
     void handleGlobalFailure(Throwable cause);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index a7783db..db88e05 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -160,7 +160,7 @@ public class AdaptiveScheduler
     private final CheckpointIDCounter checkpointIdCounter;
     private final CheckpointsCleaner checkpointsCleaner;
 
-    private final CompletableFuture<Void> terminationFuture = new 
CompletableFuture<>();
+    private final CompletableFuture<JobStatus> jobTerminationFuture = new 
CompletableFuture<>();
 
     private final RestartBackoffTimeStrategy restartBackoffTimeStrategy;
 
@@ -292,8 +292,8 @@ public class AdaptiveScheduler
     }
 
     @Override
-    public CompletableFuture<Void> getTerminationFuture() {
-        return terminationFuture;
+    public CompletableFuture<JobStatus> getJobTerminationFuture() {
+        return jobTerminationFuture;
     }
 
     @Override
@@ -829,7 +829,7 @@ public class AdaptiveScheduler
                             : null);
         }
 
-        terminationFuture.complete(null);
+        jobTerminationFuture.complete(archivedExecutionGraph.getState());
     }
 
     private void stopCheckpointServicesSafely(JobStatus terminalState) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index e924538..dc2a1ee 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -1155,7 +1155,7 @@ public class DefaultSchedulerTest extends TestLogger {
     }
 
     private void waitForTermination(final DefaultScheduler scheduler) throws 
Exception {
-        scheduler.getTerminationFuture().get(TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
+        scheduler.getJobTerminationFuture().get(TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
     }
 
     private static JobGraph singleNonParallelJobVertexJobGraph() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
index 2527e20..c87ac20 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
@@ -51,19 +51,19 @@ import java.util.function.Consumer;
 
 /** Testing implementation of the {@link SchedulerNG}. */
 public class TestingSchedulerNG implements SchedulerNG {
-    private final CompletableFuture<Void> terminationFuture;
+    private final CompletableFuture<JobStatus> jobTerminationFuture;
     private final Runnable startSchedulingRunnable;
     private final Consumer<Throwable> suspendConsumer;
     private final BiFunction<String, Boolean, CompletableFuture<String>> 
triggerSavepointFunction;
     private final Consumer<Throwable> handleGlobalFailureConsumer;
 
     private TestingSchedulerNG(
-            CompletableFuture<Void> terminationFuture,
+            CompletableFuture<JobStatus> jobTerminationFuture,
             Runnable startSchedulingRunnable,
             Consumer<Throwable> suspendConsumer,
             BiFunction<String, Boolean, CompletableFuture<String>> 
triggerSavepointFunction,
             Consumer<Throwable> handleGlobalFailureConsumer) {
-        this.terminationFuture = terminationFuture;
+        this.jobTerminationFuture = jobTerminationFuture;
         this.startSchedulingRunnable = startSchedulingRunnable;
         this.suspendConsumer = suspendConsumer;
         this.triggerSavepointFunction = triggerSavepointFunction;
@@ -88,8 +88,8 @@ public class TestingSchedulerNG implements SchedulerNG {
     public void cancel() {}
 
     @Override
-    public CompletableFuture<Void> getTerminationFuture() {
-        return terminationFuture;
+    public CompletableFuture<JobStatus> getJobTerminationFuture() {
+        return jobTerminationFuture;
     }
 
     @Override
@@ -224,15 +224,15 @@ public class TestingSchedulerNG implements SchedulerNG {
 
     /** Builder for the TestingSchedulerNG. */
     public static final class Builder {
-        private CompletableFuture<Void> terminationFuture = new 
CompletableFuture<>();
+        private CompletableFuture<JobStatus> jobTerminationFuture = new 
CompletableFuture<>();
         private Runnable startSchedulingRunnable = () -> {};
         private Consumer<Throwable> suspendConsumer = ignored -> {};
         private BiFunction<String, Boolean, CompletableFuture<String>> 
triggerSavepointFunction =
                 (ignoredA, ignoredB) -> new CompletableFuture<>();
         private Consumer<Throwable> handleGlobalFailureConsumer = (ignored) -> 
{};
 
-        public Builder setTerminationFuture(CompletableFuture<Void> 
terminationFuture) {
-            this.terminationFuture = terminationFuture;
+        public Builder setJobTerminationFuture(CompletableFuture<JobStatus> 
jobTerminationFuture) {
+            this.jobTerminationFuture = jobTerminationFuture;
             return this;
         }
 
@@ -260,7 +260,7 @@ public class TestingSchedulerNG implements SchedulerNG {
 
         public TestingSchedulerNG build() {
             return new TestingSchedulerNG(
-                    terminationFuture,
+                    jobTerminationFuture,
                     startSchedulingRunnable,
                     suspendConsumer,
                     triggerSavepointFunction,

Reply via email to