http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java index 9f4a675..2d35ce2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java @@ -37,6 +37,7 @@ import org.apache.flink.util.TestLogger; import org.junit.Test; +import java.util.Collections; import java.util.concurrent.ExecutionException; public class ScheduleWithCoLocationHintTest extends TestLogger { @@ -66,18 +67,18 @@ public class ScheduleWithCoLocationHintTest extends TestLogger { CoLocationConstraint c6 = new CoLocationConstraint(ccg); // schedule 4 tasks from the first vertex group - SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 6), sharingGroup, c1), false).get(); - SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 6), sharingGroup, c2), false).get(); - SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 6), sharingGroup, c3), false).get(); - SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 6), sharingGroup, c4), false).get(); - SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 6), sharingGroup, c1), false).get(); - SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 6), sharingGroup, c2), false).get(); - SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 6), sharingGroup, c3), false).get(); - SimpleSlot s8 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 6), sharingGroup, c5), false).get(); - SimpleSlot s9 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 6), sharingGroup, c6), false).get(); - SimpleSlot s10 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 6), sharingGroup, c4), false).get(); - SimpleSlot s11 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 6), sharingGroup, c5), false).get(); - SimpleSlot s12 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 6), sharingGroup, c6), false).get(); + SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 6), sharingGroup, c1), false, Collections.emptyList()).get(); + SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 6), sharingGroup, c2), false, Collections.emptyList()).get(); + SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 6), sharingGroup, c3), false, Collections.emptyList()).get(); + SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 6), sharingGroup, c4), false, Collections.emptyList()).get(); + SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 6), sharingGroup, c1), false, Collections.emptyList()).get(); + SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 6), sharingGroup, c2), false, Collections.emptyList()).get(); + SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 6), sharingGroup, c3), false, Collections.emptyList()).get(); + SimpleSlot s8 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 6), sharingGroup, c5), false, Collections.emptyList()).get(); + SimpleSlot s9 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 6), sharingGroup, c6), false, Collections.emptyList()).get(); + SimpleSlot s10 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 6), sharingGroup, c4), false, Collections.emptyList()).get(); + SimpleSlot s11 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 6), sharingGroup, c5), false, Collections.emptyList()).get(); + SimpleSlot s12 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 6), sharingGroup, c6), false, Collections.emptyList()).get(); assertNotNull(s1); assertNotNull(s2); @@ -140,7 +141,7 @@ public class ScheduleWithCoLocationHintTest extends TestLogger { assertTrue(scheduler.getNumberOfAvailableSlots() >= 1); SimpleSlot single = scheduler.allocateSlot( - new ScheduledUnit(getTestVertex(new JobVertexID(), 0, 1)), false).get(); + new ScheduledUnit(getTestVertex(new JobVertexID(), 0, 1)), false, Collections.emptyList()).get(); assertNotNull(single); s1.releaseSlot(); @@ -188,11 +189,11 @@ public class ScheduleWithCoLocationHintTest extends TestLogger { CoLocationConstraint c1 = new CoLocationConstraint(new CoLocationGroup()); SimpleSlot s1 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1), false).get(); + new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1), false, Collections.emptyList()).get(); SimpleSlot s2 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup, c1), false).get(); + new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup, c1), false, Collections.emptyList()).get(); - SimpleSlot sSolo = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 1)), false).get(); + SimpleSlot sSolo = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 1)), false, Collections.emptyList()).get(); ResourceID taskManager = s1.getTaskManagerID(); @@ -201,7 +202,7 @@ public class ScheduleWithCoLocationHintTest extends TestLogger { sSolo.releaseSlot(); SimpleSlot sNew = scheduler.allocateSlot( - new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1), false).get(); + new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1), false, Collections.emptyList()).get(); assertEquals(taskManager, sNew.getTaskManagerID()); assertEquals(2, scheduler.getNumberOfLocalizedAssignments()); @@ -235,14 +236,14 @@ public class ScheduleWithCoLocationHintTest extends TestLogger { CoLocationConstraint c1 = new CoLocationConstraint(new CoLocationGroup()); SimpleSlot s1 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1), false).get(); + new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1), false, Collections.emptyList()).get(); s1.releaseSlot(); - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1)), false).get(); - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2)), false).get(); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1)), false, Collections.emptyList()).get(); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2)), false, Collections.emptyList()).get(); try { - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1), false).get(); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1), false, Collections.emptyList()).get(); fail("Scheduled even though no resource was available."); } catch (ExecutionException e) { assertTrue(e.getCause() instanceof NoResourceAvailableException); @@ -283,35 +284,35 @@ public class ScheduleWithCoLocationHintTest extends TestLogger { SlotSharingGroup shareGroup = new SlotSharingGroup(); // first wave - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), shareGroup), false); - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), shareGroup), false); - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), shareGroup), false); - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), shareGroup), false); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), shareGroup), false, Collections.emptyList()); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), shareGroup), false, Collections.emptyList()); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), shareGroup), false, Collections.emptyList()); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), shareGroup), false, Collections.emptyList()); // second wave SimpleSlot s21 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertex(jid2, 0, 4), shareGroup, clc1), false).get(); + new ScheduledUnit(getTestVertex(jid2, 0, 4), shareGroup, clc1), false, Collections.emptyList()).get(); SimpleSlot s22 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertex(jid2, 2, 4), shareGroup, clc2), false).get(); + new ScheduledUnit(getTestVertex(jid2, 2, 4), shareGroup, clc2), false, Collections.emptyList()).get(); SimpleSlot s23 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertex(jid2, 1, 4), shareGroup, clc3), false).get(); + new ScheduledUnit(getTestVertex(jid2, 1, 4), shareGroup, clc3), false, Collections.emptyList()).get(); SimpleSlot s24 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertex(jid2, 3, 4), shareGroup, clc4), false).get(); + new ScheduledUnit(getTestVertex(jid2, 3, 4), shareGroup, clc4), false, Collections.emptyList()).get(); // third wave SimpleSlot s31 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertex(jid3, 1, 4), shareGroup, clc2), false).get(); + new ScheduledUnit(getTestVertex(jid3, 1, 4), shareGroup, clc2), false, Collections.emptyList()).get(); SimpleSlot s32 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertex(jid3, 2, 4), shareGroup, clc3), false).get(); + new ScheduledUnit(getTestVertex(jid3, 2, 4), shareGroup, clc3), false, Collections.emptyList()).get(); SimpleSlot s33 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertex(jid3, 3, 4), shareGroup, clc4), false).get(); + new ScheduledUnit(getTestVertex(jid3, 3, 4), shareGroup, clc4), false, Collections.emptyList()).get(); SimpleSlot s34 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertex(jid3, 0, 4), shareGroup, clc1), false).get(); + new ScheduledUnit(getTestVertex(jid3, 0, 4), shareGroup, clc1), false, Collections.emptyList()).get(); - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), shareGroup), false); - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), shareGroup), false); - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), shareGroup), false); - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), shareGroup), false); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), shareGroup), false, Collections.emptyList()); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), shareGroup), false, Collections.emptyList()); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), shareGroup), false, Collections.emptyList()); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), shareGroup), false, Collections.emptyList()); assertEquals(s21.getTaskManagerID(), s34.getTaskManagerID()); assertEquals(s22.getTaskManagerID(), s31.getTaskManagerID()); @@ -357,25 +358,25 @@ public class ScheduleWithCoLocationHintTest extends TestLogger { // schedule something into the shared group so that both instances are in the sharing group SimpleSlot s1 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get(); SimpleSlot s2 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup), false, Collections.singleton(loc2)).get(); // schedule one locally to instance 1 SimpleSlot s3 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc1), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc1), false, Collections.singleton(loc1)).get(); // schedule with co location constraint (yet unassigned) and a preference for // instance 1, but it can only get instance 2 SimpleSlot s4 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2), false, Collections.singleton(loc1)).get(); // schedule something into the assigned co-location constraints and check that they override the // other preferences SimpleSlot s5 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid3, 0, 2, loc2), sharingGroup, cc1), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid3, 0, 2, loc2), sharingGroup, cc1), false, Collections.singleton(loc2)).get(); SimpleSlot s6 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid3, 1, 2, loc1), sharingGroup, cc2), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid3, 1, 2, loc1), sharingGroup, cc2), false, Collections.singleton(loc1)).get(); // check that each slot got three assertEquals(3, s1.getRoot().getNumberLeaves()); @@ -434,9 +435,9 @@ public class ScheduleWithCoLocationHintTest extends TestLogger { CoLocationConstraint cc2 = new CoLocationConstraint(ccg); SimpleSlot s1 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false, Collections.singleton(loc1)).get(); SimpleSlot s2 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false, Collections.singleton(loc2)).get(); s1.releaseSlot(); s2.releaseSlot(); @@ -445,9 +446,9 @@ public class ScheduleWithCoLocationHintTest extends TestLogger { assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots()); SimpleSlot s3 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1), false, Collections.singleton(loc2)).get(); SimpleSlot s4 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2), false, Collections.singleton(loc1)).get(); // still preserves the previous instance mapping) assertEquals(i1.getTaskManagerID(), s3.getTaskManagerID()); @@ -495,9 +496,9 @@ public class ScheduleWithCoLocationHintTest extends TestLogger { CoLocationConstraint cc2 = new CoLocationConstraint(ccg); SimpleSlot s1 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false, Collections.singleton(loc1)).get(); SimpleSlot s2 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false, Collections.singleton(loc2)).get(); s1.releaseSlot(); s2.releaseSlot(); @@ -506,13 +507,13 @@ public class ScheduleWithCoLocationHintTest extends TestLogger { assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots()); SimpleSlot sa = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jidx, 0, 2)), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jidx, 0, 2)), false, Collections.emptyList()).get(); SimpleSlot sb = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jidx, 1, 2)), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jidx, 1, 2)), false, Collections.emptyList()).get(); try { scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1), false, Collections.singleton(loc2)).get(); fail("should not be able to find a resource"); } catch (ExecutionException e) { @@ -565,14 +566,14 @@ public class ScheduleWithCoLocationHintTest extends TestLogger { // and give locality preferences that hint at using the same shared slot for both // co location constraints (which we seek to prevent) SimpleSlot s1 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false, Collections.singleton(loc1)).get(); SimpleSlot s2 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc2), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc2), false, Collections.singleton(loc1)).get(); SimpleSlot s3 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc1), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc1), false, Collections.singleton(loc1)).get(); SimpleSlot s4 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup, cc2), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup, cc2), false, Collections.singleton(loc1)).get(); // check that each slot got three assertEquals(2, s1.getRoot().getNumberLeaves()); @@ -631,14 +632,14 @@ public class ScheduleWithCoLocationHintTest extends TestLogger { CoLocationConstraint cc2 = new CoLocationConstraint(ccg); SimpleSlot s1 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false, Collections.emptyList()).get(); SimpleSlot s2 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false, Collections.emptyList()).get(); SimpleSlot s3 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup), false, Collections.emptyList()).get(); SimpleSlot s4 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup), false, Collections.emptyList()).get(); // check that each slot got two assertEquals(2, s1.getRoot().getNumberLeaves());
http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java index a05c1a3..7882f4a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java @@ -25,6 +25,8 @@ import org.apache.flink.runtime.testingUtils.TestingUtils; import org.junit.Test; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -123,17 +125,17 @@ public class SchedulerIsolatedTasksTest { assertEquals(5, scheduler.getNumberOfAvailableSlots()); // schedule something into all slots - SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get(); - SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get(); - SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get(); - SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get(); - SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get(); + SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get(); + SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get(); + SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get(); + SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get(); + SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get(); // the slots should all be different assertTrue(areAllDistinct(s1, s2, s3, s4, s5)); try { - scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get(); + scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get(); fail("Scheduler accepted scheduling request without available resource."); } catch (ExecutionException e) { @@ -146,8 +148,8 @@ public class SchedulerIsolatedTasksTest { assertEquals(2, scheduler.getNumberOfAvailableSlots()); // now we can schedule some more slots - SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get(); - SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get(); + SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get(); + SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get(); assertTrue(areAllDistinct(s1, s2, s3, s4, s5, s6, s7)); @@ -235,7 +237,7 @@ public class SchedulerIsolatedTasksTest { disposeThread.start(); for (int i = 0; i < NUM_TASKS_TO_SCHEDULE; i++) { - CompletableFuture<SimpleSlot> future = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), true); + CompletableFuture<SimpleSlot> future = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), true, Collections.emptyList()); future.thenAcceptAsync( (SimpleSlot slot) -> { synchronized (toRelease) { @@ -284,11 +286,11 @@ public class SchedulerIsolatedTasksTest { scheduler.newInstanceAvailable(i3); List<SimpleSlot> slots = new ArrayList<SimpleSlot>(); - slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get()); - slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get()); - slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get()); - slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get()); - slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get()); + slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get()); + slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get()); + slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get()); + slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get()); + slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get()); i2.markDead(); @@ -309,7 +311,7 @@ public class SchedulerIsolatedTasksTest { // cannot get another slot, since all instances are dead try { - scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get(); + scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get(); fail("Scheduler served a slot from a dead instance"); } catch (ExecutionException e) { @@ -344,7 +346,7 @@ public class SchedulerIsolatedTasksTest { scheduler.newInstanceAvailable(i3); // schedule something on an arbitrary instance - SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(new Instance[0])), false).get(); + SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(new Instance[0])), false, Collections.emptyList()).get(); // figure out how we use the location hints Instance first = (Instance) s1.getOwner(); @@ -352,28 +354,28 @@ public class SchedulerIsolatedTasksTest { Instance third = first == i3 ? i2 : i3; // something that needs to go to the first instance again - SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(s1.getTaskManagerLocation())), false).get(); + SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(s1.getTaskManagerLocation())), false, Collections.singleton(s1.getTaskManagerLocation())).get(); assertEquals(first, s2.getOwner()); // first or second --> second, because first is full - SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, second)), false).get(); + SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, second)), false, Arrays.asList(first.getTaskManagerLocation(), second.getTaskManagerLocation())).get(); assertEquals(second, s3.getOwner()); // first or third --> third (because first is full) - SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false).get(); - SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false).get(); + SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, Arrays.asList(first.getTaskManagerLocation(), third.getTaskManagerLocation())).get(); + SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, Arrays.asList(first.getTaskManagerLocation(), third.getTaskManagerLocation())).get(); assertEquals(third, s4.getOwner()); assertEquals(third, s5.getOwner()); // first or third --> second, because all others are full - SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false).get(); + SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, Arrays.asList(first.getTaskManagerLocation(), third.getTaskManagerLocation())).get(); assertEquals(second, s6.getOwner()); // release something on the first and second instance s2.releaseSlot(); s6.releaseSlot(); - SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false).get(); + SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, Arrays.asList(first.getTaskManagerLocation(), third.getTaskManagerLocation())).get(); assertEquals(first, s7.getOwner()); assertEquals(1, scheduler.getNumberOfUnconstrainedAssignments()); http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java index 1f88dd8..a478eb9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java @@ -27,6 +27,7 @@ import org.apache.flink.util.TestLogger; import org.junit.Test; +import java.util.Collections; import java.util.Random; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -64,10 +65,10 @@ public class SchedulerSlotSharingTest extends TestLogger { scheduler.newInstanceAvailable(i2); // schedule 4 tasks from the first vertex group - SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 8), sharingGroup), false).get(); - SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 8), sharingGroup), false).get(); - SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 8), sharingGroup), false).get(); - SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 8), sharingGroup), false).get(); + SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 8), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 8), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 8), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 8), sharingGroup), false, Collections.emptyList()).get(); assertNotNull(s1); assertNotNull(s2); @@ -78,7 +79,7 @@ public class SchedulerSlotSharingTest extends TestLogger { // we cannot schedule another task from the first vertex group try { - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup), false).get(); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup), false, Collections.emptyList()).get(); fail("Scheduler accepted too many tasks at the same time"); } catch (ExecutionException e) { @@ -92,7 +93,7 @@ public class SchedulerSlotSharingTest extends TestLogger { s3.releaseSlot(); // allocate another slot from that group - SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup), false).get(); + SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup), false, Collections.emptyList()).get(); assertNotNull(s5); // release all old slots @@ -100,9 +101,9 @@ public class SchedulerSlotSharingTest extends TestLogger { s2.releaseSlot(); s4.releaseSlot(); - SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 8), sharingGroup), false).get(); - SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 6, 8), sharingGroup), false).get(); - SimpleSlot s8 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 7, 8), sharingGroup), false).get(); + SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 8), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 6, 8), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s8 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 7, 8), sharingGroup), false, Collections.emptyList()).get(); assertNotNull(s6); assertNotNull(s7); @@ -149,10 +150,10 @@ public class SchedulerSlotSharingTest extends TestLogger { scheduler.newInstanceAvailable(getRandomInstance(2)); // schedule 4 tasks from the first vertex group - SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 5), sharingGroup), false).get(); - SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 5), sharingGroup), false).get(); - SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 5), sharingGroup), false).get(); - SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 5), sharingGroup), false).get(); + SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 5), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 5), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 5), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 5), sharingGroup), false, Collections.emptyList()).get(); assertNotNull(s1); assertNotNull(s2); @@ -163,7 +164,7 @@ public class SchedulerSlotSharingTest extends TestLogger { // we cannot schedule another task from the first vertex group try { - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false).get(); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false, Collections.emptyList()).get(); fail("Scheduler accepted too many tasks at the same time"); } catch (ExecutionException e) { @@ -174,10 +175,10 @@ public class SchedulerSlotSharingTest extends TestLogger { } // schedule some tasks from the second ID group - SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 5), sharingGroup), false).get(); - SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 5), sharingGroup), false).get(); - SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 5), sharingGroup), false).get(); - SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 5), sharingGroup), false).get(); + SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 5), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 5), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 5), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 5), sharingGroup), false, Collections.emptyList()).get(); assertNotNull(s1_2); assertNotNull(s2_2); @@ -186,7 +187,7 @@ public class SchedulerSlotSharingTest extends TestLogger { // we cannot schedule another task from the second vertex group try { - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false).get(); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false, Collections.emptyList()).get(); fail("Scheduler accepted too many tasks at the same time"); } catch (ExecutionException e) { @@ -207,7 +208,7 @@ public class SchedulerSlotSharingTest extends TestLogger { // we can still not schedule anything from the second group of vertices try { - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false).get(); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false, Collections.emptyList()).get(); fail("Scheduler accepted too many tasks at the same time"); } catch (ExecutionException e) { @@ -218,7 +219,7 @@ public class SchedulerSlotSharingTest extends TestLogger { } // we can schedule something from the first vertex group - SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false).get(); + SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false, Collections.emptyList()).get(); assertNotNull(s5); assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots()); @@ -228,7 +229,7 @@ public class SchedulerSlotSharingTest extends TestLogger { // now we release a slot from the second vertex group and schedule another task from that group s2_2.releaseSlot(); - SimpleSlot s5_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false).get(); + SimpleSlot s5_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false, Collections.emptyList()).get(); assertNotNull(s5_2); // release all slots @@ -269,10 +270,10 @@ public class SchedulerSlotSharingTest extends TestLogger { scheduler.newInstanceAvailable(getRandomInstance(2)); // schedule 4 tasks from the first vertex group - SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false).get(); - SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false).get(); - SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get(); - SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false).get(); + SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false, Collections.emptyList()).get(); assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots()); assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1)); @@ -288,10 +289,10 @@ public class SchedulerSlotSharingTest extends TestLogger { assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2)); // schedule some tasks from the second ID group - SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false).get(); - SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false).get(); - SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false).get(); - SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false).get(); + SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false, Collections.emptyList()).get(); assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots()); assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1)); @@ -334,10 +335,10 @@ public class SchedulerSlotSharingTest extends TestLogger { scheduler.newInstanceAvailable(getRandomInstance(2)); // schedule 4 tasks from the first vertex group - SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false).get(); - SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false).get(); - SimpleSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get(); - SimpleSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false).get(); + SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false, Collections.emptyList()).get(); assertNotNull(s1_1); assertNotNull(s2_1); @@ -347,10 +348,10 @@ public class SchedulerSlotSharingTest extends TestLogger { assertTrue(areAllDistinct(s1_1, s2_1, s3_1, s4_1)); // schedule 4 tasks from the second vertex group - SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 7), sharingGroup), false).get(); - SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 7), sharingGroup), false).get(); - SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 7), sharingGroup), false).get(); - SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 7), sharingGroup), false).get(); + SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 7), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 7), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 7), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 7), sharingGroup), false, Collections.emptyList()).get(); assertNotNull(s1_2); assertNotNull(s2_2); @@ -360,10 +361,10 @@ public class SchedulerSlotSharingTest extends TestLogger { assertTrue(areAllDistinct(s1_2, s2_2, s3_2, s4_2)); // schedule 4 tasks from the third vertex group - SimpleSlot s1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 4), sharingGroup), false).get(); - SimpleSlot s2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 4), sharingGroup), false).get(); - SimpleSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 4), sharingGroup), false).get(); - SimpleSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 4), sharingGroup), false).get(); + SimpleSlot s1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 4), sharingGroup), false, Collections.emptyList()).get(); assertNotNull(s1_3); assertNotNull(s2_3); @@ -375,7 +376,7 @@ public class SchedulerSlotSharingTest extends TestLogger { // we cannot schedule another task from the second vertex group try { - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false).get(); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false, Collections.emptyList()).get(); fail("Scheduler accepted too many tasks at the same time"); } catch (ExecutionException e) { @@ -391,9 +392,9 @@ public class SchedulerSlotSharingTest extends TestLogger { s3_2.releaseSlot(); s4_2.releaseSlot(); - SimpleSlot s5_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 7), sharingGroup), false).get(); - SimpleSlot s6_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 6, 7), sharingGroup), false).get(); - SimpleSlot s7_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 7, 7), sharingGroup), false).get(); + SimpleSlot s5_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 7), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s6_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 6, 7), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s7_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 7, 7), sharingGroup), false, Collections.emptyList()).get(); assertNotNull(s5_2); assertNotNull(s6_2); @@ -444,9 +445,9 @@ public class SchedulerSlotSharingTest extends TestLogger { scheduler.newInstanceAvailable(getRandomInstance(2)); // schedule 1 tasks from the first vertex group and 2 from the second - SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 2), sharingGroup), false).get(); - SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 2), sharingGroup), false).get(); - SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2), sharingGroup), false).get(); + SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 2), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 2), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2), sharingGroup), false, Collections.emptyList()).get(); assertNotNull(s1_1); assertNotNull(s2_1); @@ -462,7 +463,7 @@ public class SchedulerSlotSharingTest extends TestLogger { // this should free one slot so we can allocate one non-shared - SimpleSlot sx = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1)), false).get(); + SimpleSlot sx = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1)), false, Collections.emptyList()).get(); assertNotNull(sx); assertEquals(1, sharingGroup.getTaskAssignment().getNumberOfSlots()); @@ -497,28 +498,28 @@ public class SchedulerSlotSharingTest extends TestLogger { scheduler.newInstanceAvailable(getRandomInstance(2)); // schedule some individual vertices - SimpleSlot sA1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 0, 2)), false).get(); - SimpleSlot sA2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 1, 2)), false).get(); + SimpleSlot sA1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 0, 2)), false, Collections.emptyList()).get(); + SimpleSlot sA2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 1, 2)), false, Collections.emptyList()).get(); assertNotNull(sA1); assertNotNull(sA2); // schedule some vertices in the sharing group - SimpleSlot s1_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false).get(); - SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false).get(); - SimpleSlot s2_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false).get(); - SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false).get(); + SimpleSlot s1_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s2_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false, Collections.emptyList()).get(); assertNotNull(s1_0); assertNotNull(s1_1); assertNotNull(s2_0); assertNotNull(s2_1); // schedule another isolated vertex - SimpleSlot sB1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 1, 3)), false).get(); + SimpleSlot sB1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 1, 3)), false, Collections.emptyList()).get(); assertNotNull(sB1); // should not be able to schedule more vertices try { - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get(); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false, Collections.emptyList()).get(); fail("Scheduler accepted too many tasks at the same time"); } catch (ExecutionException e) { @@ -529,7 +530,7 @@ public class SchedulerSlotSharingTest extends TestLogger { } try { - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false).get(); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false, Collections.emptyList()).get(); fail("Scheduler accepted too many tasks at the same time"); } catch (ExecutionException e) { @@ -540,7 +541,7 @@ public class SchedulerSlotSharingTest extends TestLogger { } try { - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3)), false).get(); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3)), false, Collections.emptyList()).get(); fail("Scheduler accepted too many tasks at the same time"); } catch (ExecutionException e) { @@ -551,7 +552,7 @@ public class SchedulerSlotSharingTest extends TestLogger { } try { - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 1)), false).get(); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 1)), false, Collections.emptyList()).get(); fail("Scheduler accepted too many tasks at the same time"); } catch (ExecutionException e) { @@ -564,8 +565,8 @@ public class SchedulerSlotSharingTest extends TestLogger { // release some isolated task and check that the sharing group may grow sA1.releaseSlot(); - SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get(); - SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false).get(); + SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false, Collections.emptyList()).get(); assertNotNull(s1_2); assertNotNull(s2_2); @@ -577,19 +578,19 @@ public class SchedulerSlotSharingTest extends TestLogger { assertEquals(1, scheduler.getNumberOfAvailableSlots()); // schedule one more no-shared task - SimpleSlot sB0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3)), false).get(); + SimpleSlot sB0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3)), false, Collections.emptyList()).get(); assertNotNull(sB0); // release the last of the original shared slots and allocate one more non-shared slot s2_1.releaseSlot(); - SimpleSlot sB2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 2, 3)), false).get(); + SimpleSlot sB2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 2, 3)), false, Collections.emptyList()).get(); assertNotNull(sB2); // release on non-shared and add some shared slots sA2.releaseSlot(); - SimpleSlot s1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false).get(); - SimpleSlot s2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false).get(); + SimpleSlot s1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false, Collections.emptyList()).get(); assertNotNull(s1_3); assertNotNull(s2_3); @@ -599,8 +600,8 @@ public class SchedulerSlotSharingTest extends TestLogger { s1_3.releaseSlot(); s2_3.releaseSlot(); - SimpleSlot sC0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 1, 2)), false).get(); - SimpleSlot sC1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 2)), false).get(); + SimpleSlot sC0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 1, 2)), false, Collections.emptyList()).get(); + SimpleSlot sC1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 2)), false, Collections.emptyList()).get(); assertNotNull(sC0); assertNotNull(sC1); @@ -648,8 +649,8 @@ public class SchedulerSlotSharingTest extends TestLogger { // schedule one to each instance - SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false).get(); - SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup), false).get(); + SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get(); + SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup), false, Collections.singleton(loc2)).get(); assertNotNull(s1); assertNotNull(s2); @@ -658,8 +659,8 @@ public class SchedulerSlotSharingTest extends TestLogger { assertEquals(1, i2.getNumberOfAvailableSlots()); // schedule one from the other group to each instance - SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup), false).get(); - SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup), false).get(); + SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get(); + SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup), false, Collections.singleton(loc2)).get(); assertNotNull(s3); assertNotNull(s4); @@ -701,8 +702,8 @@ public class SchedulerSlotSharingTest extends TestLogger { // schedule one to each instance - SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false).get(); - SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup), false).get(); + SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get(); + SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get(); assertNotNull(s1); assertNotNull(s2); @@ -711,8 +712,8 @@ public class SchedulerSlotSharingTest extends TestLogger { assertEquals(2, i2.getNumberOfAvailableSlots()); // schedule one from the other group to each instance - SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup), false).get(); - SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup), false).get(); + SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup), false, Collections.singleton(loc2)).get(); + SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup), false, Collections.singleton(loc2)).get(); assertNotNull(s3); assertNotNull(s4); @@ -752,14 +753,14 @@ public class SchedulerSlotSharingTest extends TestLogger { scheduler.newInstanceAvailable(i2); // schedule until the one instance is full - SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false).get(); - SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup), false).get(); - SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 4, loc1), sharingGroup), false).get(); - SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 4, loc1), sharingGroup), false).get(); + SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get(); + SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get(); + SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 4, loc1), sharingGroup), false, Collections.singleton(loc1)).get(); + SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 4, loc1), sharingGroup), false, Collections.singleton(loc1)).get(); // schedule two more with preference of same instance --> need to go to other instance - SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 3, 4, loc1), sharingGroup), false).get(); - SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 4, 4, loc1), sharingGroup), false).get(); + SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 3, 4, loc1), sharingGroup), false, Collections.singleton(loc1)).get(); + SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 4, 4, loc1), sharingGroup), false, Collections.singleton(loc1)).get(); assertNotNull(s1); assertNotNull(s2); @@ -805,19 +806,19 @@ public class SchedulerSlotSharingTest extends TestLogger { scheduler.newInstanceAvailable(getRandomInstance(4)); // allocate something from group 1 and 2 interleaved with schedule for group 3 - SimpleSlot slot_1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false).get(); - SimpleSlot slot_1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false).get(); + SimpleSlot slot_1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot slot_1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false, Collections.emptyList()).get(); - SimpleSlot slot_2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false).get(); - SimpleSlot slot_2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false).get(); + SimpleSlot slot_2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot slot_2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false, Collections.emptyList()).get(); - SimpleSlot slot_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup), false).get(); + SimpleSlot slot_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup), false, Collections.emptyList()).get(); - SimpleSlot slot_1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get(); - SimpleSlot slot_1_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false).get(); + SimpleSlot slot_1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot slot_1_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false, Collections.emptyList()).get(); - SimpleSlot slot_2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false).get(); - SimpleSlot slot_2_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false).get(); + SimpleSlot slot_2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot slot_2_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false, Collections.emptyList()).get(); // release groups 1 and 2 @@ -833,10 +834,10 @@ public class SchedulerSlotSharingTest extends TestLogger { // allocate group 4 - SimpleSlot slot_4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup), false).get(); - SimpleSlot slot_4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup), false).get(); - SimpleSlot slot_4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup), false).get(); - SimpleSlot slot_4_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup), false).get(); + SimpleSlot slot_4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot slot_4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot slot_4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot slot_4_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup), false, Collections.emptyList()).get(); // release groups 3 and 4 @@ -887,7 +888,7 @@ public class SchedulerSlotSharingTest extends TestLogger { @Override public void run() { try { - SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, enumerator4.getAndIncrement(), 4), sharingGroup), false).get(); + SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, enumerator4.getAndIncrement(), 4), sharingGroup), false, Collections.emptyList()).get(); sleepUninterruptibly(rnd.nextInt(5)); slot.releaseSlot(); @@ -910,7 +911,7 @@ public class SchedulerSlotSharingTest extends TestLogger { public void run() { try { if (flag3.compareAndSet(false, true)) { - SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup), false).get(); + SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup), false, Collections.emptyList()).get(); sleepUninterruptibly(5); @@ -939,7 +940,7 @@ public class SchedulerSlotSharingTest extends TestLogger { @Override public void run() { try { - SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, enumerator2.getAndIncrement(), 4), sharingGroup), false).get(); + SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, enumerator2.getAndIncrement(), 4), sharingGroup), false, Collections.emptyList()).get(); // wait a bit till scheduling the successor sleepUninterruptibly(rnd.nextInt(5)); @@ -966,7 +967,7 @@ public class SchedulerSlotSharingTest extends TestLogger { @Override public void run() { try { - SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, enumerator1.getAndIncrement(), 4), sharingGroup), false).get(); + SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, enumerator1.getAndIncrement(), 4), sharingGroup), false, Collections.emptyList()).get(); // wait a bit till scheduling the successor sleepUninterruptibly(rnd.nextInt(5)); @@ -1041,27 +1042,27 @@ public class SchedulerSlotSharingTest extends TestLogger { scheduler.newInstanceAvailable(getRandomInstance(4)); // schedule one task for the first and second vertex - SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup), false).get(); - SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup), false).get(); + SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup), false, Collections.emptyList()).get(); assertTrue( s1.getParent() == s2.getParent() ); assertEquals(3, scheduler.getNumberOfAvailableSlots()); - SimpleSlot s3_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 5), sharingGroup), false).get(); - SimpleSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 5), sharingGroup), false).get(); - SimpleSlot s4_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup), false).get(); - SimpleSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup), false).get(); + SimpleSlot s3_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 5), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 5), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s4_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup), false, Collections.emptyList()).get(); s1.releaseSlot(); s2.releaseSlot(); - SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 5), sharingGroup), false).get(); - SimpleSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 5), sharingGroup), false).get(); - SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup), false).get(); - SimpleSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup), false).get(); + SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 5), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 5), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup), false, Collections.emptyList()).get(); try { - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 4, 5), sharingGroup), false).get(); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 4, 5), sharingGroup), false, Collections.emptyList()).get(); fail("should throw an exception"); } catch (ExecutionException e) { http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java index c7d0f09..98dca03 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.jobmanager.scheduler; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -106,20 +107,26 @@ public class SchedulerTestUtils { public static Execution getTestVertex(Iterable<TaskManagerLocation> preferredLocations) { - ExecutionVertex vertex = mock(ExecutionVertex.class); - Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = new ArrayList<>(4); for (TaskManagerLocation preferredLocation : preferredLocations) { preferredLocationFutures.add(CompletableFuture.completedFuture(preferredLocation)); } + + return getTestVertex(preferredLocationFutures); + } + + public static Execution getTestVertex(Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures) { + ExecutionVertex vertex = mock(ExecutionVertex.class); + when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(preferredLocationFutures); when(vertex.getJobId()).thenReturn(new JobID()); when(vertex.toString()).thenReturn("TEST-VERTEX"); - + Execution execution = mock(Execution.class); when(execution.getVertex()).thenReturn(vertex); - + when(execution.calculatePreferredLocations(any(LocationPreferenceConstraint.class))).thenCallRealMethod(); + return execution; } http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LocalTaskManagerLocation.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LocalTaskManagerLocation.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LocalTaskManagerLocation.java new file mode 100644 index 0000000..60dddbb --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LocalTaskManagerLocation.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskmanager; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; + +import java.net.InetAddress; + +/** + * Dummy local task manager location for testing purposes. + */ +public class LocalTaskManagerLocation extends TaskManagerLocation { + + private static final long serialVersionUID = 2396142513336559461L; + + public LocalTaskManagerLocation() { + super(ResourceID.generate(), InetAddress.getLoopbackAddress(), -1); + } +}
