[FLINK-8749] [flip6] Release slots when scheduling operation is canceled Release slots when the scheduling operation is canceled in the ExecutionGraph.
This closes #5562. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3969170f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3969170f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3969170f Branch: refs/heads/master Commit: 3969170f5c2dba5ab76bed617648531b1e9aa435 Parents: 529b512 Author: Till Rohrmann <[email protected]> Authored: Thu Feb 22 17:01:05 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Sat Feb 24 15:05:14 2018 +0100 ---------------------------------------------------------------------- .../flink/runtime/concurrent/FutureUtils.java | 70 +++++++++++++++----- .../flink/runtime/executiongraph/Execution.java | 66 +++++++++++------- .../runtime/executiongraph/ExecutionGraph.java | 24 +++++-- .../runtime/jobmanager/scheduler/Scheduler.java | 9 +++ .../runtime/jobmaster/slotpool/SlotPool.java | 14 +++- .../jobmaster/slotpool/SlotProvider.java | 45 ++++++++++++- .../runtime/concurrent/FutureUtilsTest.java | 34 ++++++++++ .../ExecutionGraphMetricsTest.java | 7 +- .../runtime/executiongraph/ExecutionTest.java | 68 +++++++++++++++++-- .../ExecutionVertexSchedulingTest.java | 8 +-- .../executiongraph/ProgrammedSlotProvider.java | 28 ++++++++ .../utils/SimpleSlotProvider.java | 47 ++++++++++--- .../jobmanager/scheduler/SchedulerTestBase.java | 28 ++++++-- 13 files changed, 366 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3969170f/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java index 4a253b0..da77bdc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java @@ -354,18 +354,7 @@ public class FutureUtils { public static <T> ConjunctFuture<Collection<T>> combineAll(Collection<? extends CompletableFuture<? extends T>> futures) { checkNotNull(futures, "futures"); - final ResultConjunctFuture<T> conjunct = new ResultConjunctFuture<>(futures.size()); - - if (futures.isEmpty()) { - conjunct.complete(Collections.emptyList()); - } - else { - for (CompletableFuture<? extends T> future : futures) { - future.whenComplete(conjunct::handleCompletedFuture); - } - } - - return conjunct; + return new ResultConjunctFuture<>(futures); } /** @@ -407,6 +396,22 @@ public class FutureUtils { * @return The number of Futures in the conjunction that are already complete */ public abstract int getNumFuturesCompleted(); + + /** + * Gets the individual futures which make up the {@link ConjunctFuture}. + * + * @return Collection of futures which make up the {@link ConjunctFuture} + */ + protected abstract Collection<? extends CompletableFuture<?>> getConjunctFutures(); + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + for (CompletableFuture<?> completableFuture : getConjunctFutures()) { + completableFuture.cancel(mayInterruptIfRunning); + } + + return super.cancel(mayInterruptIfRunning); + } } /** @@ -414,6 +419,8 @@ public class FutureUtils { */ private static class ResultConjunctFuture<T> extends ConjunctFuture<Collection<T>> { + private final Collection<? extends CompletableFuture<? extends T>> resultFutures; + /** The total number of futures in the conjunction. */ private final int numTotal; @@ -429,7 +436,7 @@ public class FutureUtils { /** The function that is attached to all futures in the conjunction. Once a future * is complete, this function tracks the completion or fails the conjunct. */ - final void handleCompletedFuture(T value, Throwable throwable) { + private void handleCompletedFuture(T value, Throwable throwable) { if (throwable != null) { completeExceptionally(throwable); } else { @@ -444,9 +451,19 @@ public class FutureUtils { } @SuppressWarnings("unchecked") - ResultConjunctFuture(int numTotal) { - this.numTotal = numTotal; + ResultConjunctFuture(Collection<? extends CompletableFuture<? extends T>> resultFutures) { + this.resultFutures = checkNotNull(resultFutures); + this.numTotal = resultFutures.size(); results = (T[]) new Object[numTotal]; + + if (resultFutures.isEmpty()) { + complete(Collections.emptyList()); + } + else { + for (CompletableFuture<? extends T> future : resultFutures) { + future.whenComplete(this::handleCompletedFuture); + } + } } @Override @@ -458,6 +475,11 @@ public class FutureUtils { public int getNumFuturesCompleted() { return numCompleted.get(); } + + @Override + protected Collection<? extends CompletableFuture<?>> getConjunctFutures() { + return resultFutures; + } } /** @@ -466,6 +488,8 @@ public class FutureUtils { */ private static final class WaitingConjunctFuture extends ConjunctFuture<Void> { + private final Collection<? extends CompletableFuture<?>> futures; + /** Number of completed futures. */ private final AtomicInteger numCompleted = new AtomicInteger(0); @@ -484,8 +508,7 @@ public class FutureUtils { } private WaitingConjunctFuture(Collection<? extends CompletableFuture<?>> futures) { - Preconditions.checkNotNull(futures, "Futures must not be null."); - + this.futures = checkNotNull(futures); this.numTotal = futures.size(); if (futures.isEmpty()) { @@ -506,6 +529,11 @@ public class FutureUtils { public int getNumFuturesCompleted() { return numCompleted.get(); } + + @Override + protected Collection<? extends CompletableFuture<?>> getConjunctFutures() { + return futures; + } } /** @@ -533,11 +561,14 @@ public class FutureUtils { private final int numFuturesTotal; + private final Collection<? extends CompletableFuture<?>> futuresToComplete; + private int futuresCompleted; private Throwable globalThrowable; private CompletionConjunctFuture(Collection<? extends CompletableFuture<?>> futuresToComplete) { + this.futuresToComplete = checkNotNull(futuresToComplete); numFuturesTotal = futuresToComplete.size(); futuresCompleted = 0; @@ -582,6 +613,11 @@ public class FutureUtils { return futuresCompleted; } } + + @Override + protected Collection<? extends CompletableFuture<?>> getConjunctFutures() { + return futuresToComplete; + } } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/3969170f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 3e77d3e..a504799 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -34,8 +34,6 @@ import org.apache.flink.runtime.deployment.ResultPartitionLocation; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.instance.SlotSharingGroupId; -import org.apache.flink.runtime.jobmaster.LogicalSlot; -import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; @@ -43,6 +41,9 @@ import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstrain import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.StackTraceSampleResponse; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -81,16 +82,16 @@ import static org.apache.flink.util.Preconditions.checkState; * A single execution of a vertex. While an {@link ExecutionVertex} can be executed multiple times * (for recovery, re-computation, re-configuration), this class tracks the state of a single execution * of that vertex and the resources. - * + * * <h2>Lock free state transitions</h2> - * - * In several points of the code, we need to deal with possible concurrent state changes and actions. + * + * <p>In several points of the code, we need to deal with possible concurrent state changes and actions. * For example, while the call to deploy a task (send it to the TaskManager) happens, the task gets cancelled. - * + * * <p>We could lock the entire portion of the code (decision to deploy, deploy, set state to running) such that * it is guaranteed that any "cancel command" will only pick up after deployment is done and that the "cancel * command" call will never overtake the deploying call. - * + * * <p>This blocks the threads big time, because the remote calls may take long. Depending of their locking behavior, it * may even result in distributed deadlocks (unless carefully avoided). We therefore use atomic state updates and * occasional double-checking to ensure that the state after a completed call is as expected, and trigger correcting @@ -117,10 +118,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution /** The executor which is used to execute futures. */ private final Executor executor; - /** The execution vertex whose task this execution executes */ + /** The execution vertex whose task this execution executes. */ private final ExecutionVertex vertex; - /** The unique ID marking the specific execution instant of the task */ + /** The unique ID marking the specific execution instant of the task. */ private final ExecutionAttemptID attemptId; /** Gets the global modification version of the execution graph when this execution was created. @@ -128,7 +129,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution * to resolve conflicts between concurrent modification by global and local failover actions. */ private final long globalModVersion; - /** The timestamps when state transitions occurred, indexed by {@link ExecutionState#ordinal()} */ + /** The timestamps when state transitions occurred, indexed by {@link ExecutionState#ordinal()}. */ private final long[] stateTimestamps; private final int attemptNumber; @@ -137,7 +138,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution private final ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> partialInputChannelDeploymentDescriptors; - /** A future that completes once the Execution reaches a terminal ExecutionState */ + /** A future that completes once the Execution reaches a terminal ExecutionState. */ private final CompletableFuture<ExecutionState> terminalStateFuture; private final CompletableFuture<?> releaseFuture; @@ -150,14 +151,14 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution private volatile Throwable failureCause; // once assigned, never changes - /** Information to restore the task on recovery, such as checkpoint id and task state snapshot */ + /** Information to restore the task on recovery, such as checkpoint id and task state snapshot. */ @Nullable private volatile JobManagerTaskRestore taskRestore; // ------------------------ Accumulators & Metrics ------------------------ /** Lock for updating the accumulators atomically. - * Prevents final accumulators to be overwritten by partial accumulators on a late heartbeat */ + * Prevents final accumulators to be overwritten by partial accumulators on a late heartbeat. */ private final Object accumulatorLock = new Object(); /* Continuously updated map of user-defined accumulators */ @@ -460,25 +461,40 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution // calculate the preferred locations final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture = calculatePreferredLocations(locationPreferenceConstraint); - return preferredLocationsFuture + final SlotRequestId slotRequestId = new SlotRequestId(); + + final CompletableFuture<LogicalSlot> logicalSlotFuture = preferredLocationsFuture .thenCompose( (Collection<TaskManagerLocation> preferredLocations) -> slotProvider.allocateSlot( + slotRequestId, toSchedule, queued, preferredLocations, - allocationTimeout)) - .thenApply( - (LogicalSlot logicalSlot) -> { - if (tryAssignResource(logicalSlot)) { - return this; - } else { - // release the slot - logicalSlot.releaseSlot(new FlinkException("Could not assign logical slot to execution " + this + '.')); + allocationTimeout)); - throw new CompletionException(new FlinkException("Could not assign slot " + logicalSlot + " to execution " + this + " because it has already been assigned ")); - } - }); + // register call back to cancel slot request in case that the execution gets canceled + releaseFuture.whenComplete( + (Object ignored, Throwable throwable) -> { + if (logicalSlotFuture.cancel(false)) { + slotProvider.cancelSlotRequest( + slotRequestId, + slotSharingGroupId, + new FlinkException("Execution " + this + " was released.")); + } + }); + + return logicalSlotFuture.thenApply( + (LogicalSlot logicalSlot) -> { + if (tryAssignResource(logicalSlot)) { + return this; + } else { + // release the slot + logicalSlot.releaseSlot(new FlinkException("Could not assign logical slot to execution " + this + '.')); + + throw new CompletionException(new FlinkException("Could not assign slot " + logicalSlot + " to execution " + this + " because it has already been assigned ")); + } + }); } else { // call race, already deployed, or already done http://git-wip-us.apache.org/repos/asf/flink/blob/3969170f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 3d69d71..4e8b972 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -91,6 +91,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; @@ -891,9 +892,12 @@ public class ExecutionGraph implements AccessExecutionGraph { } if (state == JobStatus.RUNNING && currentGlobalModVersion == globalModVersion) { - schedulingFuture = newSchedulingFuture.whenCompleteAsync( + schedulingFuture = newSchedulingFuture; + + newSchedulingFuture.whenCompleteAsync( (Void ignored, Throwable throwable) -> { - if (throwable != null) { + if (throwable != null && !(throwable instanceof CancellationException)) { + // only fail if the scheduling future was not canceled failGlobal(ExceptionUtils.stripCompletionException(throwable)); } }, @@ -963,7 +967,7 @@ public class ExecutionGraph implements AccessExecutionGraph { // the future fails once one slot future fails. final ConjunctFuture<Collection<Execution>> allAllocationsFuture = FutureUtils.combineAll(allAllocationFutures); - return allAllocationsFuture + final CompletableFuture<Void> currentSchedulingFuture = allAllocationsFuture .thenAccept( (Collection<Execution> executionsToDeploy) -> { for (Execution execution : executionsToDeploy) { @@ -976,7 +980,7 @@ public class ExecutionGraph implements AccessExecutionGraph { t)); } } - }) + }) // Generate a more specific failure message for the eager scheduling .exceptionally( (Throwable throwable) -> { @@ -993,9 +997,19 @@ public class ExecutionGraph implements AccessExecutionGraph { } else { resultThrowable = strippedThrowable; } - + throw new CompletionException(resultThrowable); }); + + currentSchedulingFuture.whenComplete( + (Void ignored, Throwable throwable) -> { + if (throwable instanceof CancellationException) { + // cancel the individual allocation futures + allAllocationsFuture.cancel(false); + } + }); + + return currentSchedulingFuture; } public void cancel() { http://git-wip-us.apache.org/repos/asf/flink/blob/3969170f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java index 77f89b7..fc79d40 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java @@ -26,12 +26,15 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.InstanceDiedException; import org.apache.flink.runtime.instance.InstanceListener; +import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.instance.SharedSlot; import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.instance.SlotSharingGroupAssignment; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; @@ -142,6 +145,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl @Override public CompletableFuture<LogicalSlot> allocateSlot( + SlotRequestId slotRequestId, ScheduledUnit task, boolean allowQueued, Collection<TaskManagerLocation> preferredLocations, @@ -167,6 +171,11 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl } } + @Override + public CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) { + return CompletableFuture.completedFuture(Acknowledge.get()); + } + /** * Returns either a {@link SimpleSlot}, or a {@link CompletableFuture}. */ http://git-wip-us.apache.org/repos/asf/flink/blob/3969170f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java index 409f8f7..a49e6ed 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java @@ -1571,14 +1571,14 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS @Override public CompletableFuture<LogicalSlot> allocateSlot( + SlotRequestId slotRequestId, ScheduledUnit task, boolean allowQueued, Collection<TaskManagerLocation> preferredLocations, Time timeout) { - final SlotRequestId requestId = new SlotRequestId(); CompletableFuture<LogicalSlot> slotFuture = gateway.allocateSlot( - requestId, + slotRequestId, task, ResourceProfile.UNKNOWN, preferredLocations, @@ -1589,7 +1589,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS (LogicalSlot slot, Throwable failure) -> { if (failure != null) { gateway.releaseSlot( - requestId, + slotRequestId, task.getSlotSharingGroupId(), failure); } @@ -1597,6 +1597,14 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS return slotFuture; } + + @Override + public CompletableFuture<Acknowledge> cancelSlotRequest( + SlotRequestId slotRequestId, + @Nullable SlotSharingGroupId slotSharingGroupId, + Throwable cause) { + return gateway.releaseSlot(slotRequestId, slotSharingGroupId, cause); + } } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/3969170f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java index 4f82688..80b2689 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java @@ -19,16 +19,21 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import javax.annotation.Nullable; + import java.util.Collection; import java.util.concurrent.CompletableFuture; /** * The slot provider is responsible for preparing slots for ready-to-run tasks. - * + * * <p>It supports two allocating modes: * <ul> * <li>Immediate allocating: A request for a task slot immediately gets satisfied, we can call @@ -49,8 +54,44 @@ public interface SlotProvider { * @return The future of the allocation */ CompletableFuture<LogicalSlot> allocateSlot( + SlotRequestId slotRequestId, ScheduledUnit task, boolean allowQueued, Collection<TaskManagerLocation> preferredLocations, Time timeout); + + /** + * Allocating slot with specific requirement. + * + * @param task The task to allocate the slot for + * @param allowQueued Whether allow the task be queued if we do not have enough resource + * @param preferredLocations preferred locations for the slot allocation + * @param timeout after which the allocation fails with a timeout exception + * @return The future of the allocation + */ + default CompletableFuture<LogicalSlot> allocateSlot( + ScheduledUnit task, + boolean allowQueued, + Collection<TaskManagerLocation> preferredLocations, + Time timeout) { + return allocateSlot( + new SlotRequestId(), + task, + allowQueued, + preferredLocations, + timeout); + } + + /** + * Cancels the slot request with the given {@link SlotRequestId} and {@link SlotSharingGroupId}. + * + * @param slotRequestId identifying the slot request to cancel + * @param slotSharingGroupId identifying the slot request to cancel + * @param cause of the cancellation + * @return Future which is completed once the slot request has been cancelled + */ + CompletableFuture<Acknowledge> cancelSlotRequest( + SlotRequestId slotRequestId, + @Nullable SlotSharingGroupId slotSharingGroupId, + Throwable cause); } http://git-wip-us.apache.org/repos/asf/flink/blob/3969170f/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java index 7ad1638..57f6bd0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java @@ -27,6 +27,7 @@ import org.apache.flink.util.TestLogger; import org.junit.Test; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -40,6 +41,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.function.Supplier; import static org.hamcrest.CoreMatchers.containsString; @@ -424,4 +426,36 @@ public class FutureUtilsTest extends TestLogger { assertThat(suppressed, arrayContaining(suppressedException)); } } + + @Test + public void testCancelWaitingConjunctFuture() { + cancelConjunctFuture(inputFutures -> FutureUtils.waitForAll(inputFutures)); + } + + @Test + public void testCancelResultConjunctFuture() { + cancelConjunctFuture(inputFutures -> FutureUtils.combineAll(inputFutures)); + } + + @Test + public void testCancelCompleteConjunctFuture() { + cancelConjunctFuture(inputFutures -> FutureUtils.completeAll(inputFutures)); + } + + private void cancelConjunctFuture(Function<Collection<? extends CompletableFuture<?>>, FutureUtils.ConjunctFuture<?>> conjunctFutureFactory) { + final int numInputFutures = 10; + final Collection<CompletableFuture<Void>> inputFutures = new ArrayList<>(numInputFutures); + + for (int i = 0; i < numInputFutures; i++) { + inputFutures.add(new CompletableFuture<>()); + } + + final FutureUtils.ConjunctFuture<?> conjunctFuture = conjunctFutureFactory.apply(inputFutures); + + conjunctFuture.cancel(false); + + for (CompletableFuture<Void> inputFuture : inputFutures) { + assertThat(inputFuture.isCancelled(), is(true)); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/3969170f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java index 18476a5..1b835ad 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java @@ -27,13 +27,14 @@ import org.apache.flink.runtime.execution.SuppressRestartsException; import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge; import org.apache.flink.runtime.executiongraph.restart.RestartCallback; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; -import org.apache.flink.runtime.jobmaster.LogicalSlot; -import org.apache.flink.runtime.jobmaster.TestingLogicalSlot; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.TestingLogicalSlot; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.util.SerializedValue; @@ -79,7 +80,7 @@ public class ExecutionGraphMetricsTest extends TestLogger { CompletableFuture<LogicalSlot> slotFuture1 = CompletableFuture.completedFuture(new TestingLogicalSlot()); CompletableFuture<LogicalSlot> slotFuture2 = CompletableFuture.completedFuture(new TestingLogicalSlot()); - when(scheduler.allocateSlot(any(ScheduledUnit.class), anyBoolean(), any(Collection.class), any(Time.class))).thenReturn(slotFuture1, slotFuture2); + when(scheduler.allocateSlot(any(SlotRequestId.class), any(ScheduledUnit.class), anyBoolean(), any(Collection.class), any(Time.class))).thenReturn(slotFuture1, slotFuture2); TestingRestartStrategy testingRestartStrategy = new TestingRestartStrategy(); http://git-wip-us.apache.org/repos/asf/flink/blob/3969170f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java index 38518d6..99879c0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java @@ -22,13 +22,16 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; -import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils; +import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.TestingLogicalSlot; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testingUtils.TestingUtils; @@ -39,10 +42,14 @@ import org.junit.Test; import java.util.Arrays; import java.util.Collection; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; @@ -55,7 +62,7 @@ public class ExecutionTest extends TestLogger { /** * Tests that slots are released if we cannot assign the allocated resource to the - * Execution. In this case, a concurrent cancellation precedes the assignment. + * Execution. */ @Test public void testSlotReleaseOnFailedResourceAssignment() throws Exception { @@ -85,6 +92,8 @@ public class ExecutionTest extends TestLogger { 0, new SimpleAckingTaskManagerGateway()); + final LogicalSlot otherSlot = new TestingLogicalSlot(); + CompletableFuture<Execution> allocationFuture = execution.allocateAndAssignSlotForExecution( slotProvider, false, @@ -95,11 +104,8 @@ public class ExecutionTest extends TestLogger { assertEquals(ExecutionState.SCHEDULED, execution.getState()); - // cancelling the execution should move it into state CANCELED; this happens before - // the slot future has been completed - execution.cancel(); - - assertEquals(ExecutionState.CANCELED, execution.getState()); + // assign a different resource to the execution + assertTrue(execution.tryAssignResource(otherSlot)); // completing now the future should cause the slot to be released slotFuture.complete(slot); @@ -214,6 +220,54 @@ public class ExecutionTest extends TestLogger { } /** + * Tests that a slot allocation from a {@link SlotProvider} is cancelled if the + * {@link Execution} is cancelled. + */ + @Test + public void testSlotAllocationCancellationWhenExecutionCancelled() throws Exception { + final JobVertexID jobVertexId = new JobVertexID(); + final JobVertex jobVertex = new JobVertex("test vertex", jobVertexId); + jobVertex.setInvokableClass(NoOpInvokable.class); + + final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1); + final CompletableFuture<LogicalSlot> slotFuture = new CompletableFuture<>(); + slotProvider.addSlot(jobVertexId, 0, slotFuture); + + final ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph( + new JobID(), + slotProvider, + new NoRestartStrategy(), + jobVertex); + + final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId); + + final Execution currentExecutionAttempt = executionJobVertex.getTaskVertices()[0].getCurrentExecutionAttempt(); + + final CompletableFuture<Execution> allocationFuture = currentExecutionAttempt.allocateAndAssignSlotForExecution( + slotProvider, + false, + LocationPreferenceConstraint.ALL, + TestingUtils.infiniteTime()); + + assertThat(allocationFuture.isDone(), is(false)); + + assertThat(slotProvider.getSlotRequestedFuture(jobVertexId, 0).get(), is(true)); + + final Set<SlotRequestId> slotRequests = slotProvider.getSlotRequests(); + assertThat(slotRequests, hasSize(1)); + + assertThat(currentExecutionAttempt.getState(), is(ExecutionState.SCHEDULED)); + + currentExecutionAttempt.cancel(); + assertThat(currentExecutionAttempt.getState(), is(ExecutionState.CANCELED)); + + assertThat(allocationFuture.isCompletedExceptionally(), is(true)); + + final Set<SlotRequestId> canceledSlotRequests = slotProvider.getCanceledSlotRequests(); + assertThat(canceledSlotRequests, equalTo(slotRequests)); + } + + /** * Tests that all preferred locations are calculated. */ @Test http://git-wip-us.apache.org/repos/asf/flink/blob/3969170f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java index 2c49573..c0d5dc0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java @@ -30,10 +30,10 @@ import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway; import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.junit.Test; -import org.mockito.Matchers; import java.util.Collection; import java.util.concurrent.CompletableFuture; @@ -67,7 +67,7 @@ public class ExecutionVertexSchedulingTest { Scheduler scheduler = mock(Scheduler.class); CompletableFuture<LogicalSlot> future = new CompletableFuture<>(); future.complete(slot); - when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean(), any(Collection.class), any(Time.class))).thenReturn(future); + when(scheduler.allocateSlot(any(SlotRequestId.class), any(ScheduledUnit.class), anyBoolean(), any(Collection.class), any(Time.class))).thenReturn(future); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); // try to deploy to the slot @@ -99,7 +99,7 @@ public class ExecutionVertexSchedulingTest { final CompletableFuture<LogicalSlot> future = new CompletableFuture<>(); Scheduler scheduler = mock(Scheduler.class); - when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean(), any(Collection.class), any(Time.class))).thenReturn(future); + when(scheduler.allocateSlot(any(SlotRequestId.class), any(ScheduledUnit.class), anyBoolean(), any(Collection.class), any(Time.class))).thenReturn(future); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); // try to deploy to the slot @@ -133,7 +133,7 @@ public class ExecutionVertexSchedulingTest { Scheduler scheduler = mock(Scheduler.class); CompletableFuture<LogicalSlot> future = new CompletableFuture<>(); future.complete(slot); - when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean(), any(Collection.class), any(Time.class))).thenReturn(future); + when(scheduler.allocateSlot(any(SlotRequestId.class), any(ScheduledUnit.class), anyBoolean(), any(Collection.class), any(Time.class))).thenReturn(future); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); http://git-wip-us.apache.org/repos/asf/flink/blob/3969170f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java index d93693e..2e7ebc9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java @@ -19,15 +19,23 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import javax.annotation.Nullable; + import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import static org.apache.flink.util.Preconditions.checkArgument; @@ -43,6 +51,10 @@ class ProgrammedSlotProvider implements SlotProvider { private final Map<JobVertexID, CompletableFuture<Boolean>[]> slotFutureRequested = new HashMap<>(); + private final Set<SlotRequestId> slotRequests = new HashSet<>(); + + private final Set<SlotRequestId> canceledSlotRequests = new HashSet<>(); + private final int parallelism; public ProgrammedSlotProvider(int parallelism) { @@ -92,8 +104,17 @@ class ProgrammedSlotProvider implements SlotProvider { return slotFutureRequested.get(jobVertexId)[subtaskIndex]; } + public Set<SlotRequestId> getSlotRequests() { + return Collections.unmodifiableSet(slotRequests); + } + + public Set<SlotRequestId> getCanceledSlotRequests() { + return Collections.unmodifiableSet(canceledSlotRequests); + } + @Override public CompletableFuture<LogicalSlot> allocateSlot( + SlotRequestId slotRequestId, ScheduledUnit task, boolean allowQueued, Collection<TaskManagerLocation> preferredLocations, @@ -106,6 +127,7 @@ class ProgrammedSlotProvider implements SlotProvider { CompletableFuture<LogicalSlot> future = forTask[subtask]; if (future != null) { slotFutureRequested.get(vertexId)[subtask].complete(true); + slotRequests.add(slotRequestId); return future; } @@ -113,4 +135,10 @@ class ProgrammedSlotProvider implements SlotProvider { throw new IllegalArgumentException("No registered slot future for task " + vertexId + " (" + subtask + ')'); } + + @Override + public CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) { + canceledSlotRequests.add(slotRequestId); + return CompletableFuture.completedFuture(Acknowledge.get()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/3969170f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java index 776ce16..c082e9a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java @@ -23,9 +23,11 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.Slot; +import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; @@ -33,12 +35,17 @@ import org.apache.flink.runtime.instance.SimpleSlotContext; import org.apache.flink.runtime.jobmaster.SlotContext; import org.apache.flink.runtime.jobmaster.SlotOwner; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; +import javax.annotation.Nullable; + import java.net.InetAddress; import java.util.ArrayDeque; import java.util.Collection; +import java.util.HashMap; import java.util.concurrent.CompletableFuture; import static org.apache.flink.util.Preconditions.checkArgument; @@ -49,8 +56,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class SimpleSlotProvider implements SlotProvider, SlotOwner { + private final Object lock = new Object(); + private final ArrayDeque<SlotContext> slots; + private final HashMap<SlotRequestId, SlotContext> allocatedSlots; + public SimpleSlotProvider(JobID jobId, int numSlots) { this(jobId, numSlots, new SimpleAckingTaskManagerGateway()); } @@ -69,30 +80,47 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner { taskManagerGateway); slots.add(as); } + + allocatedSlots = new HashMap<>(slots.size()); } @Override public CompletableFuture<LogicalSlot> allocateSlot( + SlotRequestId slotRequestId, ScheduledUnit task, boolean allowQueued, Collection<TaskManagerLocation> preferredLocations, Time allocationTimeout) { final SlotContext slot; - synchronized (slots) { + synchronized (lock) { if (slots.isEmpty()) { slot = null; } else { slot = slots.removeFirst(); } + if (slot != null) { + SimpleSlot result = new SimpleSlot(slot, this, 0); + allocatedSlots.put(slotRequestId, slot); + return CompletableFuture.completedFuture(result); + } + else { + return FutureUtils.completedExceptionally(new NoResourceAvailableException()); + } } + } - if (slot != null) { - SimpleSlot result = new SimpleSlot(slot, this, 0); - return CompletableFuture.completedFuture(result); - } - else { - return FutureUtils.completedExceptionally(new NoResourceAvailableException()); + @Override + public CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) { + synchronized (lock) { + final SlotContext slotContext = allocatedSlots.remove(slotRequestId); + + if (slotContext != null) { + slots.add(slotContext); + return CompletableFuture.completedFuture(Acknowledge.get()); + } else { + return FutureUtils.completedExceptionally(new FlinkException("Unknown slot request id " + slotRequestId + '.')); + } } } @@ -102,14 +130,15 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner { final Slot slot = ((Slot) logicalSlot); - synchronized (slots) { + synchronized (lock) { slots.add(slot.getSlotContext()); + allocatedSlots.remove(logicalSlot.getSlotRequestId()); } return CompletableFuture.completedFuture(true); } public int getNumberOfAvailableSlots() { - synchronized (slots) { + synchronized (lock) { return slots.size(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/3969170f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java index 76a5642..9cb9cff 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java @@ -25,15 +25,17 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.instance.Instance; -import org.apache.flink.runtime.jobmaster.LogicalSlot; -import org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager; -import org.apache.flink.runtime.jobmaster.slotpool.SlotPool; -import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway; -import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.slotpool.SlotPool; +import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager; +import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; @@ -49,6 +51,8 @@ import org.junit.After; import org.junit.Before; import org.junit.runners.Parameterized; +import javax.annotation.Nullable; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -152,11 +156,16 @@ public class SchedulerTestBase extends TestLogger { } @Override - public CompletableFuture<LogicalSlot> allocateSlot(ScheduledUnit task, boolean allowQueued, Collection<TaskManagerLocation> preferredLocations, Time allocationTimeout) { + public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit task, boolean allowQueued, Collection<TaskManagerLocation> preferredLocations, Time allocationTimeout) { return scheduler.allocateSlot(task, allowQueued, preferredLocations, allocationTimeout); } @Override + public CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) { + return CompletableFuture.completedFuture(Acknowledge.get()); + } + + @Override public TaskManagerLocation addTaskManager(int numberSlots) { final Instance instance = getRandomInstance(numberSlots); scheduler.newInstanceAvailable(instance); @@ -340,7 +349,7 @@ public class SchedulerTestBase extends TestLogger { } @Override - public CompletableFuture<LogicalSlot> allocateSlot(ScheduledUnit task, boolean allowQueued, Collection<TaskManagerLocation> preferredLocations, Time allocationTimeout) { + public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit task, boolean allowQueued, Collection<TaskManagerLocation> preferredLocations, Time allocationTimeout) { return slotProvider.allocateSlot(task, allowQueued, preferredLocations, allocationTimeout).thenApply( (LogicalSlot logicalSlot) -> { switch (logicalSlot.getLocality()) { @@ -363,6 +372,11 @@ public class SchedulerTestBase extends TestLogger { return logicalSlot; }); } + + @Override + public CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) { + return CompletableFuture.completedFuture(Acknowledge.get()); + } } private static final class TestingSlotPool extends SlotPool {
