http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java index ecd88bc..ed3c361 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java @@ -19,8 +19,10 @@ package org.apache.flink.runtime.jobmanager.scheduler; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testingUtils.TestingUtils; @@ -67,18 +69,18 @@ public class ScheduleWithCoLocationHintTest extends SchedulerTestBase { CoLocationConstraint c6 = new CoLocationConstraint(ccg); // schedule 4 tasks from the first vertex group - LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c1), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c2), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c3), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c4), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s5 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c1), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s6 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c2), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s7 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c3), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s8 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c5), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s9 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c6), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s10 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c4), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s11 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c5), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s12 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c6), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c1), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c2), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c3), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c4), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s5 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c1), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s6 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c2), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s7 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c3), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s8 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c5), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s9 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c6), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s10 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c4), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s11 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c5), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s12 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c6), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); assertNotNull(s1); assertNotNull(s2); @@ -128,7 +130,7 @@ public class ScheduleWithCoLocationHintTest extends SchedulerTestBase { assertTrue(testingSlotProvider.getNumberOfAvailableSlots() >= 1); LogicalSlot single = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertex(new JobVertexID(), 0, 1, null)), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertex(new JobVertexID(), 0, 1, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); assertNotNull(single); s1.releaseSlot(); @@ -165,11 +167,11 @@ public class ScheduleWithCoLocationHintTest extends SchedulerTestBase { CoLocationConstraint c1 = new CoLocationConstraint(new CoLocationGroup()); LogicalSlot s1 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertex(jid1, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId(), c1), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertex(jid1, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId(), c1), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); LogicalSlot s2 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertex(jid2, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId(), c1), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertex(jid2, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId(), c1), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); - LogicalSlot sSolo = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 1, null)), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot sSolo = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 1, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); ResourceID taskManager = s1.getTaskManagerLocation().getResourceID(); @@ -178,7 +180,7 @@ public class ScheduleWithCoLocationHintTest extends SchedulerTestBase { sSolo.releaseSlot(); LogicalSlot sNew = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertex(jid3, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId(), c1), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertex(jid3, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId(), c1), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); assertEquals(taskManager, sNew.getTaskManagerLocation().getResourceID()); assertEquals(2, testingSlotProvider.getNumberOfLocalizedAssignments()); @@ -201,14 +203,14 @@ public class ScheduleWithCoLocationHintTest extends SchedulerTestBase { CoLocationConstraint c1 = new CoLocationConstraint(new CoLocationGroup()); LogicalSlot s1 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertex(jid1, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId(), c1), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertex(jid1, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId(), c1), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); s1.releaseSlot(); - testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1, null)), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2, null)), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); try { - testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId(), c1), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId(), c1), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); fail("Scheduled even though no resource was available."); } catch (ExecutionException e) { assertTrue(e.getCause() instanceof NoResourceAvailableException); @@ -242,35 +244,35 @@ public class ScheduleWithCoLocationHintTest extends SchedulerTestBase { SlotSharingGroup shareGroup = new SlotSharingGroup(); // first wave - testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4, shareGroup), shareGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, shareGroup), shareGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4, shareGroup), shareGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4, shareGroup), shareGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4, shareGroup), shareGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, shareGroup), shareGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4, shareGroup), shareGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4, shareGroup), shareGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); // second wave LogicalSlot s21 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertex(jid2, 0, 4, shareGroup), shareGroup.getSlotSharingGroupId(), clc1), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertex(jid2, 0, 4, shareGroup), shareGroup.getSlotSharingGroupId(), clc1), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); LogicalSlot s22 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertex(jid2, 2, 4, shareGroup), shareGroup.getSlotSharingGroupId(), clc2), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertex(jid2, 2, 4, shareGroup), shareGroup.getSlotSharingGroupId(), clc2), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); LogicalSlot s23 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertex(jid2, 1, 4, shareGroup), shareGroup.getSlotSharingGroupId(), clc3), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertex(jid2, 1, 4, shareGroup), shareGroup.getSlotSharingGroupId(), clc3), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); LogicalSlot s24 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertex(jid2, 3, 4, shareGroup), shareGroup.getSlotSharingGroupId(), clc4), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertex(jid2, 3, 4, shareGroup), shareGroup.getSlotSharingGroupId(), clc4), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); // third wave LogicalSlot s31 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertex(jid3, 1, 4, shareGroup), shareGroup.getSlotSharingGroupId(), clc2), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertex(jid3, 1, 4, shareGroup), shareGroup.getSlotSharingGroupId(), clc2), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); LogicalSlot s32 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertex(jid3, 2, 4, shareGroup), shareGroup.getSlotSharingGroupId(), clc3), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertex(jid3, 2, 4, shareGroup), shareGroup.getSlotSharingGroupId(), clc3), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); LogicalSlot s33 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertex(jid3, 3, 4, shareGroup), shareGroup.getSlotSharingGroupId(), clc4), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertex(jid3, 3, 4, shareGroup), shareGroup.getSlotSharingGroupId(), clc4), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); LogicalSlot s34 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertex(jid3, 0, 4, shareGroup), shareGroup.getSlotSharingGroupId(), clc1), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertex(jid3, 0, 4, shareGroup), shareGroup.getSlotSharingGroupId(), clc1), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); - testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4, shareGroup), shareGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4, shareGroup), shareGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4, shareGroup), shareGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4, shareGroup), shareGroup.getSlotSharingGroupId()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4, shareGroup), shareGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4, shareGroup), shareGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4, shareGroup), shareGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4, shareGroup), shareGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); assertEquals(s21.getTaskManagerLocation(), s34.getTaskManagerLocation()); assertEquals(s22.getTaskManagerLocation(), s31.getTaskManagerLocation()); @@ -302,25 +304,25 @@ public class ScheduleWithCoLocationHintTest extends SchedulerTestBase { // schedule something into the shared group so that both instances are in the sharing group LogicalSlot s1 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); LogicalSlot s2 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc2), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get(); // schedule one locally to instance 1 LogicalSlot s3 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc1), false, Collections.singleton(loc1), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc1), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); // schedule with co location constraint (yet unassigned) and a preference for // instance 1, but it can only get instance 2 LogicalSlot s4 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc2), false, Collections.singleton(loc1), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc2), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); // schedule something into the assigned co-location constraints and check that they override the // other preferences LogicalSlot s5 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid3, 0, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId(), cc1), false, Collections.singleton(loc2), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid3, 0, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId(), cc1), false, slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get(); LogicalSlot s6 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid3, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc2), false, Collections.singleton(loc1), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid3, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc2), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); // check that each slot got three assertEquals(s1.getTaskManagerLocation(), s3.getTaskManagerLocation()); @@ -362,9 +364,9 @@ public class ScheduleWithCoLocationHintTest extends SchedulerTestBase { CoLocationConstraint cc2 = new CoLocationConstraint(ccg); LogicalSlot s1 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc1), false, Collections.singleton(loc1), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc1), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); LogicalSlot s2 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId(), cc2), false, Collections.singleton(loc2), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId(), cc2), false, slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get(); s1.releaseSlot(); s2.releaseSlot(); @@ -373,9 +375,9 @@ public class ScheduleWithCoLocationHintTest extends SchedulerTestBase { assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots()); LogicalSlot s3 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId(), cc1), false, Collections.singleton(loc2), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId(), cc1), false, slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get(); LogicalSlot s4 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc2), false, Collections.singleton(loc1), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc2), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); // still preserves the previous instance mapping) assertEquals(loc1, s3.getTaskManagerLocation()); @@ -409,9 +411,9 @@ public class ScheduleWithCoLocationHintTest extends SchedulerTestBase { CoLocationConstraint cc2 = new CoLocationConstraint(ccg); LogicalSlot s1 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc1), false, Collections.singleton(loc1), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc1), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); LogicalSlot s2 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId(), cc2), false, Collections.singleton(loc2), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId(), cc2), false, slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get(); s1.releaseSlot(); s2.releaseSlot(); @@ -420,13 +422,13 @@ public class ScheduleWithCoLocationHintTest extends SchedulerTestBase { assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots()); LogicalSlot sa = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jidx, 0, 2, null)), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jidx, 0, 2, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); LogicalSlot sb = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jidx, 1, 2, null)), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jidx, 1, 2, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); try { testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId(), cc1), false, Collections.singleton(loc2), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId(), cc1), false, slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get(); fail("should not be able to find a resource"); } catch (ExecutionException e) { @@ -466,14 +468,14 @@ public class ScheduleWithCoLocationHintTest extends SchedulerTestBase { // and give locality preferences that hint at using the same shared slot for both // co location constraints (which we seek to prevent) LogicalSlot s1 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc1), false, Collections.singleton(loc1), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc1), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); LogicalSlot s2 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc2), false, Collections.singleton(loc1), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc2), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); LogicalSlot s3 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc1), false, Collections.singleton(loc1), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc1), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); LogicalSlot s4 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc2), false, Collections.singleton(loc1), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc2), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); // check that each slot got three assertEquals(s1.getTaskManagerLocation(), s3.getTaskManagerLocation()); @@ -515,14 +517,14 @@ public class ScheduleWithCoLocationHintTest extends SchedulerTestBase { CoLocationConstraint cc2 = new CoLocationConstraint(ccg); LogicalSlot s1 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc1), false, Collections.singleton(loc1), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc1), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); LogicalSlot s2 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId(), cc2), false, Collections.singleton(loc2), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId(), cc2), false, slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get(); LogicalSlot s3 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); LogicalSlot s4 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); // check that each slot got two assertEquals(s1.getTaskManagerLocation(), s3.getTaskManagerLocation()); @@ -539,4 +541,8 @@ public class ScheduleWithCoLocationHintTest extends SchedulerTestBase { assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1)); assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2)); } + + private static SlotProfile slotProfileForLocation(TaskManagerLocation location) { + return new SlotProfile(ResourceProfile.UNKNOWN, Collections.singletonList(location), Collections.emptyList()); + } }
http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java index 86f5659..2abf9fb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java @@ -19,6 +19,8 @@ package org.apache.flink.runtime.jobmanager.scheduler; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -68,17 +70,17 @@ public class SchedulerIsolatedTasksTest extends SchedulerTestBase { assertEquals(5, testingSlotProvider.getNumberOfAvailableSlots()); // schedule something into all slots - LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s5 = testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s5 = testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); // the slots should all be different assertTrue(areAllDistinct(s1, s2, s3, s4, s5)); try { - testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); fail("Scheduler accepted scheduling request without available resource."); } catch (ExecutionException e) { @@ -91,8 +93,8 @@ public class SchedulerIsolatedTasksTest extends SchedulerTestBase { assertEquals(2, testingSlotProvider.getNumberOfAvailableSlots()); // now we can schedule some more slots - LogicalSlot s6 = testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); - LogicalSlot s7 = testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot s6 = testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s7 = testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); assertTrue(areAllDistinct(s1, s2, s3, s4, s5, s6, s7)); @@ -172,7 +174,7 @@ public class SchedulerIsolatedTasksTest extends SchedulerTestBase { disposeThread.start(); for (int i = 0; i < NUM_TASKS_TO_SCHEDULE; i++) { - CompletableFuture<LogicalSlot> future = testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), true, Collections.emptyList(), TestingUtils.infiniteTime()); + CompletableFuture<LogicalSlot> future = testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), true, SlotProfile.noRequirements(), TestingUtils.infiniteTime()); future.thenAcceptAsync( (LogicalSlot slot) -> { synchronized (toRelease) { @@ -207,11 +209,11 @@ public class SchedulerIsolatedTasksTest extends SchedulerTestBase { final TaskManagerLocation taskManagerLocation3 = testingSlotProvider.addTaskManager(1); List<LogicalSlot> slots = new ArrayList<>(); - slots.add(testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get()); - slots.add(testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get()); - slots.add(testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get()); - slots.add(testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get()); - slots.add(testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get()); + slots.add(testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get()); + slots.add(testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get()); + slots.add(testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get()); + slots.add(testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get()); + slots.add(testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get()); testingSlotProvider.releaseTaskManager(taskManagerLocation2.getResourceID()); @@ -232,7 +234,7 @@ public class SchedulerIsolatedTasksTest extends SchedulerTestBase { // cannot get another slot, since all instances are dead try { - testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); fail("Scheduler served a slot from a dead instance"); } catch (ExecutionException e) { @@ -254,7 +256,7 @@ public class SchedulerIsolatedTasksTest extends SchedulerTestBase { final TaskManagerLocation taskManagerLocation3 = testingSlotProvider.addTaskManager(2); // schedule something on an arbitrary instance - LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(new Instance[0])), false, Collections.emptyList(), TestingUtils.infiniteTime()).get(); + LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(new Instance[0])), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); // figure out how we use the location hints ResourceID firstResourceId = s1.getTaskManagerLocation().getResourceID(); @@ -276,32 +278,36 @@ public class SchedulerIsolatedTasksTest extends SchedulerTestBase { TaskManagerLocation third = taskManagerLocations.get((index + 2) % taskManagerLocations.size()); // something that needs to go to the first instance again - LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(s1.getTaskManagerLocation())), false, Collections.singleton(s1.getTaskManagerLocation()), TestingUtils.infiniteTime()).get(); + LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(s1.getTaskManagerLocation())), false, slotProfileForLocation(s1.getTaskManagerLocation()), TestingUtils.infiniteTime()).get(); assertEquals(first.getResourceID(), s2.getTaskManagerLocation().getResourceID()); // first or second --> second, because first is full - LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(first, second)), false, Arrays.asList(first, second), TestingUtils.infiniteTime()).get(); + LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(first, second)), false, slotProfileForLocation(first, second), TestingUtils.infiniteTime()).get(); assertEquals(second.getResourceID(), s3.getTaskManagerLocation().getResourceID()); // first or third --> third (because first is full) - LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, Arrays.asList(first, third), TestingUtils.infiniteTime()).get(); - LogicalSlot s5 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, Arrays.asList(first, third), TestingUtils.infiniteTime()).get(); + LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, slotProfileForLocation(first, third), TestingUtils.infiniteTime()).get(); + LogicalSlot s5 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, slotProfileForLocation(first, third), TestingUtils.infiniteTime()).get(); assertEquals(third.getResourceID(), s4.getTaskManagerLocation().getResourceID()); assertEquals(third.getResourceID(), s5.getTaskManagerLocation().getResourceID()); // first or third --> second, because all others are full - LogicalSlot s6 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, Arrays.asList(first, third), TestingUtils.infiniteTime()).get(); + LogicalSlot s6 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, slotProfileForLocation(first, third), TestingUtils.infiniteTime()).get(); assertEquals(second.getResourceID(), s6.getTaskManagerLocation().getResourceID()); // release something on the first and second instance s2.releaseSlot(); s6.releaseSlot(); - LogicalSlot s7 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, Arrays.asList(first, third), TestingUtils.infiniteTime()).get(); + LogicalSlot s7 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, slotProfileForLocation(first, third), TestingUtils.infiniteTime()).get(); assertEquals(first.getResourceID(), s7.getTaskManagerLocation().getResourceID()); assertEquals(1, testingSlotProvider.getNumberOfUnconstrainedAssignments()); assertTrue(1 == testingSlotProvider.getNumberOfNonLocalizedAssignments() || 1 == testingSlotProvider.getNumberOfHostLocalizedAssignments()); assertEquals(5, testingSlotProvider.getNumberOfLocalizedAssignments()); } + + private static SlotProfile slotProfileForLocation(TaskManagerLocation... location) { + return new SlotProfile(ResourceProfile.UNKNOWN, Arrays.asList(location), Collections.emptyList()); + } }
