[FLINK-8749] [flip6] Release slots when scheduling operation is canceled

Release slots when the scheduling operation is canceled in the ExecutionGraph.

This closes #5562.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3969170f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3969170f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3969170f

Branch: refs/heads/master
Commit: 3969170f5c2dba5ab76bed617648531b1e9aa435
Parents: 529b512
Author: Till Rohrmann <[email protected]>
Authored: Thu Feb 22 17:01:05 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Sat Feb 24 15:05:14 2018 +0100

----------------------------------------------------------------------
 .../flink/runtime/concurrent/FutureUtils.java   | 70 +++++++++++++++-----
 .../flink/runtime/executiongraph/Execution.java | 66 +++++++++++-------
 .../runtime/executiongraph/ExecutionGraph.java  | 24 +++++--
 .../runtime/jobmanager/scheduler/Scheduler.java |  9 +++
 .../runtime/jobmaster/slotpool/SlotPool.java    | 14 +++-
 .../jobmaster/slotpool/SlotProvider.java        | 45 ++++++++++++-
 .../runtime/concurrent/FutureUtilsTest.java     | 34 ++++++++++
 .../ExecutionGraphMetricsTest.java              |  7 +-
 .../runtime/executiongraph/ExecutionTest.java   | 68 +++++++++++++++++--
 .../ExecutionVertexSchedulingTest.java          |  8 +--
 .../executiongraph/ProgrammedSlotProvider.java  | 28 ++++++++
 .../utils/SimpleSlotProvider.java               | 47 ++++++++++---
 .../jobmanager/scheduler/SchedulerTestBase.java | 28 ++++++--
 13 files changed, 366 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3969170f/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 4a253b0..da77bdc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -354,18 +354,7 @@ public class FutureUtils {
        public static <T> ConjunctFuture<Collection<T>> combineAll(Collection<? 
extends CompletableFuture<? extends T>> futures) {
                checkNotNull(futures, "futures");
 
-               final ResultConjunctFuture<T> conjunct = new 
ResultConjunctFuture<>(futures.size());
-
-               if (futures.isEmpty()) {
-                       conjunct.complete(Collections.emptyList());
-               }
-               else {
-                       for (CompletableFuture<? extends T> future : futures) {
-                               
future.whenComplete(conjunct::handleCompletedFuture);
-                       }
-               }
-
-               return conjunct;
+               return new ResultConjunctFuture<>(futures);
        }
 
        /**
@@ -407,6 +396,22 @@ public class FutureUtils {
                 * @return The number of Futures in the conjunction that are 
already complete
                 */
                public abstract int getNumFuturesCompleted();
+
+               /**
+                * Gets the individual futures which make up the {@link 
ConjunctFuture}.
+                *
+                * @return Collection of futures which make up the {@link 
ConjunctFuture}
+                */
+               protected abstract Collection<? extends CompletableFuture<?>> 
getConjunctFutures();
+
+               @Override
+               public boolean cancel(boolean mayInterruptIfRunning) {
+                       for (CompletableFuture<?> completableFuture : 
getConjunctFutures()) {
+                               completableFuture.cancel(mayInterruptIfRunning);
+                       }
+
+                       return super.cancel(mayInterruptIfRunning);
+               }
        }
 
        /**
@@ -414,6 +419,8 @@ public class FutureUtils {
         */
        private static class ResultConjunctFuture<T> extends 
ConjunctFuture<Collection<T>> {
 
+               private final Collection<? extends CompletableFuture<? extends 
T>> resultFutures;
+
                /** The total number of futures in the conjunction. */
                private final int numTotal;
 
@@ -429,7 +436,7 @@ public class FutureUtils {
                /** The function that is attached to all futures in the 
conjunction. Once a future
                 * is complete, this function tracks the completion or fails 
the conjunct.
                 */
-               final void handleCompletedFuture(T value, Throwable throwable) {
+               private void handleCompletedFuture(T value, Throwable 
throwable) {
                        if (throwable != null) {
                                completeExceptionally(throwable);
                        } else {
@@ -444,9 +451,19 @@ public class FutureUtils {
                }
 
                @SuppressWarnings("unchecked")
-               ResultConjunctFuture(int numTotal) {
-                       this.numTotal = numTotal;
+               ResultConjunctFuture(Collection<? extends CompletableFuture<? 
extends T>> resultFutures) {
+                       this.resultFutures = checkNotNull(resultFutures);
+                       this.numTotal = resultFutures.size();
                        results = (T[]) new Object[numTotal];
+
+                       if (resultFutures.isEmpty()) {
+                               complete(Collections.emptyList());
+                       }
+                       else {
+                               for (CompletableFuture<? extends T> future : 
resultFutures) {
+                                       
future.whenComplete(this::handleCompletedFuture);
+                               }
+                       }
                }
 
                @Override
@@ -458,6 +475,11 @@ public class FutureUtils {
                public int getNumFuturesCompleted() {
                        return numCompleted.get();
                }
+
+               @Override
+               protected Collection<? extends CompletableFuture<?>> 
getConjunctFutures() {
+                       return resultFutures;
+               }
        }
 
        /**
@@ -466,6 +488,8 @@ public class FutureUtils {
         */
        private static final class WaitingConjunctFuture extends 
ConjunctFuture<Void> {
 
+               private final Collection<? extends CompletableFuture<?>> 
futures;
+
                /** Number of completed futures. */
                private final AtomicInteger numCompleted = new AtomicInteger(0);
 
@@ -484,8 +508,7 @@ public class FutureUtils {
                }
 
                private WaitingConjunctFuture(Collection<? extends 
CompletableFuture<?>> futures) {
-                       Preconditions.checkNotNull(futures, "Futures must not 
be null.");
-
+                       this.futures = checkNotNull(futures);
                        this.numTotal = futures.size();
 
                        if (futures.isEmpty()) {
@@ -506,6 +529,11 @@ public class FutureUtils {
                public int getNumFuturesCompleted() {
                        return numCompleted.get();
                }
+
+               @Override
+               protected Collection<? extends CompletableFuture<?>> 
getConjunctFutures() {
+                       return futures;
+               }
        }
 
        /**
@@ -533,11 +561,14 @@ public class FutureUtils {
 
                private final int numFuturesTotal;
 
+               private final Collection<? extends CompletableFuture<?>> 
futuresToComplete;
+
                private int futuresCompleted;
 
                private Throwable globalThrowable;
 
                private CompletionConjunctFuture(Collection<? extends 
CompletableFuture<?>> futuresToComplete) {
+                       this.futuresToComplete = 
checkNotNull(futuresToComplete);
                        numFuturesTotal = futuresToComplete.size();
 
                        futuresCompleted = 0;
@@ -582,6 +613,11 @@ public class FutureUtils {
                                return futuresCompleted;
                        }
                }
+
+               @Override
+               protected Collection<? extends CompletableFuture<?>> 
getConjunctFutures() {
+                       return futuresToComplete;
+               }
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/3969170f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 3e77d3e..a504799 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -34,8 +34,6 @@ import 
org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
@@ -43,6 +41,9 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstrain
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.StackTraceSampleResponse;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -81,16 +82,16 @@ import static 
org.apache.flink.util.Preconditions.checkState;
  * A single execution of a vertex. While an {@link ExecutionVertex} can be 
executed multiple times
  * (for recovery, re-computation, re-configuration), this class tracks the 
state of a single execution
  * of that vertex and the resources.
- * 
+ *
  * <h2>Lock free state transitions</h2>
- * 
- * In several points of the code, we need to deal with possible concurrent 
state changes and actions.
+ *
+ * <p>In several points of the code, we need to deal with possible concurrent 
state changes and actions.
  * For example, while the call to deploy a task (send it to the TaskManager) 
happens, the task gets cancelled.
- * 
+ *
  * <p>We could lock the entire portion of the code (decision to deploy, 
deploy, set state to running) such that
  * it is guaranteed that any "cancel command" will only pick up after 
deployment is done and that the "cancel
  * command" call will never overtake the deploying call.
- * 
+ *
  * <p>This blocks the threads big time, because the remote calls may take 
long. Depending of their locking behavior, it
  * may even result in distributed deadlocks (unless carefully avoided). We 
therefore use atomic state updates and
  * occasional double-checking to ensure that the state after a completed call 
is as expected, and trigger correcting
@@ -117,10 +118,10 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
        /** The executor which is used to execute futures. */
        private final Executor executor;
 
-       /** The execution vertex whose task this execution executes */ 
+       /** The execution vertex whose task this execution executes. */
        private final ExecutionVertex vertex;
 
-       /** The unique ID marking the specific execution instant of the task */
+       /** The unique ID marking the specific execution instant of the task. */
        private final ExecutionAttemptID attemptId;
 
        /** Gets the global modification version of the execution graph when 
this execution was created.
@@ -128,7 +129,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
         * to resolve conflicts between concurrent modification by global and 
local failover actions. */
        private final long globalModVersion;
 
-       /** The timestamps when state transitions occurred, indexed by {@link 
ExecutionState#ordinal()} */ 
+       /** The timestamps when state transitions occurred, indexed by {@link 
ExecutionState#ordinal()}. */
        private final long[] stateTimestamps;
 
        private final int attemptNumber;
@@ -137,7 +138,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
 
        private final 
ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> 
partialInputChannelDeploymentDescriptors;
 
-       /** A future that completes once the Execution reaches a terminal 
ExecutionState */
+       /** A future that completes once the Execution reaches a terminal 
ExecutionState. */
        private final CompletableFuture<ExecutionState> terminalStateFuture;
 
        private final CompletableFuture<?> releaseFuture;
@@ -150,14 +151,14 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
 
        private volatile Throwable failureCause;          // once assigned, 
never changes
 
-       /** Information to restore the task on recovery, such as checkpoint id 
and task state snapshot */
+       /** Information to restore the task on recovery, such as checkpoint id 
and task state snapshot. */
        @Nullable
        private volatile JobManagerTaskRestore taskRestore;
 
        // ------------------------ Accumulators & Metrics 
------------------------
 
        /** Lock for updating the accumulators atomically.
-        * Prevents final accumulators to be overwritten by partial 
accumulators on a late heartbeat */
+        * Prevents final accumulators to be overwritten by partial 
accumulators on a late heartbeat. */
        private final Object accumulatorLock = new Object();
 
        /* Continuously updated map of user-defined accumulators */
@@ -460,25 +461,40 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                        // calculate the preferred locations
                        final 
CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture = 
calculatePreferredLocations(locationPreferenceConstraint);
 
-                       return preferredLocationsFuture
+                       final SlotRequestId slotRequestId = new SlotRequestId();
+
+                       final CompletableFuture<LogicalSlot> logicalSlotFuture 
= preferredLocationsFuture
                                .thenCompose(
                                        (Collection<TaskManagerLocation> 
preferredLocations) ->
                                                slotProvider.allocateSlot(
+                                                       slotRequestId,
                                                        toSchedule,
                                                        queued,
                                                        preferredLocations,
-                                                       allocationTimeout))
-                               .thenApply(
-                                       (LogicalSlot logicalSlot) -> {
-                                               if 
(tryAssignResource(logicalSlot)) {
-                                                       return this;
-                                               } else {
-                                                       // release the slot
-                                                       
logicalSlot.releaseSlot(new FlinkException("Could not assign logical slot to 
execution " + this + '.'));
+                                                       allocationTimeout));
 
-                                                       throw new 
CompletionException(new FlinkException("Could not assign slot " + logicalSlot + 
" to execution " + this + " because it has already been assigned "));
-                                               }
-                                       });
+                       // register call back to cancel slot request in case 
that the execution gets canceled
+                       releaseFuture.whenComplete(
+                               (Object ignored, Throwable throwable) -> {
+                                       if (logicalSlotFuture.cancel(false)) {
+                                               slotProvider.cancelSlotRequest(
+                                                       slotRequestId,
+                                                       slotSharingGroupId,
+                                                       new 
FlinkException("Execution " + this + " was released."));
+                                       }
+                               });
+
+                       return logicalSlotFuture.thenApply(
+                               (LogicalSlot logicalSlot) -> {
+                                       if (tryAssignResource(logicalSlot)) {
+                                               return this;
+                                       } else {
+                                               // release the slot
+                                               logicalSlot.releaseSlot(new 
FlinkException("Could not assign logical slot to execution " + this + '.'));
+
+                                               throw new 
CompletionException(new FlinkException("Could not assign slot " + logicalSlot + 
" to execution " + this + " because it has already been assigned "));
+                                       }
+                               });
                }
                else {
                        // call race, already deployed, or already done

http://git-wip-us.apache.org/repos/asf/flink/blob/3969170f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 3d69d71..4e8b972 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -91,6 +91,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
@@ -891,9 +892,12 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
                        }
 
                        if (state == JobStatus.RUNNING && 
currentGlobalModVersion == globalModVersion) {
-                               schedulingFuture = 
newSchedulingFuture.whenCompleteAsync(
+                               schedulingFuture = newSchedulingFuture;
+
+                               newSchedulingFuture.whenCompleteAsync(
                                        (Void ignored, Throwable throwable) -> {
-                                               if (throwable != null) {
+                                               if (throwable != null && 
!(throwable instanceof CancellationException)) {
+                                                       // only fail if the 
scheduling future was not canceled
                                                        
failGlobal(ExceptionUtils.stripCompletionException(throwable));
                                                }
                                        },
@@ -963,7 +967,7 @@ public class ExecutionGraph implements AccessExecutionGraph 
{
                // the future fails once one slot future fails.
                final ConjunctFuture<Collection<Execution>> 
allAllocationsFuture = FutureUtils.combineAll(allAllocationFutures);
 
-               return allAllocationsFuture
+               final CompletableFuture<Void> currentSchedulingFuture = 
allAllocationsFuture
                        .thenAccept(
                                (Collection<Execution> executionsToDeploy) -> {
                                        for (Execution execution : 
executionsToDeploy) {
@@ -976,7 +980,7 @@ public class ExecutionGraph implements AccessExecutionGraph 
{
                                                                        t));
                                                }
                                        }
-                       })
+                               })
                        // Generate a more specific failure message for the 
eager scheduling
                        .exceptionally(
                                (Throwable throwable) -> {
@@ -993,9 +997,19 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
                                        } else {
                                                resultThrowable = 
strippedThrowable;
                                        }
-                                       
+
                                        throw new 
CompletionException(resultThrowable);
                                });
+
+               currentSchedulingFuture.whenComplete(
+                       (Void ignored, Throwable throwable) -> {
+                               if (throwable instanceof CancellationException) 
{
+                                       // cancel the individual allocation 
futures
+                                       allAllocationsFuture.cancel(false);
+                               }
+                       });
+
+               return currentSchedulingFuture;
        }
 
        public void cancel() {

http://git-wip-us.apache.org/repos/asf/flink/blob/3969170f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 77f89b7..fc79d40 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -26,12 +26,15 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceDiedException;
 import org.apache.flink.runtime.instance.InstanceListener;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.instance.SharedSlot;
 import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
@@ -142,6 +145,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener, Sl
 
        @Override
        public CompletableFuture<LogicalSlot> allocateSlot(
+                       SlotRequestId slotRequestId,
                        ScheduledUnit task,
                        boolean allowQueued,
                        Collection<TaskManagerLocation> preferredLocations,
@@ -167,6 +171,11 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener, Sl
                }
        }
 
+       @Override
+       public CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestId 
slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable 
cause) {
+               return CompletableFuture.completedFuture(Acknowledge.get());
+       }
+
        /**
         * Returns either a {@link SimpleSlot}, or a {@link CompletableFuture}.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/3969170f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 409f8f7..a49e6ed 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -1571,14 +1571,14 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
 
                @Override
                public CompletableFuture<LogicalSlot> allocateSlot(
+                               SlotRequestId slotRequestId,
                                ScheduledUnit task,
                                boolean allowQueued,
                                Collection<TaskManagerLocation> 
preferredLocations,
                                Time timeout) {
 
-                       final SlotRequestId requestId = new SlotRequestId();
                        CompletableFuture<LogicalSlot> slotFuture = 
gateway.allocateSlot(
-                               requestId,
+                               slotRequestId,
                                task,
                                ResourceProfile.UNKNOWN,
                                preferredLocations,
@@ -1589,7 +1589,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                                (LogicalSlot slot, Throwable failure) -> {
                                        if (failure != null) {
                                                gateway.releaseSlot(
-                                                       requestId,
+                                                       slotRequestId,
                                                        
task.getSlotSharingGroupId(),
                                                        failure);
                                        }
@@ -1597,6 +1597,14 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
 
                        return slotFuture;
                }
+
+               @Override
+               public CompletableFuture<Acknowledge> cancelSlotRequest(
+                               SlotRequestId slotRequestId,
+                               @Nullable SlotSharingGroupId slotSharingGroupId,
+                               Throwable cause) {
+                       return gateway.releaseSlot(slotRequestId, 
slotSharingGroupId, cause);
+               }
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/3969170f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java
index 4f82688..80b2689 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java
@@ -19,16 +19,21 @@
 package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
+import javax.annotation.Nullable;
+
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 /**
  * The slot provider is responsible for preparing slots for ready-to-run tasks.
- * 
+ *
  * <p>It supports two allocating modes:
  * <ul>
  *     <li>Immediate allocating: A request for a task slot immediately gets 
satisfied, we can call
@@ -49,8 +54,44 @@ public interface SlotProvider {
         * @return The future of the allocation
         */
        CompletableFuture<LogicalSlot> allocateSlot(
+               SlotRequestId slotRequestId,
                ScheduledUnit task,
                boolean allowQueued,
                Collection<TaskManagerLocation> preferredLocations,
                Time timeout);
+
+       /**
+        * Allocating slot with specific requirement.
+        *
+        * @param task The task to allocate the slot for
+        * @param allowQueued Whether allow the task be queued if we do not 
have enough resource
+        * @param preferredLocations preferred locations for the slot allocation
+        * @param timeout after which the allocation fails with a timeout 
exception
+        * @return The future of the allocation
+        */
+       default CompletableFuture<LogicalSlot> allocateSlot(
+               ScheduledUnit task,
+               boolean allowQueued,
+               Collection<TaskManagerLocation> preferredLocations,
+               Time timeout) {
+               return allocateSlot(
+                       new SlotRequestId(),
+                       task,
+                       allowQueued,
+                       preferredLocations,
+                       timeout);
+       }
+
+       /**
+        * Cancels the slot request with the given {@link SlotRequestId} and 
{@link SlotSharingGroupId}.
+        *
+        * @param slotRequestId identifying the slot request to cancel
+        * @param slotSharingGroupId identifying the slot request to cancel
+        * @param cause of the cancellation
+        * @return Future which is completed once the slot request has been 
cancelled
+        */
+       CompletableFuture<Acknowledge> cancelSlotRequest(
+               SlotRequestId slotRequestId,
+               @Nullable SlotSharingGroupId slotSharingGroupId,
+               Throwable cause);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3969170f/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
index 7ad1638..57f6bd0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
@@ -40,6 +41,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
 import java.util.function.Supplier;
 
 import static org.hamcrest.CoreMatchers.containsString;
@@ -424,4 +426,36 @@ public class FutureUtilsTest extends TestLogger {
                        assertThat(suppressed, 
arrayContaining(suppressedException));
                }
        }
+
+       @Test
+       public void testCancelWaitingConjunctFuture() {
+               cancelConjunctFuture(inputFutures -> 
FutureUtils.waitForAll(inputFutures));
+       }
+
+       @Test
+       public void testCancelResultConjunctFuture() {
+               cancelConjunctFuture(inputFutures -> 
FutureUtils.combineAll(inputFutures));
+       }
+
+       @Test
+       public void testCancelCompleteConjunctFuture() {
+               cancelConjunctFuture(inputFutures -> 
FutureUtils.completeAll(inputFutures));
+       }
+
+       private void cancelConjunctFuture(Function<Collection<? extends 
CompletableFuture<?>>, FutureUtils.ConjunctFuture<?>> conjunctFutureFactory) {
+               final int numInputFutures = 10;
+               final Collection<CompletableFuture<Void>> inputFutures = new 
ArrayList<>(numInputFutures);
+
+               for (int i = 0; i < numInputFutures; i++) {
+                       inputFutures.add(new CompletableFuture<>());
+               }
+
+               final FutureUtils.ConjunctFuture<?> conjunctFuture = 
conjunctFutureFactory.apply(inputFutures);
+
+               conjunctFuture.cancel(false);
+
+               for (CompletableFuture<Void> inputFuture : inputFutures) {
+                       assertThat(inputFuture.isCancelled(), is(true));
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3969170f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 18476a5..1b835ad 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -27,13 +27,14 @@ import 
org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
 import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.SerializedValue;
@@ -79,7 +80,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 
                        CompletableFuture<LogicalSlot> slotFuture1 = 
CompletableFuture.completedFuture(new TestingLogicalSlot());
                        CompletableFuture<LogicalSlot> slotFuture2 = 
CompletableFuture.completedFuture(new TestingLogicalSlot());
-                       when(scheduler.allocateSlot(any(ScheduledUnit.class), 
anyBoolean(), any(Collection.class), any(Time.class))).thenReturn(slotFuture1, 
slotFuture2);
+                       when(scheduler.allocateSlot(any(SlotRequestId.class), 
any(ScheduledUnit.class), anyBoolean(), any(Collection.class), 
any(Time.class))).thenReturn(slotFuture1, slotFuture2);
 
                        TestingRestartStrategy testingRestartStrategy = new 
TestingRestartStrategy();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3969170f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index 38518d6..99879c0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -22,13 +22,16 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -39,10 +42,14 @@ import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -55,7 +62,7 @@ public class ExecutionTest extends TestLogger {
 
        /**
         * Tests that slots are released if we cannot assign the allocated 
resource to the
-        * Execution. In this case, a concurrent cancellation precedes the 
assignment.
+        * Execution.
         */
        @Test
        public void testSlotReleaseOnFailedResourceAssignment() throws 
Exception {
@@ -85,6 +92,8 @@ public class ExecutionTest extends TestLogger {
                        0,
                        new SimpleAckingTaskManagerGateway());
 
+               final LogicalSlot otherSlot = new TestingLogicalSlot();
+
                CompletableFuture<Execution> allocationFuture = 
execution.allocateAndAssignSlotForExecution(
                        slotProvider,
                        false,
@@ -95,11 +104,8 @@ public class ExecutionTest extends TestLogger {
 
                assertEquals(ExecutionState.SCHEDULED, execution.getState());
 
-               // cancelling the execution should move it into state CANCELED; 
this happens before
-               // the slot future has been completed
-               execution.cancel();
-
-               assertEquals(ExecutionState.CANCELED, execution.getState());
+               // assign a different resource to the execution
+               assertTrue(execution.tryAssignResource(otherSlot));
 
                // completing now the future should cause the slot to be 
released
                slotFuture.complete(slot);
@@ -214,6 +220,54 @@ public class ExecutionTest extends TestLogger {
        }
 
        /**
+        * Tests that a slot allocation from a {@link SlotProvider} is 
cancelled if the
+        * {@link Execution} is cancelled.
+        */
+       @Test
+       public void testSlotAllocationCancellationWhenExecutionCancelled() 
throws Exception {
+               final JobVertexID jobVertexId = new JobVertexID();
+               final JobVertex jobVertex = new JobVertex("test vertex", 
jobVertexId);
+               jobVertex.setInvokableClass(NoOpInvokable.class);
+
+               final ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(1);
+               final CompletableFuture<LogicalSlot> slotFuture = new 
CompletableFuture<>();
+               slotProvider.addSlot(jobVertexId, 0, slotFuture);
+
+               final ExecutionGraph executionGraph = 
ExecutionGraphTestUtils.createSimpleTestGraph(
+                       new JobID(),
+                       slotProvider,
+                       new NoRestartStrategy(),
+                       jobVertex);
+
+               final ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
+
+               final Execution currentExecutionAttempt = 
executionJobVertex.getTaskVertices()[0].getCurrentExecutionAttempt();
+
+               final CompletableFuture<Execution> allocationFuture = 
currentExecutionAttempt.allocateAndAssignSlotForExecution(
+                       slotProvider,
+                       false,
+                       LocationPreferenceConstraint.ALL,
+                       TestingUtils.infiniteTime());
+
+               assertThat(allocationFuture.isDone(), is(false));
+
+               assertThat(slotProvider.getSlotRequestedFuture(jobVertexId, 
0).get(), is(true));
+
+               final Set<SlotRequestId> slotRequests = 
slotProvider.getSlotRequests();
+               assertThat(slotRequests, hasSize(1));
+
+               assertThat(currentExecutionAttempt.getState(), 
is(ExecutionState.SCHEDULED));
+
+               currentExecutionAttempt.cancel();
+               assertThat(currentExecutionAttempt.getState(), 
is(ExecutionState.CANCELED));
+
+               assertThat(allocationFuture.isCompletedExceptionally(), 
is(true));
+
+               final Set<SlotRequestId> canceledSlotRequests = 
slotProvider.getCanceledSlotRequests();
+               assertThat(canceledSlotRequests, equalTo(slotRequests));
+       }
+
+       /**
         * Tests that all preferred locations are calculated.
         */
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/3969170f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 2c49573..c0d5dc0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -30,10 +30,10 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 
 import org.junit.Test;
-import org.mockito.Matchers;
 
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
@@ -67,7 +67,7 @@ public class ExecutionVertexSchedulingTest {
                        Scheduler scheduler = mock(Scheduler.class);
                        CompletableFuture<LogicalSlot> future = new 
CompletableFuture<>();
                        future.complete(slot);
-                       
when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean(), 
any(Collection.class), any(Time.class))).thenReturn(future);
+                       when(scheduler.allocateSlot(any(SlotRequestId.class), 
any(ScheduledUnit.class), anyBoolean(), any(Collection.class), 
any(Time.class))).thenReturn(future);
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
                        // try to deploy to the slot
@@ -99,7 +99,7 @@ public class ExecutionVertexSchedulingTest {
                        final CompletableFuture<LogicalSlot> future = new 
CompletableFuture<>();
 
                        Scheduler scheduler = mock(Scheduler.class);
-                       
when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean(), 
any(Collection.class), any(Time.class))).thenReturn(future);
+                       when(scheduler.allocateSlot(any(SlotRequestId.class), 
any(ScheduledUnit.class), anyBoolean(), any(Collection.class), 
any(Time.class))).thenReturn(future);
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
                        // try to deploy to the slot
@@ -133,7 +133,7 @@ public class ExecutionVertexSchedulingTest {
                        Scheduler scheduler = mock(Scheduler.class);
                        CompletableFuture<LogicalSlot> future = new 
CompletableFuture<>();
                        future.complete(slot);
-                       
when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean(), 
any(Collection.class), any(Time.class))).thenReturn(future);
+                       when(scheduler.allocateSlot(any(SlotRequestId.class), 
any(ScheduledUnit.class), anyBoolean(), any(Collection.class), 
any(Time.class))).thenReturn(future);
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3969170f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
index d93693e..2e7ebc9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
@@ -19,15 +19,23 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
+import javax.annotation.Nullable;
+
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -43,6 +51,10 @@ class ProgrammedSlotProvider implements SlotProvider {
 
        private final Map<JobVertexID, CompletableFuture<Boolean>[]> 
slotFutureRequested = new HashMap<>();
 
+       private final Set<SlotRequestId> slotRequests = new HashSet<>();
+
+       private final Set<SlotRequestId> canceledSlotRequests = new HashSet<>();
+
        private final int parallelism;
 
        public ProgrammedSlotProvider(int parallelism) {
@@ -92,8 +104,17 @@ class ProgrammedSlotProvider implements SlotProvider {
                return slotFutureRequested.get(jobVertexId)[subtaskIndex];
        }
 
+       public Set<SlotRequestId> getSlotRequests() {
+               return Collections.unmodifiableSet(slotRequests);
+       }
+
+       public Set<SlotRequestId> getCanceledSlotRequests() {
+               return Collections.unmodifiableSet(canceledSlotRequests);
+       }
+
        @Override
        public CompletableFuture<LogicalSlot> allocateSlot(
+                       SlotRequestId slotRequestId,
                        ScheduledUnit task,
                        boolean allowQueued,
                        Collection<TaskManagerLocation> preferredLocations,
@@ -106,6 +127,7 @@ class ProgrammedSlotProvider implements SlotProvider {
                        CompletableFuture<LogicalSlot> future = 
forTask[subtask];
                        if (future != null) {
                                
slotFutureRequested.get(vertexId)[subtask].complete(true);
+                               slotRequests.add(slotRequestId);
 
                                return future;
                        }
@@ -113,4 +135,10 @@ class ProgrammedSlotProvider implements SlotProvider {
 
                throw new IllegalArgumentException("No registered slot future 
for task " + vertexId + " (" + subtask + ')');
        }
+
+       @Override
+       public CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestId 
slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable 
cause) {
+               canceledSlotRequests.add(slotRequestId);
+               return CompletableFuture.completedFuture(Acknowledge.get());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3969170f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
index 776ce16..c082e9a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
@@ -23,9 +23,11 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
@@ -33,12 +35,17 @@ import org.apache.flink.runtime.instance.SimpleSlotContext;
 import org.apache.flink.runtime.jobmaster.SlotContext;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 import java.net.InetAddress;
 import java.util.ArrayDeque;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -49,8 +56,12 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class SimpleSlotProvider implements SlotProvider, SlotOwner {
 
+       private final Object lock = new Object();
+
        private final ArrayDeque<SlotContext> slots;
 
+       private final HashMap<SlotRequestId, SlotContext> allocatedSlots;
+
        public SimpleSlotProvider(JobID jobId, int numSlots) {
                this(jobId, numSlots, new SimpleAckingTaskManagerGateway());
        }
@@ -69,30 +80,47 @@ public class SimpleSlotProvider implements SlotProvider, 
SlotOwner {
                                taskManagerGateway);
                        slots.add(as);
                }
+
+               allocatedSlots = new HashMap<>(slots.size());
        }
 
        @Override
        public CompletableFuture<LogicalSlot> allocateSlot(
+                       SlotRequestId slotRequestId,
                        ScheduledUnit task,
                        boolean allowQueued,
                        Collection<TaskManagerLocation> preferredLocations,
                        Time allocationTimeout) {
                final SlotContext slot;
 
-               synchronized (slots) {
+               synchronized (lock) {
                        if (slots.isEmpty()) {
                                slot = null;
                        } else {
                                slot = slots.removeFirst();
                        }
+                       if (slot != null) {
+                               SimpleSlot result = new SimpleSlot(slot, this, 
0);
+                               allocatedSlots.put(slotRequestId, slot);
+                               return 
CompletableFuture.completedFuture(result);
+                       }
+                       else {
+                               return FutureUtils.completedExceptionally(new 
NoResourceAvailableException());
+                       }
                }
+       }
 
-               if (slot != null) {
-                       SimpleSlot result = new SimpleSlot(slot, this, 0);
-                       return CompletableFuture.completedFuture(result);
-               }
-               else {
-                       return FutureUtils.completedExceptionally(new 
NoResourceAvailableException());
+       @Override
+       public CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestId 
slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable 
cause) {
+               synchronized (lock) {
+                       final SlotContext slotContext = 
allocatedSlots.remove(slotRequestId);
+
+                       if (slotContext != null) {
+                               slots.add(slotContext);
+                               return 
CompletableFuture.completedFuture(Acknowledge.get());
+                       } else {
+                               return FutureUtils.completedExceptionally(new 
FlinkException("Unknown slot request id " + slotRequestId + '.'));
+                       }
                }
        }
 
@@ -102,14 +130,15 @@ public class SimpleSlotProvider implements SlotProvider, 
SlotOwner {
 
                final Slot slot = ((Slot) logicalSlot);
 
-               synchronized (slots) {
+               synchronized (lock) {
                        slots.add(slot.getSlotContext());
+                       allocatedSlots.remove(logicalSlot.getSlotRequestId());
                }
                return CompletableFuture.completedFuture(true);
        }
 
        public int getNumberOfAvailableSlots() {
-               synchronized (slots) {
+               synchronized (lock) {
                        return slots.size();
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/3969170f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
index 76a5642..9cb9cff 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
@@ -25,15 +25,17 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
@@ -49,6 +51,8 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.runners.Parameterized;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -152,11 +156,16 @@ public class SchedulerTestBase extends TestLogger {
                }
 
                @Override
-               public CompletableFuture<LogicalSlot> 
allocateSlot(ScheduledUnit task, boolean allowQueued, 
Collection<TaskManagerLocation> preferredLocations, Time allocationTimeout) {
+               public CompletableFuture<LogicalSlot> 
allocateSlot(SlotRequestId slotRequestId, ScheduledUnit task, boolean 
allowQueued, Collection<TaskManagerLocation> preferredLocations, Time 
allocationTimeout) {
                        return scheduler.allocateSlot(task, allowQueued, 
preferredLocations, allocationTimeout);
                }
 
                @Override
+               public CompletableFuture<Acknowledge> 
cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId 
slotSharingGroupId, Throwable cause) {
+                       return 
CompletableFuture.completedFuture(Acknowledge.get());
+               }
+
+               @Override
                public TaskManagerLocation addTaskManager(int numberSlots) {
                        final Instance instance = 
getRandomInstance(numberSlots);
                        scheduler.newInstanceAvailable(instance);
@@ -340,7 +349,7 @@ public class SchedulerTestBase extends TestLogger {
                }
 
                @Override
-               public CompletableFuture<LogicalSlot> 
allocateSlot(ScheduledUnit task, boolean allowQueued, 
Collection<TaskManagerLocation> preferredLocations, Time allocationTimeout) {
+               public CompletableFuture<LogicalSlot> 
allocateSlot(SlotRequestId slotRequestId, ScheduledUnit task, boolean 
allowQueued, Collection<TaskManagerLocation> preferredLocations, Time 
allocationTimeout) {
                        return slotProvider.allocateSlot(task, allowQueued, 
preferredLocations, allocationTimeout).thenApply(
                                (LogicalSlot logicalSlot) -> {
                                        switch (logicalSlot.getLocality()) {
@@ -363,6 +372,11 @@ public class SchedulerTestBase extends TestLogger {
                                        return logicalSlot;
                                });
                }
+
+               @Override
+               public CompletableFuture<Acknowledge> 
cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId 
slotSharingGroupId, Throwable cause) {
+                       return 
CompletableFuture.completedFuture(Acknowledge.get());
+               }
        }
 
        private static final class TestingSlotPool extends SlotPool {

Reply via email to