Repository: flink
Updated Branches:
  refs/heads/master 1e315f0dd -> 519639c64


[FLINK-8732] [flip6] Cancel ongoing scheduling operation

Keeps track of ongoing scheduling operations in the ExecutionGraph and cancels
them in case of a concurrent cancel, suspend or fail call. This makes sure that
the original cause for termination is maintained.

This closes #5548.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/519639c6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/519639c6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/519639c6

Branch: refs/heads/master
Commit: 519639c64039563ac4f2a875a8cfa630b25e4e8b
Parents: 1e315f0
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Wed Feb 21 15:57:50 2018 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Thu Feb 22 17:22:09 2018 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java |  20 ++--
 .../runtime/executiongraph/ExecutionGraph.java  | 101 +++++++++++++++----
 .../executiongraph/ExecutionJobVertex.java      |  10 +-
 .../runtime/executiongraph/ExecutionVertex.java |   5 +-
 .../ExecutionGraphSchedulingTest.java           |  53 ++++++++++
 .../runtime/executiongraph/ExecutionTest.java   |   2 +-
 .../runtime/jobmaster/TestingLogicalSlot.java   |  28 +++--
 7 files changed, 176 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/519639c6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 14d88c3..3e77d3e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -358,7 +358,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
        //  Actions
        // 
--------------------------------------------------------------------------------------------
 
-       public boolean scheduleForExecution() {
+       public CompletableFuture<Void> scheduleForExecution() {
                final ExecutionGraph executionGraph = 
getVertex().getExecutionGraph();
                final SlotProvider resourceProvider = 
executionGraph.getSlotProvider();
                final boolean allowQueued = 
executionGraph.isQueuedSchedulingAllowed();
@@ -377,14 +377,14 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
         * @param queued Flag to indicate whether the scheduler may queue this 
task if it cannot
         *               immediately deploy it.
         * @param locationPreferenceConstraint constraint for the location 
preferences
-        * @throws IllegalStateException Thrown, if the vertex is not in 
CREATED state, which is the only state that permits scheduling.
+        * @return Future which is completed once the Execution has been 
deployed
         */
-       public boolean scheduleForExecution(
+       public CompletableFuture<Void> scheduleForExecution(
                        SlotProvider slotProvider,
                        boolean queued,
                        LocationPreferenceConstraint 
locationPreferenceConstraint) {
+               final Time allocationTimeout = 
vertex.getExecutionGraph().getAllocationTimeout();
                try {
-                       final Time allocationTimeout = 
vertex.getExecutionGraph().getAllocationTimeout();
                        final CompletableFuture<Execution> allocationFuture = 
allocateAndAssignSlotForExecution(
                                slotProvider,
                                queued,
@@ -395,11 +395,10 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                        // that we directly deploy the tasks if the slot 
allocation future is completed. This is
                        // necessary for immediate deployment.
                        final CompletableFuture<Void> deploymentFuture = 
allocationFuture.handle(
-                               (Execution ignored, Throwable throwable) ->  {
+                               (Execution ignored, Throwable throwable) -> {
                                        if (throwable != null) {
                                                
markFailed(ExceptionUtils.stripCompletionException(throwable));
-                                       }
-                                       else {
+                                       } else {
                                                try {
                                                        deploy();
                                                } catch (Throwable t) {
@@ -415,10 +414,9 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                                allocationFuture.completeExceptionally(new 
IllegalArgumentException("The slot allocation future has not been completed 
yet."));
                        }
 
-                       return true;
-               }
-               catch (IllegalExecutionStateException e) {
-                       return false;
+                       return deploymentFuture;
+               } catch (IllegalExecutionStateException e) {
+                       return FutureUtils.completedExceptionally(e);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/519639c6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 9331b2e..22e5c92 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -77,6 +77,8 @@ import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.net.URL;
 import java.util.ArrayList;
@@ -90,6 +92,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
@@ -270,6 +273,12 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
         * strong reference to any user-defined classes.*/
        private volatile ErrorInfo failureInfo;
 
+       /**
+        * Future for an ongoing or completed scheduling action.
+        */
+       @Nullable
+       private volatile CompletableFuture<Void> schedulingFuture;
+
        // ------ Fields that are relevant to the execution and need to be 
cleared before archiving  -------
 
        /** The coordinator for checkpoints, if snapshot checkpoints are 
enabled */
@@ -409,6 +418,8 @@ public class ExecutionGraph implements AccessExecutionGraph 
{
                // the failover strategy must be instantiated last, so that the 
execution graph
                // is ready by the time the failover strategy sees it
                this.failoverStrategy = 
checkNotNull(failoverStrategyFactory.create(this), "null failover strategy");
+
+               this.schedulingFuture = null;
                LOG.info("Job recovers via failover strategy: {}", 
failoverStrategy.getStrategyName());
        }
 
@@ -857,37 +868,60 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
 
        public void scheduleForExecution() throws JobException {
 
+               final long currentGlobalModVersion = globalModVersion;
+
                if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
 
+                       final CompletableFuture<Void> newSchedulingFuture;
+
                        switch (scheduleMode) {
 
                                case LAZY_FROM_SOURCES:
-                                       scheduleLazy(slotProvider);
+                                       newSchedulingFuture = 
scheduleLazy(slotProvider);
                                        break;
 
                                case EAGER:
-                                       scheduleEager(slotProvider, 
allocationTimeout);
+                                       newSchedulingFuture = 
scheduleEager(slotProvider, allocationTimeout);
                                        break;
 
                                default:
                                        throw new JobException("Schedule mode 
is invalid.");
                        }
+
+                       if (state == JobStatus.RUNNING && 
currentGlobalModVersion == globalModVersion) {
+                               schedulingFuture = 
newSchedulingFuture.whenCompleteAsync(
+                                       (Void ignored, Throwable throwable) -> {
+                                               if (throwable != null) {
+                                                       
failGlobal(ExceptionUtils.stripCompletionException(throwable));
+                                               }
+                                       },
+                                       futureExecutor);
+                       } else {
+                               newSchedulingFuture.cancel(false);
+                       }
                }
                else {
                        throw new IllegalStateException("Job may only be 
scheduled from state " + JobStatus.CREATED);
                }
        }
 
-       private void scheduleLazy(SlotProvider slotProvider) {
+       private CompletableFuture<Void> scheduleLazy(SlotProvider slotProvider) 
{
+
+               final ArrayList<CompletableFuture<Void>> schedulingFutures = 
new ArrayList<>(numVerticesTotal);
+
                // simply take the vertices without inputs.
                for (ExecutionJobVertex ejv : verticesInCreationOrder) {
                        if (ejv.getJobVertex().isInputVertex()) {
-                               ejv.scheduleAll(
+                               final CompletableFuture<Void> 
schedulingJobVertexFuture = ejv.scheduleAll(
                                        slotProvider,
                                        allowQueuedScheduling,
                                        LocationPreferenceConstraint.ALL); // 
since it is an input vertex, the input based location preferences should be 
empty
+
+                               
schedulingFutures.add(schedulingJobVertexFuture);
                        }
                }
+
+               return FutureUtils.waitForAll(schedulingFutures);
        }
 
        /**
@@ -896,8 +930,10 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
         * @param slotProvider  The resource provider from which the slots are 
allocated
         * @param timeout       The maximum time that the deployment may take, 
before a
         *                      TimeoutException is thrown.
+        * @returns Future which is completed once the {@link ExecutionGraph} 
has been scheduled.
+        * The future can also be completed exceptionally if an error happened.
         */
-       private void scheduleEager(SlotProvider slotProvider, final Time 
timeout) {
+       private CompletableFuture<Void> scheduleEager(SlotProvider 
slotProvider, final Time timeout) {
                checkState(state == JobStatus.RUNNING, "job is not running 
currently");
 
                // Important: reserve all the space we need up front.
@@ -925,9 +961,23 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
                // the future fails once one slot future fails.
                final ConjunctFuture<Collection<Execution>> 
allAllocationsFuture = FutureUtils.combineAll(allAllocationFutures);
 
-               allAllocationsFuture.whenCompleteAsync(
-                       (Collection<Execution> allAllocations, Throwable 
throwable) -> {
-                               if (throwable != null) {
+               return allAllocationsFuture
+                       .thenAccept(
+                               (Collection<Execution> executionsToDeploy) -> {
+                                       for (Execution execution : 
executionsToDeploy) {
+                                               try {
+                                                       execution.deploy();
+                                               } catch (Throwable t) {
+                                                       throw new 
CompletionException(
+                                                               new 
FlinkException(
+                                                                       
String.format("Could not deploy execution %s.", execution),
+                                                                       t));
+                                               }
+                                       }
+                       })
+                       // Generate a more specific failure message for the 
eager scheduling
+                       .exceptionally(
+                               (Throwable throwable) -> {
                                        final Throwable strippedThrowable = 
ExceptionUtils.stripCompletionException(throwable);
                                        final Throwable resultThrowable;
 
@@ -942,18 +992,8 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
                                                resultThrowable = 
strippedThrowable;
                                        }
 
-                                       failGlobal(resultThrowable);
-                               } else {
-                                       try {
-                                               // successfully obtained all 
slots, now deploy
-                                               for (Execution execution : 
allAllocations) {
-                                                       execution.deploy();
-                                               }
-                                       } catch (Throwable t) {
-                                               failGlobal(new 
FlinkException("Could not deploy executions.", t));
-                                       }
-                               }
-                       }, futureExecutor);
+                                       throw new 
CompletionException(resultThrowable);
+                               });
        }
 
        public void cancel() {
@@ -966,6 +1006,13 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
                                        // make sure no concurrent local 
actions interfere with the cancellation
                                        final long globalVersionForRestart = 
incrementGlobalModVersion();
 
+                                       final CompletableFuture<Void> 
ongoingSchedulingFuture = schedulingFuture;
+
+                                       // cancel ongoing scheduling action
+                                       if (ongoingSchedulingFuture != null) {
+                                               
ongoingSchedulingFuture.cancel(false);
+                                       }
+
                                        final ArrayList<CompletableFuture<?>> 
futures = new ArrayList<>(verticesInCreationOrder.size());
 
                                        // cancel all tasks (that still need 
cancelling)
@@ -1057,6 +1104,13 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
                                // make sure no concurrent local actions 
interfere with the cancellation
                                incrementGlobalModVersion();
 
+                               final CompletableFuture<Void> 
ongoingSchedulingFuture = schedulingFuture;
+
+                               // cancel ongoing scheduling action
+                               if (ongoingSchedulingFuture != null) {
+                                       ongoingSchedulingFuture.cancel(false);
+                               }
+
                                for (ExecutionJobVertex ejv: 
verticesInCreationOrder) {
                                        ejv.cancel();
                                }
@@ -1108,6 +1162,13 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
                                // make sure no concurrent local or global 
actions interfere with the failover
                                final long globalVersionForRestart = 
incrementGlobalModVersion();
 
+                               final CompletableFuture<Void> 
ongoingSchedulingFuture = schedulingFuture;
+
+                               // cancel ongoing scheduling action
+                               if (ongoingSchedulingFuture != null) {
+                                       ongoingSchedulingFuture.cancel(false);
+                               }
+
                                // we build a future that is complete once all 
vertices have reached a terminal state
                                final ArrayList<CompletableFuture<?>> futures = 
new ArrayList<>(verticesInCreationOrder.size());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/519639c6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index db336f5..6e578fa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -472,18 +473,23 @@ public class ExecutionJobVertex implements 
AccessExecutionJobVertex, Archiveable
         * @param slotProvider to allocate the slots from
         * @param queued if the allocations can be queued
         * @param locationPreferenceConstraint constraint for the location 
preferences
+        * @return Future which is completed once all {@link Execution} could 
be deployed
         */
-       public void scheduleAll(
+       public CompletableFuture<Void> scheduleAll(
                        SlotProvider slotProvider,
                        boolean queued,
                        LocationPreferenceConstraint 
locationPreferenceConstraint) {
                
                final ExecutionVertex[] vertices = this.taskVertices;
 
+               final ArrayList<CompletableFuture<Void>> scheduleFutures = new 
ArrayList<>(vertices.length);
+
                // kick off the tasks
                for (ExecutionVertex ev : vertices) {
-                       ev.scheduleForExecution(slotProvider, queued, 
locationPreferenceConstraint);
+                       
scheduleFutures.add(ev.scheduleForExecution(slotProvider, queued, 
locationPreferenceConstraint));
                }
+
+               return FutureUtils.waitForAll(scheduleFutures);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/519639c6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 88923fb..f13e42c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -607,9 +607,10 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
         * @param slotProvider to allocate the slots from
         * @param queued if the allocation can be queued
         * @param locationPreferenceConstraint constraint for the location 
preferences
-        * @return
+        * @return Future which is completed once the execution is deployed. 
The future
+        * can also completed exceptionally.
         */
-       public boolean scheduleForExecution(
+       public CompletableFuture<Void> scheduleForExecution(
                        SlotProvider slotProvider,
                        boolean queued,
                        LocationPreferenceConstraint 
locationPreferenceConstraint) {

http://git-wip-us.apache.org/repos/asf/flink/blob/519639c6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index b16aa96..88ba446 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -28,8 +28,10 @@ import 
org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SimpleSlotContext;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -40,8 +42,11 @@ import 
org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmanager.slots.TestingSlotOwner;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.TestLogger;
@@ -412,6 +417,54 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
                verify(taskManager, 
times(0)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
        }
 
+       /**
+        * Tests that an ongoing scheduling operation does not fail the {@link 
ExecutionGraph}
+        * if it gets concurrently cancelled
+        */
+       @Test
+       public void testSchedulingOperationCancellationWhenCancel() throws 
Exception {
+               final JobVertex jobVertex = new JobVertex("NoOp JobVertex");
+               jobVertex.setInvokableClass(NoOpInvokable.class);
+               jobVertex.setParallelism(2);
+               final JobGraph jobGraph = new JobGraph(jobVertex);
+               jobGraph.setScheduleMode(ScheduleMode.EAGER);
+               jobGraph.setAllowQueuedScheduling(true);
+
+               final CompletableFuture<LogicalSlot> slotFuture1 = new 
CompletableFuture<>();
+               final CompletableFuture<LogicalSlot> slotFuture2 = new 
CompletableFuture<>();
+               final ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(2);
+               slotProvider.addSlots(jobVertex.getID(), new 
CompletableFuture[]{slotFuture1, slotFuture2});
+               final ExecutionGraph executionGraph = 
createExecutionGraph(jobGraph, slotProvider);
+
+               executionGraph.scheduleForExecution();
+
+               final CompletableFuture<?> releaseFuture = new 
CompletableFuture<>();
+
+               final TestingLogicalSlot slot = new TestingLogicalSlot(
+                       new LocalTaskManagerLocation(),
+                       new SimpleAckingTaskManagerGateway(),
+                       0,
+                       new AllocationID(),
+                       new SlotRequestId(),
+                       new SlotSharingGroupId(),
+                       releaseFuture);
+               slotFuture1.complete(slot);
+
+               // cancel should change the state of all executions to CANCELLED
+               executionGraph.cancel();
+
+               // complete the now CANCELLED execution --> this should cause a 
failure
+               slotFuture2.complete(new TestingLogicalSlot());
+
+               Thread.sleep(1L);
+               // release the first slot to finish the cancellation
+               releaseFuture.complete(null);
+
+               // NOTE: This test will only occasionally fail without the fix 
since there is
+               // a race between the releaseFuture and the slotFuture2
+               assertThat(executionGraph.getTerminationFuture().get(), 
is(JobStatus.CANCELED));
+       }
+
        // 
------------------------------------------------------------------------
        //  Utilities
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/519639c6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index edae2c7..38518d6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -304,7 +304,7 @@ public class ExecutionTest extends TestLogger {
 
                ExecutionVertex executionVertex = 
executionJobVertex.getTaskVertices()[0];
 
-               assertTrue(executionVertex.scheduleForExecution(slotProvider, 
false, LocationPreferenceConstraint.ANY));
+               executionVertex.scheduleForExecution(slotProvider, false, 
LocationPreferenceConstraint.ANY).get();
 
                Execution currentExecutionAttempt = 
executionVertex.getCurrentExecutionAttempt();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/519639c6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java
index e20700e..25f296d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java
@@ -45,7 +45,10 @@ public class TestingLogicalSlot implements LogicalSlot {
 
        private final int slotNumber;
 
-       private final CompletableFuture<?> releaseFuture = new 
CompletableFuture<>();
+       private final CompletableFuture<?> releaseFuture;
+
+       @Nullable
+       private final CompletableFuture<?> customReleaseFuture;
        
        private final AllocationID allocationId;
 
@@ -60,7 +63,8 @@ public class TestingLogicalSlot implements LogicalSlot {
                        0,
                        new AllocationID(),
                        new SlotRequestId(),
-                       new SlotSharingGroupId());
+                       new SlotSharingGroupId(),
+                       null);
        }
 
        public TestingLogicalSlot(
@@ -69,7 +73,8 @@ public class TestingLogicalSlot implements LogicalSlot {
                        int slotNumber,
                        AllocationID allocationId,
                        SlotRequestId slotRequestId,
-                       SlotSharingGroupId slotSharingGroupId) {
+                       SlotSharingGroupId slotSharingGroupId,
+                       @Nullable CompletableFuture<?> customReleaseFuture) {
                this.taskManagerLocation = 
Preconditions.checkNotNull(taskManagerLocation);
                this.taskManagerGateway = 
Preconditions.checkNotNull(taskManagerGateway);
                this.payloadReference = new AtomicReference<>();
@@ -77,6 +82,8 @@ public class TestingLogicalSlot implements LogicalSlot {
                this.allocationId = Preconditions.checkNotNull(allocationId);
                this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
                this.slotSharingGroupId = 
Preconditions.checkNotNull(slotSharingGroupId);
+               this.releaseFuture = new CompletableFuture<>();
+               this.customReleaseFuture = customReleaseFuture;
        }
 
        @Override
@@ -96,7 +103,11 @@ public class TestingLogicalSlot implements LogicalSlot {
 
        @Override
        public boolean isAlive() {
-               return !releaseFuture.isDone();
+               if (customReleaseFuture != null) {
+                       return !customReleaseFuture.isDone();
+               } else {
+                       return !releaseFuture.isDone();
+               }
        }
 
        @Override
@@ -112,9 +123,12 @@ public class TestingLogicalSlot implements LogicalSlot {
 
        @Override
        public CompletableFuture<?> releaseSlot(@Nullable Throwable cause) {
-               releaseFuture.complete(null);
-
-               return releaseFuture;
+               if (customReleaseFuture != null) {
+                       return customReleaseFuture;
+               } else {
+                       releaseFuture.complete(null);
+                       return releaseFuture;
+               }
        }
 
        @Override

Reply via email to