[FLINK-4459] [distributed runtime] Introduce SlotProvider for Scheduler This closes #2424
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e40f590 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e40f590 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e40f590 Branch: refs/heads/flip-6 Commit: 6e40f59015dcdb8529691318e8f5a33e831252b8 Parents: 502a79d Author: Kurt Young <[email protected]> Authored: Fri Aug 26 17:51:40 2016 +0800 Committer: Stephan Ewen <[email protected]> Committed: Sun Sep 4 23:09:59 2016 +0200 ---------------------------------------------------------------------- .../flink/runtime/executiongraph/Execution.java | 15 +- .../runtime/executiongraph/ExecutionGraph.java | 32 +- .../executiongraph/ExecutionJobVertex.java | 7 +- .../runtime/executiongraph/ExecutionVertex.java | 8 +- .../flink/runtime/instance/SlotProvider.java | 48 +++ .../runtime/jobmanager/scheduler/Scheduler.java | 27 +- .../ExecutionGraphMetricsTest.java | 8 +- .../ExecutionVertexSchedulingTest.java | 28 +- .../TerminalStateDeadlockTest.java | 27 +- .../ScheduleWithCoLocationHintTest.java | 303 +++++++++++-------- .../scheduler/SchedulerIsolatedTasksTest.java | 45 ++- .../scheduler/SchedulerSlotSharingTest.java | 230 +++++++------- 12 files changed, 430 insertions(+), 348 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 846df49..6826365 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -34,13 +34,13 @@ import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; -import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture; import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; @@ -271,15 +271,15 @@ public class Execution { * to be scheduled immediately and no resource is available. If the task is accepted by the schedule, any * error sets the vertex state to failed and triggers the recovery logic. * - * @param scheduler The scheduler to use to schedule this execution attempt. + * @param slotProvider The slot provider to use to allocate slot for this execution attempt. * @param queued Flag to indicate whether the scheduler may queue this task if it cannot * immediately deploy it. * * @throws IllegalStateException Thrown, if the vertex is not in CREATED state, which is the only state that permits scheduling. * @throws NoResourceAvailableException Thrown is no queued scheduling is allowed and no resources are currently available. */ - public boolean scheduleForExecution(Scheduler scheduler, boolean queued) throws NoResourceAvailableException { - if (scheduler == null) { + public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException { + if (slotProvider == null) { throw new IllegalArgumentException("Cannot send null Scheduler when scheduling execution."); } @@ -299,9 +299,8 @@ public class Execution { // IMPORTANT: To prevent leaks of cluster resources, we need to make sure that slots are returned // in all cases where the deployment failed. we use many try {} finally {} clauses to assure that + final SlotAllocationFuture future = slotProvider.allocateSlot(toSchedule, queued); if (queued) { - SlotAllocationFuture future = scheduler.scheduleQueued(toSchedule); - future.setFutureAction(new SlotAllocationFutureAction() { @Override public void slotAllocated(SimpleSlot slot) { @@ -319,7 +318,7 @@ public class Execution { }); } else { - SimpleSlot slot = scheduler.scheduleImmediately(toSchedule); + SimpleSlot slot = future.get(); try { deployToSlot(slot); } @@ -560,7 +559,7 @@ public class Execution { public Boolean call() throws Exception { try { consumerVertex.scheduleForExecution( - consumerVertex.getExecutionGraph().getScheduler(), + consumerVertex.getExecutionGraph().getSlotProvider(), consumerVertex.getExecutionGraph().isQueuedSchedulingAllowed()); } catch (Throwable t) { consumerVertex.fail(new IllegalStateException("Could not schedule consumer " + http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 92cab41..585e9f3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -40,6 +40,7 @@ import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.SuppressRestartsException; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobStatus; @@ -47,7 +48,6 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; -import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.query.KvStateLocationRegistry; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.util.SerializableObject; @@ -197,8 +197,8 @@ public class ExecutionGraph { // ------ Fields that are relevant to the execution and need to be cleared before archiving ------- - /** The scheduler to use for scheduling new tasks as they are needed */ - private Scheduler scheduler; + /** The slot provider to use for allocating slots for tasks as they are needed */ + private SlotProvider slotProvider; /** Strategy to use for restarts */ private RestartStrategy restartStrategy; @@ -470,8 +470,8 @@ public class ExecutionGraph { return jsonPlan; } - public Scheduler getScheduler() { - return scheduler; + public SlotProvider getSlotProvider() { + return slotProvider; } public JobID getJobID() { @@ -670,17 +670,17 @@ public class ExecutionGraph { } } - public void scheduleForExecution(Scheduler scheduler) throws JobException { - if (scheduler == null) { + public void scheduleForExecution(SlotProvider slotProvider) throws JobException { + if (slotProvider == null) { throw new IllegalArgumentException("Scheduler must not be null."); } - if (this.scheduler != null && this.scheduler != scheduler) { - throw new IllegalArgumentException("Cannot use different schedulers for the same job"); + if (this.slotProvider != null && this.slotProvider != slotProvider) { + throw new IllegalArgumentException("Cannot use different slot providers for the same job"); } if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) { - this.scheduler = scheduler; + this.slotProvider = slotProvider; switch (scheduleMode) { @@ -688,14 +688,14 @@ public class ExecutionGraph { // simply take the vertices without inputs. for (ExecutionJobVertex ejv : this.tasks.values()) { if (ejv.getJobVertex().isInputVertex()) { - ejv.scheduleAll(scheduler, allowQueuedScheduling); + ejv.scheduleAll(slotProvider, allowQueuedScheduling); } } break; case EAGER: for (ExecutionJobVertex ejv : getVerticesTopologically()) { - ejv.scheduleAll(scheduler, allowQueuedScheduling); + ejv.scheduleAll(slotProvider, allowQueuedScheduling); } break; @@ -850,8 +850,8 @@ public class ExecutionGraph { throw new IllegalStateException("Can only restart job from state restarting."); } - if (scheduler == null) { - throw new IllegalStateException("The execution graph has not been scheduled before - scheduler is null."); + if (slotProvider == null) { + throw new IllegalStateException("The execution graph has not been scheduled before - slotProvider is null."); } this.currentExecutions.clear(); @@ -885,7 +885,7 @@ public class ExecutionGraph { } } - scheduleForExecution(scheduler); + scheduleForExecution(slotProvider); } catch (Throwable t) { fail(t); @@ -917,7 +917,7 @@ public class ExecutionGraph { // clear the non-serializable fields restartStrategy = null; - scheduler = null; + slotProvider = null; checkpointCoordinator = null; executionContext = null; kvStateLocationRegistry = null; http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index d3dc8fe..1ac9522 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.IntermediateDataSet; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; @@ -37,7 +38,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; -import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.util.SerializableObject; import org.apache.flink.util.Preconditions; @@ -289,12 +289,13 @@ public class ExecutionJobVertex { // Actions //--------------------------------------------------------------------------------------------- - public void scheduleAll(Scheduler scheduler, boolean queued) throws NoResourceAvailableException { + public void scheduleAll(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException { + ExecutionVertex[] vertices = this.taskVertices; // kick off the tasks for (ExecutionVertex ev : vertices) { - ev.scheduleForExecution(scheduler, queued); + ev.scheduleForExecution(slotProvider, queued); } } http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 88e1b88..a8d5ee4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescript import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.SimpleSlot; @@ -40,12 +41,11 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; -import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.util.SerializedValue; - import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; + import org.slf4j.Logger; import scala.concurrent.duration.FiniteDuration; @@ -443,8 +443,8 @@ public class ExecutionVertex { } } - public boolean scheduleForExecution(Scheduler scheduler, boolean queued) throws NoResourceAvailableException { - return this.currentExecution.scheduleForExecution(scheduler, queued); + public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException { + return this.currentExecution.scheduleForExecution(slotProvider, queued); } public void deployToSlot(SimpleSlot slot) throws JobException { http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java new file mode 100644 index 0000000..b2c23a5 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.instance; + +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture; + +/** + * The slot provider is responsible for preparing slots for ready-to-run tasks. + * + * <p>It supports two allocating modes: + * <ul> + * <li>Immediate allocating: A request for a task slot immediately gets satisfied, we can call + * {@link SlotAllocationFuture#get()} to get the allocated slot.</li> + * <li>Queued allocating: A request for a task slot is queued and returns a future that will be + * fulfilled as soon as a slot becomes available.</li> + * </ul> + */ +public interface SlotProvider { + + /** + * Allocating slot with specific requirement. + * + * @param task The task to allocate the slot for + * @param allowQueued Whether allow the task be queued if we do not have enough resource + * @return The future of the allocation + * + * @throws NoResourceAvailableException + */ + SlotAllocationFuture allocateSlot(ScheduledUnit task, boolean allowQueued) throws NoResourceAvailableException; +} http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java index 734972d..c9cdd00 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java @@ -39,6 +39,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.instance.SlotSharingGroupAssignment; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.instance.SharedSlot; @@ -65,7 +66,7 @@ import scala.concurrent.ExecutionContext; * fulfilled as soon as a slot becomes available.</li> * </ul> */ -public class Scheduler implements InstanceListener, SlotAvailabilityListener { +public class Scheduler implements InstanceListener, SlotAvailabilityListener, SlotProvider { /** Scheduler-wide logger */ private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class); @@ -129,30 +130,24 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { // ------------------------------------------------------------------------ // Scheduling // ------------------------------------------------------------------------ - - public SimpleSlot scheduleImmediately(ScheduledUnit task) throws NoResourceAvailableException { - Object ret = scheduleTask(task, false); - if (ret instanceof SimpleSlot) { - return (SimpleSlot) ret; - } - else { - throw new RuntimeException(); - } - } - - public SlotAllocationFuture scheduleQueued(ScheduledUnit task) throws NoResourceAvailableException { - Object ret = scheduleTask(task, true); + + + @Override + public SlotAllocationFuture allocateSlot(ScheduledUnit task, boolean allowQueued) + throws NoResourceAvailableException { + + final Object ret = scheduleTask(task, allowQueued); if (ret instanceof SimpleSlot) { return new SlotAllocationFuture((SimpleSlot) ret); } - if (ret instanceof SlotAllocationFuture) { + else if (ret instanceof SlotAllocationFuture) { return (SlotAllocationFuture) ret; } else { throw new RuntimeException(); } } - + /** * Returns either a {@link org.apache.flink.runtime.instance.SimpleSlot}, or a {@link SlotAllocationFuture}. */ http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java index d5520fd..aa5925f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.Slot; @@ -70,8 +71,8 @@ import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; + +import static org.mockito.Mockito.*; public class ExecutionGraphMetricsTest extends TestLogger { @@ -135,7 +136,8 @@ public class ExecutionGraphMetricsTest extends TestLogger { when(simpleSlot.setExecutedVertex(Matchers.any(Execution.class))).thenReturn(true); when(simpleSlot.getRoot()).thenReturn(rootSlot); - when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(simpleSlot); + when(scheduler.allocateSlot(any(ScheduledUnit.class), anyBoolean())) + .thenReturn(new SlotAllocationFuture(simpleSlot)); http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java index 5e9ee33..c576ce5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java @@ -18,25 +18,29 @@ package org.apache.flink.runtime.executiongraph; -import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex; -import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; - import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.instance.DummyActorGateway; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture; import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.junit.Test; +import org.junit.Test; import org.mockito.Matchers; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.anyBoolean; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class ExecutionVertexSchedulingTest { @Test @@ -54,7 +58,8 @@ public class ExecutionVertexSchedulingTest { assertTrue(slot.isReleased()); Scheduler scheduler = mock(Scheduler.class); - when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(slot); + when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean())) + .thenReturn(new SlotAllocationFuture(slot)); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); // try to deploy to the slot @@ -86,7 +91,7 @@ public class ExecutionVertexSchedulingTest { final SlotAllocationFuture future = new SlotAllocationFuture(); Scheduler scheduler = mock(Scheduler.class); - when(scheduler.scheduleQueued(Matchers.any(ScheduledUnit.class))).thenReturn(future); + when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean())).thenReturn(future); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); // try to deploy to the slot @@ -117,7 +122,8 @@ public class ExecutionVertexSchedulingTest { final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId()); Scheduler scheduler = mock(Scheduler.class); - when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(slot); + when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean())) + .thenReturn(new SlotAllocationFuture(slot)); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); @@ -130,4 +136,4 @@ public class ExecutionVertexSchedulingTest { fail(e.getMessage()); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java index 870ae05..4cae7c2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -54,12 +55,12 @@ import java.util.concurrent.TimeUnit; import static org.junit.Assert.*; public class TerminalStateDeadlockTest { - + private final Field stateField; private final Field resourceField; private final Field execGraphStateField; - private final Field execGraphSchedulerField; - + private final Field execGraphSlotProviderField; + private final SimpleSlot resource; @@ -75,8 +76,8 @@ public class TerminalStateDeadlockTest { this.execGraphStateField = ExecutionGraph.class.getDeclaredField("state"); this.execGraphStateField.setAccessible(true); - this.execGraphSchedulerField = ExecutionGraph.class.getDeclaredField("scheduler"); - this.execGraphSchedulerField.setAccessible(true); + this.execGraphSlotProviderField = ExecutionGraph.class.getDeclaredField("slotProvider"); + this.execGraphSlotProviderField.setAccessible(true); // the dummy resource ResourceID resourceId = ResourceID.generate(); @@ -96,11 +97,9 @@ public class TerminalStateDeadlockTest { throw new RuntimeException(); } } - - - + // ------------------------------------------------------------------------ - + @Test public void testProvokeDeadlock() { try { @@ -135,7 +134,7 @@ public class TerminalStateDeadlockTest { initializeExecution(e2); execGraphStateField.set(eg, JobStatus.FAILING); - execGraphSchedulerField.set(eg, scheduler); + execGraphSlotProviderField.set(eg, scheduler); Runnable r1 = new Runnable() { @Override @@ -173,12 +172,10 @@ public class TerminalStateDeadlockTest { static class TestExecGraph extends ExecutionGraph { - private static final long serialVersionUID = -7606144898417942044L; - private static final Configuration EMPTY_CONFIG = new Configuration(); private static final FiniteDuration TIMEOUT = new FiniteDuration(30, TimeUnit.SECONDS); - + private volatile boolean done; TestExecGraph(JobID jobId) throws IOException { @@ -193,14 +190,14 @@ public class TerminalStateDeadlockTest { } @Override - public void scheduleForExecution(Scheduler scheduler) { + public void scheduleForExecution(SlotProvider slotProvider) { // notify that we are done with the "restarting" synchronized (this) { done = true; this.notifyAll(); } } - + public void waitTillDone() { try { synchronized (this) { http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java index eab4fea..b803702 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java @@ -63,18 +63,18 @@ public class ScheduleWithCoLocationHintTest { CoLocationConstraint c6 = new CoLocationConstraint(ccg); // schedule 4 tasks from the first vertex group - SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 6), sharingGroup, c1)); - SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 6), sharingGroup, c2)); - SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 6), sharingGroup, c3)); - SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 6), sharingGroup, c4)); - SimpleSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 6), sharingGroup, c1)); - SimpleSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 6), sharingGroup, c2)); - SimpleSlot s7 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 6), sharingGroup, c3)); - SimpleSlot s8 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 6), sharingGroup, c5)); - SimpleSlot s9 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 5, 6), sharingGroup, c6)); - SimpleSlot s10 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 6), sharingGroup, c4)); - SimpleSlot s11 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 4, 6), sharingGroup, c5)); - SimpleSlot s12 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 5, 6), sharingGroup, c6)); + SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 6), sharingGroup, c1), false).get(); + SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 6), sharingGroup, c2), false).get(); + SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 6), sharingGroup, c3), false).get(); + SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 6), sharingGroup, c4), false).get(); + SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 6), sharingGroup, c1), false).get(); + SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 6), sharingGroup, c2), false).get(); + SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 6), sharingGroup, c3), false).get(); + SimpleSlot s8 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 6), sharingGroup, c5), false).get(); + SimpleSlot s9 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 6), sharingGroup, c6), false).get(); + SimpleSlot s10 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 6), sharingGroup, c4), false).get(); + SimpleSlot s11 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 6), sharingGroup, c5), false).get(); + SimpleSlot s12 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 6), sharingGroup, c6), false).get(); assertNotNull(s1); assertNotNull(s2); @@ -109,22 +109,22 @@ public class ScheduleWithCoLocationHintTest { assertEquals(s4.getTaskManagerID(), s10.getTaskManagerID()); assertEquals(s8.getTaskManagerID(), s11.getTaskManagerID()); assertEquals(s9.getTaskManagerID(), s12.getTaskManagerID()); - + assertEquals(c1.getLocation(), s1.getTaskManagerLocation()); assertEquals(c2.getLocation(), s2.getTaskManagerLocation()); assertEquals(c3.getLocation(), s3.getTaskManagerLocation()); assertEquals(c4.getLocation(), s4.getTaskManagerLocation()); assertEquals(c5.getLocation(), s8.getTaskManagerLocation()); assertEquals(c6.getLocation(), s9.getTaskManagerLocation()); - + // check the scheduler's bookkeeping assertEquals(0, scheduler.getNumberOfAvailableSlots()); - + // the first assignments are unconstrained, co.-scheduling is constrained assertEquals(6, scheduler.getNumberOfLocalizedAssignments()); assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments()); assertEquals(6, scheduler.getNumberOfUnconstrainedAssignments()); - + // release some slots, be sure that new available ones come up s1.releaseSlot(); s2.releaseSlot(); @@ -135,10 +135,11 @@ public class ScheduleWithCoLocationHintTest { s11.releaseSlot(); s12.releaseSlot(); assertTrue(scheduler.getNumberOfAvailableSlots() >= 1); - - SimpleSlot single = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(new JobVertexID(), 0, 1))); + + SimpleSlot single = scheduler.allocateSlot( + new ScheduledUnit(getTestVertex(new JobVertexID(), 0, 1)), false).get(); assertNotNull(single); - + s1.releaseSlot(); s2.releaseSlot(); s3.releaseSlot(); @@ -149,9 +150,9 @@ public class ScheduleWithCoLocationHintTest { s9.releaseSlot(); s11.releaseSlot(); s12.releaseSlot(); - + assertEquals(5, scheduler.getNumberOfAvailableSlots()); - + assertEquals(6, scheduler.getNumberOfLocalizedAssignments()); assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments()); assertEquals(7, scheduler.getNumberOfUnconstrainedAssignments()); @@ -161,7 +162,7 @@ public class ScheduleWithCoLocationHintTest { fail(e.getMessage()); } } - + @Test public void scheduleWithIntermediateRelease() { try { @@ -169,34 +170,37 @@ public class ScheduleWithCoLocationHintTest { JobVertexID jid2 = new JobVertexID(); JobVertexID jid3 = new JobVertexID(); JobVertexID jid4 = new JobVertexID(); - + Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); - + Instance i1 = getRandomInstance(1); Instance i2 = getRandomInstance(1); - + scheduler.newInstanceAvailable(i1); scheduler.newInstanceAvailable(i2); - + assertEquals(2, scheduler.getNumberOfAvailableSlots()); - + SlotSharingGroup sharingGroup = new SlotSharingGroup(); CoLocationConstraint c1 = new CoLocationConstraint(new CoLocationGroup()); - - SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1)); - SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup, c1)); - - SimpleSlot sSolo = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 0, 1))); - + + SimpleSlot s1 = scheduler.allocateSlot( + new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1), false).get(); + SimpleSlot s2 = scheduler.allocateSlot( + new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup, c1), false).get(); + + SimpleSlot sSolo = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 1)), false).get(); + ResourceID taskManager = s1.getTaskManagerID(); - + s1.releaseSlot(); s2.releaseSlot(); sSolo.releaseSlot(); - - SimpleSlot sNew = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1)); + + SimpleSlot sNew = scheduler.allocateSlot( + new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1), false).get(); assertEquals(taskManager, sNew.getTaskManagerID()); - + assertEquals(2, scheduler.getNumberOfLocalizedAssignments()); assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments()); assertEquals(2, scheduler.getNumberOfUnconstrainedAssignments()); @@ -206,41 +210,41 @@ public class ScheduleWithCoLocationHintTest { fail(e.getMessage()); } } - + @Test public void scheduleWithReleaseNoResource() { try { JobVertexID jid1 = new JobVertexID(); JobVertexID jid2 = new JobVertexID(); JobVertexID jid3 = new JobVertexID(); - + Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); - + Instance i1 = getRandomInstance(1); Instance i2 = getRandomInstance(1); - + scheduler.newInstanceAvailable(i1); scheduler.newInstanceAvailable(i2); - + assertEquals(2, scheduler.getNumberOfAvailableSlots()); - + SlotSharingGroup sharingGroup = new SlotSharingGroup(); CoLocationConstraint c1 = new CoLocationConstraint(new CoLocationGroup()); - - SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1)); + + SimpleSlot s1 = scheduler.allocateSlot( + new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1), false).get(); s1.releaseSlot(); - - scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 1))); - scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 2))); - - + + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1)), false).get(); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2)), false).get(); + try { - scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1)); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1), false).get(); fail("Scheduled even though no resource was available."); } catch (NoResourceAvailableException e) { // expected } - + assertEquals(0, scheduler.getNumberOfLocalizedAssignments()); assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments()); assertEquals(3, scheduler.getNumberOfUnconstrainedAssignments()); @@ -250,7 +254,7 @@ public class ScheduleWithCoLocationHintTest { fail(e.getMessage()); } } - + @Test public void scheduleMixedCoLocationSlotSharing() { try { @@ -276,27 +280,35 @@ public class ScheduleWithCoLocationHintTest { SlotSharingGroup shareGroup = new SlotSharingGroup(); // first wave - scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 4), shareGroup)); - scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), shareGroup)); - scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 4), shareGroup)); - scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), shareGroup)); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), shareGroup), false); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), shareGroup), false); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), shareGroup), false); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), shareGroup), false); // second wave - SimpleSlot s21 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 4), shareGroup, clc1)); - SimpleSlot s22 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 4), shareGroup, clc2)); - SimpleSlot s23 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 4), shareGroup, clc3)); - SimpleSlot s24 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 4), shareGroup, clc4)); + SimpleSlot s21 = scheduler.allocateSlot( + new ScheduledUnit(getTestVertex(jid2, 0, 4), shareGroup, clc1), false).get(); + SimpleSlot s22 = scheduler.allocateSlot( + new ScheduledUnit(getTestVertex(jid2, 2, 4), shareGroup, clc2), false).get(); + SimpleSlot s23 = scheduler.allocateSlot( + new ScheduledUnit(getTestVertex(jid2, 1, 4), shareGroup, clc3), false).get(); + SimpleSlot s24 = scheduler.allocateSlot( + new ScheduledUnit(getTestVertex(jid2, 3, 4), shareGroup, clc4), false).get(); // third wave - SimpleSlot s31 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 1, 4), shareGroup, clc2)); - SimpleSlot s32 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 2, 4), shareGroup, clc3)); - SimpleSlot s33 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 3, 4), shareGroup, clc4)); - SimpleSlot s34 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 4), shareGroup, clc1)); - - scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 0, 4), shareGroup)); - scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 1, 4), shareGroup)); - scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 2, 4), shareGroup)); - scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 3, 4), shareGroup)); + SimpleSlot s31 = scheduler.allocateSlot( + new ScheduledUnit(getTestVertex(jid3, 1, 4), shareGroup, clc2), false).get(); + SimpleSlot s32 = scheduler.allocateSlot( + new ScheduledUnit(getTestVertex(jid3, 2, 4), shareGroup, clc3), false).get(); + SimpleSlot s33 = scheduler.allocateSlot( + new ScheduledUnit(getTestVertex(jid3, 3, 4), shareGroup, clc4), false).get(); + SimpleSlot s34 = scheduler.allocateSlot( + new ScheduledUnit(getTestVertex(jid3, 0, 4), shareGroup, clc1), false).get(); + + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), shareGroup), false); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), shareGroup), false); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), shareGroup), false); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), shareGroup), false); assertEquals(s21.getTaskManagerID(), s34.getTaskManagerID()); assertEquals(s22.getTaskManagerID(), s31.getTaskManagerID()); @@ -341,20 +353,26 @@ public class ScheduleWithCoLocationHintTest { CoLocationConstraint cc2 = new CoLocationConstraint(ccg); // schedule something into the shared group so that both instances are in the sharing group - SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup)); - SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup)); + SimpleSlot s1 = scheduler.allocateSlot( + new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false).get(); + SimpleSlot s2 = scheduler.allocateSlot( + new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup), false).get(); // schedule one locally to instance 1 - SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc1)); + SimpleSlot s3 = scheduler.allocateSlot( + new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc1), false).get(); // schedule with co location constraint (yet unassigned) and a preference for // instance 1, but it can only get instance 2 - SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2)); + SimpleSlot s4 = scheduler.allocateSlot( + new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2), false).get(); // schedule something into the assigned co-location constraints and check that they override the // other preferences - SimpleSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid3, 0, 2, loc2), sharingGroup, cc1)); - SimpleSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid3, 1, 2, loc1), sharingGroup, cc2)); + SimpleSlot s5 = scheduler.allocateSlot( + new ScheduledUnit(getTestVertexWithLocation(jid3, 0, 2, loc2), sharingGroup, cc1), false).get(); + SimpleSlot s6 = scheduler.allocateSlot( + new ScheduledUnit(getTestVertexWithLocation(jid3, 1, 2, loc1), sharingGroup, cc2), false).get(); // check that each slot got three assertEquals(3, s1.getRoot().getNumberLeaves()); @@ -386,13 +404,13 @@ public class ScheduleWithCoLocationHintTest { fail(e.getMessage()); } } - + @Test public void testSlotReleasedInBetween() { try { JobVertexID jid1 = new JobVertexID(); JobVertexID jid2 = new JobVertexID(); - + Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); Instance i1 = getRandomInstance(1); @@ -403,36 +421,40 @@ public class ScheduleWithCoLocationHintTest { scheduler.newInstanceAvailable(i2); scheduler.newInstanceAvailable(i1); - + assertEquals(2, scheduler.getNumberOfAvailableSlots()); - + SlotSharingGroup sharingGroup = new SlotSharingGroup(); - + CoLocationGroup ccg = new CoLocationGroup(); CoLocationConstraint cc1 = new CoLocationConstraint(ccg); CoLocationConstraint cc2 = new CoLocationConstraint(ccg); - SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1)); - SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2)); - + SimpleSlot s1 = scheduler.allocateSlot( + new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false).get(); + SimpleSlot s2 = scheduler.allocateSlot( + new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false).get(); + s1.releaseSlot(); s2.releaseSlot(); - + assertEquals(2, scheduler.getNumberOfAvailableSlots()); assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots()); - SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1)); - SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2)); - + SimpleSlot s3 = scheduler.allocateSlot( + new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1), false).get(); + SimpleSlot s4 = scheduler.allocateSlot( + new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2), false).get(); + // still preserves the previous instance mapping) assertEquals(i1.getTaskManagerID(), s3.getTaskManagerID()); assertEquals(i2.getTaskManagerID(), s4.getTaskManagerID()); - + s3.releaseSlot(); s4.releaseSlot(); assertEquals(2, scheduler.getNumberOfAvailableSlots()); - + assertEquals(4, scheduler.getNumberOfLocalizedAssignments()); assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments()); assertEquals(0, scheduler.getNumberOfUnconstrainedAssignments()); @@ -442,14 +464,14 @@ public class ScheduleWithCoLocationHintTest { fail(e.getMessage()); } } - + @Test public void testSlotReleasedInBetweenAndNoNewLocal() { try { JobVertexID jid1 = new JobVertexID(); JobVertexID jid2 = new JobVertexID(); JobVertexID jidx = new JobVertexID(); - + Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); Instance i1 = getRandomInstance(1); @@ -460,41 +482,46 @@ public class ScheduleWithCoLocationHintTest { scheduler.newInstanceAvailable(i2); scheduler.newInstanceAvailable(i1); - + assertEquals(2, scheduler.getNumberOfAvailableSlots()); - + SlotSharingGroup sharingGroup = new SlotSharingGroup(); - + CoLocationGroup ccg = new CoLocationGroup(); CoLocationConstraint cc1 = new CoLocationConstraint(ccg); CoLocationConstraint cc2 = new CoLocationConstraint(ccg); - SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1)); - SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2)); - + SimpleSlot s1 = scheduler.allocateSlot( + new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false).get(); + SimpleSlot s2 = scheduler.allocateSlot( + new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false).get(); + s1.releaseSlot(); s2.releaseSlot(); - + assertEquals(2, scheduler.getNumberOfAvailableSlots()); assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots()); - SimpleSlot sa = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jidx, 0, 2))); - SimpleSlot sb = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jidx, 1, 2))); - + SimpleSlot sa = scheduler.allocateSlot( + new ScheduledUnit(getTestVertexWithLocation(jidx, 0, 2)), false).get(); + SimpleSlot sb = scheduler.allocateSlot( + new ScheduledUnit(getTestVertexWithLocation(jidx, 1, 2)), false).get(); + try { - scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1)); + scheduler.allocateSlot( + new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1), false); fail("should not be able to find a resource"); } catch (NoResourceAvailableException e) { // good } catch (Exception e) { fail("wrong exception"); } - + sa.releaseSlot(); sb.releaseSlot(); assertEquals(2, scheduler.getNumberOfAvailableSlots()); - + assertEquals(2, scheduler.getNumberOfLocalizedAssignments()); assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments()); assertEquals(2, scheduler.getNumberOfUnconstrainedAssignments()); @@ -504,15 +531,15 @@ public class ScheduleWithCoLocationHintTest { fail(e.getMessage()); } } - + @Test public void testScheduleOutOfOrder() { try { JobVertexID jid1 = new JobVertexID(); JobVertexID jid2 = new JobVertexID(); - + Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); - + Instance i1 = getRandomInstance(1); Instance i2 = getRandomInstance(1); @@ -520,11 +547,11 @@ public class ScheduleWithCoLocationHintTest { scheduler.newInstanceAvailable(i2); scheduler.newInstanceAvailable(i1); - + assertEquals(2, scheduler.getNumberOfAvailableSlots()); - + SlotSharingGroup sharingGroup = new SlotSharingGroup(); - + CoLocationGroup ccg = new CoLocationGroup(); CoLocationConstraint cc1 = new CoLocationConstraint(ccg); CoLocationConstraint cc2 = new CoLocationConstraint(ccg); @@ -532,33 +559,37 @@ public class ScheduleWithCoLocationHintTest { // schedule something from the second job vertex id before the first is filled, // and give locality preferences that hint at using the same shared slot for both // co location constraints (which we seek to prevent) - SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1)); - SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc2)); + SimpleSlot s1 = scheduler.allocateSlot( + new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false).get(); + SimpleSlot s2 = scheduler.allocateSlot( + new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc2), false).get(); + + SimpleSlot s3 = scheduler.allocateSlot( + new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc1), false).get(); + SimpleSlot s4 = scheduler.allocateSlot( + new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup, cc2), false).get(); - SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc1)); - SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup, cc2)); - // check that each slot got three assertEquals(2, s1.getRoot().getNumberLeaves()); assertEquals(2, s2.getRoot().getNumberLeaves()); - + assertEquals(s1.getTaskManagerID(), s3.getTaskManagerID()); assertEquals(s2.getTaskManagerID(), s4.getTaskManagerID()); - + // check the scheduler's bookkeeping assertEquals(0, scheduler.getNumberOfAvailableSlots()); - + assertEquals(3, scheduler.getNumberOfLocalizedAssignments()); assertEquals(1, scheduler.getNumberOfNonLocalizedAssignments()); assertEquals(0, scheduler.getNumberOfUnconstrainedAssignments()); - + // release some slots, be sure that new available ones come up s1.releaseSlot(); s2.releaseSlot(); s3.releaseSlot(); s4.releaseSlot(); assertEquals(2, scheduler.getNumberOfAvailableSlots()); - + assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots()); assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1)); assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2)); @@ -568,15 +599,15 @@ public class ScheduleWithCoLocationHintTest { fail(e.getMessage()); } } - + @Test public void nonColocationFollowsCoLocation() { try { JobVertexID jid1 = new JobVertexID(); JobVertexID jid2 = new JobVertexID(); - + Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); - + Instance i1 = getRandomInstance(1); Instance i2 = getRandomInstance(1); @@ -585,32 +616,36 @@ public class ScheduleWithCoLocationHintTest { scheduler.newInstanceAvailable(i2); scheduler.newInstanceAvailable(i1); - + assertEquals(2, scheduler.getNumberOfAvailableSlots()); SlotSharingGroup sharingGroup = new SlotSharingGroup(); - + CoLocationGroup ccg = new CoLocationGroup(); CoLocationConstraint cc1 = new CoLocationConstraint(ccg); CoLocationConstraint cc2 = new CoLocationConstraint(ccg); - SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1)); - SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2)); + SimpleSlot s1 = scheduler.allocateSlot( + new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false).get(); + SimpleSlot s2 = scheduler.allocateSlot( + new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false).get(); + + SimpleSlot s3 = scheduler.allocateSlot( + new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup), false).get(); + SimpleSlot s4 = scheduler.allocateSlot( + new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup), false).get(); - SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup)); - SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup)); - // check that each slot got two assertEquals(2, s1.getRoot().getNumberLeaves()); assertEquals(2, s2.getRoot().getNumberLeaves()); - + s1.releaseSlot(); s2.releaseSlot(); s3.releaseSlot(); s4.releaseSlot(); - + assertEquals(2, scheduler.getNumberOfAvailableSlots()); - + assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots()); assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1)); assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2)); http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java index 25498c4..d78f551 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java @@ -35,7 +35,6 @@ import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.a import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getDummyTask; import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance; import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getTestVertex; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -122,17 +121,17 @@ public class SchedulerIsolatedTasksTest { assertEquals(5, scheduler.getNumberOfAvailableSlots()); // schedule something into all slots - SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())); - SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())); - SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())); - SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())); - SimpleSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())); + SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get(); + SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get(); + SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get(); + SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get(); + SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get(); // the slots should all be different assertTrue(areAllDistinct(s1, s2, s3, s4, s5)); try { - scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())); + scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false); fail("Scheduler accepted scheduling request without available resource."); } catch (NoResourceAvailableException e) { @@ -145,8 +144,8 @@ public class SchedulerIsolatedTasksTest { assertEquals(2, scheduler.getNumberOfAvailableSlots()); // now we can schedule some more slots - SimpleSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())); - SimpleSlot s7 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())); + SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get(); + SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get(); assertTrue(areAllDistinct(s1, s2, s3, s4, s5, s6, s7)); @@ -245,7 +244,7 @@ public class SchedulerIsolatedTasksTest { disposeThread.start(); for (int i = 0; i < NUM_TASKS_TO_SCHEDULE; i++) { - SlotAllocationFuture future = scheduler.scheduleQueued(new ScheduledUnit(getDummyTask())); + SlotAllocationFuture future = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), true); future.setFutureAction(action); allAllocatedSlots.add(future); } @@ -287,11 +286,11 @@ public class SchedulerIsolatedTasksTest { scheduler.newInstanceAvailable(i3); List<SimpleSlot> slots = new ArrayList<SimpleSlot>(); - slots.add(scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()))); - slots.add(scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()))); - slots.add(scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()))); - slots.add(scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()))); - slots.add(scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()))); + slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get()); + slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get()); + slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get()); + slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get()); + slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get()); i2.markDead(); @@ -312,7 +311,7 @@ public class SchedulerIsolatedTasksTest { // cannot get another slot, since all instances are dead try { - scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())); + scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get(); fail("Scheduler served a slot from a dead instance"); } catch (NoResourceAvailableException e) { @@ -347,7 +346,7 @@ public class SchedulerIsolatedTasksTest { scheduler.newInstanceAvailable(i3); // schedule something on an arbitrary instance - SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(new Instance[0]))); + SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(new Instance[0])), false).get(); // figure out how we use the location hints Instance first = (Instance) s1.getOwner(); @@ -355,28 +354,28 @@ public class SchedulerIsolatedTasksTest { Instance third = first == i3 ? i2 : i3; // something that needs to go to the first instance again - SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(s1.getTaskManagerLocation()))); + SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(s1.getTaskManagerLocation())), false).get(); assertEquals(first, s2.getOwner()); // first or second --> second, because first is full - SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(first, second))); + SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, second)), false).get(); assertEquals(second, s3.getOwner()); // first or third --> third (because first is full) - SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(first, third))); - SimpleSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(first, third))); + SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false).get(); + SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false).get(); assertEquals(third, s4.getOwner()); assertEquals(third, s5.getOwner()); // first or third --> second, because all others are full - SimpleSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(first, third))); + SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false).get(); assertEquals(second, s6.getOwner()); // release something on the first and second instance s2.releaseSlot(); s6.releaseSlot(); - SimpleSlot s7 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(first, third))); + SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false).get(); assertEquals(first, s7.getOwner()); assertEquals(1, scheduler.getNumberOfUnconstrainedAssignments());
