Repository: flink Updated Branches: refs/heads/master 8afadd459 -> 3ff91be1d
[FLINK-7153] Re-introduce preferred locations for scheduling Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c73b2fe1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c73b2fe1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c73b2fe1 Branch: refs/heads/master Commit: c73b2fe1f93f9ff2f05eb9130051729320634448 Parents: 8afadd4 Author: Till Rohrmann <[email protected]> Authored: Mon Oct 16 14:04:13 2017 +0200 Committer: Till Rohrmann <[email protected]> Committed: Thu Nov 2 17:04:44 2017 +0100 ---------------------------------------------------------------------- .../flink/runtime/executiongraph/Execution.java | 162 ++++++++++++++----- .../ExecutionAndAllocationFuture.java | 45 ++++++ .../executiongraph/ExecutionAndSlot.java | 47 ------ .../runtime/executiongraph/ExecutionGraph.java | 146 ++++++----------- .../executiongraph/ExecutionGraphUtils.java | 106 ------------ .../executiongraph/ExecutionJobVertex.java | 32 +--- .../runtime/executiongraph/ExecutionVertex.java | 51 +++--- .../apache/flink/runtime/instance/SlotPool.java | 9 +- .../instance/SlotSharingGroupAssignment.java | 20 +-- .../runtime/jobmanager/scheduler/Scheduler.java | 50 +++--- .../ExecutionGraphSchedulingTest.java | 151 +---------------- .../executiongraph/ExecutionGraphStopTest.java | 15 +- .../executiongraph/ExecutionGraphTestUtils.java | 23 +-- .../executiongraph/ExecutionGraphUtilsTest.java | 124 -------------- .../ExecutionVertexCancelTest.java | 8 +- .../ExecutionVertexLocalityTest.java | 19 +-- .../ScheduleWithCoLocationHintTest.java | 3 +- .../scheduler/SchedulerSlotSharingTest.java | 3 +- .../scheduler/SchedulerTestUtils.java | 21 ++- 19 files changed, 352 insertions(+), 683 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index c1f423b..9d3e128 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -44,6 +44,8 @@ import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.StackTraceSampleResponse; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -52,9 +54,11 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import static org.apache.flink.runtime.execution.ExecutionState.CANCELED; @@ -126,9 +130,11 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution /** A future that completes once the Execution reaches a terminal ExecutionState */ private final CompletableFuture<ExecutionState> terminationFuture; + private final CompletableFuture<TaskManagerLocation> taskManagerLocationFuture; + private volatile ExecutionState state = CREATED; - private volatile SimpleSlot assignedResource; // once assigned, never changes until the execution is archived + private final AtomicReference<SimpleSlot> assignedResource; private volatile Throwable failureCause; // once assigned, never changes @@ -185,6 +191,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution this.partialInputChannelDeploymentDescriptors = new ConcurrentLinkedQueue<>(); this.terminationFuture = new CompletableFuture<>(); + this.taskManagerLocationFuture = new CompletableFuture<>(); + + this.assignedResource = new AtomicReference<>(); } // -------------------------------------------------------------------------------------------- @@ -220,14 +229,53 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution return globalModVersion; } + public CompletableFuture<TaskManagerLocation> getTaskManagerLocationFuture() { + return taskManagerLocationFuture; + } + public SimpleSlot getAssignedResource() { - return assignedResource; + return assignedResource.get(); + } + + /** + * Tries to assign the given slot to the execution. The assignment works only if the + * Execution is in state SCHEDULED. Returns true, if the resource could be assigned. + * + * @param slot to assign to this execution + * @return true if the slot could be assigned to the execution, otherwise false + */ + boolean tryAssignResource(final SimpleSlot slot) { + Preconditions.checkNotNull(slot); + + // only allow to set the assigned resource in state SCHEDULED or CREATED + // note: we also accept resource assignment when being in state CREATED for testing purposes + if (state == SCHEDULED || state == CREATED) { + if (assignedResource.compareAndSet(null, slot)) { + // check for concurrent modification (e.g. cancelling call) + if (state == SCHEDULED || state == CREATED) { + Preconditions.checkState(!taskManagerLocationFuture.isDone(), "The TaskManagerLocationFuture should not be set if we haven't assigned a resource yet."); + taskManagerLocationFuture.complete(slot.getTaskManagerLocation()); + + return true; + } else { + // free assigned resource and return false + assignedResource.set(null); + return false; + } + } else { + // the slot already has another slot assigned + return false; + } + } else { + // do not allow resource assignment if we are not in state SCHEDULED + return false; + } } @Override public TaskManagerLocation getAssignedResourceLocation() { // returns non-null only when a location is already assigned - return assignedResource != null ? assignedResource.getTaskManagerLocation() : null; + return assignedResource.get() != null ? assignedResource.get().getTaskManagerLocation() : null; } public Throwable getFailureCause() { @@ -301,27 +349,23 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution */ public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) { try { - final CompletableFuture<SimpleSlot> slotAllocationFuture = allocateSlotForExecution(slotProvider, queued); + final CompletableFuture<Execution> allocationFuture = allocateAndAssignSlotForExecution(slotProvider, queued); // IMPORTANT: We have to use the synchronous handle operation (direct executor) here so // that we directly deploy the tasks if the slot allocation future is completed. This is // necessary for immediate deployment. - final CompletableFuture<Void> deploymentFuture = slotAllocationFuture.handle( - (simpleSlot, throwable) -> { - if (simpleSlot != null) { + final CompletableFuture<Void> deploymentFuture = allocationFuture.handle( + (Execution ignored, Throwable throwable) -> { + if (throwable != null) { + markFailed(ExceptionUtils.stripCompletionException(throwable)); + } + else { try { - deployToSlot(simpleSlot); + deploy(); } catch (Throwable t) { - try { - simpleSlot.releaseSlot(); - } finally { - markFailed(t); - } + markFailed(ExceptionUtils.stripCompletionException(t)); } } - else { - markFailed(throwable); - } return null; } ); @@ -338,8 +382,16 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution } } - public CompletableFuture<SimpleSlot> allocateSlotForExecution(SlotProvider slotProvider, boolean queued) - throws IllegalExecutionStateException { + /** + * Allocates and assigns a slot obtained from the slot provider to the execution. + * + * @param slotProvider to obtain a new slot from + * @param queued if the allocation can be queued + * @return Future which is completed with this execution once the slot has been assigned + * or with an exception if an error occurred. + * @throws IllegalExecutionStateException if this method has been called while not being in the CREATED state + */ + public CompletableFuture<Execution> allocateAndAssignSlotForExecution(SlotProvider slotProvider, boolean queued) throws IllegalExecutionStateException { checkNotNull(slotProvider); @@ -359,7 +411,18 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution new ScheduledUnit(this, sharingGroup) : new ScheduledUnit(this, sharingGroup, locationConstraint); - return slotProvider.allocateSlot(toSchedule, queued); + CompletableFuture<SimpleSlot> slotFuture = slotProvider.allocateSlot(toSchedule, queued); + + return slotFuture.thenApply(slot -> { + if (tryAssignResource(slot)) { + return this; + } else { + // release the slot + slot.releaseSlot(); + + throw new CompletionException(new FlinkException("Could not assign slot " + slot + " to execution " + this + " because it has already been assigned ")); + } + }); } else { // call race, already deployed, or already done @@ -367,8 +430,15 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution } } - public void deployToSlot(final SimpleSlot slot) throws JobException { - checkNotNull(slot); + /** + * Deploys the execution to the previously assigned resource. + * + * @throws JobException if the execution cannot be deployed to the assigned resource + */ + public void deploy() throws JobException { + final SimpleSlot slot = assignedResource.get(); + + checkNotNull(slot, "In order to deploy the execution we first have to assign a resource via tryAssignResource."); // Check if the TaskManager died in the meantime // This only speeds up the response to TaskManagers failing concurrently to deployments. @@ -397,7 +467,6 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution if (!slot.setExecutedVertex(this)) { throw new JobException("Could not assign the ExecutionVertex to the slot " + slot); } - this.assignedResource = slot; // race double check, did we fail/cancel and do we need to release the slot? if (this.state != DEPLOYING) { @@ -447,7 +516,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution * Sends stop RPC call. */ public void stop() { - final SimpleSlot slot = assignedResource; + final SimpleSlot slot = assignedResource.get(); if (slot != null) { final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); @@ -504,10 +573,16 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution // we skip the canceling state. set the timestamp, for a consistent appearance markTimestamp(CANCELING, getStateTimestamp(CANCELED)); + // cancel the future in order to fail depending scheduling operations + taskManagerLocationFuture.cancel(false); + try { vertex.getExecutionGraph().deregisterExecution(this); - if (assignedResource != null) { - assignedResource.releaseSlot(); + + final SimpleSlot slot = assignedResource.get(); + + if (slot != null) { + slot.releaseSlot(); } } finally { @@ -673,7 +748,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution int maxStrackTraceDepth, Time timeout) { - final SimpleSlot slot = assignedResource; + final SimpleSlot slot = assignedResource.get(); if (slot != null) { final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); @@ -697,7 +772,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution * @param timestamp of the completed checkpoint */ public void notifyCheckpointComplete(long checkpointId, long timestamp) { - final SimpleSlot slot = assignedResource; + final SimpleSlot slot = assignedResource.get(); if (slot != null) { final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); @@ -717,7 +792,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution * @param checkpointOptions of the checkpoint to trigger */ public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) { - final SimpleSlot slot = assignedResource; + final SimpleSlot slot = assignedResource.get(); if (slot != null) { final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); @@ -775,7 +850,12 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution updateAccumulatorsAndMetrics(userAccumulators, metrics); - assignedResource.releaseSlot(); + final SimpleSlot slot = assignedResource.get(); + + if (slot != null) { + slot.releaseSlot(); + } + vertex.getExecutionGraph().deregisterExecution(this); } finally { @@ -828,7 +908,12 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution if (transitionState(current, CANCELED)) { try { - assignedResource.releaseSlot(); + final SimpleSlot slot = assignedResource.get(); + + if (slot != null) { + slot.releaseSlot(); + } + vertex.getExecutionGraph().deregisterExecution(this); } finally { @@ -920,8 +1005,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution updateAccumulatorsAndMetrics(userAccumulators, metrics); try { - if (assignedResource != null) { - assignedResource.releaseSlot(); + final SimpleSlot slot = assignedResource.get(); + if (slot != null) { + slot.releaseSlot(); } vertex.getExecutionGraph().deregisterExecution(this); } @@ -936,7 +1022,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution } try { - if (assignedResource != null) { + if (assignedResource.get() != null) { sendCancelRpcCall(); } } catch (Throwable tt) { @@ -1003,7 +1089,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution * The sending is tried up to NUM_CANCEL_CALL_TRIES times. */ private void sendCancelRpcCall() { - final SimpleSlot slot = assignedResource; + final SimpleSlot slot = assignedResource.get(); if (slot != null) { final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); @@ -1024,7 +1110,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution } private void sendFailIntermediateResultPartitionsRpcCall() { - final SimpleSlot slot = assignedResource; + final SimpleSlot slot = assignedResource.get(); if (slot != null) { final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); @@ -1042,7 +1128,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution private void sendUpdatePartitionInfoRpcCall( final Iterable<PartitionInfo> partitionInfos) { - final SimpleSlot slot = assignedResource; + final SimpleSlot slot = assignedResource.get(); if (slot != null) { final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); @@ -1162,8 +1248,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution @Override public String toString() { + final SimpleSlot slot = assignedResource.get(); + return String.format("Attempt #%d (%s) @ %s - [%s]", attemptNumber, vertex.getTaskNameWithSubtaskIndex(), - (assignedResource == null ? "(unassigned)" : assignedResource.toString()), state); + (slot == null ? "(unassigned)" : slot), state); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndAllocationFuture.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndAllocationFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndAllocationFuture.java new file mode 100644 index 0000000..1022dbc --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndAllocationFuture.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A pair of an {@link Execution} together with an allocation future. + */ +public class ExecutionAndAllocationFuture { + + public final Execution executionAttempt; + + public final CompletableFuture<Void> allocationFuture; + + public ExecutionAndAllocationFuture(Execution executionAttempt, CompletableFuture<Void> allocationFuture) { + this.executionAttempt = checkNotNull(executionAttempt); + this.allocationFuture = checkNotNull(allocationFuture); + } + + // ----------------------------------------------------------------------- + + @Override + public String toString() { + return super.toString(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java deleted file mode 100644 index 123ff0c..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.executiongraph; - -import org.apache.flink.runtime.instance.SimpleSlot; - -import java.util.concurrent.CompletableFuture; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * A pair of an {@link Execution} together with a slot future. - */ -public class ExecutionAndSlot { - - public final Execution executionAttempt; - - public final CompletableFuture<SimpleSlot> slotFuture; - - public ExecutionAndSlot(Execution executionAttempt, CompletableFuture<SimpleSlot> slotFuture) { - this.executionAttempt = checkNotNull(executionAttempt); - this.slotFuture = checkNotNull(slotFuture); - } - - // ----------------------------------------------------------------------- - - @Override - public String toString() { - return super.toString(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index dca6c44..62c6e99 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -50,7 +50,6 @@ import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy; import org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback; import org.apache.flink.runtime.executiongraph.restart.RestartCallback; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; -import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; @@ -90,7 +89,6 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; @@ -878,113 +876,67 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive // that way we do not have any operation that can fail between allocating the slots // and adding them to the list. If we had a failure in between there, that would // cause the slots to get lost - final ArrayList<ExecutionAndSlot[]> resources = new ArrayList<>(getNumberOfExecutionJobVertices()); final boolean queued = allowQueuedScheduling; - // we use this flag to handle failures in a 'finally' clause - // that allows us to not go through clumsy cast-and-rethrow logic - boolean successful = false; + // collecting all the slots may resize and fail in that operation without slots getting lost + final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); - try { - // collecting all the slots may resize and fail in that operation without slots getting lost - final ArrayList<CompletableFuture<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); + // allocate the slots (obtain all their futures + for (ExecutionJobVertex ejv : getVerticesTopologically()) { + // these calls are not blocking, they only return futures + Collection<CompletableFuture<Execution>> allocationFutures = ejv.allocateResourcesForAll(slotProvider, queued); - // allocate the slots (obtain all their futures - for (ExecutionJobVertex ejv : getVerticesTopologically()) { - // these calls are not blocking, they only return futures - ExecutionAndSlot[] slots = ejv.allocateResourcesForAll(slotProvider, queued); + allAllocationFutures.addAll(allocationFutures); + } - // we need to first add the slots to this list, to be safe on release - resources.add(slots); + // this future is complete once all slot futures are complete. + // the future fails once one slot future fails. + final ConjunctFuture<Collection<Execution>> allAllocationsComplete = FutureUtils.combineAll(allAllocationFutures); - for (ExecutionAndSlot ens : slots) { - slotFutures.add(ens.slotFuture); - } + // make sure that we fail if the allocation timeout was exceeded + final ScheduledFuture<?> timeoutCancelHandle = futureExecutor.schedule(new Runnable() { + @Override + public void run() { + // When the timeout triggers, we try to complete the conjunct future with an exception. + // Note that this is a no-op if the future is already completed + int numTotal = allAllocationsComplete.getNumFuturesTotal(); + int numComplete = allAllocationsComplete.getNumFuturesCompleted(); + String message = "Could not allocate all requires slots within timeout of " + + timeout + ". Slots required: " + numTotal + ", slots allocated: " + numComplete; + + allAllocationsComplete.completeExceptionally(new NoResourceAvailableException(message)); } + }, timeout.getSize(), timeout.getUnit()); - // this future is complete once all slot futures are complete. - // the future fails once one slot future fails. - final ConjunctFuture<Void> allAllocationsComplete = FutureUtils.waitForAll(slotFutures); - - // make sure that we fail if the allocation timeout was exceeded - final ScheduledFuture<?> timeoutCancelHandle = futureExecutor.schedule(new Runnable() { - @Override - public void run() { - // When the timeout triggers, we try to complete the conjunct future with an exception. - // Note that this is a no-op if the future is already completed - int numTotal = allAllocationsComplete.getNumFuturesTotal(); - int numComplete = allAllocationsComplete.getNumFuturesCompleted(); - String message = "Could not allocate all requires slots within timeout of " + - timeout + ". Slots required: " + numTotal + ", slots allocated: " + numComplete; - - allAllocationsComplete.completeExceptionally(new NoResourceAvailableException(message)); - } - }, timeout.getSize(), timeout.getUnit()); - - - allAllocationsComplete.handleAsync( - (Void slots, Throwable throwable) -> { - try { - // we do not need the cancellation timeout any more - timeoutCancelHandle.cancel(false); - - if (throwable == null) { - // successfully obtained all slots, now deploy - - for (ExecutionAndSlot[] jobVertexTasks : resources) { - for (ExecutionAndSlot execAndSlot : jobVertexTasks) { - - // the futures must all be ready - this is simply a sanity check - final SimpleSlot slot; - try { - slot = execAndSlot.slotFuture.getNow(null); - checkNotNull(slot); - } - catch (CompletionException | NullPointerException e) { - throw new IllegalStateException("SlotFuture is incomplete " + - "or erroneous even though all futures completed", e); - } - - // actual deployment - execAndSlot.executionAttempt.deployToSlot(slot); - } - } - } - else { - // let the exception handler deal with this - throw throwable; - } - } - catch (Throwable t) { - // we catch everything here to make sure cleanup happens and the - // ExecutionGraph notices the error - // we need to to release all slots before going into recovery! - try { - ExecutionGraphUtils.releaseAllSlotsSilently(resources); - } - finally { - failGlobal(t); + allAllocationsComplete.handleAsync( + (Collection<Execution> executions, Throwable throwable) -> { + try { + // we do not need the cancellation timeout any more + timeoutCancelHandle.cancel(false); + + if (throwable == null) { + // successfully obtained all slots, now deploy + for (Execution execution : executions) { + execution.deploy(); } } + else { + // let the exception handler deal with this + throw throwable; + } + } + catch (Throwable t) { + // we catch everything here to make sure cleanup happens and the + // ExecutionGraph notices the error + failGlobal(ExceptionUtils.stripCompletionException(t)); + } - // Wouldn't it be nice if we could return an actual Void object? - // return (Void) Unsafe.getUnsafe().allocateInstance(Void.class); - return null; - }, - futureExecutor); - - // from now on, slots will be rescued by the futures and their completion, or by the timeout - successful = true; - } - finally { - if (!successful) { - // we come here only if the 'try' block finished with an exception - // we release the slots (possibly failing some executions on the way) and - // let the exception bubble up - ExecutionGraphUtils.releaseAllSlotsSilently(resources); - } - } + // Wouldn't it be nice if we could return an actual Void object? + // return (Void) Unsafe.getUnsafe().allocateInstance(Void.class); + return null; + }, + futureExecutor); } public void cancel() { http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java deleted file mode 100644 index f1d793d..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.executiongraph; - -import org.apache.flink.runtime.instance.SimpleSlot; -import org.apache.flink.util.ExceptionUtils; - -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.function.BiFunction; - -/** - * Utilities for dealing with the execution graphs and scheduling. - */ -public class ExecutionGraphUtils { - - /** - * Releases the slot represented by the given future. If the future is complete, the - * slot is immediately released. Otherwise, the slot is released as soon as the future - * is completed. - * - * <p>Note that releasing the slot means cancelling any task execution currently - * associated with that slot. - * - * @param slotFuture The future for the slot to release. - */ - public static void releaseSlotFuture(CompletableFuture<SimpleSlot> slotFuture) { - slotFuture.handle(ReleaseSlotFunction.INSTANCE::apply); - } - - /** - * Releases the all the slots in the list of arrays of {@code ExecutionAndSlot}. - * For each future in that collection holds: If the future is complete, its slot is - * immediately released. Otherwise, the slot is released as soon as the future - * is completed. - * - * <p>This methods never throws any exceptions (except for fatal exceptions) and continues - * to release the remaining slots if one slot release failed. - * - * <p>Note that releasing the slot means cancelling any task execution currently - * associated with that slot. - * - * @param resources The collection of ExecutionAndSlot whose slots should be released. - */ - public static void releaseAllSlotsSilently(List<ExecutionAndSlot[]> resources) { - try { - for (ExecutionAndSlot[] jobVertexResources : resources) { - if (jobVertexResources != null) { - for (ExecutionAndSlot execAndSlot : jobVertexResources) { - if (execAndSlot != null) { - try { - releaseSlotFuture(execAndSlot.slotFuture); - } - catch (Throwable t) { - ExceptionUtils.rethrowIfFatalError(t); - } - } - } - } - } - } - catch (Throwable t) { - ExceptionUtils.rethrowIfFatalError(t); - } - } - - // ------------------------------------------------------------------------ - - /** - * A function to be applied into a future, releasing the slot immediately upon completion. - * Completion here refers to both the successful and exceptional completion. - */ - private static final class ReleaseSlotFunction implements BiFunction<SimpleSlot, Throwable, Void> { - - static final ReleaseSlotFunction INSTANCE = new ReleaseSlotFunction(); - - @Override - public Void apply(SimpleSlot simpleSlot, Throwable throwable) { - if (simpleSlot != null) { - simpleSlot.releaseSlot(); - } - return null; - } - } - - // ------------------------------------------------------------------------ - - /** Utility class is not meant to be instantiated */ - private ExecutionGraphUtils() {} -} http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index 90224b0..98191d0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -35,7 +35,6 @@ import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.blob.PermanentBlobKey; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.jobgraph.IntermediateDataSet; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; @@ -57,6 +56,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -475,37 +475,21 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable * * @param resourceProvider The resource provider from whom the slots are requested. */ - public ExecutionAndSlot[] allocateResourcesForAll(SlotProvider resourceProvider, boolean queued) { + public Collection<CompletableFuture<Execution>> allocateResourcesForAll(SlotProvider resourceProvider, boolean queued) { final ExecutionVertex[] vertices = this.taskVertices; - final ExecutionAndSlot[] slots = new ExecutionAndSlot[vertices.length]; + final CompletableFuture<Execution>[] slots = new CompletableFuture[vertices.length]; // try to acquire a slot future for each execution. // we store the execution with the future just to be on the safe side for (int i = 0; i < vertices.length; i++) { - - // we use this flag to handle failures in a 'finally' clause - // that allows us to not go through clumsy cast-and-rethrow logic - boolean successful = false; - - try { - // allocate the next slot (future) - final Execution exec = vertices[i].getCurrentExecutionAttempt(); - final CompletableFuture<SimpleSlot> future = exec.allocateSlotForExecution(resourceProvider, queued); - slots[i] = new ExecutionAndSlot(exec, future); - successful = true; - } - finally { - if (!successful) { - // this is the case if an exception was thrown - for (int k = 0; k < i; k++) { - ExecutionGraphUtils.releaseSlotFuture(slots[k].slotFuture); - } - } - } + // allocate the next slot (future) + final Execution exec = vertices[i].getCurrentExecutionAttempt(); + final CompletableFuture<Execution> allocationFuture = exec.allocateAndAssignSlotForExecution(resourceProvider, queued); + slots[i] = allocationFuture; } // all good, we acquired all slots - return slots; + return Arrays.asList(slots); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 6b9d481..e87a5a0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -55,6 +55,7 @@ import org.slf4j.Logger; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashMap; @@ -266,6 +267,10 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi return currentExecution.getFailureCause(); } + public CompletableFuture<TaskManagerLocation> getCurrentTaskManagerLocationFuture() { + return currentExecution.getTaskManagerLocationFuture(); + } + public SimpleSlot getCurrentAssignedResource() { return currentExecution.getAssignedResource(); } @@ -445,8 +450,8 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi * @see #getPreferredLocationsBasedOnState() * @see #getPreferredLocationsBasedOnInputs() */ - public Iterable<TaskManagerLocation> getPreferredLocations() { - Iterable<TaskManagerLocation> basedOnState = getPreferredLocationsBasedOnState(); + public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocations() { + Collection<CompletableFuture<TaskManagerLocation>> basedOnState = getPreferredLocationsBasedOnState(); return basedOnState != null ? basedOnState : getPreferredLocationsBasedOnInputs(); } @@ -454,13 +459,13 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi * Gets the preferred location to execute the current task execution attempt, based on the state * that the execution attempt will resume. * - * @return A size-one iterable with the location preference, or null, if there is no + * @return A size-one collection with the location preference, or null, if there is no * location preference based on the state. */ - public Iterable<TaskManagerLocation> getPreferredLocationsBasedOnState() { + public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocationsBasedOnState() { TaskManagerLocation priorLocation; if (currentExecution.getTaskStateSnapshot() != null && (priorLocation = getLatestPriorLocation()) != null) { - return Collections.singleton(priorLocation); + return Collections.singleton(CompletableFuture.completedFuture(priorLocation)); } else { return null; @@ -476,14 +481,13 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi * @return The preferred locations based in input streams, or an empty iterable, * if there is no input-based preference. */ - public Iterable<TaskManagerLocation> getPreferredLocationsBasedOnInputs() { + public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocationsBasedOnInputs() { // otherwise, base the preferred locations on the input connections if (inputEdges == null) { return Collections.emptySet(); } else { - Set<TaskManagerLocation> locations = new HashSet<>(); - Set<TaskManagerLocation> inputLocations = new HashSet<>(); + Set<CompletableFuture<TaskManagerLocation>> inputLocations = new HashSet<>(4); // go over all inputs for (int i = 0; i < inputEdges.length; i++) { @@ -493,28 +497,17 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi // go over all input sources for (int k = 0; k < sources.length; k++) { // look-up assigned slot of input source - SimpleSlot sourceSlot = sources[k].getSource().getProducer().getCurrentAssignedResource(); - if (sourceSlot != null) { - // add input location - inputLocations.add(sourceSlot.getTaskManagerLocation()); - // inputs which have too many distinct sources are not considered - if (inputLocations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) { - inputLocations.clear(); - break; - } + CompletableFuture<TaskManagerLocation> taskManagerLocationFuture = sources[k].getSource().getProducer().getCurrentTaskManagerLocationFuture(); + inputLocations.add(taskManagerLocationFuture); + + if (inputLocations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) { + return Collections.emptyList(); } } } - // keep the locations of the input with the least preferred locations - if (locations.isEmpty() || // nothing assigned yet - (!inputLocations.isEmpty() && inputLocations.size() < locations.size())) { - // current input has fewer preferred locations - locations.clear(); - locations.addAll(inputLocations); - } } - return locations.isEmpty() ? Collections.<TaskManagerLocation>emptyList() : locations; + return inputLocations.isEmpty() ? Collections.emptyList() : inputLocations; } } @@ -598,8 +591,14 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi return this.currentExecution.scheduleForExecution(slotProvider, queued); } + @VisibleForTesting public void deployToSlot(SimpleSlot slot) throws JobException { - this.currentExecution.deployToSlot(slot); + if (this.currentExecution.tryAssignResource(slot)) { + this.currentExecution.deploy(); + } else { + throw new IllegalStateException("Could not assign resource " + slot + " to current execution " + + currentExecution + '.'); + } } /** http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java index 6397043..fcf7d40 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java @@ -1003,10 +1003,13 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { @Override public CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) { - Iterable<TaskManagerLocation> locationPreferences = - task.getTaskToExecute().getVertex().getPreferredLocations(); + Collection<CompletableFuture<TaskManagerLocation>> locationPreferenceFutures = + task.getTaskToExecute().getVertex().getPreferredLocations(); - return gateway.allocateSlot(task, ResourceProfile.UNKNOWN, locationPreferences, timeout); + CompletableFuture<Collection<TaskManagerLocation>> locationPreferencesFuture = FutureUtils.combineAll(locationPreferenceFutures); + + return locationPreferencesFuture.thenCompose( + locationPreferences -> gateway.allocateSlot(task, ResourceProfile.UNKNOWN, locationPreferences, timeout)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java index 88fbc10..a071e50 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java @@ -266,18 +266,12 @@ public class SlotSharingGroupAssignment { * slots if no local slot is available. The method returns null, when this sharing group has * no slot is available for the given JobVertexID. * - * @param vertex The vertex to allocate a slot for. + * @param vertexID the vertex id + * @param locationPreferences location preferences * * @return A slot to execute the given ExecutionVertex in, or null, if none is available. */ - public SimpleSlot getSlotForTask(ExecutionVertex vertex) { - return getSlotForTask(vertex.getJobvertexId(), vertex.getPreferredLocationsBasedOnInputs()); - } - - /** - * - */ - SimpleSlot getSlotForTask(JobVertexID vertexID, Iterable<TaskManagerLocation> locationPreferences) { + public SimpleSlot getSlotForTask(JobVertexID vertexID, Iterable<TaskManagerLocation> locationPreferences) { synchronized (lock) { Tuple2<SharedSlot, Locality> p = getSlotForTaskInternal(vertexID, locationPreferences, false); @@ -306,17 +300,13 @@ public class SlotSharingGroupAssignment { * shared slot and returns it. If no suitable shared slot could be found, this method * returns null.</p> * - * @param vertex The execution vertex to find a slot for. * @param constraint The co-location constraint for the placement of the execution vertex. + * @param locationPreferences location preferences * * @return A simple slot allocate within a suitable shared slot, or {@code null}, if no suitable * shared slot is available. */ - public SimpleSlot getSlotForTask(ExecutionVertex vertex, CoLocationConstraint constraint) { - return getSlotForTask(constraint, vertex.getPreferredLocationsBasedOnInputs()); - } - - SimpleSlot getSlotForTask(CoLocationConstraint constraint, Iterable<TaskManagerLocation> locationPreferences) { + public SimpleSlot getSlotForTask(CoLocationConstraint constraint, Iterable<TaskManagerLocation> locationPreferences) { synchronized (lock) { if (constraint.isAssignedAndAlive()) { // the shared slot of the co-location group is initialized and set we allocate a sub-slot http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java index 5a7e819..9b1ffbe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager.scheduler; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -31,6 +32,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; @@ -134,31 +136,36 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl @Override public CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) { - try { - final Object ret = scheduleTask(task, allowQueued); + Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = task.getTaskToExecute().getVertex().getPreferredLocationsBasedOnInputs(); - if (ret instanceof SimpleSlot) { - return CompletableFuture.completedFuture((SimpleSlot) ret); - } - else if (ret instanceof CompletableFuture) { - @SuppressWarnings("unchecked") - CompletableFuture<SimpleSlot> typed = (CompletableFuture<SimpleSlot>) ret; - return typed; - } - else { - // this should never happen, simply guard this case with an exception - throw new RuntimeException(); + CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture = FutureUtils.combineAll(preferredLocationFutures); + + return preferredLocationsFuture.thenCompose( + preferredLocations -> { + try { + final Object ret = scheduleTask(task, allowQueued, preferredLocations); + + if (ret instanceof SimpleSlot) { + return CompletableFuture.completedFuture((SimpleSlot) ret); + } else if (ret instanceof CompletableFuture) { + @SuppressWarnings("unchecked") + CompletableFuture<SimpleSlot> typed = (CompletableFuture<SimpleSlot>) ret; + return typed; + } else { + // this should never happen, simply guard this case with an exception + throw new RuntimeException(); + } + } catch (NoResourceAvailableException e) { + throw new CompletionException(e); + } } - } - catch (NoResourceAvailableException e) { - return FutureUtils.completedExceptionally(e); - } + ); } /** * Returns either a {@link SimpleSlot}, or a {@link CompletableFuture}. */ - private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throws NoResourceAvailableException { + private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource, Iterable<TaskManagerLocation> preferredLocations) throws NoResourceAvailableException { if (task == null) { throw new NullPointerException(); } @@ -168,7 +175,6 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl final ExecutionVertex vertex = task.getTaskToExecute().getVertex(); - final Iterable<TaskManagerLocation> preferredLocations = vertex.getPreferredLocationsBasedOnInputs(); final boolean forceExternalLocation = false && preferredLocations != null && preferredLocations.iterator().hasNext(); @@ -197,10 +203,10 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl // get a slot from the group, if the group has one for us (and can fulfill the constraint) final SimpleSlot slotFromGroup; if (constraint == null) { - slotFromGroup = assignment.getSlotForTask(vertex); + slotFromGroup = assignment.getSlotForTask(vertex.getJobvertexId(), preferredLocations); } else { - slotFromGroup = assignment.getSlotForTask(vertex, constraint); + slotFromGroup = assignment.getSlotForTask(constraint, preferredLocations); } SimpleSlot newSlot = null; @@ -234,7 +240,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl localOnly = true; } else { - locations = vertex.getPreferredLocationsBasedOnInputs(); + locations = preferredLocations; localOnly = forceExternalLocation; } http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java index b90c306..69a679a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java @@ -38,7 +38,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.ScheduleMode; -import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot; import org.apache.flink.runtime.jobmanager.slots.SlotOwner; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; @@ -49,21 +48,22 @@ import org.apache.flink.util.TestLogger; import org.junit.After; import org.junit.Test; - -import org.mockito.invocation.InvocationOnMock; import org.mockito.verification.Timeout; import java.net.InetAddress; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Tests for the scheduling of the execution graph. This tests that @@ -399,141 +399,6 @@ public class ExecutionGraphSchedulingTest extends TestLogger { } } - /** - * Tests that the {@link ExecutionJobVertex#allocateResourcesForAll(SlotProvider, boolean)} method - * releases partially acquired resources upon exception. - */ - @Test - public void testExecutionJobVertexAllocateResourcesReleasesOnException() throws Exception { - final int parallelism = 8; - - final JobVertex vertex = new JobVertex("vertex"); - vertex.setParallelism(parallelism); - vertex.setInvokableClass(NoOpInvokable.class); - - final JobID jobId = new JobID(); - final JobGraph jobGraph = new JobGraph(jobId, "test", vertex); - - // set up some available slots and some slot owner that accepts released slots back - final List<SimpleSlot> returnedSlots = new ArrayList<>(); - final SlotOwner recycler = new SlotOwner() { - @Override - public boolean returnAllocatedSlot(Slot slot) { - returnedSlots.add((SimpleSlot) slot); - return true; - } - }; - - // slot provider that hand out parallelism / 3 slots, then throws an exception - final SlotProvider slotProvider = mock(SlotProvider.class); - - final TaskManagerGateway taskManager = mock(TaskManagerGateway.class); - final List<SimpleSlot> availableSlots = new ArrayList<>(Arrays.asList( - createSlot(taskManager, jobId, recycler), - createSlot(taskManager, jobId, recycler), - createSlot(taskManager, jobId, recycler))); - - when(slotProvider.allocateSlot(any(ScheduledUnit.class), anyBoolean())).then( - (InvocationOnMock invocation) -> { - if (availableSlots.isEmpty()) { - throw new TestRuntimeException(); - } else { - return CompletableFuture.completedFuture(availableSlots.remove(0)); - } - }); - - final ExecutionGraph eg = createExecutionGraph(jobGraph, slotProvider); - final ExecutionJobVertex ejv = eg.getJobVertex(vertex.getID()); - - // acquire resources and check that all are back after the failure - - final int numSlotsToExpectBack = availableSlots.size(); - - try { - ejv.allocateResourcesForAll(slotProvider, false); - fail("should have failed with an exception"); - } - catch (TestRuntimeException e) { - // expected - } - - assertEquals(numSlotsToExpectBack, returnedSlots.size()); - } - - /** - * Tests that the {@link ExecutionGraph#scheduleForExecution()} method - * releases partially acquired resources upon exception. - */ - @Test - public void testExecutionGraphScheduleReleasesResourcesOnException() throws Exception { - - // [pipelined] - // we construct a simple graph (source) ----------------> (target) - - final int parallelism = 3; - - final JobVertex sourceVertex = new JobVertex("source"); - sourceVertex.setParallelism(parallelism); - sourceVertex.setInvokableClass(NoOpInvokable.class); - - final JobVertex targetVertex = new JobVertex("target"); - targetVertex.setParallelism(parallelism); - targetVertex.setInvokableClass(NoOpInvokable.class); - - targetVertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); - - final JobID jobId = new JobID(); - final JobGraph jobGraph = new JobGraph(jobId, "test", sourceVertex, targetVertex); - - // set up some available slots and some slot owner that accepts released slots back - final List<SimpleSlot> returnedSlots = new ArrayList<>(); - final SlotOwner recycler = new SlotOwner() { - @Override - public boolean returnAllocatedSlot(Slot slot) { - returnedSlots.add((SimpleSlot) slot); - return true; - } - }; - - final TaskManagerGateway taskManager = mock(TaskManagerGateway.class); - final List<SimpleSlot> availableSlots = new ArrayList<>(Arrays.asList( - createSlot(taskManager, jobId, recycler), - createSlot(taskManager, jobId, recycler), - createSlot(taskManager, jobId, recycler), - createSlot(taskManager, jobId, recycler), - createSlot(taskManager, jobId, recycler))); - - - // slot provider that hand out parallelism / 3 slots, then throws an exception - final SlotProvider slotProvider = mock(SlotProvider.class); - - when(slotProvider.allocateSlot(any(ScheduledUnit.class), anyBoolean())).then( - (InvocationOnMock invocation) -> { - if (availableSlots.isEmpty()) { - throw new TestRuntimeException(); - } else { - return CompletableFuture.completedFuture(availableSlots.remove(0)); - } - }); - - final ExecutionGraph eg = createExecutionGraph(jobGraph, slotProvider); - - // acquire resources and check that all are back after the failure - - final int numSlotsToExpectBack = availableSlots.size(); - - try { - eg.setScheduleMode(ScheduleMode.EAGER); - eg.scheduleForExecution(); - fail("should have failed with an exception"); - } - catch (TestRuntimeException e) { - // expected - } - - assertEquals(numSlotsToExpectBack, returnedSlots.size()); - } - // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java index de9081b..4ce3f9d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java @@ -112,25 +112,29 @@ public class ExecutionGraphStopTest extends TestLogger { // deploy source 1 for (ExecutionVertex ev : eg.getJobVertex(source1.getID()).getTaskVertices()) { SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, sourceGateway); - ev.getCurrentExecutionAttempt().deployToSlot(slot); + ev.getCurrentExecutionAttempt().tryAssignResource(slot); + ev.getCurrentExecutionAttempt().deploy(); } // deploy source 2 for (ExecutionVertex ev : eg.getJobVertex(source2.getID()).getTaskVertices()) { SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, sourceGateway); - ev.getCurrentExecutionAttempt().deployToSlot(slot); + ev.getCurrentExecutionAttempt().tryAssignResource(slot); + ev.getCurrentExecutionAttempt().deploy(); } // deploy non-source 1 for (ExecutionVertex ev : eg.getJobVertex(nonSource1.getID()).getTaskVertices()) { SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, nonSourceGateway); - ev.getCurrentExecutionAttempt().deployToSlot(slot); + ev.getCurrentExecutionAttempt().tryAssignResource(slot); + ev.getCurrentExecutionAttempt().deploy(); } // deploy non-source 2 for (ExecutionVertex ev : eg.getJobVertex(nonSource2.getID()).getTaskVertices()) { SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, nonSourceGateway); - ev.getCurrentExecutionAttempt().deployToSlot(slot); + ev.getCurrentExecutionAttempt().tryAssignResource(slot); + ev.getCurrentExecutionAttempt().deploy(); } eg.stop(); @@ -162,7 +166,8 @@ public class ExecutionGraphStopTest extends TestLogger { final SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, gateway); - exec.deployToSlot(slot); + exec.tryAssignResource(slot); + exec.deploy(); exec.switchToRunning(); assertEquals(ExecutionState.RUNNING, exec.getState()); http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index b534ade..5feeabc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.executiongraph; -import akka.actor.Status; - import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; @@ -63,20 +61,20 @@ import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.util.SerializedValue; +import akka.actor.Status; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.ExecutionContext; -import scala.concurrent.ExecutionContext$; - import java.lang.reflect.Field; import java.net.InetAddress; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeoutException; +import scala.concurrent.ExecutionContext; +import scala.concurrent.ExecutionContext$; + import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; - import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -231,15 +229,10 @@ public class ExecutionGraphTestUtils { } public static void setVertexResource(ExecutionVertex vertex, SimpleSlot slot) { - try { - Execution exec = vertex.getCurrentExecutionAttempt(); - - Field f = Execution.class.getDeclaredField("assignedResource"); - f.setAccessible(true); - f.set(exec, slot); - } - catch (Exception e) { - throw new RuntimeException("Modifying the slot failed", e); + Execution exec = vertex.getCurrentExecutionAttempt(); + + if(!exec.tryAssignResource(slot)) { + throw new RuntimeException("Could not assign resource."); } } http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java deleted file mode 100644 index c616501..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.executiongraph; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.instance.SimpleSlot; -import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot; -import org.apache.flink.runtime.jobmanager.slots.SlotOwner; -import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; - -import org.junit.Test; - -import java.net.InetAddress; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.CompletableFuture; - -import static org.mockito.Mockito.*; - -/** - * Tests for the utility methods in the class {@link ExecutionGraphUtils}. - */ -public class ExecutionGraphUtilsTest { - - @Test - public void testReleaseSlots() { - final JobID jid = new JobID(); - final SlotOwner owner = mock(SlotOwner.class); - - final SimpleSlot slot1 = new SimpleSlot(createAllocatedSlot(jid, 0), owner, 0); - final SimpleSlot slot2 = new SimpleSlot(createAllocatedSlot(jid, 1), owner, 1); - final SimpleSlot slot3 = new SimpleSlot(createAllocatedSlot(jid, 2), owner, 2); - - final CompletableFuture<SimpleSlot> incompleteFuture = new CompletableFuture<>(); - - final CompletableFuture<SimpleSlot> completeFuture = new CompletableFuture<>(); - completeFuture.complete(slot2); - - final CompletableFuture<SimpleSlot> disposedSlotFuture = new CompletableFuture<>(); - slot3.releaseSlot(); - disposedSlotFuture.complete(slot3); - - // release all futures - ExecutionGraphUtils.releaseSlotFuture(incompleteFuture); - ExecutionGraphUtils.releaseSlotFuture(completeFuture); - ExecutionGraphUtils.releaseSlotFuture(disposedSlotFuture); - - // only now complete the incomplete future - incompleteFuture.complete(slot1); - - // verify that each slot was returned once to the owner - verify(owner, times(1)).returnAllocatedSlot(eq(slot1)); - verify(owner, times(1)).returnAllocatedSlot(eq(slot2)); - verify(owner, times(1)).returnAllocatedSlot(eq(slot3)); - } - - @Test - public void testReleaseSlotsWithNulls() { - final JobID jid = new JobID(); - final SlotOwner owner = mock(SlotOwner.class); - - final Execution mockExecution = mock(Execution.class); - - final SimpleSlot slot1 = new SimpleSlot(createAllocatedSlot(jid, 0), owner, 0); - final SimpleSlot slot2 = new SimpleSlot(createAllocatedSlot(jid, 1), owner, 1); - final SimpleSlot slot3 = new SimpleSlot(createAllocatedSlot(jid, 2), owner, 2); - final SimpleSlot slot4 = new SimpleSlot(createAllocatedSlot(jid, 3), owner, 3); - final SimpleSlot slot5 = new SimpleSlot(createAllocatedSlot(jid, 4), owner, 4); - - ExecutionAndSlot[] slots1 = new ExecutionAndSlot[] { - null, - new ExecutionAndSlot(mockExecution, CompletableFuture.completedFuture(slot1)), - null, - new ExecutionAndSlot(mockExecution, CompletableFuture.completedFuture(slot2)), - null - }; - - ExecutionAndSlot[] slots2 = new ExecutionAndSlot[] { - new ExecutionAndSlot(mockExecution, CompletableFuture.completedFuture(slot3)), - new ExecutionAndSlot(mockExecution, CompletableFuture.completedFuture(slot4)), - new ExecutionAndSlot(mockExecution, CompletableFuture.completedFuture(slot5)) - }; - - List<ExecutionAndSlot[]> resources = Arrays.asList(null, slots1, new ExecutionAndSlot[0], null, slots2); - - ExecutionGraphUtils.releaseAllSlotsSilently(resources); - - verify(owner, times(1)).returnAllocatedSlot(eq(slot1)); - verify(owner, times(1)).returnAllocatedSlot(eq(slot2)); - verify(owner, times(1)).returnAllocatedSlot(eq(slot3)); - verify(owner, times(1)).returnAllocatedSlot(eq(slot4)); - verify(owner, times(1)).returnAllocatedSlot(eq(slot5)); - } - - // ------------------------------------------------------------------------ - - private static AllocatedSlot createAllocatedSlot(JobID jid, int num) { - TaskManagerLocation loc = new TaskManagerLocation( - ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + num); - - return new AllocatedSlot(new AllocationID(), jid, loc, num, - ResourceProfile.UNKNOWN, mock(TaskManagerGateway.class)); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java index 44e1794..9908dae 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java @@ -263,8 +263,8 @@ public class ExecutionVertexCancelTest extends TestLogger { Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway)); SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); - setVertexState(vertex, ExecutionState.RUNNING); setVertexResource(vertex, slot); + setVertexState(vertex, ExecutionState.RUNNING); assertEquals(ExecutionState.RUNNING, vertex.getExecutionState()); @@ -303,8 +303,8 @@ public class ExecutionVertexCancelTest extends TestLogger { Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway)); SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); - setVertexState(vertex, ExecutionState.RUNNING); setVertexResource(vertex, slot); + setVertexState(vertex, ExecutionState.RUNNING); assertEquals(ExecutionState.RUNNING, vertex.getExecutionState()); @@ -351,8 +351,8 @@ public class ExecutionVertexCancelTest extends TestLogger { Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway)); SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); - setVertexState(vertex, ExecutionState.RUNNING); setVertexResource(vertex, slot); + setVertexState(vertex, ExecutionState.RUNNING); assertEquals(ExecutionState.RUNNING, vertex.getExecutionState()); @@ -384,8 +384,8 @@ public class ExecutionVertexCancelTest extends TestLogger { Instance instance = getInstance(new ActorTaskManagerGateway(gateway)); SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); - setVertexState(vertex, ExecutionState.RUNNING); setVertexResource(vertex, slot); + setVertexState(vertex, ExecutionState.RUNNING); assertEquals(ExecutionState.RUNNING, vertex.getExecutionState()); http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java index 5f12646..15d021a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java @@ -43,6 +43,7 @@ import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -50,6 +51,7 @@ import org.junit.Test; import java.lang.reflect.Field; import java.net.InetAddress; import java.util.Iterator; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; @@ -92,10 +94,10 @@ public class ExecutionVertexLocalityTest extends TestLogger { // validate that the target vertices have no location preference for (int i = 0; i < parallelism; i++) { ExecutionVertex target = graph.getAllVertices().get(targetVertexId).getTaskVertices()[i]; - Iterator<TaskManagerLocation> preference = target.getPreferredLocations().iterator(); + Iterator<CompletableFuture<TaskManagerLocation>> preference = target.getPreferredLocations().iterator(); assertTrue(preference.hasNext()); - assertEquals(locations[i], preference.next()); + assertEquals(locations[i], preference.next().get()); assertFalse(preference.hasNext()); } } @@ -122,7 +124,7 @@ public class ExecutionVertexLocalityTest extends TestLogger { for (int i = 0; i < parallelism; i++) { ExecutionVertex target = graph.getAllVertices().get(targetVertexId).getTaskVertices()[i]; - Iterator<TaskManagerLocation> preference = target.getPreferredLocations().iterator(); + Iterator<CompletableFuture<TaskManagerLocation>> preference = target.getPreferredLocations().iterator(); assertFalse(preference.hasNext()); } } @@ -178,10 +180,10 @@ public class ExecutionVertexLocalityTest extends TestLogger { // validate that the target vertices have the state's location as the location preference for (int i = 0; i < parallelism; i++) { ExecutionVertex target = graph.getAllVertices().get(targetVertexId).getTaskVertices()[i]; - Iterator<TaskManagerLocation> preference = target.getPreferredLocations().iterator(); + Iterator<CompletableFuture<TaskManagerLocation>> preference = target.getPreferredLocations().iterator(); assertTrue(preference.hasNext()); - assertEquals(locations[i], preference.next()); + assertEquals(locations[i], preference.next().get()); assertFalse(preference.hasNext()); } } @@ -236,10 +238,9 @@ public class ExecutionVertexLocalityTest extends TestLogger { SimpleSlot simpleSlot = new SimpleSlot(slot, mock(SlotOwner.class), 0); - final Field locationField = Execution.class.getDeclaredField("assignedResource"); - locationField.setAccessible(true); - - locationField.set(vertex.getCurrentExecutionAttempt(), simpleSlot); + if (!vertex.getCurrentExecutionAttempt().tryAssignResource(simpleSlot)) { + throw new FlinkException("Could not assign resource."); + } } private void setState(Execution execution, ExecutionState state) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java index afb9dac..9f4a675 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java @@ -33,12 +33,13 @@ import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.TestLogger; import org.junit.Test; import java.util.concurrent.ExecutionException; -public class ScheduleWithCoLocationHintTest { +public class ScheduleWithCoLocationHintTest extends TestLogger { @Test public void scheduleAllSharedAndCoLocated() { http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java index c049593..1f88dd8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -47,7 +48,7 @@ import static org.junit.Assert.fail; /** * Tests for the scheduler when scheduling tasks in slot sharing groups. */ -public class SchedulerSlotSharingTest { +public class SchedulerSlotSharingTest extends TestLogger { @Test public void scheduleSingleVertexType() { http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java index 4312b0f..c7d0f09 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java @@ -25,9 +25,11 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -105,8 +107,13 @@ public class SchedulerTestUtils { public static Execution getTestVertex(Iterable<TaskManagerLocation> preferredLocations) { ExecutionVertex vertex = mock(ExecutionVertex.class); - - when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(preferredLocations); + + Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = new ArrayList<>(4); + + for (TaskManagerLocation preferredLocation : preferredLocations) { + preferredLocationFutures.add(CompletableFuture.completedFuture(preferredLocation)); + } + when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(preferredLocationFutures); when(vertex.getJobId()).thenReturn(new JobID()); when(vertex.toString()).thenReturn("TEST-VERTEX"); @@ -119,7 +126,7 @@ public class SchedulerTestUtils { public static Execution getTestVertex(JobVertexID jid, int taskIndex, int numTasks) { ExecutionVertex vertex = mock(ExecutionVertex.class); - when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(null); + when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(Collections.emptyList()); when(vertex.getJobId()).thenReturn(new JobID()); when(vertex.getJobvertexId()).thenReturn(jid); when(vertex.getParallelSubtaskIndex()).thenReturn(taskIndex); @@ -139,7 +146,13 @@ public class SchedulerTestUtils { ExecutionVertex vertex = mock(ExecutionVertex.class); - when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(Arrays.asList(locations)); + Collection<CompletableFuture<TaskManagerLocation>> preferrecLocationFutures = new ArrayList<>(locations.length); + + for (TaskManagerLocation location : locations) { + preferrecLocationFutures.add(CompletableFuture.completedFuture(location)); + } + + when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(preferrecLocationFutures); when(vertex.getJobId()).thenReturn(new JobID()); when(vertex.getJobvertexId()).thenReturn(jid); when(vertex.getParallelSubtaskIndex()).thenReturn(taskIndex);
