http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
index b22ccd0..272a911 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
@@ -26,9 +26,9 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import org.apache.flink.runtime.instance.SimpleSlot;
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
-import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -77,18 +77,18 @@ public class ScheduleWithCoLocationHintTest {
                        CoLocationConstraint c6 = new CoLocationConstraint(ccg);
                        
                        // schedule 4 tasks from the first vertex group
-                       AllocatedSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 0, 6), sharingGroup, c1));
-                       AllocatedSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 1, 6), sharingGroup, c2));
-                       AllocatedSlot s3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 2, 6), sharingGroup, c3));
-                       AllocatedSlot s4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 3, 6), sharingGroup, c4));
-                       AllocatedSlot s5 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 0, 6), sharingGroup, c1));
-                       AllocatedSlot s6 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 1, 6), sharingGroup, c2));
-                       AllocatedSlot s7 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 2, 6), sharingGroup, c3));
-                       AllocatedSlot s8 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 4, 6), sharingGroup, c5));
-                       AllocatedSlot s9 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 5, 6), sharingGroup, c6));
-                       AllocatedSlot s10 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 3, 6), sharingGroup, c4));
-                       AllocatedSlot s11 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 4, 6), sharingGroup, c5));
-                       AllocatedSlot s12 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 5, 6), sharingGroup, c6));
+                       SimpleSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 0, 6), sharingGroup, c1));
+                       SimpleSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 1, 6), sharingGroup, c2));
+                       SimpleSlot s3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 2, 6), sharingGroup, c3));
+                       SimpleSlot s4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 3, 6), sharingGroup, c4));
+                       SimpleSlot s5 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 0, 6), sharingGroup, c1));
+                       SimpleSlot s6 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 1, 6), sharingGroup, c2));
+                       SimpleSlot s7 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 2, 6), sharingGroup, c3));
+                       SimpleSlot s8 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 4, 6), sharingGroup, c5));
+                       SimpleSlot s9 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 5, 6), sharingGroup, c6));
+                       SimpleSlot s10 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 3, 6), sharingGroup, c4));
+                       SimpleSlot s11 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 4, 6), sharingGroup, c5));
+                       SimpleSlot s12 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 5, 6), sharingGroup, c6));
 
                        assertNotNull(s1);
                        assertNotNull(s2);
@@ -104,18 +104,18 @@ public class ScheduleWithCoLocationHintTest {
                        assertNotNull(s12);
                        
                        // check that each slot got exactly two tasks
-                       assertEquals(2, ((SubSlot) 
s1).getSharedSlot().getNumberOfAllocatedSubSlots());
-                       assertEquals(2, ((SubSlot) 
s2).getSharedSlot().getNumberOfAllocatedSubSlots());
-                       assertEquals(2, ((SubSlot) 
s3).getSharedSlot().getNumberOfAllocatedSubSlots());
-                       assertEquals(2, ((SubSlot) 
s4).getSharedSlot().getNumberOfAllocatedSubSlots());
-                       assertEquals(2, ((SubSlot) 
s5).getSharedSlot().getNumberOfAllocatedSubSlots());
-                       assertEquals(2, ((SubSlot) 
s6).getSharedSlot().getNumberOfAllocatedSubSlots());
-                       assertEquals(2, ((SubSlot) 
s7).getSharedSlot().getNumberOfAllocatedSubSlots());
-                       assertEquals(2, ((SubSlot) 
s8).getSharedSlot().getNumberOfAllocatedSubSlots());
-                       assertEquals(2, ((SubSlot) 
s9).getSharedSlot().getNumberOfAllocatedSubSlots());
-                       assertEquals(2, ((SubSlot) 
s10).getSharedSlot().getNumberOfAllocatedSubSlots());
-                       assertEquals(2, ((SubSlot) 
s11).getSharedSlot().getNumberOfAllocatedSubSlots());
-                       assertEquals(2, ((SubSlot) 
s12).getSharedSlot().getNumberOfAllocatedSubSlots());
+                       assertEquals(2, s1.getRoot().getNumberLeaves());
+                       assertEquals(2, s2.getRoot().getNumberLeaves());
+                       assertEquals(2, s3.getRoot().getNumberLeaves());
+                       assertEquals(2, s4.getRoot().getNumberLeaves());
+                       assertEquals(2, s5.getRoot().getNumberLeaves());
+                       assertEquals(2, s6.getRoot().getNumberLeaves());
+                       assertEquals(2, s7.getRoot().getNumberLeaves());
+                       assertEquals(2, s8.getRoot().getNumberLeaves());
+                       assertEquals(2, s9.getRoot().getNumberLeaves());
+                       assertEquals(2, s10.getRoot().getNumberLeaves());
+                       assertEquals(2, s11.getRoot().getNumberLeaves());
+                       assertEquals(2, s12.getRoot().getNumberLeaves());
                        
                        assertEquals(s1.getInstance(), s5.getInstance());
                        assertEquals(s2.getInstance(), s6.getInstance());
@@ -150,7 +150,7 @@ public class ScheduleWithCoLocationHintTest {
                        s12.releaseSlot();
                        assertTrue(scheduler.getNumberOfAvailableSlots() >= 1);
                        
-                       AllocatedSlot single = 
scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(new 
JobVertexID(), 0, 1)));
+                       SimpleSlot single = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(new JobVertexID(), 0, 1)));
                        assertNotNull(single);
                        
                        s1.releaseSlot();
@@ -197,10 +197,10 @@ public class ScheduleWithCoLocationHintTest {
                        SlotSharingGroup sharingGroup = new SlotSharingGroup();
                        CoLocationConstraint c1 = new CoLocationConstraint(new 
CoLocationGroup());
                        
-                       AllocatedSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1));
-                       AllocatedSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup, c1));
+                       SimpleSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1));
+                       SimpleSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup, c1));
                        
-                       AllocatedSlot sSolo = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid4, 0, 1)));
+                       SimpleSlot sSolo = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid4, 0, 1)));
                        
                        Instance loc = s1.getInstance();
                        
@@ -208,7 +208,7 @@ public class ScheduleWithCoLocationHintTest {
                        s2.releaseSlot();
                        sSolo.releaseSlot();
                        
-                       AllocatedSlot sNew = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1));
+                       SimpleSlot sNew = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1));
                        assertEquals(loc, sNew.getInstance());
                        
                        assertEquals(2, 
scheduler.getNumberOfLocalizedAssignments());
@@ -241,7 +241,7 @@ public class ScheduleWithCoLocationHintTest {
                        SlotSharingGroup sharingGroup = new SlotSharingGroup();
                        CoLocationConstraint c1 = new CoLocationConstraint(new 
CoLocationGroup());
                        
-                       AllocatedSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1));
+                       SimpleSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1));
                        s1.releaseSlot();
                        
                        scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 0, 1)));
@@ -296,16 +296,16 @@ public class ScheduleWithCoLocationHintTest {
                        scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 3, 4), shareGroup));
                        
                        // second wave
-                       AllocatedSlot s21 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 0, 4), shareGroup, clc1));
-                       AllocatedSlot s22 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 2, 4), shareGroup, clc2));
-                       AllocatedSlot s23 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 1, 4), shareGroup, clc3));
-                       AllocatedSlot s24 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 3, 4), shareGroup, clc4));
+                       SimpleSlot s21 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 0, 4), shareGroup, clc1));
+                       SimpleSlot s22 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 2, 4), shareGroup, clc2));
+                       SimpleSlot s23 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 1, 4), shareGroup, clc3));
+                       SimpleSlot s24 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 3, 4), shareGroup, clc4));
                        
                        // third wave
-                       AllocatedSlot s31 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 1, 4), shareGroup, clc2));
-                       AllocatedSlot s32 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 2, 4), shareGroup, clc3));
-                       AllocatedSlot s33 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 3, 4), shareGroup, clc4));
-                       AllocatedSlot s34 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 0, 4), shareGroup, clc1));
+                       SimpleSlot s31 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 1, 4), shareGroup, clc2));
+                       SimpleSlot s32 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 2, 4), shareGroup, clc3));
+                       SimpleSlot s33 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 3, 4), shareGroup, clc4));
+                       SimpleSlot s34 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 0, 4), shareGroup, clc1));
                        
                        scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid4, 0, 4), shareGroup));
                        scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid4, 1, 4), shareGroup));
@@ -352,24 +352,24 @@ public class ScheduleWithCoLocationHintTest {
                        CoLocationConstraint cc2 = new 
CoLocationConstraint(ccg);
 
                        // schedule something into the shared group so that 
both instances are in the sharing group
-                       AllocatedSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup));
-                       AllocatedSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup));
+                       SimpleSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup));
+                       SimpleSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup));
                        
                        // schedule one locally to instance 1
-                       AllocatedSlot s3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i1), sharingGroup, cc1));
+                       SimpleSlot s3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i1), sharingGroup, cc1));
 
                        // schedule with co location constraint (yet 
unassigned) and a preference for
                        // instance 1, but it can only get instance 2
-                       AllocatedSlot s4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup, cc2));
+                       SimpleSlot s4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup, cc2));
                        
                        // schedule something into the assigned co-location 
constraints and check that they override the
                        // other preferences
-                       AllocatedSlot s5 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid3, 0, 2, i2), sharingGroup, cc1));
-                       AllocatedSlot s6 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid3, 1, 2, i1), sharingGroup, cc2));
+                       SimpleSlot s5 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid3, 0, 2, i2), sharingGroup, cc1));
+                       SimpleSlot s6 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid3, 1, 2, i1), sharingGroup, cc2));
                        
                        // check that each slot got three
-                       assertEquals(3, ((SubSlot) 
s1).getSharedSlot().getNumberOfAllocatedSubSlots());
-                       assertEquals(3, ((SubSlot) 
s2).getSharedSlot().getNumberOfAllocatedSubSlots());
+                       assertEquals(3, s1.getRoot().getNumberLeaves());
+                       assertEquals(3, s2.getRoot().getNumberLeaves());
                        
                        assertEquals(s1.getInstance(), s3.getInstance());
                        assertEquals(s2.getInstance(), s4.getInstance());
@@ -420,8 +420,8 @@ public class ScheduleWithCoLocationHintTest {
                        CoLocationConstraint cc1 = new 
CoLocationConstraint(ccg);
                        CoLocationConstraint cc2 = new 
CoLocationConstraint(ccg);
 
-                       AllocatedSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1));
-                       AllocatedSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup, cc2));
+                       SimpleSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1));
+                       SimpleSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup, cc2));
                        
                        s1.releaseSlot();
                        s2.releaseSlot();
@@ -429,8 +429,8 @@ public class ScheduleWithCoLocationHintTest {
                        assertEquals(2, scheduler.getNumberOfAvailableSlots());
                        assertEquals(0, 
sharingGroup.getTaskAssignment().getNumberOfSlots());
 
-                       AllocatedSlot s3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i2), sharingGroup, cc1));
-                       AllocatedSlot s4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup, cc2));
+                       SimpleSlot s3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i2), sharingGroup, cc1));
+                       SimpleSlot s4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup, cc2));
                        
                        // still preserves the previous instance mapping)
                        assertEquals(i1, s3.getInstance());
@@ -474,8 +474,8 @@ public class ScheduleWithCoLocationHintTest {
                        CoLocationConstraint cc1 = new 
CoLocationConstraint(ccg);
                        CoLocationConstraint cc2 = new 
CoLocationConstraint(ccg);
 
-                       AllocatedSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1));
-                       AllocatedSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup, cc2));
+                       SimpleSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1));
+                       SimpleSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup, cc2));
                        
                        s1.releaseSlot();
                        s2.releaseSlot();
@@ -483,8 +483,8 @@ public class ScheduleWithCoLocationHintTest {
                        assertEquals(2, scheduler.getNumberOfAvailableSlots());
                        assertEquals(0, 
sharingGroup.getTaskAssignment().getNumberOfSlots());
 
-                       AllocatedSlot sa = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jidx, 0, 2)));
-                       AllocatedSlot sb = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jidx, 1, 2)));
+                       SimpleSlot sa = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jidx, 0, 2)));
+                       SimpleSlot sb = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jidx, 1, 2)));
                        
                        try {
                                scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i2), sharingGroup, cc1));
@@ -535,15 +535,15 @@ public class ScheduleWithCoLocationHintTest {
                        // schedule something from the second job vertex id 
before the first is filled,
                        // and give locality preferences that hint at using the 
same shared slot for both
                        // co location constraints (which we seek to prevent)
-                       AllocatedSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1));
-                       AllocatedSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i1), sharingGroup, cc2));
+                       SimpleSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1));
+                       SimpleSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i1), sharingGroup, cc2));
 
-                       AllocatedSlot s3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup, cc1));
-                       AllocatedSlot s4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i1), sharingGroup, cc2));
+                       SimpleSlot s3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup, cc1));
+                       SimpleSlot s4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i1), sharingGroup, cc2));
                        
                        // check that each slot got three
-                       assertEquals(2, ((SubSlot) 
s1).getSharedSlot().getNumberOfAllocatedSubSlots());
-                       assertEquals(2, ((SubSlot) 
s2).getSharedSlot().getNumberOfAllocatedSubSlots());
+                       assertEquals(2, s1.getRoot().getNumberLeaves());
+                       assertEquals(2, s2.getRoot().getNumberLeaves());
                        
                        assertEquals(s1.getInstance(), s3.getInstance());
                        assertEquals(s2.getInstance(), s4.getInstance());
@@ -594,15 +594,15 @@ public class ScheduleWithCoLocationHintTest {
                        CoLocationConstraint cc1 = new 
CoLocationConstraint(ccg);
                        CoLocationConstraint cc2 = new 
CoLocationConstraint(ccg);
 
-                       AllocatedSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1));
-                       AllocatedSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup, cc2));
+                       SimpleSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1));
+                       SimpleSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup, cc2));
 
-                       AllocatedSlot s3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i1), sharingGroup));
-                       AllocatedSlot s4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup));
+                       SimpleSlot s3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i1), sharingGroup));
+                       SimpleSlot s4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup));
                        
-                       // check that each slot got three
-                       assertEquals(2, ((SubSlot) 
s1).getSharedSlot().getNumberOfAllocatedSubSlots());
-                       assertEquals(2, ((SubSlot) 
s2).getSharedSlot().getNumberOfAllocatedSubSlots());
+                       // check that each slot got two
+                       assertEquals(2, s1.getRoot().getNumberLeaves());
+                       assertEquals(2, s2.getRoot().getNumberLeaves());
                        
                        s1.releaseSlot();
                        s2.releaseSlot();

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
index 7209842..a7f0f04 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
@@ -24,6 +24,7 @@ import static 
org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.g
 import static 
org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;
 import static org.junit.Assert.*;
 
+import org.apache.flink.runtime.instance.SimpleSlot;
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -40,7 +41,6 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
 
 /**
@@ -137,11 +137,11 @@ public class SchedulerIsolatedTasksTest {
                        assertEquals(5, scheduler.getNumberOfAvailableSlots());
                        
                        // schedule something into all slots
-                       AllocatedSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getDummyTask()));
-                       AllocatedSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getDummyTask()));
-                       AllocatedSlot s3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getDummyTask()));
-                       AllocatedSlot s4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getDummyTask()));
-                       AllocatedSlot s5 = scheduler.scheduleImmediately(new 
ScheduledUnit(getDummyTask()));
+                       SimpleSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getDummyTask()));
+                       SimpleSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getDummyTask()));
+                       SimpleSlot s3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getDummyTask()));
+                       SimpleSlot s4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getDummyTask()));
+                       SimpleSlot s5 = scheduler.scheduleImmediately(new 
ScheduledUnit(getDummyTask()));
                        
                        // the slots should all be different
                        assertTrue(areAllDistinct(s1, s2, s3, s4, s5));
@@ -160,8 +160,8 @@ public class SchedulerIsolatedTasksTest {
                        assertEquals(2, scheduler.getNumberOfAvailableSlots());
                        
                        // now we can schedule some more slots
-                       AllocatedSlot s6 = scheduler.scheduleImmediately(new 
ScheduledUnit(getDummyTask()));
-                       AllocatedSlot s7 = scheduler.scheduleImmediately(new 
ScheduledUnit(getDummyTask()));
+                       SimpleSlot s6 = scheduler.scheduleImmediately(new 
ScheduledUnit(getDummyTask()));
+                       SimpleSlot s7 = scheduler.scheduleImmediately(new 
ScheduledUnit(getDummyTask()));
                        
                        assertTrue(areAllDistinct(s1, s2, s3, s4, s5, s6, s7));
                        
@@ -215,7 +215,7 @@ public class SchedulerIsolatedTasksTest {
                        List<SlotAllocationFuture> allAllocatedSlots = new 
ArrayList<SlotAllocationFuture>();
                        
                        // slots that need to be released
-                       final Set<AllocatedSlot> toRelease = new 
HashSet<AllocatedSlot>();
+                       final Set<SimpleSlot> toRelease = new 
HashSet<SimpleSlot>();
                        
                        // flag to track errors in the concurrent thread
                        final AtomicBoolean errored = new AtomicBoolean(false);
@@ -223,7 +223,7 @@ public class SchedulerIsolatedTasksTest {
                        
                        SlotAllocationFutureAction action = new 
SlotAllocationFutureAction() {
                                @Override
-                               public void slotAllocated(AllocatedSlot slot) {
+                               public void slotAllocated(SimpleSlot slot) {
                                        synchronized (toRelease) {
                                                toRelease.add(slot);
                                                toRelease.notifyAll();
@@ -244,8 +244,8 @@ public class SchedulerIsolatedTasksTest {
                                                                        
toRelease.wait();
                                                                }
                                                                
-                                                               
Iterator<AllocatedSlot> iter = toRelease.iterator();
-                                                               AllocatedSlot 
next = iter.next();
+                                                               
Iterator<SimpleSlot> iter = toRelease.iterator();
+                                                               SimpleSlot next 
= iter.next();
                                                                iter.remove();
                                                                
                                                                
next.releaseSlot();
@@ -272,7 +272,7 @@ public class SchedulerIsolatedTasksTest {
                        
                        assertFalse("The slot releasing thread caused an 
error.", errored.get());
                        
-                       List<AllocatedSlot> slotsAfter = new 
ArrayList<AllocatedSlot>();
+                       List<SimpleSlot> slotsAfter = new 
ArrayList<SimpleSlot>();
                        for (SlotAllocationFuture future : allAllocatedSlots) {
                                slotsAfter.add(future.waitTillAllocated());
                        }
@@ -303,7 +303,7 @@ public class SchedulerIsolatedTasksTest {
                        scheduler.newInstanceAvailable(i2);
                        scheduler.newInstanceAvailable(i3);
                        
-                       List<AllocatedSlot> slots = new 
ArrayList<AllocatedSlot>();
+                       List<SimpleSlot> slots = new ArrayList<SimpleSlot>();
                        slots.add(scheduler.scheduleImmediately(new 
ScheduledUnit(getDummyTask())));
                        slots.add(scheduler.scheduleImmediately(new 
ScheduledUnit(getDummyTask())));
                        slots.add(scheduler.scheduleImmediately(new 
ScheduledUnit(getDummyTask())));
@@ -312,7 +312,7 @@ public class SchedulerIsolatedTasksTest {
                        
                        i2.markDead();
                        
-                       for (AllocatedSlot slot : slots) {
+                       for (SimpleSlot slot : slots) {
                                if (slot.getInstance() == i2) {
                                        assertTrue(slot.isCanceled());
                                } else {
@@ -364,7 +364,7 @@ public class SchedulerIsolatedTasksTest {
                        scheduler.newInstanceAvailable(i3);
                        
                        // schedule something on an arbitrary instance
-                       AllocatedSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(Collections.<Instance>emptyList())));
+                       SimpleSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(Collections.<Instance>emptyList())));
                        
                        // figure out how we use the location hints
                        Instance first = s1.getInstance();
@@ -372,28 +372,28 @@ public class SchedulerIsolatedTasksTest {
                        Instance third = first == i3 ? i2 : i3;
                        
                        // something that needs to go to the first instance 
again
-                       AllocatedSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(Collections.singletonList(s1.getInstance()))));
+                       SimpleSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(Collections.singletonList(s1.getInstance()))));
                        assertEquals(first, s2.getInstance());
 
                        // first or second --> second, because first is full
-                       AllocatedSlot s3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(Arrays.asList(first, second))));
+                       SimpleSlot s3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(Arrays.asList(first, second))));
                        assertEquals(second, s3.getInstance());
                        
                        // first or third --> third (because first is full)
-                       AllocatedSlot s4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(Arrays.asList(first, third))));
-                       AllocatedSlot s5 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(Arrays.asList(first, third))));
+                       SimpleSlot s4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(Arrays.asList(first, third))));
+                       SimpleSlot s5 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(Arrays.asList(first, third))));
                        assertEquals(third, s4.getInstance());
                        assertEquals(third, s5.getInstance());
                        
                        // first or third --> second, because all others are 
full
-                       AllocatedSlot s6 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(Arrays.asList(first, third))));
+                       SimpleSlot s6 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(Arrays.asList(first, third))));
                        assertEquals(second, s6.getInstance());
                        
                        // release something on the first and second instance
                        s2.releaseSlot();
                        s6.releaseSlot();
                        
-                       AllocatedSlot s7 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(Arrays.asList(first, third))));
+                       SimpleSlot s7 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(Arrays.asList(first, third))));
                        assertEquals(first, s7.getInstance());
                        
                        assertEquals(1, 
scheduler.getNumberOfUnconstrainedAssignments());

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
index de90701..d1c6938 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
@@ -31,13 +31,13 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.flink.runtime.instance.SimpleSlot;
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
@@ -74,10 +74,10 @@ public class SchedulerSlotSharingTest {
                        scheduler.newInstanceAvailable(i2);
                        
                        // schedule 4 tasks from the first vertex group
-                       AllocatedSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 0, 8), sharingGroup));
-                       AllocatedSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 1, 8), sharingGroup));
-                       AllocatedSlot s3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 2, 8), sharingGroup));
-                       AllocatedSlot s4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 3, 8), sharingGroup));
+                       SimpleSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 0, 8), sharingGroup));
+                       SimpleSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 1, 8), sharingGroup));
+                       SimpleSlot s3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 2, 8), sharingGroup));
+                       SimpleSlot s4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 3, 8), sharingGroup));
                        
                        assertNotNull(s1);
                        assertNotNull(s2);
@@ -102,7 +102,7 @@ public class SchedulerSlotSharingTest {
                        s3.releaseSlot();
                        
                        // allocate another slot from that group
-                       AllocatedSlot s5 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup));
+                       SimpleSlot s5 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup));
                        assertNotNull(s5);
                        
                        // release all old slots
@@ -110,9 +110,9 @@ public class SchedulerSlotSharingTest {
                        s2.releaseSlot();
                        s4.releaseSlot();
                        
-                       AllocatedSlot s6 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 5, 8), sharingGroup));
-                       AllocatedSlot s7 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 6, 8), sharingGroup));
-                       AllocatedSlot s8 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 7, 8), sharingGroup));
+                       SimpleSlot s6 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 5, 8), sharingGroup));
+                       SimpleSlot s7 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 6, 8), sharingGroup));
+                       SimpleSlot s8 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 7, 8), sharingGroup));
                        
                        assertNotNull(s6);
                        assertNotNull(s7);
@@ -159,10 +159,10 @@ public class SchedulerSlotSharingTest {
                        scheduler.newInstanceAvailable(getRandomInstance(2));
                        
                        // schedule 4 tasks from the first vertex group
-                       AllocatedSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 0, 5), sharingGroup));
-                       AllocatedSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 1, 5), sharingGroup));
-                       AllocatedSlot s3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 2, 5), sharingGroup));
-                       AllocatedSlot s4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 3, 5), sharingGroup));
+                       SimpleSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 0, 5), sharingGroup));
+                       SimpleSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 1, 5), sharingGroup));
+                       SimpleSlot s3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 2, 5), sharingGroup));
+                       SimpleSlot s4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 3, 5), sharingGroup));
                        
                        assertNotNull(s1);
                        assertNotNull(s2);
@@ -184,10 +184,10 @@ public class SchedulerSlotSharingTest {
                        }
                        
                        // schedule some tasks from the second ID group
-                       AllocatedSlot s1_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 0, 5), sharingGroup));
-                       AllocatedSlot s2_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 1, 5), sharingGroup));
-                       AllocatedSlot s3_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 2, 5), sharingGroup));
-                       AllocatedSlot s4_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 3, 5), sharingGroup));
+                       SimpleSlot s1_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 0, 5), sharingGroup));
+                       SimpleSlot s2_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 1, 5), sharingGroup));
+                       SimpleSlot s3_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 2, 5), sharingGroup));
+                       SimpleSlot s4_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 3, 5), sharingGroup));
                        
                        assertNotNull(s1_2);
                        assertNotNull(s2_2);
@@ -228,7 +228,7 @@ public class SchedulerSlotSharingTest {
                        }
                        
                        // we can schedule something from the first vertex group
-                       AllocatedSlot s5 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup));
+                       SimpleSlot s5 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup));
                        assertNotNull(s5);
                        
                        assertEquals(4, 
sharingGroup.getTaskAssignment().getNumberOfSlots());
@@ -238,7 +238,7 @@ public class SchedulerSlotSharingTest {
                        
                        // now we release a slot from the second vertex group 
and schedule another task from that group
                        s2_2.releaseSlot();
-                       AllocatedSlot s5_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup));
+                       SimpleSlot s5_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup));
                        assertNotNull(s5_2);
                        
                        // release all slots
@@ -279,10 +279,10 @@ public class SchedulerSlotSharingTest {
                        scheduler.newInstanceAvailable(getRandomInstance(2));
                        
                        // schedule 4 tasks from the first vertex group
-                       AllocatedSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup));
-                       AllocatedSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup));
-                       AllocatedSlot s3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup));
-                       AllocatedSlot s4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup));
+                       SimpleSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup));
+                       SimpleSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup));
+                       SimpleSlot s3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup));
+                       SimpleSlot s4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup));
                        
                        assertEquals(4, 
sharingGroup.getTaskAssignment().getNumberOfSlots());
                        assertEquals(0, 
sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForJid(jid1));
@@ -298,10 +298,10 @@ public class SchedulerSlotSharingTest {
                        assertEquals(0, 
sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForJid(jid2));
                        
                        // schedule some tasks from the second ID group
-                       AllocatedSlot s1_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup));
-                       AllocatedSlot s2_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup));
-                       AllocatedSlot s3_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup));
-                       AllocatedSlot s4_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup));
+                       SimpleSlot s1_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup));
+                       SimpleSlot s2_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup));
+                       SimpleSlot s3_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup));
+                       SimpleSlot s4_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup));
 
                        assertEquals(4, 
sharingGroup.getTaskAssignment().getNumberOfSlots());
                        assertEquals(4, 
sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForJid(jid1));
@@ -344,10 +344,10 @@ public class SchedulerSlotSharingTest {
                        scheduler.newInstanceAvailable(getRandomInstance(2));
                        
                        // schedule 4 tasks from the first vertex group
-                       AllocatedSlot s1_1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup));
-                       AllocatedSlot s2_1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup));
-                       AllocatedSlot s3_1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup));
-                       AllocatedSlot s4_1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup));
+                       SimpleSlot s1_1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup));
+                       SimpleSlot s2_1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup));
+                       SimpleSlot s3_1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup));
+                       SimpleSlot s4_1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup));
                        
                        assertNotNull(s1_1);
                        assertNotNull(s2_1);
@@ -357,10 +357,10 @@ public class SchedulerSlotSharingTest {
                        assertTrue(areAllDistinct(s1_1, s2_1, s3_1, s4_1));
                        
                        // schedule 4 tasks from the second vertex group
-                       AllocatedSlot s1_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 0, 7), sharingGroup));
-                       AllocatedSlot s2_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 1, 7), sharingGroup));
-                       AllocatedSlot s3_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 2, 7), sharingGroup));
-                       AllocatedSlot s4_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 3, 7), sharingGroup));
+                       SimpleSlot s1_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 0, 7), sharingGroup));
+                       SimpleSlot s2_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 1, 7), sharingGroup));
+                       SimpleSlot s3_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 2, 7), sharingGroup));
+                       SimpleSlot s4_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 3, 7), sharingGroup));
                        
                        assertNotNull(s1_2);
                        assertNotNull(s2_2);
@@ -370,10 +370,10 @@ public class SchedulerSlotSharingTest {
                        assertTrue(areAllDistinct(s1_2, s2_2, s3_2, s4_2));
                        
                        // schedule 4 tasks from the third vertex group
-                       AllocatedSlot s1_3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 0, 4), sharingGroup));
-                       AllocatedSlot s2_3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 1, 4), sharingGroup));
-                       AllocatedSlot s3_3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 2, 4), sharingGroup));
-                       AllocatedSlot s4_3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 3, 4), sharingGroup));
+                       SimpleSlot s1_3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 0, 4), sharingGroup));
+                       SimpleSlot s2_3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 1, 4), sharingGroup));
+                       SimpleSlot s3_3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 2, 4), sharingGroup));
+                       SimpleSlot s4_3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 3, 4), sharingGroup));
                        
                        assertNotNull(s1_3);
                        assertNotNull(s2_3);
@@ -401,9 +401,9 @@ public class SchedulerSlotSharingTest {
                        s3_2.releaseSlot();
                        s4_2.releaseSlot();
                        
-                       AllocatedSlot s5_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 5, 7), sharingGroup));
-                       AllocatedSlot s6_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 6, 7), sharingGroup));
-                       AllocatedSlot s7_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 7, 7), sharingGroup));
+                       SimpleSlot s5_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 5, 7), sharingGroup));
+                       SimpleSlot s6_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 6, 7), sharingGroup));
+                       SimpleSlot s7_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 7, 7), sharingGroup));
                        
                        assertNotNull(s5_2);
                        assertNotNull(s6_2);
@@ -454,9 +454,9 @@ public class SchedulerSlotSharingTest {
                        scheduler.newInstanceAvailable(getRandomInstance(2));
                        
                        // schedule 1 tasks from the first vertex group and 2 
from the second
-                       AllocatedSlot s1_1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 0, 2), sharingGroup));
-                       AllocatedSlot s2_1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 0, 2), sharingGroup));
-                       AllocatedSlot s2_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 1, 2), sharingGroup));
+                       SimpleSlot s1_1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 0, 2), sharingGroup));
+                       SimpleSlot s2_1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 0, 2), sharingGroup));
+                       SimpleSlot s2_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 1, 2), sharingGroup));
                        
                        assertNotNull(s1_1);
                        assertNotNull(s2_1);
@@ -472,7 +472,7 @@ public class SchedulerSlotSharingTest {
                        
                        
                        // this should free one slot so we can allocate one 
non-shared
-                       AllocatedSlot sx = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 0, 1)));
+                       SimpleSlot sx = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 0, 1)));
                        assertNotNull(sx);
                        
                        assertEquals(1, 
sharingGroup.getTaskAssignment().getNumberOfSlots());
@@ -507,23 +507,23 @@ public class SchedulerSlotSharingTest {
                        scheduler.newInstanceAvailable(getRandomInstance(2));
                        
                        // schedule some individual vertices
-                       AllocatedSlot sA1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jidA, 0, 2)));
-                       AllocatedSlot sA2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jidA, 1, 2)));
+                       SimpleSlot sA1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jidA, 0, 2)));
+                       SimpleSlot sA2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jidA, 1, 2)));
                        assertNotNull(sA1);
                        assertNotNull(sA2);
                        
                        // schedule some vertices in the sharing group
-                       AllocatedSlot s1_0 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup));
-                       AllocatedSlot s1_1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup));
-                       AllocatedSlot s2_0 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup));
-                       AllocatedSlot s2_1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup));
+                       SimpleSlot s1_0 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup));
+                       SimpleSlot s1_1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup));
+                       SimpleSlot s2_0 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup));
+                       SimpleSlot s2_1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup));
                        assertNotNull(s1_0);
                        assertNotNull(s1_1);
                        assertNotNull(s2_0);
                        assertNotNull(s2_1);
                        
                        // schedule another isolated vertex
-                       AllocatedSlot sB1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jidB, 1, 3)));
+                       SimpleSlot sB1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jidB, 1, 3)));
                        assertNotNull(sB1);
                        
                        // should not be able to schedule more vertices
@@ -574,8 +574,8 @@ public class SchedulerSlotSharingTest {
                        // release some isolated task and check that the 
sharing group may grow
                        sA1.releaseSlot();
                        
-                       AllocatedSlot s1_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup));
-                       AllocatedSlot s2_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup));
+                       SimpleSlot s1_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup));
+                       SimpleSlot s2_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup));
                        assertNotNull(s1_2);
                        assertNotNull(s2_2);
                        
@@ -587,19 +587,19 @@ public class SchedulerSlotSharingTest {
                        assertEquals(1, scheduler.getNumberOfAvailableSlots());
                        
                        // schedule one more no-shared task
-                       AllocatedSlot sB0 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jidB, 0, 3)));
+                       SimpleSlot sB0 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jidB, 0, 3)));
                        assertNotNull(sB0);
                        
                        // release the last of the original shared slots and 
allocate one more non-shared slot
                        s2_1.releaseSlot();
-                       AllocatedSlot sB2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jidB, 2, 3)));
+                       SimpleSlot sB2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jidB, 2, 3)));
                        assertNotNull(sB2);
                        
                        
                        // release on non-shared and add some shared slots
                        sA2.releaseSlot();
-                       AllocatedSlot s1_3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup));
-                       AllocatedSlot s2_3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup));
+                       SimpleSlot s1_3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup));
+                       SimpleSlot s2_3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup));
                        assertNotNull(s1_3);
                        assertNotNull(s2_3);
                        
@@ -609,8 +609,8 @@ public class SchedulerSlotSharingTest {
                        s1_3.releaseSlot();
                        s2_3.releaseSlot();
                        
-                       AllocatedSlot sC0 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jidC, 1, 2)));
-                       AllocatedSlot sC1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jidC, 0, 2)));
+                       SimpleSlot sC0 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jidC, 1, 2)));
+                       SimpleSlot sC1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jidC, 0, 2)));
                        assertNotNull(sC0);
                        assertNotNull(sC1);
                        
@@ -655,8 +655,8 @@ public class SchedulerSlotSharingTest {
                        
                        
                        // schedule one to each instance
-                       AllocatedSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup));
-                       AllocatedSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup));
+                       SimpleSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup));
+                       SimpleSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup));
                        assertNotNull(s1);
                        assertNotNull(s2);
                        
@@ -665,8 +665,8 @@ public class SchedulerSlotSharingTest {
                        assertEquals(1, i2.getNumberOfAvailableSlots());
                        
                        // schedule one from the other group to each instance
-                       AllocatedSlot s3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i1), sharingGroup));
-                       AllocatedSlot s4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i2), sharingGroup));
+                       SimpleSlot s3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i1), sharingGroup));
+                       SimpleSlot s4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i2), sharingGroup));
                        assertNotNull(s3);
                        assertNotNull(s4);
                        
@@ -705,8 +705,8 @@ public class SchedulerSlotSharingTest {
                        
                        
                        // schedule one to each instance
-                       AllocatedSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup));
-                       AllocatedSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i1), sharingGroup));
+                       SimpleSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup));
+                       SimpleSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i1), sharingGroup));
                        assertNotNull(s1);
                        assertNotNull(s2);
                        
@@ -715,8 +715,8 @@ public class SchedulerSlotSharingTest {
                        assertEquals(2, i2.getNumberOfAvailableSlots());
                        
                        // schedule one from the other group to each instance
-                       AllocatedSlot s3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i2), sharingGroup));
-                       AllocatedSlot s4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i2), sharingGroup));
+                       SimpleSlot s3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i2), sharingGroup));
+                       SimpleSlot s4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i2), sharingGroup));
                        assertNotNull(s3);
                        assertNotNull(s4);
                        
@@ -754,14 +754,14 @@ public class SchedulerSlotSharingTest {
                        scheduler.newInstanceAvailable(i2);
                        
                        // schedule until the one instance is full
-                       AllocatedSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup));
-                       AllocatedSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i1), sharingGroup));
-                       AllocatedSlot s3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 0, 4, i1), sharingGroup));
-                       AllocatedSlot s4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 1, 4, i1), sharingGroup));
+                       SimpleSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup));
+                       SimpleSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i1), sharingGroup));
+                       SimpleSlot s3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 0, 4, i1), sharingGroup));
+                       SimpleSlot s4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 1, 4, i1), sharingGroup));
 
                        // schedule two more with preference of same instance 
--> need to go to other instance
-                       AllocatedSlot s5 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 3, 4, i1), sharingGroup));
-                       AllocatedSlot s6 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 4, 4, i1), sharingGroup));
+                       SimpleSlot s5 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 3, 4, i1), sharingGroup));
+                       SimpleSlot s6 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 4, 4, i1), sharingGroup));
                        
                        assertNotNull(s1);
                        assertNotNull(s2);
@@ -808,19 +808,19 @@ public class SchedulerSlotSharingTest {
                        scheduler.newInstanceAvailable(getRandomInstance(4));
                        
                        // allocate something from group 1 and 2 interleaved 
with schedule for group 3
-                       AllocatedSlot slot_1_1 = 
scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 4), 
sharingGroup));
-                       AllocatedSlot slot_1_2 = 
scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 4), 
sharingGroup));
+                       SimpleSlot slot_1_1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup));
+                       SimpleSlot slot_1_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup));
 
-                       AllocatedSlot slot_2_1 = 
scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 4), 
sharingGroup));
-                       AllocatedSlot slot_2_2 = 
scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 4), 
sharingGroup));
+                       SimpleSlot slot_2_1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup));
+                       SimpleSlot slot_2_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup));
                        
-                       AllocatedSlot slot_3 = 
scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1), 
sharingGroup));
+                       SimpleSlot slot_3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup));
                        
-                       AllocatedSlot slot_1_3 = 
scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), 
sharingGroup));
-                       AllocatedSlot slot_1_4 = 
scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), 
sharingGroup));
+                       SimpleSlot slot_1_3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup));
+                       SimpleSlot slot_1_4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup));
                        
-                       AllocatedSlot slot_2_3 = 
scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 4), 
sharingGroup));
-                       AllocatedSlot slot_2_4 = 
scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 4), 
sharingGroup));
+                       SimpleSlot slot_2_3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup));
+                       SimpleSlot slot_2_4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup));
                        
                        // release groups 1 and 2
                        
@@ -836,10 +836,10 @@ public class SchedulerSlotSharingTest {
                        
                        // allocate group 4
                        
-                       AllocatedSlot slot_4_1 = 
scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 0, 4), 
sharingGroup));
-                       AllocatedSlot slot_4_2 = 
scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 1, 4), 
sharingGroup));
-                       AllocatedSlot slot_4_3 = 
scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 2, 4), 
sharingGroup));
-                       AllocatedSlot slot_4_4 = 
scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 3, 4), 
sharingGroup));
+                       SimpleSlot slot_4_1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup));
+                       SimpleSlot slot_4_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup));
+                       SimpleSlot slot_4_3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup));
+                       SimpleSlot slot_4_4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup));
                        
                        // release groups 3 and 4
                        
@@ -892,11 +892,11 @@ public class SchedulerSlotSharingTest {
                                        @Override
                                        public void run() {
                                                try {
-                                                       AllocatedSlot slot = 
scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 
enumerator4.getAndIncrement(), 4), sharingGroup));
-                                                       
+                                                       SimpleSlot slot = 
scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 
enumerator4.getAndIncrement(), 4), sharingGroup));
+
                                                        
sleepUninterruptibly(rnd.nextInt(5));
                                                        slot.releaseSlot();
-                                                       
+
                                                        if 
(completed.incrementAndGet() == 13) {
                                                                synchronized 
(completed) {
                                                                        
completed.notifyAll();
@@ -915,7 +915,7 @@ public class SchedulerSlotSharingTest {
                                        public void run() {
                                                try {
                                                        if 
(flag3.compareAndSet(false, true)) {
-                                                               AllocatedSlot 
slot = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 
1), sharingGroup));
+                                                               SimpleSlot slot 
= scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1), 
sharingGroup));
                                                                
                                                                
sleepUninterruptibly(5);
                                                                
@@ -944,7 +944,7 @@ public class SchedulerSlotSharingTest {
                                        @Override
                                        public void run() {
                                                try {
-                                                       AllocatedSlot slot = 
scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 
enumerator2.getAndIncrement(), 4), sharingGroup));
+                                                       SimpleSlot slot = 
scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 
enumerator2.getAndIncrement(), 4), sharingGroup));
                                                        
                                                        // wait a bit till 
scheduling the successor
                                                        
sleepUninterruptibly(rnd.nextInt(5));
@@ -971,7 +971,7 @@ public class SchedulerSlotSharingTest {
                                        @Override
                                        public void run() {
                                                try {
-                                                       AllocatedSlot slot = 
scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 
enumerator1.getAndIncrement(), 4), sharingGroup));
+                                                       SimpleSlot slot = 
scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 
enumerator1.getAndIncrement(), 4), sharingGroup));
                                                        
                                                        // wait a bit till 
scheduling the successor
                                                        
sleepUninterruptibly(rnd.nextInt(5));
@@ -1049,24 +1049,24 @@ public class SchedulerSlotSharingTest {
                        scheduler.newInstanceAvailable(getRandomInstance(4));
                        
                        // schedule one task for the first and second vertex
-                       AllocatedSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup));
-                       AllocatedSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup));
+                       SimpleSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup));
+                       SimpleSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup));
                        
-                       assertTrue( ((SubSlot) s1).getSharedSlot() == 
((SubSlot) s2).getSharedSlot() );
+                       assertTrue(  s1.getParent() == s2.getParent() );
                        assertEquals(3, scheduler.getNumberOfAvailableSlots());
                        
-                       AllocatedSlot s3_0 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 0, 5), sharingGroup));
-                       AllocatedSlot s3_1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 1, 5), sharingGroup));
-                       AllocatedSlot s4_0 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup));
-                       AllocatedSlot s4_1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup));
+                       SimpleSlot s3_0 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 0, 5), sharingGroup));
+                       SimpleSlot s3_1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 1, 5), sharingGroup));
+                       SimpleSlot s4_0 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup));
+                       SimpleSlot s4_1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup));
                        
                        s1.releaseSlot();
                        s2.releaseSlot();
                        
-                       AllocatedSlot s3_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 2, 5), sharingGroup));
-                       AllocatedSlot s3_3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 3, 5), sharingGroup));
-                       AllocatedSlot s4_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup));
-                       AllocatedSlot s4_3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup));
+                       SimpleSlot s3_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 2, 5), sharingGroup));
+                       SimpleSlot s3_3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 3, 5), sharingGroup));
+                       SimpleSlot s4_2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup));
+                       SimpleSlot s4_3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup));
                        
                        try {
                                scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 4, 5), sharingGroup));

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java
index feb3005..fcb638f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java
@@ -23,6 +23,8 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Matchers.any;
 import static org.junit.Assert.*;
 
+import org.apache.flink.runtime.instance.SharedSlot;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -39,41 +41,45 @@ public class SharedSlotsTest {
                        doAnswer(new Answer<Void>() {
                                @Override
                                public Void answer(InvocationOnMock invocation) 
throws Throwable {
-                                       final SubSlot sub = (SubSlot) 
invocation.getArguments()[0];
-                                       final SharedSlot shared = (SharedSlot) 
invocation.getArguments()[1];
-                                       shared.releaseSlot(sub);
+                                       final SimpleSlot simpleSlot = 
(SimpleSlot) invocation.getArguments()[0];
+                                       final SharedSlot sharedSlot = 
simpleSlot.getParent();
+
+                                       sharedSlot.freeSubSlot(simpleSlot);
+
                                        return null;
                                }
                                
-                       }).when(assignment).releaseSubSlot(any(SubSlot.class), 
any(SharedSlot.class));
+                       
}).when(assignment).releaseSimpleSlot(any(SimpleSlot.class));
+
+                       JobVertexID id1 = new JobVertexID();
                        
                        Instance instance = 
SchedulerTestUtils.getRandomInstance(1);
                        
-                       SharedSlot slot = new 
SharedSlot(instance.allocateSlot(new JobID()), assignment);
-                       assertFalse(slot.isDisposed());
+                       SharedSlot slot = instance.allocateSharedSlot(new 
JobID(), assignment, id1);
+                       assertFalse(slot.isDead());
                        
-                       SubSlot ss1 = slot.allocateSubSlot(new JobVertexID());
+                       SimpleSlot ss1 = slot.allocateSubSlot(id1);
                        assertNotNull(ss1);
                        
                        // verify resources
                        assertEquals(instance, ss1.getInstance());
                        assertEquals(0, ss1.getSlotNumber());
-                       assertEquals(slot.getAllocatedSlot().getJobID(), 
ss1.getJobID());
+                       assertEquals(slot.getJobID(), ss1.getJobID());
                        
-                       SubSlot ss2 = slot.allocateSubSlot(new JobVertexID());
+                       SimpleSlot ss2 = slot.allocateSubSlot(new 
JobVertexID());
                        assertNotNull(ss2);
                        
-                       assertEquals(2, slot.getNumberOfAllocatedSubSlots());
+                       assertEquals(2, slot.getNumberLeaves());
                        
                        // release first slot, should not trigger release
                        ss1.releaseSlot();
-                       assertFalse(slot.isDisposed());
+                       assertFalse(slot.isDead());
                        
                        ss2.releaseSlot();
-                       assertFalse(slot.isDisposed());
+                       assertFalse(slot.isDead());
                        
                        // the shared slot should now dispose itself
-                       assertEquals(0, slot.getNumberOfAllocatedSubSlots());
+                       assertEquals(0, slot.getNumberLeaves());
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -85,46 +91,49 @@ public class SharedSlotsTest {
        public void createAndRelease() {
                try {
                        SlotSharingGroupAssignment assignment = 
mock(SlotSharingGroupAssignment.class);
-                       doAnswer(new Answer<Void>() {
+                       doAnswer(new Answer<Boolean>() {
                                @Override
-                               public Void answer(InvocationOnMock invocation) 
throws Throwable {
-                                       final SubSlot sub = (SubSlot) 
invocation.getArguments()[0];
-                                       final SharedSlot shared = (SharedSlot) 
invocation.getArguments()[1];
-                                       if (shared.releaseSlot(sub) == 0) {
-                                               shared.dispose();
+                               public Boolean answer(InvocationOnMock 
invocation) throws Throwable {
+                                       final SimpleSlot slot = (SimpleSlot) 
invocation.getArguments()[0];
+                                       final SharedSlot shared = 
slot.getParent();
+                                       if (shared.freeSubSlot(slot) == 0) {
+                                               shared.markDead();
+                                               return true;
                                        }
-                                       return null;
+                                       return false;
                                }
-                               
-                       }).when(assignment).releaseSubSlot(any(SubSlot.class), 
any(SharedSlot.class));
-                       
+
+                       
}).when(assignment).releaseSimpleSlot(any(SimpleSlot.class));
+
+                       JobVertexID id1 = new JobVertexID();
+
                        Instance instance = 
SchedulerTestUtils.getRandomInstance(1);
                        
-                       SharedSlot slot = new 
SharedSlot(instance.allocateSlot(new JobID()), assignment);
-                       assertFalse(slot.isDisposed());
+                       SharedSlot slot = instance.allocateSharedSlot(new 
JobID(), assignment, id1);
+                       assertFalse(slot.isDead());
                        
-                       SubSlot ss1 = slot.allocateSubSlot(new JobVertexID());
+                       SimpleSlot ss1 = slot.allocateSubSlot(id1);
                        assertNotNull(ss1);
                        
                        // verify resources
                        assertEquals(instance, ss1.getInstance());
                        assertEquals(0, ss1.getSlotNumber());
-                       assertEquals(slot.getAllocatedSlot().getJobID(), 
ss1.getJobID());
+                       assertEquals(slot.getJobID(), ss1.getJobID());
                        
-                       SubSlot ss2 = slot.allocateSubSlot(new JobVertexID());
+                       SimpleSlot ss2 = slot.allocateSubSlot(new 
JobVertexID());
                        assertNotNull(ss2);
                        
-                       assertEquals(2, slot.getNumberOfAllocatedSubSlots());
+                       assertEquals(2, slot.getNumberLeaves());
                        
                        // release first slot, should not trigger release
                        ss1.releaseSlot();
-                       assertFalse(slot.isDisposed());
+                       assertFalse(slot.isDead());
                        
                        ss2.releaseSlot();
-                       assertTrue(slot.isDisposed());
+                       assertTrue(slot.isDead());
                        
                        // the shared slot should now dispose itself
-                       assertEquals(0, slot.getNumberOfAllocatedSubSlots());
+                       assertEquals(0, slot.getNumberLeaves());
                        
                        assertNull(slot.allocateSubSlot(new JobVertexID()));
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
index 47afacb..23a5c94 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
@@ -23,7 +23,7 @@ import static org.junit.Assert.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.junit.Test;
 
@@ -36,7 +36,7 @@ public class SlotAllocationFutureTest {
                        
                        SlotAllocationFutureAction action = new 
SlotAllocationFutureAction() {
                                @Override
-                               public void slotAllocated(AllocatedSlot slot) {}
+                               public void slotAllocated(SimpleSlot slot) {}
                        };
                        
                        future.setFutureAction(action);
@@ -47,8 +47,8 @@ public class SlotAllocationFutureTest {
                                // expected
                        }
                        
-                       final AllocatedSlot slot1 = new AllocatedSlot(new 
JobID(), SchedulerTestUtils.getRandomInstance(1), 0);
-                       final AllocatedSlot slot2 = new AllocatedSlot(new 
JobID(), SchedulerTestUtils.getRandomInstance(1), 0);
+                       final SimpleSlot slot1 = new SimpleSlot(new JobID(), 
SchedulerTestUtils.getRandomInstance(1), 0, null, null);
+                       final SimpleSlot slot2 = new SimpleSlot(new JobID(), 
SchedulerTestUtils.getRandomInstance(1), 0, null, null);
                        
                        future.setSlot(slot1);
                        try {
@@ -71,13 +71,13 @@ public class SlotAllocationFutureTest {
                        // action before the slot
                        {
                                final AtomicInteger invocations = new 
AtomicInteger();
-                               final AllocatedSlot thisSlot = new 
AllocatedSlot(new JobID(), SchedulerTestUtils.getRandomInstance(1), 0);
+                               final SimpleSlot thisSlot = new SimpleSlot(new 
JobID(), SchedulerTestUtils.getRandomInstance(1), 0, null, null);
                                
                                SlotAllocationFuture future = new 
SlotAllocationFuture();
                                
                                future.setFutureAction(new 
SlotAllocationFutureAction() {
                                        @Override
-                                       public void slotAllocated(AllocatedSlot 
slot) {
+                                       public void slotAllocated(SimpleSlot 
slot) {
                                                assertEquals(thisSlot, slot);
                                                invocations.incrementAndGet();
                                        }
@@ -91,14 +91,14 @@ public class SlotAllocationFutureTest {
                        // slot before action
                        {
                                final AtomicInteger invocations = new 
AtomicInteger();
-                               final AllocatedSlot thisSlot = new 
AllocatedSlot(new JobID(), SchedulerTestUtils.getRandomInstance(1), 0);
+                               final SimpleSlot thisSlot = new SimpleSlot(new 
JobID(), SchedulerTestUtils.getRandomInstance(1), 0, null, null);
                                
                                SlotAllocationFuture future = new 
SlotAllocationFuture();
                                future.setSlot(thisSlot);
                                
                                future.setFutureAction(new 
SlotAllocationFutureAction() {
                                        @Override
-                                       public void slotAllocated(AllocatedSlot 
slot) {
+                                       public void slotAllocated(SimpleSlot 
slot) {
                                                assertEquals(thisSlot, slot);
                                                invocations.incrementAndGet();
                                        }
@@ -121,7 +121,7 @@ public class SlotAllocationFutureTest {
                                final AtomicInteger invocations = new 
AtomicInteger();
                                final AtomicBoolean error = new AtomicBoolean();
                                
-                               final AllocatedSlot thisSlot = new 
AllocatedSlot(new JobID(), SchedulerTestUtils.getRandomInstance(1), 0);
+                               final SimpleSlot thisSlot = new SimpleSlot(new 
JobID(), SchedulerTestUtils.getRandomInstance(1), 0, null, null);
                                
                                final SlotAllocationFuture future = new 
SlotAllocationFuture();
                                
@@ -130,7 +130,7 @@ public class SlotAllocationFutureTest {
                                        @Override
                                        public void run() {
                                                try {
-                                                       AllocatedSlot syncSlot 
= future.waitTillAllocated();
+                                                       SimpleSlot syncSlot = 
future.waitTillAllocated();
                                                        if (syncSlot == null || 
syncSlot != thisSlot) {
                                                                error.set(true);
                                                                return;
@@ -158,12 +158,12 @@ public class SlotAllocationFutureTest {
                        
                        // setting slot before syncing
                        {
-                               final AllocatedSlot thisSlot = new 
AllocatedSlot(new JobID(), SchedulerTestUtils.getRandomInstance(1), 0);
+                               final SimpleSlot thisSlot = new SimpleSlot(new 
JobID(), SchedulerTestUtils.getRandomInstance(1), 0, null, null);
                                final SlotAllocationFuture future = new 
SlotAllocationFuture();
 
                                future.setSlot(thisSlot);
                                
-                               AllocatedSlot retrieved = 
future.waitTillAllocated();
+                               SimpleSlot retrieved = 
future.waitTillAllocated();
                                
                                assertNotNull(retrieved);
                                assertEquals(thisSlot, retrieved);

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 36f0f92..9bcd163 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -62,7 +62,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
         within(1 second) {
           jm ! SubmitJob(jobGraph)
 
-          expectMsg(SubmissionFailure(jobGraph.getJobID, new 
NoResourceAvailableException(1,1)))
+          expectMsg(SubmissionFailure(jobGraph.getJobID, new 
NoResourceAvailableException(1,1,0)))
 
           expectNoMsg()
         }

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala
index 0f6eeca..22ee2c1 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.jobmanager
 
-import akka.actor.{ActorSystem, PoisonPill}
+import akka.actor.{Kill, ActorSystem, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, 
DistributionPattern, JobGraph}
 import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender}
@@ -41,7 +41,7 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
   }
 
   "The JobManager" should {
-    "handle failing task manager" in {
+    "handle gracefully failing task manager" in {
       val num_tasks = 31
       val sender = new AbstractJobVertex("Sender")
       val receiver = new AbstractJobVertex("Receiver")
@@ -78,6 +78,41 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
         cluster.stop()
       }
     }
+
+    "handle hard failing task manager" in {
+      val num_tasks = 31
+      val sender = new AbstractJobVertex("Sender")
+      val receiver = new AbstractJobVertex("Receiver")
+      sender.setInvokableClass(classOf[Sender])
+      receiver.setInvokableClass(classOf[BlockingReceiver])
+      sender.setParallelism(num_tasks)
+      receiver.setParallelism(num_tasks)
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+
+      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
+      val jobID = jobGraph.getJobID
+
+      val cluster = TestingUtils.startTestingCluster(num_tasks, 2)
+
+      val taskManagers = cluster.getTaskManagers
+      val jm = cluster.getJobManager
+
+      try {
+        within(TestingUtils.TESTING_DURATION) {
+          jm ! SubmitJob(jobGraph)
+          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+
+          jm ! WaitForAllVerticesToBeRunningOrFinished(jobID)
+          expectMsg(AllVerticesRunning(jobID))
+
+          // kill one task manager
+          taskManagers(0) ! Kill
+          expectMsgType[JobResultFailed]
+        }
+      }finally{
+        cluster.stop()
+      }
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
index fba7c76..626c518 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.jobmanager
 
-import akka.actor.{ActorSystem, PoisonPill}
+import akka.actor.{Kill, ActorSystem, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, 
DistributionPattern, JobGraph}
 import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender}
@@ -41,7 +41,7 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll {
   }
 
   "The JobManager" should {
-    "handle task manager failures with slot sharing" in {
+    "handle gracefully failing task manager with slot sharing" in {
       val num_tasks = 20
 
       val sender = new AbstractJobVertex("Sender")
@@ -83,6 +83,48 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll {
         cluster.stop()
       }
     }
+
+    "handle hard failing task manager with slot sharing" in {
+      val num_tasks = 20
+
+      val sender = new AbstractJobVertex("Sender")
+      val receiver = new AbstractJobVertex("Receiver")
+
+      sender.setInvokableClass(classOf[Sender])
+      receiver.setInvokableClass(classOf[BlockingReceiver])
+
+      sender.setParallelism(num_tasks)
+      receiver.setParallelism(num_tasks)
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+
+      val sharingGroup = new SlotSharingGroup()
+      sender.setSlotSharingGroup(sharingGroup)
+      receiver.setSlotSharingGroup(sharingGroup)
+
+      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
+      val jobID = jobGraph.getJobID
+
+      val cluster = TestingUtils.startTestingCluster(num_tasks/2, 2)
+      val jm = cluster.getJobManager
+      val taskManagers = cluster.getTaskManagers
+
+      try{
+        within(TestingUtils.TESTING_DURATION) {
+          jm ! SubmitJob(jobGraph)
+          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+
+          jm ! WaitForAllVerticesToBeRunningOrFinished(jobID)
+          expectMsg(AllVerticesRunning(jobID))
+
+          //kill task manager
+          taskManagers(0) ! Kill
+
+          expectMsgType[JobResultFailed]
+        }
+      }finally{
+        cluster.stop()
+      }
+    }
   }
 
 }

Reply via email to