Repository: flink Updated Branches: refs/heads/master 647c552a2 -> d63bc75ff
http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java index 1cc9301..aab8132 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java @@ -19,6 +19,8 @@ package org.apache.flink.runtime.jobmanager.scheduler; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -68,10 +70,10 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { testingSlotProvider.addTaskManager(2); // schedule 4 tasks from the first vertex group - LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); assertNotNull(s1); assertNotNull(s2); @@ -82,7 +84,7 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { // we cannot schedule another task from the first vertex group try { - testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); fail("Scheduler accepted too many tasks at the same time"); } catch (ExecutionException e) { @@ -96,7 +98,7 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { s3.releaseSlot(); // allocate another slot from that group - LogicalSlot s5 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot s5 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); assertNotNull(s5); // release all old slots @@ -104,9 +106,9 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { s2.releaseSlot(); s4.releaseSlot(); - LogicalSlot s6 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s7 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 6, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s8 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 7, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot s6 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s7 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 6, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s8 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 7, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); assertNotNull(s6); assertNotNull(s7); @@ -151,10 +153,10 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { testingSlotProvider.addTaskManager(2); // schedule 4 tasks from the first vertex group - LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); assertNotNull(s1); assertNotNull(s2); @@ -165,7 +167,7 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { // we cannot schedule another task from the first vertex group try { - testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); fail("Scheduler accepted too many tasks at the same time"); } catch (ExecutionException e) { @@ -176,10 +178,10 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { } // schedule some tasks from the second ID group - LogicalSlot s1_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s3_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s4_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot s1_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s3_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s4_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); assertNotNull(s1_2); assertNotNull(s2_2); @@ -188,7 +190,7 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { // we cannot schedule another task from the second vertex group try { - testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); fail("Scheduler accepted too many tasks at the same time"); } catch (ExecutionException e) { @@ -209,7 +211,7 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { // we can still not schedule anything from the second group of vertices try { - testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); fail("Scheduler accepted too many tasks at the same time"); } catch (ExecutionException e) { @@ -220,7 +222,7 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { } // we can schedule something from the first vertex group - LogicalSlot s5 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot s5 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); assertNotNull(s5); assertEquals(4, testingSlotProvider.getNumberOfSlots(sharingGroup)); @@ -230,7 +232,7 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { // now we release a slot from the second vertex group and schedule another task from that group s2_2.releaseSlot(); - LogicalSlot s5_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot s5_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); assertNotNull(s5_2); // release all slots @@ -265,10 +267,10 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { testingSlotProvider.addTaskManager(2); // schedule 4 tasks from the first vertex group - LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); assertEquals(4, testingSlotProvider.getNumberOfSlots(sharingGroup)); assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1)); @@ -284,10 +286,10 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid2)); // schedule some tasks from the second ID group - LogicalSlot s1_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s3_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s4_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot s1_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s3_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s4_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); assertEquals(4, testingSlotProvider.getNumberOfSlots(sharingGroup)); assertEquals(4, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1)); @@ -329,10 +331,10 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { testingSlotProvider.addTaskManager(2); // schedule 4 tasks from the first vertex group - LogicalSlot s1_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s2_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s3_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s4_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot s1_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s2_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s3_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s4_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); assertNotNull(s1_1); assertNotNull(s2_1); @@ -342,10 +344,10 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { assertTrue(areAllDistinct(s1_1, s2_1, s3_1, s4_1)); // schedule 4 tasks from the second vertex group - LogicalSlot s1_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s3_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s4_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot s1_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s3_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s4_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); assertNotNull(s1_2); assertNotNull(s2_2); @@ -355,10 +357,10 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { assertTrue(areAllDistinct(s1_2, s2_2, s3_2, s4_2)); // schedule 4 tasks from the third vertex group - LogicalSlot s1_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s2_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s3_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s4_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot s1_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s2_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s3_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s4_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); assertNotNull(s1_3); assertNotNull(s2_3); @@ -370,7 +372,7 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { // we cannot schedule another task from the second vertex group try { - testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); fail("Scheduler accepted too many tasks at the same time"); } catch (ExecutionException e) { @@ -386,9 +388,9 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { s3_2.releaseSlot(); s4_2.releaseSlot(); - LogicalSlot s5_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s6_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 6, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s7_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 7, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot s5_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s6_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 6, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s7_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 7, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); assertNotNull(s5_2); assertNotNull(s6_2); @@ -438,9 +440,9 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { testingSlotProvider.addTaskManager(2); // schedule 1 tasks from the first vertex group and 2 from the second - LogicalSlot s1_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 2, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s2_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 2, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot s1_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 2, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s2_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 2, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); assertNotNull(s1_1); assertNotNull(s2_1); @@ -456,7 +458,7 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { // this should free one slot so we can allocate one non-shared - LogicalSlot sx = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1, null)), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot sx = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); assertNotNull(sx); assertEquals(1, testingSlotProvider.getNumberOfSlots(sharingGroup)); @@ -490,28 +492,28 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { testingSlotProvider.addTaskManager(2); // schedule some individual vertices - LogicalSlot sA2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 1, 2, null)), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot sA1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 0, 2, null)), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot sA2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 1, 2, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot sA1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 0, 2, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); assertNotNull(sA1); assertNotNull(sA2); // schedule some vertices in the sharing group - LogicalSlot s1_0 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s1_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s2_0 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s2_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot s1_0 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s1_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s2_0 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s2_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); assertNotNull(s1_0); assertNotNull(s1_1); assertNotNull(s2_0); assertNotNull(s2_1); // schedule another isolated vertex - LogicalSlot sB1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 1, 3, null)), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot sB1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 1, 3, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); assertNotNull(sB1); // should not be able to schedule more vertices try { - testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); fail("Scheduler accepted too many tasks at the same time"); } catch (ExecutionException e) { @@ -522,7 +524,7 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { } try { - testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); fail("Scheduler accepted too many tasks at the same time"); } catch (ExecutionException e) { @@ -533,7 +535,7 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { } try { - testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3, null)), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); fail("Scheduler accepted too many tasks at the same time"); } catch (ExecutionException e) { @@ -544,7 +546,7 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { } try { - testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 1, null)), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 1, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); fail("Scheduler accepted too many tasks at the same time"); } catch (ExecutionException e) { @@ -557,8 +559,8 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { // release some isolated task and check that the sharing group may grow sA1.releaseSlot(); - LogicalSlot s1_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot s1_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); assertNotNull(s1_2); assertNotNull(s2_2); @@ -570,19 +572,19 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { assertEquals(1, testingSlotProvider.getNumberOfAvailableSlots()); // schedule one more no-shared task - LogicalSlot sB0 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3, null)), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot sB0 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); assertNotNull(sB0); // release the last of the original shared slots and allocate one more non-shared slot s2_1.releaseSlot(); - LogicalSlot sB2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 2, 3, null)), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot sB2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 2, 3, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); assertNotNull(sB2); // release on non-shared and add some shared slots sA2.releaseSlot(); - LogicalSlot s1_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s2_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot s1_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s2_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); assertNotNull(s1_3); assertNotNull(s2_3); @@ -592,8 +594,8 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { s1_3.releaseSlot(); s2_3.releaseSlot(); - LogicalSlot sC0 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 1, 2, null)), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot sC1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 2, null)), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot sC0 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 1, 2, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot sC1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 2, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); assertNotNull(sC0); assertNotNull(sC1); @@ -633,8 +635,8 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { TaskManagerLocation loc2 = testingSlotProvider.addTaskManager(2); // schedule one to each instance - LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1), TestingUtils.infiniteTime()).get(); - LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc2), TestingUtils.infiniteTime()).get(); + LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); + LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get(); assertNotNull(s1); assertNotNull(s2); @@ -643,8 +645,8 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { assertEquals(loc2, s2.getTaskManagerLocation()); // schedule one from the other group to each instance - LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1), TestingUtils.infiniteTime()).get(); - LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc2), TestingUtils.infiniteTime()).get(); + LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); + LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get(); assertNotNull(s3); assertNotNull(s4); @@ -679,8 +681,8 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { TaskManagerLocation loc2 = testingSlotProvider.addTaskManager(2); // schedule one to each instance - LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1), TestingUtils.infiniteTime()).get(); - LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1), TestingUtils.infiniteTime()).get(); + LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); + LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); assertNotNull(s1); assertNotNull(s2); @@ -689,8 +691,8 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { assertEquals(loc1, s2.getTaskManagerLocation()); // schedule one from the other group to each instance - LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc2), TestingUtils.infiniteTime()).get(); - LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc2), TestingUtils.infiniteTime()).get(); + LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get(); + LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get(); assertNotNull(s3); assertNotNull(s4); @@ -724,14 +726,14 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { TaskManagerLocation loc2 = testingSlotProvider.addTaskManager(2); // schedule until the one instance is full - LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1), TestingUtils.infiniteTime()).get(); - LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1), TestingUtils.infiniteTime()).get(); - LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 4, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1), TestingUtils.infiniteTime()).get(); - LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 4, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1), TestingUtils.infiniteTime()).get(); + LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); + LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); + LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 4, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); + LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 4, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); // schedule two more with preference of same instance --> need to go to other instance - LogicalSlot s5 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 3, 4, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1), TestingUtils.infiniteTime()).get(); - LogicalSlot s6 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 4, 4, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1), TestingUtils.infiniteTime()).get(); + LogicalSlot s5 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 3, 4, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); + LogicalSlot s6 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 4, 4, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); assertNotNull(s1); assertNotNull(s2); @@ -775,19 +777,19 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { testingSlotProvider.addTaskManager(4); // allocate something from group 1 and 2 interleaved with schedule for group 3 - LogicalSlot slot_1_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot slot_1_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot slot_1_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot slot_1_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); - LogicalSlot slot_2_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot slot_2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot slot_2_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot slot_2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); - LogicalSlot slot_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot slot_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); - LogicalSlot slot_1_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot slot_1_4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot slot_1_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot slot_1_4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); - LogicalSlot slot_2_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot slot_2_4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot slot_2_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot slot_2_4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); // release groups 1 and 2 @@ -803,10 +805,10 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { // allocate group 4 - LogicalSlot slot_4_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot slot_4_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot slot_4_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot slot_4_4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot slot_4_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot slot_4_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot slot_4_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot slot_4_4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); // release groups 3 and 4 @@ -855,7 +857,7 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { @Override public void run() { try { - LogicalSlot slot = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, enumerator4.getAndIncrement(), 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot slot = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, enumerator4.getAndIncrement(), 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); sleepUninterruptibly(rnd.nextInt(5)); slot.releaseSlot(); @@ -877,7 +879,7 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { public void run() { try { if (flag3.compareAndSet(false, true)) { - LogicalSlot slot = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot slot = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); sleepUninterruptibly(5); executor.execute(deploy4); @@ -905,7 +907,7 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { @Override public void run() { try { - LogicalSlot slot = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, enumerator2.getAndIncrement(), 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot slot = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, enumerator2.getAndIncrement(), 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); // wait a bit till scheduling the successor sleepUninterruptibly(rnd.nextInt(5)); @@ -932,7 +934,7 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { @Override public void run() { try { - LogicalSlot slot = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, enumerator1.getAndIncrement(), 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot slot = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, enumerator1.getAndIncrement(), 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); // wait a bit till scheduling the successor sleepUninterruptibly(rnd.nextInt(5)); @@ -1005,27 +1007,27 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { scheduler.newInstanceAvailable(getRandomInstance(4)); // schedule one task for the first and second vertex - LogicalSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); assertEquals( s1.getTaskManagerLocation(), s2.getTaskManagerLocation() ); assertEquals(3, scheduler.getNumberOfAvailableSlots()); - LogicalSlot s3_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s4_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot s3_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s4_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); s1.releaseSlot(); s2.releaseSlot(); - LogicalSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); try { - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); fail("should throw an exception"); } catch (ExecutionException e) { @@ -1050,4 +1052,8 @@ public class SchedulerSlotSharingTest extends SchedulerTestBase { fail(e.getMessage()); } } + + private static SlotProfile slotProfileForLocation(TaskManagerLocation location) { + return new SlotProfile(ResourceProfile.UNKNOWN, Collections.singletonList(location), Collections.emptyList()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java index a28e890..d9919ac 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.jobmanager.scheduler; import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionGraph; @@ -32,7 +33,6 @@ import org.apache.flink.util.TestLogger; import org.hamcrest.Matchers; import org.junit.Test; -import java.util.Collections; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -123,7 +123,7 @@ public class SchedulerTest extends TestLogger { new ScheduledUnit( execution), true, - Collections.emptyList(), + SlotProfile.noRequirements(), Time.milliseconds(1L)); try { http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java index 9cb9cff..9738449 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.SlotSharingGroupId; @@ -156,8 +157,13 @@ public class SchedulerTestBase extends TestLogger { } @Override - public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit task, boolean allowQueued, Collection<TaskManagerLocation> preferredLocations, Time allocationTimeout) { - return scheduler.allocateSlot(task, allowQueued, preferredLocations, allocationTimeout); + public CompletableFuture<LogicalSlot> allocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit task, + boolean allowQueued, + SlotProfile slotProfile, + Time allocationTimeout) { + return scheduler.allocateSlot(task, allowQueued, slotProfile, allocationTimeout); } @Override @@ -349,8 +355,13 @@ public class SchedulerTestBase extends TestLogger { } @Override - public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit task, boolean allowQueued, Collection<TaskManagerLocation> preferredLocations, Time allocationTimeout) { - return slotProvider.allocateSlot(task, allowQueued, preferredLocations, allocationTimeout).thenApply( + public CompletableFuture<LogicalSlot> allocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit task, + boolean allowQueued, + SlotProfile slotProfile, + Time allocationTimeout) { + return slotProvider.allocateSlot(task, allowQueued, slotProfile, allocationTimeout).thenApply( (LogicalSlot logicalSlot) -> { switch (logicalSlot.getLocality()) { case LOCAL: http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java index 2d18c65..c0074ed 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -98,9 +99,9 @@ public class AvailableSlotsTest extends TestLogger { assertTrue(availableSlots.contains(slot1.getAllocationId())); assertTrue(availableSlots.containsTaskManager(resource1)); - assertNull(availableSlots.poll(DEFAULT_TESTING_BIG_PROFILE, null)); + assertNull(availableSlots.poll(SlotProfile.noLocality(DEFAULT_TESTING_BIG_PROFILE))); - SlotAndLocality slotAndLocality = availableSlots.poll(DEFAULT_TESTING_PROFILE, null); + SlotAndLocality slotAndLocality = availableSlots.poll(SlotProfile.noLocality(DEFAULT_TESTING_PROFILE)); assertEquals(slot1, slotAndLocality.getSlot()); assertEquals(0, availableSlots.size()); assertFalse(availableSlots.contains(slot1.getAllocationId())); http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java index 9a0256c..0375760 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -35,7 +36,6 @@ import org.apache.flink.runtime.testingUtils.TestingUtils; import org.junit.Test; -import java.util.Collections; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; @@ -79,7 +79,7 @@ public class SlotPoolCoLocationTest extends SlotPoolSchedulingTestBase { slotSharingGroupId, coLocationConstraint1), true, - Collections.emptyList(), + SlotProfile.noRequirements(), TestingUtils.infiniteTime()); CompletableFuture<LogicalSlot> logicalSlotFuture22 = slotProvider.allocateSlot( @@ -88,7 +88,7 @@ public class SlotPoolCoLocationTest extends SlotPoolSchedulingTestBase { slotSharingGroupId, coLocationConstraint2), true, - Collections.emptyList(), + SlotProfile.noRequirements(), TestingUtils.infiniteTime()); CompletableFuture<LogicalSlot> logicalSlotFuture12 = slotProvider.allocateSlot( @@ -97,7 +97,7 @@ public class SlotPoolCoLocationTest extends SlotPoolSchedulingTestBase { slotSharingGroupId, coLocationConstraint1), true, - Collections.emptyList(), + SlotProfile.noRequirements(), TestingUtils.infiniteTime()); CompletableFuture<LogicalSlot> logicalSlotFuture21 = slotProvider.allocateSlot( @@ -106,7 +106,7 @@ public class SlotPoolCoLocationTest extends SlotPoolSchedulingTestBase { slotSharingGroupId, coLocationConstraint2), true, - Collections.emptyList(), + SlotProfile.noRequirements(), TestingUtils.infiniteTime()); final AllocationID allocationId1 = allocationIds.take(); http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java index b2be97e..cc837bc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.jobmanager.scheduler.DummyScheduledUnit; @@ -58,7 +59,6 @@ import org.junit.experimental.categories.Category; import javax.annotation.Nullable; -import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; @@ -121,8 +121,7 @@ public class SlotPoolRpcTest extends TestLogger { CompletableFuture<LogicalSlot> future = pool.allocateSlot( new SlotRequestId(), new ScheduledUnit(SchedulerTestUtils.getDummyTask()), - DEFAULT_TESTING_PROFILE, - Collections.emptyList(), + SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), true, fastTimeout); @@ -158,8 +157,7 @@ public class SlotPoolRpcTest extends TestLogger { CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot( requestId, new ScheduledUnit(SchedulerTestUtils.getDummyTask()), - DEFAULT_TESTING_PROFILE, - Collections.emptyList(), + SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), true, fastTimeout); @@ -204,8 +202,7 @@ public class SlotPoolRpcTest extends TestLogger { CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot( requestId, new DummyScheduledUnit(), - DEFAULT_TESTING_PROFILE, - Collections.emptyList(), + SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), true, fastTimeout); @@ -252,8 +249,7 @@ public class SlotPoolRpcTest extends TestLogger { CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot( requestId, new ScheduledUnit(SchedulerTestUtils.getDummyTask()), - DEFAULT_TESTING_PROFILE, - Collections.emptyList(), + SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), true, fastTimeout); @@ -313,7 +309,7 @@ public class SlotPoolRpcTest extends TestLogger { CompletableFuture<LogicalSlot> future = pool.getSlotProvider().allocateSlot( new DummyScheduledUnit(), true, - Collections.emptyList(), + SlotProfile.noRequirements(), fastTimeout); try { http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java index b7fa484..f33e1a1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -35,7 +36,6 @@ import org.apache.flink.util.FlinkException; import org.junit.Test; -import java.util.Collections; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; @@ -67,7 +67,7 @@ public class SlotPoolSlotSharingTest extends SlotPoolSchedulingTestBase { slotSharingGroupId, null), true, - Collections.emptyList(), + SlotProfile.noRequirements(), TestingUtils.infiniteTime()); assertFalse(logicalSlotFuture.isDone()); @@ -104,7 +104,7 @@ public class SlotPoolSlotSharingTest extends SlotPoolSchedulingTestBase { new SlotSharingGroupId(), null), true, - Collections.emptyList(), + SlotProfile.noRequirements(), TestingUtils.infiniteTime()); final AllocationID allocationId = allocationIdFuture.get(); @@ -143,7 +143,7 @@ public class SlotPoolSlotSharingTest extends SlotPoolSchedulingTestBase { slotSharingGroupId, null), true, - Collections.emptyList(), + SlotProfile.noRequirements(), TestingUtils.infiniteTime()); CompletableFuture<LogicalSlot> logicalSlotFuture2 = slotProvider.allocateSlot( @@ -152,7 +152,7 @@ public class SlotPoolSlotSharingTest extends SlotPoolSchedulingTestBase { slotSharingGroupId, null), true, - Collections.emptyList(), + SlotProfile.noRequirements(), TestingUtils.infiniteTime()); assertFalse(logicalSlotFuture1.isDone()); @@ -166,7 +166,7 @@ public class SlotPoolSlotSharingTest extends SlotPoolSchedulingTestBase { slotSharingGroupId, null), true, - Collections.emptyList(), + SlotProfile.noRequirements(), TestingUtils.infiniteTime()); CompletableFuture<LogicalSlot> logicalSlotFuture4 = slotProvider.allocateSlot( @@ -175,7 +175,7 @@ public class SlotPoolSlotSharingTest extends SlotPoolSchedulingTestBase { slotSharingGroupId, null), true, - Collections.emptyList(), + SlotProfile.noRequirements(), TestingUtils.infiniteTime()); assertFalse(logicalSlotFuture3.isDone()); @@ -242,7 +242,7 @@ public class SlotPoolSlotSharingTest extends SlotPoolSchedulingTestBase { slotSharingGroupId1, null), true, - Collections.emptyList(), + SlotProfile.noRequirements(), TestingUtils.infiniteTime()); CompletableFuture<LogicalSlot> logicalSlotFuture2 = slotProvider.allocateSlot( @@ -251,7 +251,7 @@ public class SlotPoolSlotSharingTest extends SlotPoolSchedulingTestBase { slotSharingGroupId1, null), true, - Collections.emptyList(), + SlotProfile.noRequirements(), TestingUtils.infiniteTime()); CompletableFuture<LogicalSlot> logicalSlotFuture3 = slotProvider.allocateSlot( @@ -260,7 +260,7 @@ public class SlotPoolSlotSharingTest extends SlotPoolSchedulingTestBase { slotSharingGroupId2, null), true, - Collections.emptyList(), + SlotProfile.noRequirements(), TestingUtils.infiniteTime()); CompletableFuture<LogicalSlot> logicalSlotFuture4 = slotProvider.allocateSlot( @@ -269,7 +269,7 @@ public class SlotPoolSlotSharingTest extends SlotPoolSchedulingTestBase { slotSharingGroupId2, null), true, - Collections.emptyList(), + SlotProfile.noRequirements(), TestingUtils.infiniteTime()); assertFalse(logicalSlotFuture1.isDone()); http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java index d6e0521..c529ceb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -122,8 +123,7 @@ public class SlotPoolTest extends TestLogger { CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot( requestId, new DummyScheduledUnit(), - DEFAULT_TESTING_PROFILE, - Collections.emptyList(), + SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), true, timeout); assertFalse(future.isDone()); @@ -165,15 +165,13 @@ public class SlotPoolTest extends TestLogger { CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot( new SlotRequestId(), new DummyScheduledUnit(), - DEFAULT_TESTING_PROFILE, - Collections.emptyList(), + SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), true, timeout); CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot( new SlotRequestId(), new DummyScheduledUnit(), - DEFAULT_TESTING_PROFILE, - Collections.emptyList(), + SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), true, timeout); @@ -229,8 +227,7 @@ public class SlotPoolTest extends TestLogger { CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot( new SlotRequestId(), new DummyScheduledUnit(), - DEFAULT_TESTING_PROFILE, - Collections.emptyList(), + SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), true, timeout); assertFalse(future1.isDone()); @@ -253,8 +250,7 @@ public class SlotPoolTest extends TestLogger { CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot( new SlotRequestId(), new DummyScheduledUnit(), - DEFAULT_TESTING_PROFILE, - Collections.emptyList(), + SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), true, timeout); @@ -287,8 +283,7 @@ public class SlotPoolTest extends TestLogger { CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot( new SlotRequestId(), new DummyScheduledUnit(), - DEFAULT_TESTING_PROFILE, - Collections.emptyList(), + SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), true, timeout); assertFalse(future.isDone()); @@ -362,8 +357,7 @@ public class SlotPoolTest extends TestLogger { CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot( new SlotRequestId(), new DummyScheduledUnit(), - DEFAULT_TESTING_PROFILE, - Collections.emptyList(), + SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), true, timeout); @@ -372,8 +366,7 @@ public class SlotPoolTest extends TestLogger { CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot( new SlotRequestId(), new DummyScheduledUnit(), - DEFAULT_TESTING_PROFILE, - Collections.emptyList(), + SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), true, timeout); @@ -427,11 +420,15 @@ public class SlotPoolTest extends TestLogger { try { final SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway); + SlotProfile slotProfile = new SlotProfile( + ResourceProfile.UNKNOWN, + Collections.emptyList(), + Collections.emptyList()); + CompletableFuture<LogicalSlot> slotFuture = slotPoolGateway.allocateSlot( new SlotRequestId(), scheduledUnit, - ResourceProfile.UNKNOWN, - Collections.emptyList(), + slotProfile, true, timeout); @@ -485,8 +482,7 @@ public class SlotPoolTest extends TestLogger { CompletableFuture<LogicalSlot> slotFuture1 = slotPoolGateway.allocateSlot( slotRequestId1, scheduledUnit, - ResourceProfile.UNKNOWN, - Collections.emptyList(), + SlotProfile.noRequirements(), true, timeout); @@ -496,8 +492,7 @@ public class SlotPoolTest extends TestLogger { CompletableFuture<LogicalSlot> slotFuture2 = slotPoolGateway.allocateSlot( slotRequestId2, scheduledUnit, - ResourceProfile.UNKNOWN, - Collections.emptyList(), + SlotProfile.noRequirements(), true, timeout); http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java index c18b3b3..ec6eae2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.instance.SimpleSlotContext; import org.apache.flink.runtime.instance.SlotSharingGroupId; @@ -415,7 +416,8 @@ public class SlotSharingManagerTest extends TestLogger { new SlotRequestId()); AbstractID groupId = new AbstractID(); - SlotSharingManager.MultiTaskSlotLocality resolvedRootSlotLocality = slotSharingManager.getResolvedRootSlot(groupId, Collections.emptyList()); + SlotSharingManager.MultiTaskSlotLocality resolvedRootSlotLocality = + slotSharingManager.getResolvedRootSlot(groupId, SlotProfile.noRequirements().matcher()); assertNotNull(resolvedRootSlotLocality); assertEquals(Locality.UNCONSTRAINED, resolvedRootSlotLocality.getLocality()); @@ -431,7 +433,7 @@ public class SlotSharingManagerTest extends TestLogger { SlotSharingManager.MultiTaskSlotLocality resolvedRootSlot1 = slotSharingManager.getResolvedRootSlot( groupId, - Collections.emptyList()); + SlotProfile.noRequirements().matcher()); assertNull(resolvedRootSlot1); } @@ -470,7 +472,9 @@ public class SlotSharingManagerTest extends TestLogger { new SlotRequestId()); AbstractID groupId = new AbstractID(); - SlotSharingManager.MultiTaskSlotLocality resolvedRootSlot1 = slotSharingManager.getResolvedRootSlot(groupId, Collections.singleton(taskManagerLocation)); + SlotProfile.LocalityAwareRequirementsToSlotMatcher matcher = + new SlotProfile.LocalityAwareRequirementsToSlotMatcher(Collections.singleton(taskManagerLocation)); + SlotSharingManager.MultiTaskSlotLocality resolvedRootSlot1 = slotSharingManager.getResolvedRootSlot(groupId, matcher); assertNotNull(resolvedRootSlot1); assertEquals(Locality.LOCAL, resolvedRootSlot1.getLocality()); assertEquals(rootSlot2.getSlotRequestId(), resolvedRootSlot1.getMultiTaskSlot().getSlotRequestId()); @@ -481,7 +485,7 @@ public class SlotSharingManagerTest extends TestLogger { groupId, resolvedRootSlot1.getLocality()); - SlotSharingManager.MultiTaskSlotLocality resolvedRootSlot2 = slotSharingManager.getResolvedRootSlot(groupId, Collections.singleton(taskManagerLocation)); + SlotSharingManager.MultiTaskSlotLocality resolvedRootSlot2 = slotSharingManager.getResolvedRootSlot(groupId,matcher); assertNotNull(resolvedRootSlot2); assertNotSame(Locality.LOCAL, (resolvedRootSlot2.getLocality()));
