http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java index b22ccd0..272a911 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java @@ -26,9 +26,9 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import org.apache.flink.runtime.instance.SimpleSlot; import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; -import org.apache.flink.runtime.instance.AllocatedSlot; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.testingUtils.TestingUtils; @@ -77,18 +77,18 @@ public class ScheduleWithCoLocationHintTest { CoLocationConstraint c6 = new CoLocationConstraint(ccg); // schedule 4 tasks from the first vertex group - AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 6), sharingGroup, c1)); - AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 6), sharingGroup, c2)); - AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 6), sharingGroup, c3)); - AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 6), sharingGroup, c4)); - AllocatedSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 6), sharingGroup, c1)); - AllocatedSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 6), sharingGroup, c2)); - AllocatedSlot s7 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 6), sharingGroup, c3)); - AllocatedSlot s8 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 6), sharingGroup, c5)); - AllocatedSlot s9 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 5, 6), sharingGroup, c6)); - AllocatedSlot s10 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 6), sharingGroup, c4)); - AllocatedSlot s11 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 4, 6), sharingGroup, c5)); - AllocatedSlot s12 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 5, 6), sharingGroup, c6)); + SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 6), sharingGroup, c1)); + SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 6), sharingGroup, c2)); + SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 6), sharingGroup, c3)); + SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 6), sharingGroup, c4)); + SimpleSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 6), sharingGroup, c1)); + SimpleSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 6), sharingGroup, c2)); + SimpleSlot s7 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 6), sharingGroup, c3)); + SimpleSlot s8 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 6), sharingGroup, c5)); + SimpleSlot s9 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 5, 6), sharingGroup, c6)); + SimpleSlot s10 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 6), sharingGroup, c4)); + SimpleSlot s11 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 4, 6), sharingGroup, c5)); + SimpleSlot s12 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 5, 6), sharingGroup, c6)); assertNotNull(s1); assertNotNull(s2); @@ -104,18 +104,18 @@ public class ScheduleWithCoLocationHintTest { assertNotNull(s12); // check that each slot got exactly two tasks - assertEquals(2, ((SubSlot) s1).getSharedSlot().getNumberOfAllocatedSubSlots()); - assertEquals(2, ((SubSlot) s2).getSharedSlot().getNumberOfAllocatedSubSlots()); - assertEquals(2, ((SubSlot) s3).getSharedSlot().getNumberOfAllocatedSubSlots()); - assertEquals(2, ((SubSlot) s4).getSharedSlot().getNumberOfAllocatedSubSlots()); - assertEquals(2, ((SubSlot) s5).getSharedSlot().getNumberOfAllocatedSubSlots()); - assertEquals(2, ((SubSlot) s6).getSharedSlot().getNumberOfAllocatedSubSlots()); - assertEquals(2, ((SubSlot) s7).getSharedSlot().getNumberOfAllocatedSubSlots()); - assertEquals(2, ((SubSlot) s8).getSharedSlot().getNumberOfAllocatedSubSlots()); - assertEquals(2, ((SubSlot) s9).getSharedSlot().getNumberOfAllocatedSubSlots()); - assertEquals(2, ((SubSlot) s10).getSharedSlot().getNumberOfAllocatedSubSlots()); - assertEquals(2, ((SubSlot) s11).getSharedSlot().getNumberOfAllocatedSubSlots()); - assertEquals(2, ((SubSlot) s12).getSharedSlot().getNumberOfAllocatedSubSlots()); + assertEquals(2, s1.getRoot().getNumberLeaves()); + assertEquals(2, s2.getRoot().getNumberLeaves()); + assertEquals(2, s3.getRoot().getNumberLeaves()); + assertEquals(2, s4.getRoot().getNumberLeaves()); + assertEquals(2, s5.getRoot().getNumberLeaves()); + assertEquals(2, s6.getRoot().getNumberLeaves()); + assertEquals(2, s7.getRoot().getNumberLeaves()); + assertEquals(2, s8.getRoot().getNumberLeaves()); + assertEquals(2, s9.getRoot().getNumberLeaves()); + assertEquals(2, s10.getRoot().getNumberLeaves()); + assertEquals(2, s11.getRoot().getNumberLeaves()); + assertEquals(2, s12.getRoot().getNumberLeaves()); assertEquals(s1.getInstance(), s5.getInstance()); assertEquals(s2.getInstance(), s6.getInstance()); @@ -150,7 +150,7 @@ public class ScheduleWithCoLocationHintTest { s12.releaseSlot(); assertTrue(scheduler.getNumberOfAvailableSlots() >= 1); - AllocatedSlot single = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(new JobVertexID(), 0, 1))); + SimpleSlot single = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(new JobVertexID(), 0, 1))); assertNotNull(single); s1.releaseSlot(); @@ -197,10 +197,10 @@ public class ScheduleWithCoLocationHintTest { SlotSharingGroup sharingGroup = new SlotSharingGroup(); CoLocationConstraint c1 = new CoLocationConstraint(new CoLocationGroup()); - AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1)); - AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup, c1)); + SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1)); + SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup, c1)); - AllocatedSlot sSolo = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 0, 1))); + SimpleSlot sSolo = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 0, 1))); Instance loc = s1.getInstance(); @@ -208,7 +208,7 @@ public class ScheduleWithCoLocationHintTest { s2.releaseSlot(); sSolo.releaseSlot(); - AllocatedSlot sNew = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1)); + SimpleSlot sNew = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1)); assertEquals(loc, sNew.getInstance()); assertEquals(2, scheduler.getNumberOfLocalizedAssignments()); @@ -241,7 +241,7 @@ public class ScheduleWithCoLocationHintTest { SlotSharingGroup sharingGroup = new SlotSharingGroup(); CoLocationConstraint c1 = new CoLocationConstraint(new CoLocationGroup()); - AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1)); + SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1)); s1.releaseSlot(); scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 1))); @@ -296,16 +296,16 @@ public class ScheduleWithCoLocationHintTest { scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), shareGroup)); // second wave - AllocatedSlot s21 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 4), shareGroup, clc1)); - AllocatedSlot s22 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 4), shareGroup, clc2)); - AllocatedSlot s23 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 4), shareGroup, clc3)); - AllocatedSlot s24 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 4), shareGroup, clc4)); + SimpleSlot s21 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 4), shareGroup, clc1)); + SimpleSlot s22 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 4), shareGroup, clc2)); + SimpleSlot s23 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 4), shareGroup, clc3)); + SimpleSlot s24 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 4), shareGroup, clc4)); // third wave - AllocatedSlot s31 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 1, 4), shareGroup, clc2)); - AllocatedSlot s32 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 2, 4), shareGroup, clc3)); - AllocatedSlot s33 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 3, 4), shareGroup, clc4)); - AllocatedSlot s34 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 4), shareGroup, clc1)); + SimpleSlot s31 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 1, 4), shareGroup, clc2)); + SimpleSlot s32 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 2, 4), shareGroup, clc3)); + SimpleSlot s33 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 3, 4), shareGroup, clc4)); + SimpleSlot s34 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 4), shareGroup, clc1)); scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 0, 4), shareGroup)); scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 1, 4), shareGroup)); @@ -352,24 +352,24 @@ public class ScheduleWithCoLocationHintTest { CoLocationConstraint cc2 = new CoLocationConstraint(ccg); // schedule something into the shared group so that both instances are in the sharing group - AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup)); - AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup)); + SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup)); + SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup)); // schedule one locally to instance 1 - AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i1), sharingGroup, cc1)); + SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i1), sharingGroup, cc1)); // schedule with co location constraint (yet unassigned) and a preference for // instance 1, but it can only get instance 2 - AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup, cc2)); + SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup, cc2)); // schedule something into the assigned co-location constraints and check that they override the // other preferences - AllocatedSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid3, 0, 2, i2), sharingGroup, cc1)); - AllocatedSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid3, 1, 2, i1), sharingGroup, cc2)); + SimpleSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid3, 0, 2, i2), sharingGroup, cc1)); + SimpleSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid3, 1, 2, i1), sharingGroup, cc2)); // check that each slot got three - assertEquals(3, ((SubSlot) s1).getSharedSlot().getNumberOfAllocatedSubSlots()); - assertEquals(3, ((SubSlot) s2).getSharedSlot().getNumberOfAllocatedSubSlots()); + assertEquals(3, s1.getRoot().getNumberLeaves()); + assertEquals(3, s2.getRoot().getNumberLeaves()); assertEquals(s1.getInstance(), s3.getInstance()); assertEquals(s2.getInstance(), s4.getInstance()); @@ -420,8 +420,8 @@ public class ScheduleWithCoLocationHintTest { CoLocationConstraint cc1 = new CoLocationConstraint(ccg); CoLocationConstraint cc2 = new CoLocationConstraint(ccg); - AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1)); - AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup, cc2)); + SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1)); + SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup, cc2)); s1.releaseSlot(); s2.releaseSlot(); @@ -429,8 +429,8 @@ public class ScheduleWithCoLocationHintTest { assertEquals(2, scheduler.getNumberOfAvailableSlots()); assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots()); - AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i2), sharingGroup, cc1)); - AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup, cc2)); + SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i2), sharingGroup, cc1)); + SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup, cc2)); // still preserves the previous instance mapping) assertEquals(i1, s3.getInstance()); @@ -474,8 +474,8 @@ public class ScheduleWithCoLocationHintTest { CoLocationConstraint cc1 = new CoLocationConstraint(ccg); CoLocationConstraint cc2 = new CoLocationConstraint(ccg); - AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1)); - AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup, cc2)); + SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1)); + SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup, cc2)); s1.releaseSlot(); s2.releaseSlot(); @@ -483,8 +483,8 @@ public class ScheduleWithCoLocationHintTest { assertEquals(2, scheduler.getNumberOfAvailableSlots()); assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots()); - AllocatedSlot sa = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jidx, 0, 2))); - AllocatedSlot sb = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jidx, 1, 2))); + SimpleSlot sa = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jidx, 0, 2))); + SimpleSlot sb = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jidx, 1, 2))); try { scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i2), sharingGroup, cc1)); @@ -535,15 +535,15 @@ public class ScheduleWithCoLocationHintTest { // schedule something from the second job vertex id before the first is filled, // and give locality preferences that hint at using the same shared slot for both // co location constraints (which we seek to prevent) - AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1)); - AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i1), sharingGroup, cc2)); + SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1)); + SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i1), sharingGroup, cc2)); - AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup, cc1)); - AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i1), sharingGroup, cc2)); + SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup, cc1)); + SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i1), sharingGroup, cc2)); // check that each slot got three - assertEquals(2, ((SubSlot) s1).getSharedSlot().getNumberOfAllocatedSubSlots()); - assertEquals(2, ((SubSlot) s2).getSharedSlot().getNumberOfAllocatedSubSlots()); + assertEquals(2, s1.getRoot().getNumberLeaves()); + assertEquals(2, s2.getRoot().getNumberLeaves()); assertEquals(s1.getInstance(), s3.getInstance()); assertEquals(s2.getInstance(), s4.getInstance()); @@ -594,15 +594,15 @@ public class ScheduleWithCoLocationHintTest { CoLocationConstraint cc1 = new CoLocationConstraint(ccg); CoLocationConstraint cc2 = new CoLocationConstraint(ccg); - AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1)); - AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup, cc2)); + SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1)); + SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup, cc2)); - AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i1), sharingGroup)); - AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup)); + SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i1), sharingGroup)); + SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup)); - // check that each slot got three - assertEquals(2, ((SubSlot) s1).getSharedSlot().getNumberOfAllocatedSubSlots()); - assertEquals(2, ((SubSlot) s2).getSharedSlot().getNumberOfAllocatedSubSlots()); + // check that each slot got two + assertEquals(2, s1.getRoot().getNumberLeaves()); + assertEquals(2, s2.getRoot().getNumberLeaves()); s1.releaseSlot(); s2.releaseSlot();
http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java index 7209842..a7f0f04 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java @@ -24,6 +24,7 @@ import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.g import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance; import static org.junit.Assert.*; +import org.apache.flink.runtime.instance.SimpleSlot; import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; import org.apache.flink.runtime.testingUtils.TestingUtils; @@ -40,7 +41,6 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.flink.runtime.instance.AllocatedSlot; import org.apache.flink.runtime.instance.Instance; /** @@ -137,11 +137,11 @@ public class SchedulerIsolatedTasksTest { assertEquals(5, scheduler.getNumberOfAvailableSlots()); // schedule something into all slots - AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())); - AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())); - AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())); - AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())); - AllocatedSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())); + SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())); + SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())); + SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())); + SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())); + SimpleSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())); // the slots should all be different assertTrue(areAllDistinct(s1, s2, s3, s4, s5)); @@ -160,8 +160,8 @@ public class SchedulerIsolatedTasksTest { assertEquals(2, scheduler.getNumberOfAvailableSlots()); // now we can schedule some more slots - AllocatedSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())); - AllocatedSlot s7 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())); + SimpleSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())); + SimpleSlot s7 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())); assertTrue(areAllDistinct(s1, s2, s3, s4, s5, s6, s7)); @@ -215,7 +215,7 @@ public class SchedulerIsolatedTasksTest { List<SlotAllocationFuture> allAllocatedSlots = new ArrayList<SlotAllocationFuture>(); // slots that need to be released - final Set<AllocatedSlot> toRelease = new HashSet<AllocatedSlot>(); + final Set<SimpleSlot> toRelease = new HashSet<SimpleSlot>(); // flag to track errors in the concurrent thread final AtomicBoolean errored = new AtomicBoolean(false); @@ -223,7 +223,7 @@ public class SchedulerIsolatedTasksTest { SlotAllocationFutureAction action = new SlotAllocationFutureAction() { @Override - public void slotAllocated(AllocatedSlot slot) { + public void slotAllocated(SimpleSlot slot) { synchronized (toRelease) { toRelease.add(slot); toRelease.notifyAll(); @@ -244,8 +244,8 @@ public class SchedulerIsolatedTasksTest { toRelease.wait(); } - Iterator<AllocatedSlot> iter = toRelease.iterator(); - AllocatedSlot next = iter.next(); + Iterator<SimpleSlot> iter = toRelease.iterator(); + SimpleSlot next = iter.next(); iter.remove(); next.releaseSlot(); @@ -272,7 +272,7 @@ public class SchedulerIsolatedTasksTest { assertFalse("The slot releasing thread caused an error.", errored.get()); - List<AllocatedSlot> slotsAfter = new ArrayList<AllocatedSlot>(); + List<SimpleSlot> slotsAfter = new ArrayList<SimpleSlot>(); for (SlotAllocationFuture future : allAllocatedSlots) { slotsAfter.add(future.waitTillAllocated()); } @@ -303,7 +303,7 @@ public class SchedulerIsolatedTasksTest { scheduler.newInstanceAvailable(i2); scheduler.newInstanceAvailable(i3); - List<AllocatedSlot> slots = new ArrayList<AllocatedSlot>(); + List<SimpleSlot> slots = new ArrayList<SimpleSlot>(); slots.add(scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()))); slots.add(scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()))); slots.add(scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()))); @@ -312,7 +312,7 @@ public class SchedulerIsolatedTasksTest { i2.markDead(); - for (AllocatedSlot slot : slots) { + for (SimpleSlot slot : slots) { if (slot.getInstance() == i2) { assertTrue(slot.isCanceled()); } else { @@ -364,7 +364,7 @@ public class SchedulerIsolatedTasksTest { scheduler.newInstanceAvailable(i3); // schedule something on an arbitrary instance - AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(Collections.<Instance>emptyList()))); + SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(Collections.<Instance>emptyList()))); // figure out how we use the location hints Instance first = s1.getInstance(); @@ -372,28 +372,28 @@ public class SchedulerIsolatedTasksTest { Instance third = first == i3 ? i2 : i3; // something that needs to go to the first instance again - AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(Collections.singletonList(s1.getInstance())))); + SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(Collections.singletonList(s1.getInstance())))); assertEquals(first, s2.getInstance()); // first or second --> second, because first is full - AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(Arrays.asList(first, second)))); + SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(Arrays.asList(first, second)))); assertEquals(second, s3.getInstance()); // first or third --> third (because first is full) - AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(Arrays.asList(first, third)))); - AllocatedSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(Arrays.asList(first, third)))); + SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(Arrays.asList(first, third)))); + SimpleSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(Arrays.asList(first, third)))); assertEquals(third, s4.getInstance()); assertEquals(third, s5.getInstance()); // first or third --> second, because all others are full - AllocatedSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(Arrays.asList(first, third)))); + SimpleSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(Arrays.asList(first, third)))); assertEquals(second, s6.getInstance()); // release something on the first and second instance s2.releaseSlot(); s6.releaseSlot(); - AllocatedSlot s7 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(Arrays.asList(first, third)))); + SimpleSlot s7 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(Arrays.asList(first, third)))); assertEquals(first, s7.getInstance()); assertEquals(1, scheduler.getNumberOfUnconstrainedAssignments()); http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java index de90701..d1c6938 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java @@ -31,13 +31,13 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.flink.runtime.instance.SimpleSlot; import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import org.apache.flink.runtime.instance.AllocatedSlot; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -74,10 +74,10 @@ public class SchedulerSlotSharingTest { scheduler.newInstanceAvailable(i2); // schedule 4 tasks from the first vertex group - AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 8), sharingGroup)); - AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 8), sharingGroup)); - AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 8), sharingGroup)); - AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 8), sharingGroup)); + SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 8), sharingGroup)); + SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 8), sharingGroup)); + SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 8), sharingGroup)); + SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 8), sharingGroup)); assertNotNull(s1); assertNotNull(s2); @@ -102,7 +102,7 @@ public class SchedulerSlotSharingTest { s3.releaseSlot(); // allocate another slot from that group - AllocatedSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup)); + SimpleSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup)); assertNotNull(s5); // release all old slots @@ -110,9 +110,9 @@ public class SchedulerSlotSharingTest { s2.releaseSlot(); s4.releaseSlot(); - AllocatedSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 5, 8), sharingGroup)); - AllocatedSlot s7 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 6, 8), sharingGroup)); - AllocatedSlot s8 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 7, 8), sharingGroup)); + SimpleSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 5, 8), sharingGroup)); + SimpleSlot s7 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 6, 8), sharingGroup)); + SimpleSlot s8 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 7, 8), sharingGroup)); assertNotNull(s6); assertNotNull(s7); @@ -159,10 +159,10 @@ public class SchedulerSlotSharingTest { scheduler.newInstanceAvailable(getRandomInstance(2)); // schedule 4 tasks from the first vertex group - AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 5), sharingGroup)); - AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 5), sharingGroup)); - AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 5), sharingGroup)); - AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 5), sharingGroup)); + SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 5), sharingGroup)); + SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 5), sharingGroup)); + SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 5), sharingGroup)); + SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 5), sharingGroup)); assertNotNull(s1); assertNotNull(s2); @@ -184,10 +184,10 @@ public class SchedulerSlotSharingTest { } // schedule some tasks from the second ID group - AllocatedSlot s1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 5), sharingGroup)); - AllocatedSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 5), sharingGroup)); - AllocatedSlot s3_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 5), sharingGroup)); - AllocatedSlot s4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 5), sharingGroup)); + SimpleSlot s1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 5), sharingGroup)); + SimpleSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 5), sharingGroup)); + SimpleSlot s3_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 5), sharingGroup)); + SimpleSlot s4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 5), sharingGroup)); assertNotNull(s1_2); assertNotNull(s2_2); @@ -228,7 +228,7 @@ public class SchedulerSlotSharingTest { } // we can schedule something from the first vertex group - AllocatedSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup)); + SimpleSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup)); assertNotNull(s5); assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots()); @@ -238,7 +238,7 @@ public class SchedulerSlotSharingTest { // now we release a slot from the second vertex group and schedule another task from that group s2_2.releaseSlot(); - AllocatedSlot s5_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup)); + SimpleSlot s5_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup)); assertNotNull(s5_2); // release all slots @@ -279,10 +279,10 @@ public class SchedulerSlotSharingTest { scheduler.newInstanceAvailable(getRandomInstance(2)); // schedule 4 tasks from the first vertex group - AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup)); - AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup)); - AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup)); - AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup)); + SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup)); + SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup)); + SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup)); + SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup)); assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots()); assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForJid(jid1)); @@ -298,10 +298,10 @@ public class SchedulerSlotSharingTest { assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForJid(jid2)); // schedule some tasks from the second ID group - AllocatedSlot s1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup)); - AllocatedSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup)); - AllocatedSlot s3_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup)); - AllocatedSlot s4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup)); + SimpleSlot s1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup)); + SimpleSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup)); + SimpleSlot s3_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup)); + SimpleSlot s4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup)); assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots()); assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForJid(jid1)); @@ -344,10 +344,10 @@ public class SchedulerSlotSharingTest { scheduler.newInstanceAvailable(getRandomInstance(2)); // schedule 4 tasks from the first vertex group - AllocatedSlot s1_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup)); - AllocatedSlot s2_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup)); - AllocatedSlot s3_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup)); - AllocatedSlot s4_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup)); + SimpleSlot s1_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup)); + SimpleSlot s2_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup)); + SimpleSlot s3_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup)); + SimpleSlot s4_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup)); assertNotNull(s1_1); assertNotNull(s2_1); @@ -357,10 +357,10 @@ public class SchedulerSlotSharingTest { assertTrue(areAllDistinct(s1_1, s2_1, s3_1, s4_1)); // schedule 4 tasks from the second vertex group - AllocatedSlot s1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 7), sharingGroup)); - AllocatedSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 7), sharingGroup)); - AllocatedSlot s3_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 7), sharingGroup)); - AllocatedSlot s4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 7), sharingGroup)); + SimpleSlot s1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 7), sharingGroup)); + SimpleSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 7), sharingGroup)); + SimpleSlot s3_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 7), sharingGroup)); + SimpleSlot s4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 7), sharingGroup)); assertNotNull(s1_2); assertNotNull(s2_2); @@ -370,10 +370,10 @@ public class SchedulerSlotSharingTest { assertTrue(areAllDistinct(s1_2, s2_2, s3_2, s4_2)); // schedule 4 tasks from the third vertex group - AllocatedSlot s1_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 4), sharingGroup)); - AllocatedSlot s2_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 1, 4), sharingGroup)); - AllocatedSlot s3_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 2, 4), sharingGroup)); - AllocatedSlot s4_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 3, 4), sharingGroup)); + SimpleSlot s1_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 4), sharingGroup)); + SimpleSlot s2_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 1, 4), sharingGroup)); + SimpleSlot s3_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 2, 4), sharingGroup)); + SimpleSlot s4_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 3, 4), sharingGroup)); assertNotNull(s1_3); assertNotNull(s2_3); @@ -401,9 +401,9 @@ public class SchedulerSlotSharingTest { s3_2.releaseSlot(); s4_2.releaseSlot(); - AllocatedSlot s5_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 5, 7), sharingGroup)); - AllocatedSlot s6_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 6, 7), sharingGroup)); - AllocatedSlot s7_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 7, 7), sharingGroup)); + SimpleSlot s5_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 5, 7), sharingGroup)); + SimpleSlot s6_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 6, 7), sharingGroup)); + SimpleSlot s7_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 7, 7), sharingGroup)); assertNotNull(s5_2); assertNotNull(s6_2); @@ -454,9 +454,9 @@ public class SchedulerSlotSharingTest { scheduler.newInstanceAvailable(getRandomInstance(2)); // schedule 1 tasks from the first vertex group and 2 from the second - AllocatedSlot s1_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 2), sharingGroup)); - AllocatedSlot s2_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 2), sharingGroup)); - AllocatedSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 2), sharingGroup)); + SimpleSlot s1_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 2), sharingGroup)); + SimpleSlot s2_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 2), sharingGroup)); + SimpleSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 2), sharingGroup)); assertNotNull(s1_1); assertNotNull(s2_1); @@ -472,7 +472,7 @@ public class SchedulerSlotSharingTest { // this should free one slot so we can allocate one non-shared - AllocatedSlot sx = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1))); + SimpleSlot sx = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1))); assertNotNull(sx); assertEquals(1, sharingGroup.getTaskAssignment().getNumberOfSlots()); @@ -507,23 +507,23 @@ public class SchedulerSlotSharingTest { scheduler.newInstanceAvailable(getRandomInstance(2)); // schedule some individual vertices - AllocatedSlot sA1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidA, 0, 2))); - AllocatedSlot sA2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidA, 1, 2))); + SimpleSlot sA1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidA, 0, 2))); + SimpleSlot sA2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidA, 1, 2))); assertNotNull(sA1); assertNotNull(sA2); // schedule some vertices in the sharing group - AllocatedSlot s1_0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup)); - AllocatedSlot s1_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup)); - AllocatedSlot s2_0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup)); - AllocatedSlot s2_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup)); + SimpleSlot s1_0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup)); + SimpleSlot s1_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup)); + SimpleSlot s2_0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup)); + SimpleSlot s2_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup)); assertNotNull(s1_0); assertNotNull(s1_1); assertNotNull(s2_0); assertNotNull(s2_1); // schedule another isolated vertex - AllocatedSlot sB1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidB, 1, 3))); + SimpleSlot sB1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidB, 1, 3))); assertNotNull(sB1); // should not be able to schedule more vertices @@ -574,8 +574,8 @@ public class SchedulerSlotSharingTest { // release some isolated task and check that the sharing group may grow sA1.releaseSlot(); - AllocatedSlot s1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup)); - AllocatedSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup)); + SimpleSlot s1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup)); + SimpleSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup)); assertNotNull(s1_2); assertNotNull(s2_2); @@ -587,19 +587,19 @@ public class SchedulerSlotSharingTest { assertEquals(1, scheduler.getNumberOfAvailableSlots()); // schedule one more no-shared task - AllocatedSlot sB0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidB, 0, 3))); + SimpleSlot sB0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidB, 0, 3))); assertNotNull(sB0); // release the last of the original shared slots and allocate one more non-shared slot s2_1.releaseSlot(); - AllocatedSlot sB2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidB, 2, 3))); + SimpleSlot sB2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidB, 2, 3))); assertNotNull(sB2); // release on non-shared and add some shared slots sA2.releaseSlot(); - AllocatedSlot s1_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup)); - AllocatedSlot s2_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup)); + SimpleSlot s1_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup)); + SimpleSlot s2_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup)); assertNotNull(s1_3); assertNotNull(s2_3); @@ -609,8 +609,8 @@ public class SchedulerSlotSharingTest { s1_3.releaseSlot(); s2_3.releaseSlot(); - AllocatedSlot sC0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidC, 1, 2))); - AllocatedSlot sC1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidC, 0, 2))); + SimpleSlot sC0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidC, 1, 2))); + SimpleSlot sC1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidC, 0, 2))); assertNotNull(sC0); assertNotNull(sC1); @@ -655,8 +655,8 @@ public class SchedulerSlotSharingTest { // schedule one to each instance - AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup)); - AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup)); + SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup)); + SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup)); assertNotNull(s1); assertNotNull(s2); @@ -665,8 +665,8 @@ public class SchedulerSlotSharingTest { assertEquals(1, i2.getNumberOfAvailableSlots()); // schedule one from the other group to each instance - AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i1), sharingGroup)); - AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i2), sharingGroup)); + SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i1), sharingGroup)); + SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i2), sharingGroup)); assertNotNull(s3); assertNotNull(s4); @@ -705,8 +705,8 @@ public class SchedulerSlotSharingTest { // schedule one to each instance - AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup)); - AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i1), sharingGroup)); + SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup)); + SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i1), sharingGroup)); assertNotNull(s1); assertNotNull(s2); @@ -715,8 +715,8 @@ public class SchedulerSlotSharingTest { assertEquals(2, i2.getNumberOfAvailableSlots()); // schedule one from the other group to each instance - AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i2), sharingGroup)); - AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i2), sharingGroup)); + SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i2), sharingGroup)); + SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i2), sharingGroup)); assertNotNull(s3); assertNotNull(s4); @@ -754,14 +754,14 @@ public class SchedulerSlotSharingTest { scheduler.newInstanceAvailable(i2); // schedule until the one instance is full - AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup)); - AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i1), sharingGroup)); - AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 4, i1), sharingGroup)); - AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 4, i1), sharingGroup)); + SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup)); + SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i1), sharingGroup)); + SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 4, i1), sharingGroup)); + SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 4, i1), sharingGroup)); // schedule two more with preference of same instance --> need to go to other instance - AllocatedSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 3, 4, i1), sharingGroup)); - AllocatedSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 4, 4, i1), sharingGroup)); + SimpleSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 3, 4, i1), sharingGroup)); + SimpleSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 4, 4, i1), sharingGroup)); assertNotNull(s1); assertNotNull(s2); @@ -808,19 +808,19 @@ public class SchedulerSlotSharingTest { scheduler.newInstanceAvailable(getRandomInstance(4)); // allocate something from group 1 and 2 interleaved with schedule for group 3 - AllocatedSlot slot_1_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup)); - AllocatedSlot slot_1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup)); + SimpleSlot slot_1_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup)); + SimpleSlot slot_1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup)); - AllocatedSlot slot_2_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup)); - AllocatedSlot slot_2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup)); + SimpleSlot slot_2_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup)); + SimpleSlot slot_2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup)); - AllocatedSlot slot_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup)); + SimpleSlot slot_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup)); - AllocatedSlot slot_1_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup)); - AllocatedSlot slot_1_4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup)); + SimpleSlot slot_1_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup)); + SimpleSlot slot_1_4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup)); - AllocatedSlot slot_2_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup)); - AllocatedSlot slot_2_4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup)); + SimpleSlot slot_2_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup)); + SimpleSlot slot_2_4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup)); // release groups 1 and 2 @@ -836,10 +836,10 @@ public class SchedulerSlotSharingTest { // allocate group 4 - AllocatedSlot slot_4_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup)); - AllocatedSlot slot_4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup)); - AllocatedSlot slot_4_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup)); - AllocatedSlot slot_4_4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup)); + SimpleSlot slot_4_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup)); + SimpleSlot slot_4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup)); + SimpleSlot slot_4_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup)); + SimpleSlot slot_4_4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup)); // release groups 3 and 4 @@ -892,11 +892,11 @@ public class SchedulerSlotSharingTest { @Override public void run() { try { - AllocatedSlot slot = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, enumerator4.getAndIncrement(), 4), sharingGroup)); - + SimpleSlot slot = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, enumerator4.getAndIncrement(), 4), sharingGroup)); + sleepUninterruptibly(rnd.nextInt(5)); slot.releaseSlot(); - + if (completed.incrementAndGet() == 13) { synchronized (completed) { completed.notifyAll(); @@ -915,7 +915,7 @@ public class SchedulerSlotSharingTest { public void run() { try { if (flag3.compareAndSet(false, true)) { - AllocatedSlot slot = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup)); + SimpleSlot slot = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup)); sleepUninterruptibly(5); @@ -944,7 +944,7 @@ public class SchedulerSlotSharingTest { @Override public void run() { try { - AllocatedSlot slot = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, enumerator2.getAndIncrement(), 4), sharingGroup)); + SimpleSlot slot = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, enumerator2.getAndIncrement(), 4), sharingGroup)); // wait a bit till scheduling the successor sleepUninterruptibly(rnd.nextInt(5)); @@ -971,7 +971,7 @@ public class SchedulerSlotSharingTest { @Override public void run() { try { - AllocatedSlot slot = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, enumerator1.getAndIncrement(), 4), sharingGroup)); + SimpleSlot slot = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, enumerator1.getAndIncrement(), 4), sharingGroup)); // wait a bit till scheduling the successor sleepUninterruptibly(rnd.nextInt(5)); @@ -1049,24 +1049,24 @@ public class SchedulerSlotSharingTest { scheduler.newInstanceAvailable(getRandomInstance(4)); // schedule one task for the first and second vertex - AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup)); - AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup)); + SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup)); + SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup)); - assertTrue( ((SubSlot) s1).getSharedSlot() == ((SubSlot) s2).getSharedSlot() ); + assertTrue( s1.getParent() == s2.getParent() ); assertEquals(3, scheduler.getNumberOfAvailableSlots()); - AllocatedSlot s3_0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 5), sharingGroup)); - AllocatedSlot s3_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 1, 5), sharingGroup)); - AllocatedSlot s4_0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup)); - AllocatedSlot s4_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup)); + SimpleSlot s3_0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 5), sharingGroup)); + SimpleSlot s3_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 1, 5), sharingGroup)); + SimpleSlot s4_0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup)); + SimpleSlot s4_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup)); s1.releaseSlot(); s2.releaseSlot(); - AllocatedSlot s3_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 2, 5), sharingGroup)); - AllocatedSlot s3_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 3, 5), sharingGroup)); - AllocatedSlot s4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup)); - AllocatedSlot s4_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup)); + SimpleSlot s3_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 2, 5), sharingGroup)); + SimpleSlot s3_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 3, 5), sharingGroup)); + SimpleSlot s4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup)); + SimpleSlot s4_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup)); try { scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 4, 5), sharingGroup)); http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java index feb3005..fcb638f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java @@ -23,6 +23,8 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Matchers.any; import static org.junit.Assert.*; +import org.apache.flink.runtime.instance.SharedSlot; +import org.apache.flink.runtime.instance.SimpleSlot; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -39,41 +41,45 @@ public class SharedSlotsTest { doAnswer(new Answer<Void>() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { - final SubSlot sub = (SubSlot) invocation.getArguments()[0]; - final SharedSlot shared = (SharedSlot) invocation.getArguments()[1]; - shared.releaseSlot(sub); + final SimpleSlot simpleSlot = (SimpleSlot) invocation.getArguments()[0]; + final SharedSlot sharedSlot = simpleSlot.getParent(); + + sharedSlot.freeSubSlot(simpleSlot); + return null; } - }).when(assignment).releaseSubSlot(any(SubSlot.class), any(SharedSlot.class)); + }).when(assignment).releaseSimpleSlot(any(SimpleSlot.class)); + + JobVertexID id1 = new JobVertexID(); Instance instance = SchedulerTestUtils.getRandomInstance(1); - SharedSlot slot = new SharedSlot(instance.allocateSlot(new JobID()), assignment); - assertFalse(slot.isDisposed()); + SharedSlot slot = instance.allocateSharedSlot(new JobID(), assignment, id1); + assertFalse(slot.isDead()); - SubSlot ss1 = slot.allocateSubSlot(new JobVertexID()); + SimpleSlot ss1 = slot.allocateSubSlot(id1); assertNotNull(ss1); // verify resources assertEquals(instance, ss1.getInstance()); assertEquals(0, ss1.getSlotNumber()); - assertEquals(slot.getAllocatedSlot().getJobID(), ss1.getJobID()); + assertEquals(slot.getJobID(), ss1.getJobID()); - SubSlot ss2 = slot.allocateSubSlot(new JobVertexID()); + SimpleSlot ss2 = slot.allocateSubSlot(new JobVertexID()); assertNotNull(ss2); - assertEquals(2, slot.getNumberOfAllocatedSubSlots()); + assertEquals(2, slot.getNumberLeaves()); // release first slot, should not trigger release ss1.releaseSlot(); - assertFalse(slot.isDisposed()); + assertFalse(slot.isDead()); ss2.releaseSlot(); - assertFalse(slot.isDisposed()); + assertFalse(slot.isDead()); // the shared slot should now dispose itself - assertEquals(0, slot.getNumberOfAllocatedSubSlots()); + assertEquals(0, slot.getNumberLeaves()); } catch (Exception e) { e.printStackTrace(); @@ -85,46 +91,49 @@ public class SharedSlotsTest { public void createAndRelease() { try { SlotSharingGroupAssignment assignment = mock(SlotSharingGroupAssignment.class); - doAnswer(new Answer<Void>() { + doAnswer(new Answer<Boolean>() { @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - final SubSlot sub = (SubSlot) invocation.getArguments()[0]; - final SharedSlot shared = (SharedSlot) invocation.getArguments()[1]; - if (shared.releaseSlot(sub) == 0) { - shared.dispose(); + public Boolean answer(InvocationOnMock invocation) throws Throwable { + final SimpleSlot slot = (SimpleSlot) invocation.getArguments()[0]; + final SharedSlot shared = slot.getParent(); + if (shared.freeSubSlot(slot) == 0) { + shared.markDead(); + return true; } - return null; + return false; } - - }).when(assignment).releaseSubSlot(any(SubSlot.class), any(SharedSlot.class)); - + + }).when(assignment).releaseSimpleSlot(any(SimpleSlot.class)); + + JobVertexID id1 = new JobVertexID(); + Instance instance = SchedulerTestUtils.getRandomInstance(1); - SharedSlot slot = new SharedSlot(instance.allocateSlot(new JobID()), assignment); - assertFalse(slot.isDisposed()); + SharedSlot slot = instance.allocateSharedSlot(new JobID(), assignment, id1); + assertFalse(slot.isDead()); - SubSlot ss1 = slot.allocateSubSlot(new JobVertexID()); + SimpleSlot ss1 = slot.allocateSubSlot(id1); assertNotNull(ss1); // verify resources assertEquals(instance, ss1.getInstance()); assertEquals(0, ss1.getSlotNumber()); - assertEquals(slot.getAllocatedSlot().getJobID(), ss1.getJobID()); + assertEquals(slot.getJobID(), ss1.getJobID()); - SubSlot ss2 = slot.allocateSubSlot(new JobVertexID()); + SimpleSlot ss2 = slot.allocateSubSlot(new JobVertexID()); assertNotNull(ss2); - assertEquals(2, slot.getNumberOfAllocatedSubSlots()); + assertEquals(2, slot.getNumberLeaves()); // release first slot, should not trigger release ss1.releaseSlot(); - assertFalse(slot.isDisposed()); + assertFalse(slot.isDead()); ss2.releaseSlot(); - assertTrue(slot.isDisposed()); + assertTrue(slot.isDead()); // the shared slot should now dispose itself - assertEquals(0, slot.getNumberOfAllocatedSubSlots()); + assertEquals(0, slot.getNumberLeaves()); assertNull(slot.allocateSubSlot(new JobVertexID())); } http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java index 47afacb..23a5c94 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java @@ -23,7 +23,7 @@ import static org.junit.Assert.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.flink.runtime.instance.AllocatedSlot; +import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.jobgraph.JobID; import org.junit.Test; @@ -36,7 +36,7 @@ public class SlotAllocationFutureTest { SlotAllocationFutureAction action = new SlotAllocationFutureAction() { @Override - public void slotAllocated(AllocatedSlot slot) {} + public void slotAllocated(SimpleSlot slot) {} }; future.setFutureAction(action); @@ -47,8 +47,8 @@ public class SlotAllocationFutureTest { // expected } - final AllocatedSlot slot1 = new AllocatedSlot(new JobID(), SchedulerTestUtils.getRandomInstance(1), 0); - final AllocatedSlot slot2 = new AllocatedSlot(new JobID(), SchedulerTestUtils.getRandomInstance(1), 0); + final SimpleSlot slot1 = new SimpleSlot(new JobID(), SchedulerTestUtils.getRandomInstance(1), 0, null, null); + final SimpleSlot slot2 = new SimpleSlot(new JobID(), SchedulerTestUtils.getRandomInstance(1), 0, null, null); future.setSlot(slot1); try { @@ -71,13 +71,13 @@ public class SlotAllocationFutureTest { // action before the slot { final AtomicInteger invocations = new AtomicInteger(); - final AllocatedSlot thisSlot = new AllocatedSlot(new JobID(), SchedulerTestUtils.getRandomInstance(1), 0); + final SimpleSlot thisSlot = new SimpleSlot(new JobID(), SchedulerTestUtils.getRandomInstance(1), 0, null, null); SlotAllocationFuture future = new SlotAllocationFuture(); future.setFutureAction(new SlotAllocationFutureAction() { @Override - public void slotAllocated(AllocatedSlot slot) { + public void slotAllocated(SimpleSlot slot) { assertEquals(thisSlot, slot); invocations.incrementAndGet(); } @@ -91,14 +91,14 @@ public class SlotAllocationFutureTest { // slot before action { final AtomicInteger invocations = new AtomicInteger(); - final AllocatedSlot thisSlot = new AllocatedSlot(new JobID(), SchedulerTestUtils.getRandomInstance(1), 0); + final SimpleSlot thisSlot = new SimpleSlot(new JobID(), SchedulerTestUtils.getRandomInstance(1), 0, null, null); SlotAllocationFuture future = new SlotAllocationFuture(); future.setSlot(thisSlot); future.setFutureAction(new SlotAllocationFutureAction() { @Override - public void slotAllocated(AllocatedSlot slot) { + public void slotAllocated(SimpleSlot slot) { assertEquals(thisSlot, slot); invocations.incrementAndGet(); } @@ -121,7 +121,7 @@ public class SlotAllocationFutureTest { final AtomicInteger invocations = new AtomicInteger(); final AtomicBoolean error = new AtomicBoolean(); - final AllocatedSlot thisSlot = new AllocatedSlot(new JobID(), SchedulerTestUtils.getRandomInstance(1), 0); + final SimpleSlot thisSlot = new SimpleSlot(new JobID(), SchedulerTestUtils.getRandomInstance(1), 0, null, null); final SlotAllocationFuture future = new SlotAllocationFuture(); @@ -130,7 +130,7 @@ public class SlotAllocationFutureTest { @Override public void run() { try { - AllocatedSlot syncSlot = future.waitTillAllocated(); + SimpleSlot syncSlot = future.waitTillAllocated(); if (syncSlot == null || syncSlot != thisSlot) { error.set(true); return; @@ -158,12 +158,12 @@ public class SlotAllocationFutureTest { // setting slot before syncing { - final AllocatedSlot thisSlot = new AllocatedSlot(new JobID(), SchedulerTestUtils.getRandomInstance(1), 0); + final SimpleSlot thisSlot = new SimpleSlot(new JobID(), SchedulerTestUtils.getRandomInstance(1), 0, null, null); final SlotAllocationFuture future = new SlotAllocationFuture(); future.setSlot(thisSlot); - AllocatedSlot retrieved = future.waitTillAllocated(); + SimpleSlot retrieved = future.waitTillAllocated(); assertNotNull(retrieved); assertEquals(thisSlot, retrieved); http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala index 36f0f92..9bcd163 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala @@ -62,7 +62,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll { within(1 second) { jm ! SubmitJob(jobGraph) - expectMsg(SubmissionFailure(jobGraph.getJobID, new NoResourceAvailableException(1,1))) + expectMsg(SubmissionFailure(jobGraph.getJobID, new NoResourceAvailableException(1,1,0))) expectNoMsg() } http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala index 0f6eeca..22ee2c1 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala @@ -18,7 +18,7 @@ package org.apache.flink.runtime.jobmanager -import akka.actor.{ActorSystem, PoisonPill} +import akka.actor.{Kill, ActorSystem, PoisonPill} import akka.testkit.{ImplicitSender, TestKit} import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, DistributionPattern, JobGraph} import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender} @@ -41,7 +41,7 @@ with WordSpecLike with Matchers with BeforeAndAfterAll { } "The JobManager" should { - "handle failing task manager" in { + "handle gracefully failing task manager" in { val num_tasks = 31 val sender = new AbstractJobVertex("Sender") val receiver = new AbstractJobVertex("Receiver") @@ -78,6 +78,41 @@ with WordSpecLike with Matchers with BeforeAndAfterAll { cluster.stop() } } + + "handle hard failing task manager" in { + val num_tasks = 31 + val sender = new AbstractJobVertex("Sender") + val receiver = new AbstractJobVertex("Receiver") + sender.setInvokableClass(classOf[Sender]) + receiver.setInvokableClass(classOf[BlockingReceiver]) + sender.setParallelism(num_tasks) + receiver.setParallelism(num_tasks) + receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) + + val jobGraph = new JobGraph("Pointwise Job", sender, receiver) + val jobID = jobGraph.getJobID + + val cluster = TestingUtils.startTestingCluster(num_tasks, 2) + + val taskManagers = cluster.getTaskManagers + val jm = cluster.getJobManager + + try { + within(TestingUtils.TESTING_DURATION) { + jm ! SubmitJob(jobGraph) + expectMsg(SubmissionSuccess(jobGraph.getJobID)) + + jm ! WaitForAllVerticesToBeRunningOrFinished(jobID) + expectMsg(AllVerticesRunning(jobID)) + + // kill one task manager + taskManagers(0) ! Kill + expectMsgType[JobResultFailed] + } + }finally{ + cluster.stop() + } + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala index fba7c76..626c518 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala @@ -18,7 +18,7 @@ package org.apache.flink.runtime.jobmanager -import akka.actor.{ActorSystem, PoisonPill} +import akka.actor.{Kill, ActorSystem, PoisonPill} import akka.testkit.{ImplicitSender, TestKit} import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, DistributionPattern, JobGraph} import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender} @@ -41,7 +41,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { } "The JobManager" should { - "handle task manager failures with slot sharing" in { + "handle gracefully failing task manager with slot sharing" in { val num_tasks = 20 val sender = new AbstractJobVertex("Sender") @@ -83,6 +83,48 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { cluster.stop() } } + + "handle hard failing task manager with slot sharing" in { + val num_tasks = 20 + + val sender = new AbstractJobVertex("Sender") + val receiver = new AbstractJobVertex("Receiver") + + sender.setInvokableClass(classOf[Sender]) + receiver.setInvokableClass(classOf[BlockingReceiver]) + + sender.setParallelism(num_tasks) + receiver.setParallelism(num_tasks) + receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) + + val sharingGroup = new SlotSharingGroup() + sender.setSlotSharingGroup(sharingGroup) + receiver.setSlotSharingGroup(sharingGroup) + + val jobGraph = new JobGraph("Pointwise Job", sender, receiver) + val jobID = jobGraph.getJobID + + val cluster = TestingUtils.startTestingCluster(num_tasks/2, 2) + val jm = cluster.getJobManager + val taskManagers = cluster.getTaskManagers + + try{ + within(TestingUtils.TESTING_DURATION) { + jm ! SubmitJob(jobGraph) + expectMsg(SubmissionSuccess(jobGraph.getJobID)) + + jm ! WaitForAllVerticesToBeRunningOrFinished(jobID) + expectMsg(AllVerticesRunning(jobID)) + + //kill task manager + taskManagers(0) ! Kill + + expectMsgType[JobResultFailed] + } + }finally{ + cluster.stop() + } + } } }
