Repository: flink
Updated Branches:
  refs/heads/master 647c552a2 -> d63bc75ff


http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
index 1cc9301..aab8132 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.jobmanager.scheduler;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -68,10 +70,10 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                        testingSlotProvider.addTaskManager(2);
                        
                        // schedule 4 tasks from the first vertex group
-                       LogicalSlot s1 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 0, 8, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 1, 8, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s3 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 2, 8, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s4 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 3, 8, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s1 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 0, 8, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 1, 8, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s3 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 2, 8, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s4 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 3, 8, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
                        
                        assertNotNull(s1);
                        assertNotNull(s2);
@@ -82,7 +84,7 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                        
                        // we cannot schedule another task from the first 
vertex group
                        try {
-                               testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 4, 8, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+                               testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 4, 8, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
                                fail("Scheduler accepted too many tasks at the 
same time");
                        }
                        catch (ExecutionException e) {
@@ -96,7 +98,7 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                        s3.releaseSlot();
                        
                        // allocate another slot from that group
-                       LogicalSlot s5 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 4, 8, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s5 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 4, 8, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
                        assertNotNull(s5);
                        
                        // release all old slots
@@ -104,9 +106,9 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                        s2.releaseSlot();
                        s4.releaseSlot();
                        
-                       LogicalSlot s6 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 5, 8, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s7 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 6, 8, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s8 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 7, 8, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s6 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 5, 8, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s7 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 6, 8, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s8 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 7, 8, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
                        
                        assertNotNull(s6);
                        assertNotNull(s7);
@@ -151,10 +153,10 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                testingSlotProvider.addTaskManager(2);
 
                // schedule 4 tasks from the first vertex group
-               LogicalSlot s1 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 0, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-               LogicalSlot s2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 1, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-               LogicalSlot s3 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 2, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-               LogicalSlot s4 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 3, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+               LogicalSlot s1 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 0, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+               LogicalSlot s2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 1, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+               LogicalSlot s3 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 2, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+               LogicalSlot s4 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 3, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
 
                assertNotNull(s1);
                assertNotNull(s2);
@@ -165,7 +167,7 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
 
                // we cannot schedule another task from the first vertex group
                try {
-                       testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 4, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+                       testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 4, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
                        fail("Scheduler accepted too many tasks at the same 
time");
                }
                catch (ExecutionException e) {
@@ -176,10 +178,10 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                }
 
                // schedule some tasks from the second ID group
-               LogicalSlot s1_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 0, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-               LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 1, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-               LogicalSlot s3_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 2, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-               LogicalSlot s4_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 3, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+               LogicalSlot s1_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 0, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+               LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 1, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+               LogicalSlot s3_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 2, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+               LogicalSlot s4_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 3, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
 
                assertNotNull(s1_2);
                assertNotNull(s2_2);
@@ -188,7 +190,7 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
 
                // we cannot schedule another task from the second vertex group
                try {
-                       testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 4, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+                       testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 4, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
                        fail("Scheduler accepted too many tasks at the same 
time");
                }
                catch (ExecutionException e) {
@@ -209,7 +211,7 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
 
                // we can still not schedule anything from the second group of 
vertices
                try {
-                       testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 4, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+                       testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 4, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
                        fail("Scheduler accepted too many tasks at the same 
time");
                }
                catch (ExecutionException e) {
@@ -220,7 +222,7 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                }
 
                // we can schedule something from the first vertex group
-               LogicalSlot s5 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 4, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+               LogicalSlot s5 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 4, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
                assertNotNull(s5);
 
                assertEquals(4, 
testingSlotProvider.getNumberOfSlots(sharingGroup));
@@ -230,7 +232,7 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
 
                // now we release a slot from the second vertex group and 
schedule another task from that group
                s2_2.releaseSlot();
-               LogicalSlot s5_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 4, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+               LogicalSlot s5_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 4, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
                assertNotNull(s5_2);
 
                // release all slots
@@ -265,10 +267,10 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                        testingSlotProvider.addTaskManager(2);
 
                        // schedule 4 tasks from the first vertex group
-                       LogicalSlot s1 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 0, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 1, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s3 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s4 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 3, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s1 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 0, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 1, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s3 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s4 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 3, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
                        
                        assertEquals(4, 
testingSlotProvider.getNumberOfSlots(sharingGroup));
                        assertEquals(0, 
testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1));
@@ -284,10 +286,10 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                        assertEquals(0, 
testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid2));
                        
                        // schedule some tasks from the second ID group
-                       LogicalSlot s1_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 0, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 1, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s3_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 2, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s4_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 3, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s1_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 0, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 1, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s3_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 2, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s4_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 3, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
 
                        assertEquals(4, 
testingSlotProvider.getNumberOfSlots(sharingGroup));
                        assertEquals(4, 
testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1));
@@ -329,10 +331,10 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                        testingSlotProvider.addTaskManager(2);
 
                        // schedule 4 tasks from the first vertex group
-                       LogicalSlot s1_1 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 0, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s2_1 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 1, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s3_1 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s4_1 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 3, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s1_1 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 0, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s2_1 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 1, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s3_1 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s4_1 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 3, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
                        
                        assertNotNull(s1_1);
                        assertNotNull(s2_1);
@@ -342,10 +344,10 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                        assertTrue(areAllDistinct(s1_1, s2_1, s3_1, s4_1));
                        
                        // schedule 4 tasks from the second vertex group
-                       LogicalSlot s1_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 0, 7, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 1, 7, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s3_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 2, 7, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s4_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 3, 7, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s1_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 0, 7, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 1, 7, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s3_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 2, 7, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s4_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 3, 7, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
                        
                        assertNotNull(s1_2);
                        assertNotNull(s2_2);
@@ -355,10 +357,10 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                        assertTrue(areAllDistinct(s1_2, s2_2, s3_2, s4_2));
                        
                        // schedule 4 tasks from the third vertex group
-                       LogicalSlot s1_3 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid3, 0, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s2_3 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid3, 1, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s3_3 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid3, 2, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s4_3 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid3, 3, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s1_3 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid3, 0, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s2_3 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid3, 1, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s3_3 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid3, 2, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s4_3 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid3, 3, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
                        
                        assertNotNull(s1_3);
                        assertNotNull(s2_3);
@@ -370,7 +372,7 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                        
                        // we cannot schedule another task from the second 
vertex group
                        try {
-                               testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 4, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+                               testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 4, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
                                fail("Scheduler accepted too many tasks at the 
same time");
                        }
                        catch (ExecutionException e) {
@@ -386,9 +388,9 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                        s3_2.releaseSlot();
                        s4_2.releaseSlot();
                        
-                       LogicalSlot s5_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 5, 7, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s6_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 6, 7, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s7_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 7, 7, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s5_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 5, 7, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s6_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 6, 7, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s7_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 7, 7, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
                        
                        assertNotNull(s5_2);
                        assertNotNull(s6_2);
@@ -438,9 +440,9 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                        testingSlotProvider.addTaskManager(2);
 
                        // schedule 1 tasks from the first vertex group and 2 
from the second
-                       LogicalSlot s1_1 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 0, 2, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s2_1 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 0, 2, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 1, 2, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s1_1 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 0, 2, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s2_1 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 0, 2, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 1, 2, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
                        
                        assertNotNull(s1_1);
                        assertNotNull(s2_1);
@@ -456,7 +458,7 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                        
                        
                        // this should free one slot so we can allocate one 
non-shared
-                       LogicalSlot sx = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid3, 0, 1, null)), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot sx = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid3, 0, 1, null)), false, 
SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
                        assertNotNull(sx);
                        
                        assertEquals(1, 
testingSlotProvider.getNumberOfSlots(sharingGroup));
@@ -490,28 +492,28 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                        testingSlotProvider.addTaskManager(2);
 
                        // schedule some individual vertices
-                       LogicalSlot sA2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jidA, 1, 2, null)), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot sA1 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jidA, 0, 2, null)), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot sA2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jidA, 1, 2, null)), false, 
SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
+                       LogicalSlot sA1 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jidA, 0, 2, null)), false, 
SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
                        assertNotNull(sA1);
                        assertNotNull(sA2);
                        
                        // schedule some vertices in the sharing group
-                       LogicalSlot s1_0 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 0, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s1_1 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 1, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s2_0 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 0, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s2_1 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 1, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s1_0 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 0, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s1_1 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 1, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s2_0 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 0, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s2_1 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 1, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
                        assertNotNull(s1_0);
                        assertNotNull(s1_1);
                        assertNotNull(s2_0);
                        assertNotNull(s2_1);
                        
                        // schedule another isolated vertex
-                       LogicalSlot sB1 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jidB, 1, 3, null)), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot sB1 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jidB, 1, 3, null)), false, 
SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
                        assertNotNull(sB1);
                        
                        // should not be able to schedule more vertices
                        try {
-                               testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+                               testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
                                fail("Scheduler accepted too many tasks at the 
same time");
                        }
                        catch (ExecutionException e) {
@@ -522,7 +524,7 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                        }
                        
                        try {
-                               testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 2, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+                               testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 2, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
                                fail("Scheduler accepted too many tasks at the 
same time");
                        }
                        catch (ExecutionException e) {
@@ -533,7 +535,7 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                        }
                        
                        try {
-                               testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jidB, 0, 3, null)), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+                               testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jidB, 0, 3, null)), false, 
SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
                                fail("Scheduler accepted too many tasks at the 
same time");
                        }
                        catch (ExecutionException e) {
@@ -544,7 +546,7 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                        }
                        
                        try {
-                               testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jidC, 0, 1, null)), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+                               testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jidC, 0, 1, null)), false, 
SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
                                fail("Scheduler accepted too many tasks at the 
same time");
                        }
                        catch (ExecutionException e) {
@@ -557,8 +559,8 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                        // release some isolated task and check that the 
sharing group may grow
                        sA1.releaseSlot();
                        
-                       LogicalSlot s1_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 3, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s1_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 3, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
                        assertNotNull(s1_2);
                        assertNotNull(s2_2);
                        
@@ -570,19 +572,19 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                        assertEquals(1, 
testingSlotProvider.getNumberOfAvailableSlots());
                        
                        // schedule one more no-shared task
-                       LogicalSlot sB0 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jidB, 0, 3, null)), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot sB0 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jidB, 0, 3, null)), false, 
SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
                        assertNotNull(sB0);
                        
                        // release the last of the original shared slots and 
allocate one more non-shared slot
                        s2_1.releaseSlot();
-                       LogicalSlot sB2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jidB, 2, 3, null)), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot sB2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jidB, 2, 3, null)), false, 
SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
                        assertNotNull(sB2);
                        
                        
                        // release on non-shared and add some shared slots
                        sA2.releaseSlot();
-                       LogicalSlot s1_3 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 3, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s2_3 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 2, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s1_3 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 3, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s2_3 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 2, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
                        assertNotNull(s1_3);
                        assertNotNull(s2_3);
                        
@@ -592,8 +594,8 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                        s1_3.releaseSlot();
                        s2_3.releaseSlot();
                        
-                       LogicalSlot sC0 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jidC, 1, 2, null)), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot sC1 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jidC, 0, 2, null)), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot sC0 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jidC, 1, 2, null)), false, 
SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
+                       LogicalSlot sC1 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertex(jidC, 0, 2, null)), false, 
SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
                        assertNotNull(sC0);
                        assertNotNull(sC1);
                        
@@ -633,8 +635,8 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                        TaskManagerLocation loc2 = 
testingSlotProvider.addTaskManager(2);
                        
                        // schedule one to each instance
-                       LogicalSlot s1 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), 
sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc2), 
sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc2), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s1 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), 
sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc2), 
sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc2), 
TestingUtils.infiniteTime()).get();
                        assertNotNull(s1);
                        assertNotNull(s2);
                        
@@ -643,8 +645,8 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                        assertEquals(loc2, s2.getTaskManagerLocation());
                        
                        // schedule one from the other group to each instance
-                       LogicalSlot s3 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc1), 
sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s4 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, sharingGroup, loc2), 
sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc2), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s3 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc1), 
sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s4 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, sharingGroup, loc2), 
sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc2), 
TestingUtils.infiniteTime()).get();
                        assertNotNull(s3);
                        assertNotNull(s4);
                        
@@ -679,8 +681,8 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                        TaskManagerLocation loc2 = 
testingSlotProvider.addTaskManager(2);
 
                        // schedule one to each instance
-                       LogicalSlot s1 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), 
sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc1), 
sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s1 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), 
sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc1), 
sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), 
TestingUtils.infiniteTime()).get();
                        assertNotNull(s1);
                        assertNotNull(s2);
                        
@@ -689,8 +691,8 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                        assertEquals(loc1, s2.getTaskManagerLocation());
                        
                        // schedule one from the other group to each instance
-                       LogicalSlot s3 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc2), 
sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc2), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s4 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, sharingGroup, loc2), 
sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc2), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s3 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc2), 
sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc2), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s4 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, sharingGroup, loc2), 
sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc2), 
TestingUtils.infiniteTime()).get();
                        assertNotNull(s3);
                        assertNotNull(s4);
                        
@@ -724,14 +726,14 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                        TaskManagerLocation loc2 = 
testingSlotProvider.addTaskManager(2);
 
                        // schedule until the one instance is full
-                       LogicalSlot s1 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), 
sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc1), 
sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s3 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 0, 4, sharingGroup, loc1), 
sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s4 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 1, 4, sharingGroup, loc1), 
sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s1 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), 
sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s2 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc1), 
sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s3 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 0, 4, sharingGroup, loc1), 
sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s4 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 1, 4, sharingGroup, loc1), 
sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), 
TestingUtils.infiniteTime()).get();
 
                        // schedule two more with preference of same instance 
--> need to go to other instance
-                       LogicalSlot s5 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 3, 4, sharingGroup, loc1), 
sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s6 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 4, 4, sharingGroup, loc1), 
sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s5 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 3, 4, sharingGroup, loc1), 
sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s6 = testingSlotProvider.allocateSlot(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 4, 4, sharingGroup, loc1), 
sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), 
TestingUtils.infiniteTime()).get();
                        
                        assertNotNull(s1);
                        assertNotNull(s2);
@@ -775,19 +777,19 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                        testingSlotProvider.addTaskManager(4);
 
                        // allocate something from group 1 and 2 interleaved 
with schedule for group 3
-                       LogicalSlot slot_1_1 = 
testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4, 
sharingGroup), sharingGroup.getSlotSharingGroupId()), false, 
Collections.emptyList(), TestingUtils.infiniteTime()).get();
-                       LogicalSlot slot_1_2 = 
testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4, 
sharingGroup), sharingGroup.getSlotSharingGroupId()), false, 
Collections.emptyList(), TestingUtils.infiniteTime()).get();
+                       LogicalSlot slot_1_1 = 
testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4, 
sharingGroup), sharingGroup.getSlotSharingGroupId()), false, 
SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
+                       LogicalSlot slot_1_2 = 
testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4, 
sharingGroup), sharingGroup.getSlotSharingGroupId()), false, 
SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
 
-                       LogicalSlot slot_2_1 = 
testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4, 
sharingGroup), sharingGroup.getSlotSharingGroupId()), false, 
Collections.emptyList(), TestingUtils.infiniteTime()).get();
-                       LogicalSlot slot_2_2 = 
testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4, 
sharingGroup), sharingGroup.getSlotSharingGroupId()), false, 
Collections.emptyList(), TestingUtils.infiniteTime()).get();
+                       LogicalSlot slot_2_1 = 
testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4, 
sharingGroup), sharingGroup.getSlotSharingGroupId()), false, 
SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
+                       LogicalSlot slot_2_2 = 
testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4, 
sharingGroup), sharingGroup.getSlotSharingGroupId()), false, 
SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
                        
-                       LogicalSlot slot_3 = 
testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1, 
sharingGroup), sharingGroup.getSlotSharingGroupId()), false, 
Collections.emptyList(), TestingUtils.infiniteTime()).get();
+                       LogicalSlot slot_3 = 
testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1, 
sharingGroup), sharingGroup.getSlotSharingGroupId()), false, 
SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
                        
-                       LogicalSlot slot_1_3 = 
testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, 
sharingGroup), sharingGroup.getSlotSharingGroupId()), false, 
Collections.emptyList(), TestingUtils.infiniteTime()).get();
-                       LogicalSlot slot_1_4 = 
testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4, 
sharingGroup), sharingGroup.getSlotSharingGroupId()), false, 
Collections.emptyList(), TestingUtils.infiniteTime()).get();
+                       LogicalSlot slot_1_3 = 
testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, 
sharingGroup), sharingGroup.getSlotSharingGroupId()), false, 
SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
+                       LogicalSlot slot_1_4 = 
testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4, 
sharingGroup), sharingGroup.getSlotSharingGroupId()), false, 
SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
                        
-                       LogicalSlot slot_2_3 = 
testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4, 
sharingGroup), sharingGroup.getSlotSharingGroupId()), false, 
Collections.emptyList(), TestingUtils.infiniteTime()).get();
-                       LogicalSlot slot_2_4 = 
testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4, 
sharingGroup), sharingGroup.getSlotSharingGroupId()), false, 
Collections.emptyList(), TestingUtils.infiniteTime()).get();
+                       LogicalSlot slot_2_3 = 
testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4, 
sharingGroup), sharingGroup.getSlotSharingGroupId()), false, 
SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
+                       LogicalSlot slot_2_4 = 
testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4, 
sharingGroup), sharingGroup.getSlotSharingGroupId()), false, 
SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
                        
                        // release groups 1 and 2
                        
@@ -803,10 +805,10 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                        
                        // allocate group 4
                        
-                       LogicalSlot slot_4_1 = 
testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4, 
sharingGroup), sharingGroup.getSlotSharingGroupId()), false, 
Collections.emptyList(), TestingUtils.infiniteTime()).get();
-                       LogicalSlot slot_4_2 = 
testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4, 
sharingGroup), sharingGroup.getSlotSharingGroupId()), false, 
Collections.emptyList(), TestingUtils.infiniteTime()).get();
-                       LogicalSlot slot_4_3 = 
testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4, 
sharingGroup), sharingGroup.getSlotSharingGroupId()), false, 
Collections.emptyList(), TestingUtils.infiniteTime()).get();
-                       LogicalSlot slot_4_4 = 
testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4, 
sharingGroup), sharingGroup.getSlotSharingGroupId()), false, 
Collections.emptyList(), TestingUtils.infiniteTime()).get();
+                       LogicalSlot slot_4_1 = 
testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4, 
sharingGroup), sharingGroup.getSlotSharingGroupId()), false, 
SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
+                       LogicalSlot slot_4_2 = 
testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4, 
sharingGroup), sharingGroup.getSlotSharingGroupId()), false, 
SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
+                       LogicalSlot slot_4_3 = 
testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4, 
sharingGroup), sharingGroup.getSlotSharingGroupId()), false, 
SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
+                       LogicalSlot slot_4_4 = 
testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4, 
sharingGroup), sharingGroup.getSlotSharingGroupId()), false, 
SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
                        
                        // release groups 3 and 4
                        
@@ -855,7 +857,7 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                                        @Override
                                        public void run() {
                                                try {
-                                                       LogicalSlot slot = 
testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 
enumerator4.getAndIncrement(), 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+                                                       LogicalSlot slot = 
testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 
enumerator4.getAndIncrement(), 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
                                                        
sleepUninterruptibly(rnd.nextInt(5));
                                                        slot.releaseSlot();
 
@@ -877,7 +879,7 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                                        public void run() {
                                                try {
                                                        if 
(flag3.compareAndSet(false, true)) {
-                                                               LogicalSlot 
slot = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 
0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, 
Collections.emptyList(), TestingUtils.infiniteTime()).get();
+                                                               LogicalSlot 
slot = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 
0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, 
SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
                                                                
sleepUninterruptibly(5);
                                                                
                                                                
executor.execute(deploy4);
@@ -905,7 +907,7 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                                        @Override
                                        public void run() {
                                                try {
-                                                       LogicalSlot slot = 
testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 
enumerator2.getAndIncrement(), 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+                                                       LogicalSlot slot = 
testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 
enumerator2.getAndIncrement(), 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
 
                                                        // wait a bit till 
scheduling the successor
                                                        
sleepUninterruptibly(rnd.nextInt(5));
@@ -932,7 +934,7 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                                        @Override
                                        public void run() {
                                                try {
-                                                       LogicalSlot slot = 
testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 
enumerator1.getAndIncrement(), 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+                                                       LogicalSlot slot = 
testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 
enumerator1.getAndIncrement(), 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
 
                                                        // wait a bit till 
scheduling the successor
                                                        
sleepUninterruptibly(rnd.nextInt(5));
@@ -1005,27 +1007,27 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                        scheduler.newInstanceAvailable(getRandomInstance(4));
                        
                        // schedule one task for the first and second vertex
-                       LogicalSlot s1 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 0, 1, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s2 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 0, 1, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s1 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 0, 1, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s2 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 0, 1, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
                        
                        assertEquals( s1.getTaskManagerLocation(), 
s2.getTaskManagerLocation() );
                        assertEquals(3, scheduler.getNumberOfAvailableSlots());
                        
-                       LogicalSlot s3_0 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid3, 0, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s3_1 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid3, 1, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s4_0 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid4, 0, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s4_1 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid4, 1, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s3_0 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid3, 0, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s3_1 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid3, 1, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s4_0 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid4, 0, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s4_1 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid4, 1, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
                        
                        s1.releaseSlot();
                        s2.releaseSlot();
                        
-                       LogicalSlot s3_2 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid3, 2, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s3_3 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid3, 3, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s4_2 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid4, 2, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
-                       LogicalSlot s4_3 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid4, 3, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s3_2 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid3, 2, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s3_3 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid3, 3, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s4_2 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid4, 2, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
+                       LogicalSlot s4_3 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid4, 3, 4, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
                        
                        try {
-                               scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid3, 4, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList(), 
TestingUtils.infiniteTime()).get();
+                               scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid3, 4, 5, sharingGroup), 
sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), 
TestingUtils.infiniteTime()).get();
                                fail("should throw an exception");
                        }
                        catch (ExecutionException e) {
@@ -1050,4 +1052,8 @@ public class SchedulerSlotSharingTest extends 
SchedulerTestBase {
                        fail(e.getMessage());
                }
        }
+
+       private static SlotProfile slotProfileForLocation(TaskManagerLocation 
location) {
+               return new SlotProfile(ResourceProfile.UNKNOWN, 
Collections.singletonList(location), Collections.emptyList());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java
index a28e890..d9919ac 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobmanager.scheduler;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -32,7 +33,6 @@ import org.apache.flink.util.TestLogger;
 import org.hamcrest.Matchers;
 import org.junit.Test;
 
-import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -123,7 +123,7 @@ public class SchedulerTest extends TestLogger {
                        new ScheduledUnit(
                                execution),
                        true,
-                       Collections.emptyList(),
+                       SlotProfile.noRequirements(),
                        Time.milliseconds(1L));
 
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
index 9cb9cff..9738449 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
@@ -156,8 +157,13 @@ public class SchedulerTestBase extends TestLogger {
                }
 
                @Override
-               public CompletableFuture<LogicalSlot> 
allocateSlot(SlotRequestId slotRequestId, ScheduledUnit task, boolean 
allowQueued, Collection<TaskManagerLocation> preferredLocations, Time 
allocationTimeout) {
-                       return scheduler.allocateSlot(task, allowQueued, 
preferredLocations, allocationTimeout);
+               public CompletableFuture<LogicalSlot> allocateSlot(
+                       SlotRequestId slotRequestId,
+                       ScheduledUnit task,
+                       boolean allowQueued,
+                       SlotProfile slotProfile,
+                       Time allocationTimeout) {
+                       return scheduler.allocateSlot(task, allowQueued, 
slotProfile, allocationTimeout);
                }
 
                @Override
@@ -349,8 +355,13 @@ public class SchedulerTestBase extends TestLogger {
                }
 
                @Override
-               public CompletableFuture<LogicalSlot> 
allocateSlot(SlotRequestId slotRequestId, ScheduledUnit task, boolean 
allowQueued, Collection<TaskManagerLocation> preferredLocations, Time 
allocationTimeout) {
-                       return slotProvider.allocateSlot(task, allowQueued, 
preferredLocations, allocationTimeout).thenApply(
+               public CompletableFuture<LogicalSlot> allocateSlot(
+                       SlotRequestId slotRequestId,
+                       ScheduledUnit task,
+                       boolean allowQueued,
+                       SlotProfile slotProfile,
+                       Time allocationTimeout) {
+                       return slotProvider.allocateSlot(task, allowQueued, 
slotProfile, allocationTimeout).thenApply(
                                (LogicalSlot logicalSlot) -> {
                                        switch (logicalSlot.getLocality()) {
                                                case LOCAL:

http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java
index 2d18c65..c0074ed 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster.slotpool;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -98,9 +99,9 @@ public class AvailableSlotsTest extends TestLogger {
                assertTrue(availableSlots.contains(slot1.getAllocationId()));
                assertTrue(availableSlots.containsTaskManager(resource1));
 
-               assertNull(availableSlots.poll(DEFAULT_TESTING_BIG_PROFILE, 
null));
+               
assertNull(availableSlots.poll(SlotProfile.noLocality(DEFAULT_TESTING_BIG_PROFILE)));
 
-               SlotAndLocality slotAndLocality = 
availableSlots.poll(DEFAULT_TESTING_PROFILE, null);
+               SlotAndLocality slotAndLocality = 
availableSlots.poll(SlotProfile.noLocality(DEFAULT_TESTING_PROFILE));
                assertEquals(slot1, slotAndLocality.getSlot());
                assertEquals(0, availableSlots.size());
                assertFalse(availableSlots.contains(slot1.getAllocationId()));

http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java
index 9a0256c..0375760 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -35,7 +36,6 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 
 import org.junit.Test;
 
-import java.util.Collections;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
@@ -79,7 +79,7 @@ public class SlotPoolCoLocationTest extends 
SlotPoolSchedulingTestBase {
                                slotSharingGroupId,
                                coLocationConstraint1),
                        true,
-                       Collections.emptyList(),
+                       SlotProfile.noRequirements(),
                        TestingUtils.infiniteTime());
 
                CompletableFuture<LogicalSlot> logicalSlotFuture22 = 
slotProvider.allocateSlot(
@@ -88,7 +88,7 @@ public class SlotPoolCoLocationTest extends 
SlotPoolSchedulingTestBase {
                                slotSharingGroupId,
                                coLocationConstraint2),
                        true,
-                       Collections.emptyList(),
+                       SlotProfile.noRequirements(),
                        TestingUtils.infiniteTime());
 
                CompletableFuture<LogicalSlot> logicalSlotFuture12 = 
slotProvider.allocateSlot(
@@ -97,7 +97,7 @@ public class SlotPoolCoLocationTest extends 
SlotPoolSchedulingTestBase {
                                slotSharingGroupId,
                                coLocationConstraint1),
                        true,
-                       Collections.emptyList(),
+                       SlotProfile.noRequirements(),
                        TestingUtils.infiniteTime());
 
                CompletableFuture<LogicalSlot> logicalSlotFuture21 = 
slotProvider.allocateSlot(
@@ -106,7 +106,7 @@ public class SlotPoolCoLocationTest extends 
SlotPoolSchedulingTestBase {
                                slotSharingGroupId,
                                coLocationConstraint2),
                        true,
-                       Collections.emptyList(),
+                       SlotProfile.noRequirements(),
                        TestingUtils.infiniteTime());
 
                final AllocationID allocationId1 = allocationIds.take();

http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
index b2be97e..cc837bc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobmanager.scheduler.DummyScheduledUnit;
@@ -58,7 +59,6 @@ import org.junit.experimental.categories.Category;
 
 import javax.annotation.Nullable;
 
-import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
@@ -121,8 +121,7 @@ public class SlotPoolRpcTest extends TestLogger {
                        CompletableFuture<LogicalSlot> future = 
pool.allocateSlot(
                                new SlotRequestId(),
                                new 
ScheduledUnit(SchedulerTestUtils.getDummyTask()),
-                               DEFAULT_TESTING_PROFILE,
-                               Collections.emptyList(),
+                               SlotProfile.noLocality(DEFAULT_TESTING_PROFILE),
                                true,
                                fastTimeout);
 
@@ -158,8 +157,7 @@ public class SlotPoolRpcTest extends TestLogger {
                        CompletableFuture<LogicalSlot> future = 
slotPoolGateway.allocateSlot(
                                requestId,
                                new 
ScheduledUnit(SchedulerTestUtils.getDummyTask()),
-                               DEFAULT_TESTING_PROFILE,
-                               Collections.emptyList(),
+                               SlotProfile.noLocality(DEFAULT_TESTING_PROFILE),
                                true,
                                fastTimeout);
 
@@ -204,8 +202,7 @@ public class SlotPoolRpcTest extends TestLogger {
                        CompletableFuture<LogicalSlot> future = 
slotPoolGateway.allocateSlot(
                                requestId,
                                new DummyScheduledUnit(),
-                               DEFAULT_TESTING_PROFILE,
-                               Collections.emptyList(),
+                               SlotProfile.noLocality(DEFAULT_TESTING_PROFILE),
                                true,
                                fastTimeout);
 
@@ -252,8 +249,7 @@ public class SlotPoolRpcTest extends TestLogger {
                        CompletableFuture<LogicalSlot> future = 
slotPoolGateway.allocateSlot(
                                requestId,
                                new 
ScheduledUnit(SchedulerTestUtils.getDummyTask()),
-                               DEFAULT_TESTING_PROFILE,
-                               Collections.emptyList(),
+                               SlotProfile.noLocality(DEFAULT_TESTING_PROFILE),
                                true,
                                fastTimeout);
 
@@ -313,7 +309,7 @@ public class SlotPoolRpcTest extends TestLogger {
                        CompletableFuture<LogicalSlot> future = 
pool.getSlotProvider().allocateSlot(
                                new DummyScheduledUnit(),
                                true,
-                               Collections.emptyList(),
+                               SlotProfile.noRequirements(),
                                fastTimeout);
 
                        try {

http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java
index b7fa484..f33e1a1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -35,7 +36,6 @@ import org.apache.flink.util.FlinkException;
 
 import org.junit.Test;
 
-import java.util.Collections;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
@@ -67,7 +67,7 @@ public class SlotPoolSlotSharingTest extends 
SlotPoolSchedulingTestBase {
                                slotSharingGroupId,
                                null),
                        true,
-                       Collections.emptyList(),
+                       SlotProfile.noRequirements(),
                        TestingUtils.infiniteTime());
 
                assertFalse(logicalSlotFuture.isDone());
@@ -104,7 +104,7 @@ public class SlotPoolSlotSharingTest extends 
SlotPoolSchedulingTestBase {
                                new SlotSharingGroupId(),
                                null),
                        true,
-                       Collections.emptyList(),
+                       SlotProfile.noRequirements(),
                        TestingUtils.infiniteTime());
 
                final AllocationID allocationId = allocationIdFuture.get();
@@ -143,7 +143,7 @@ public class SlotPoolSlotSharingTest extends 
SlotPoolSchedulingTestBase {
                                slotSharingGroupId,
                                null),
                        true,
-                       Collections.emptyList(),
+                       SlotProfile.noRequirements(),
                        TestingUtils.infiniteTime());
 
                CompletableFuture<LogicalSlot> logicalSlotFuture2 = 
slotProvider.allocateSlot(
@@ -152,7 +152,7 @@ public class SlotPoolSlotSharingTest extends 
SlotPoolSchedulingTestBase {
                                slotSharingGroupId,
                                null),
                        true,
-                       Collections.emptyList(),
+                       SlotProfile.noRequirements(),
                        TestingUtils.infiniteTime());
 
                assertFalse(logicalSlotFuture1.isDone());
@@ -166,7 +166,7 @@ public class SlotPoolSlotSharingTest extends 
SlotPoolSchedulingTestBase {
                                slotSharingGroupId,
                                null),
                        true,
-                       Collections.emptyList(),
+                       SlotProfile.noRequirements(),
                        TestingUtils.infiniteTime());
 
                CompletableFuture<LogicalSlot> logicalSlotFuture4 = 
slotProvider.allocateSlot(
@@ -175,7 +175,7 @@ public class SlotPoolSlotSharingTest extends 
SlotPoolSchedulingTestBase {
                                slotSharingGroupId,
                                null),
                        true,
-                       Collections.emptyList(),
+                       SlotProfile.noRequirements(),
                        TestingUtils.infiniteTime());
 
                assertFalse(logicalSlotFuture3.isDone());
@@ -242,7 +242,7 @@ public class SlotPoolSlotSharingTest extends 
SlotPoolSchedulingTestBase {
                                slotSharingGroupId1,
                                null),
                        true,
-                       Collections.emptyList(),
+                       SlotProfile.noRequirements(),
                        TestingUtils.infiniteTime());
 
                CompletableFuture<LogicalSlot> logicalSlotFuture2 = 
slotProvider.allocateSlot(
@@ -251,7 +251,7 @@ public class SlotPoolSlotSharingTest extends 
SlotPoolSchedulingTestBase {
                                slotSharingGroupId1,
                                null),
                        true,
-                       Collections.emptyList(),
+                       SlotProfile.noRequirements(),
                        TestingUtils.infiniteTime());
 
                CompletableFuture<LogicalSlot> logicalSlotFuture3 = 
slotProvider.allocateSlot(
@@ -260,7 +260,7 @@ public class SlotPoolSlotSharingTest extends 
SlotPoolSchedulingTestBase {
                                slotSharingGroupId2,
                                null),
                        true,
-                       Collections.emptyList(),
+                       SlotProfile.noRequirements(),
                        TestingUtils.infiniteTime());
 
                CompletableFuture<LogicalSlot> logicalSlotFuture4 = 
slotProvider.allocateSlot(
@@ -269,7 +269,7 @@ public class SlotPoolSlotSharingTest extends 
SlotPoolSchedulingTestBase {
                                slotSharingGroupId2,
                                null),
                        true,
-                       Collections.emptyList(),
+                       SlotProfile.noRequirements(),
                        TestingUtils.infiniteTime());
 
                assertFalse(logicalSlotFuture1.isDone());

http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
index d6e0521..c529ceb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -122,8 +123,7 @@ public class SlotPoolTest extends TestLogger {
                        CompletableFuture<LogicalSlot> future = 
slotPoolGateway.allocateSlot(
                                requestId,
                                new DummyScheduledUnit(),
-                               DEFAULT_TESTING_PROFILE,
-                               Collections.emptyList(),
+                               SlotProfile.noLocality(DEFAULT_TESTING_PROFILE),
                                true,
                                timeout);
                        assertFalse(future.isDone());
@@ -165,15 +165,13 @@ public class SlotPoolTest extends TestLogger {
                        CompletableFuture<LogicalSlot> future1 = 
slotPoolGateway.allocateSlot(
                                new SlotRequestId(),
                                new DummyScheduledUnit(),
-                               DEFAULT_TESTING_PROFILE,
-                               Collections.emptyList(),
+                               SlotProfile.noLocality(DEFAULT_TESTING_PROFILE),
                                true,
                                timeout);
                        CompletableFuture<LogicalSlot> future2 = 
slotPoolGateway.allocateSlot(
                                new SlotRequestId(),
                                new DummyScheduledUnit(),
-                               DEFAULT_TESTING_PROFILE,
-                               Collections.emptyList(),
+                               SlotProfile.noLocality(DEFAULT_TESTING_PROFILE),
                                true,
                                timeout);
 
@@ -229,8 +227,7 @@ public class SlotPoolTest extends TestLogger {
                        CompletableFuture<LogicalSlot> future1 = 
slotPoolGateway.allocateSlot(
                                new SlotRequestId(),
                                new DummyScheduledUnit(),
-                               DEFAULT_TESTING_PROFILE,
-                               Collections.emptyList(),
+                               SlotProfile.noLocality(DEFAULT_TESTING_PROFILE),
                                true,
                                timeout);
                        assertFalse(future1.isDone());
@@ -253,8 +250,7 @@ public class SlotPoolTest extends TestLogger {
                        CompletableFuture<LogicalSlot> future2 = 
slotPoolGateway.allocateSlot(
                                new SlotRequestId(),
                                new DummyScheduledUnit(),
-                               DEFAULT_TESTING_PROFILE,
-                               Collections.emptyList(),
+                               SlotProfile.noLocality(DEFAULT_TESTING_PROFILE),
                                true,
                                timeout);
 
@@ -287,8 +283,7 @@ public class SlotPoolTest extends TestLogger {
                        CompletableFuture<LogicalSlot> future = 
slotPoolGateway.allocateSlot(
                                new SlotRequestId(),
                                new DummyScheduledUnit(),
-                               DEFAULT_TESTING_PROFILE,
-                               Collections.emptyList(),
+                               SlotProfile.noLocality(DEFAULT_TESTING_PROFILE),
                                true,
                                timeout);
                        assertFalse(future.isDone());
@@ -362,8 +357,7 @@ public class SlotPoolTest extends TestLogger {
                        CompletableFuture<LogicalSlot> future1 = 
slotPoolGateway.allocateSlot(
                                new SlotRequestId(),
                                new DummyScheduledUnit(),
-                               DEFAULT_TESTING_PROFILE,
-                               Collections.emptyList(),
+                               SlotProfile.noLocality(DEFAULT_TESTING_PROFILE),
                                true,
                                timeout);
 
@@ -372,8 +366,7 @@ public class SlotPoolTest extends TestLogger {
                        CompletableFuture<LogicalSlot> future2 = 
slotPoolGateway.allocateSlot(
                                new SlotRequestId(),
                                new DummyScheduledUnit(),
-                               DEFAULT_TESTING_PROFILE,
-                               Collections.emptyList(),
+                               SlotProfile.noLocality(DEFAULT_TESTING_PROFILE),
                                true,
                                timeout);
 
@@ -427,11 +420,15 @@ public class SlotPoolTest extends TestLogger {
                try {
                        final SlotPoolGateway slotPoolGateway = 
setupSlotPool(slotPool, resourceManagerGateway);
 
+                       SlotProfile slotProfile = new SlotProfile(
+                               ResourceProfile.UNKNOWN,
+                               Collections.emptyList(),
+                               Collections.emptyList());
+
                        CompletableFuture<LogicalSlot> slotFuture = 
slotPoolGateway.allocateSlot(
                                new SlotRequestId(),
                                scheduledUnit,
-                               ResourceProfile.UNKNOWN,
-                               Collections.emptyList(),
+                               slotProfile,
                                true,
                                timeout);
 
@@ -485,8 +482,7 @@ public class SlotPoolTest extends TestLogger {
                        CompletableFuture<LogicalSlot> slotFuture1 = 
slotPoolGateway.allocateSlot(
                                slotRequestId1,
                                scheduledUnit,
-                               ResourceProfile.UNKNOWN,
-                               Collections.emptyList(),
+                               SlotProfile.noRequirements(),
                                true,
                                timeout);
 
@@ -496,8 +492,7 @@ public class SlotPoolTest extends TestLogger {
                        CompletableFuture<LogicalSlot> slotFuture2 = 
slotPoolGateway.allocateSlot(
                                slotRequestId2,
                                scheduledUnit,
-                               ResourceProfile.UNKNOWN,
-                               Collections.emptyList(),
+                               SlotProfile.noRequirements(),
                                true,
                                timeout);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
index c18b3b3..ec6eae2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.SimpleSlotContext;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
@@ -415,7 +416,8 @@ public class SlotSharingManagerTest extends TestLogger {
                        new SlotRequestId());
 
                AbstractID groupId = new AbstractID();
-               SlotSharingManager.MultiTaskSlotLocality 
resolvedRootSlotLocality = slotSharingManager.getResolvedRootSlot(groupId, 
Collections.emptyList());
+               SlotSharingManager.MultiTaskSlotLocality 
resolvedRootSlotLocality =
+                       slotSharingManager.getResolvedRootSlot(groupId, 
SlotProfile.noRequirements().matcher());
 
                assertNotNull(resolvedRootSlotLocality);
                assertEquals(Locality.UNCONSTRAINED, 
resolvedRootSlotLocality.getLocality());
@@ -431,7 +433,7 @@ public class SlotSharingManagerTest extends TestLogger {
 
                SlotSharingManager.MultiTaskSlotLocality resolvedRootSlot1 = 
slotSharingManager.getResolvedRootSlot(
                        groupId,
-                       Collections.emptyList());
+                       SlotProfile.noRequirements().matcher());
 
                assertNull(resolvedRootSlot1);
        }
@@ -470,7 +472,9 @@ public class SlotSharingManagerTest extends TestLogger {
                        new SlotRequestId());
 
                AbstractID groupId = new AbstractID();
-               SlotSharingManager.MultiTaskSlotLocality resolvedRootSlot1 = 
slotSharingManager.getResolvedRootSlot(groupId, 
Collections.singleton(taskManagerLocation));
+               SlotProfile.LocalityAwareRequirementsToSlotMatcher matcher =
+                       new 
SlotProfile.LocalityAwareRequirementsToSlotMatcher(Collections.singleton(taskManagerLocation));
+               SlotSharingManager.MultiTaskSlotLocality resolvedRootSlot1 = 
slotSharingManager.getResolvedRootSlot(groupId, matcher);
                assertNotNull(resolvedRootSlot1);
                assertEquals(Locality.LOCAL, resolvedRootSlot1.getLocality());
                assertEquals(rootSlot2.getSlotRequestId(), 
resolvedRootSlot1.getMultiTaskSlot().getSlotRequestId());
@@ -481,7 +485,7 @@ public class SlotSharingManagerTest extends TestLogger {
                        groupId,
                        resolvedRootSlot1.getLocality());
 
-               SlotSharingManager.MultiTaskSlotLocality resolvedRootSlot2 = 
slotSharingManager.getResolvedRootSlot(groupId, 
Collections.singleton(taskManagerLocation));
+               SlotSharingManager.MultiTaskSlotLocality resolvedRootSlot2 = 
slotSharingManager.getResolvedRootSlot(groupId,matcher);
 
                assertNotNull(resolvedRootSlot2);
                assertNotSame(Locality.LOCAL, 
(resolvedRootSlot2.getLocality()));

Reply via email to