http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/util/clock/Clock.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/clock/Clock.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/clock/Clock.java
new file mode 100644
index 0000000..d33bba4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/clock/Clock.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.clock;
+
+/**
+ * A clock that gives access to time. This clock returns two flavors of time:
+ * 
+ * <p><b>Absolute Time:</b> This refers to real world wall clock time, and it 
typically
+ * derived from a system clock. It is subject to clock drift and inaccuracy, 
and can jump
+ * if the system clock is adjusted.
+ * 
+ * <p><b>Relative Time:</b> This time advances at the same speed as the 
<i>absolute time</i>,
+ * but the timestamps can only be referred to relative to each other. The 
timestamps have
+ * no absolute meaning and cannot be compared across JVM processes. The source 
for the
+ * timestamps is not affected by adjustments to the system clock, so it never 
jumps.
+ */
+public abstract class Clock {
+
+       public abstract long absoluteTimeMillis();
+
+       public abstract long relativeTimeMillis();
+
+       public abstract long relativeTimeNanos();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/util/clock/SystemClock.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/clock/SystemClock.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/clock/SystemClock.java
new file mode 100644
index 0000000..789a0b7
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/clock/SystemClock.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.clock;
+
+/**
+ * A clock that returns the time of the system / process.
+ * 
+ * <p>This clock uses {@link System#currentTimeMillis()} for <i>absolute 
time</i>
+ * and {@link System#nanoTime()} for <i>relative time</i>.
+ * 
+ * <p>This SystemClock exists as a singleton instance.
+ */
+public class SystemClock extends Clock {
+
+       private static final SystemClock INSTANCE = new SystemClock();
+
+       public static SystemClock getInstance() {
+               return INSTANCE;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public long absoluteTimeMillis() {
+               return System.currentTimeMillis();
+       }
+
+       @Override
+       public long relativeTimeMillis() {
+               return System.nanoTime() / 1_000_000;
+       }
+
+       @Override
+       public long relativeTimeNanos() {
+               return System.nanoTime();
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private SystemClock() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
index bc5ddaa..cd1d895 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
@@ -46,4 +46,9 @@ public class ResourceProfileTest {
                assertTrue(rp4.isMatching(rp3));
                assertTrue(rp4.isMatching(rp4));
        }
+
+       @Test
+       public void testUnknownMatchesUnknown() {
+               
assertTrue(ResourceProfile.UNKNOWN.isMatching(ResourceProfile.UNKNOWN));
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
index 655a3ea..33ed679 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
@@ -1,135 +1,135 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class AllocatedSlotsTest {
-
-       @Test
-       public void testOperations() throws Exception {
-               SlotPool.AllocatedSlots allocatedSlots = new 
SlotPool.AllocatedSlots();
-
-               final AllocationID allocation1 = new AllocationID();
-               final ResourceID resource1 = new ResourceID("resource1");
-               final Slot slot1 = createSlot(resource1);
-
-               allocatedSlots.add(allocation1, new SlotDescriptor(slot1), 
slot1);
-
-               assertTrue(allocatedSlots.contains(slot1));
-               assertTrue(allocatedSlots.containResource(resource1));
-
-               assertEquals(slot1, allocatedSlots.get(allocation1));
-               assertEquals(1, 
allocatedSlots.getSlotsByResource(resource1).size());
-               assertEquals(1, allocatedSlots.size());
-
-               final AllocationID allocation2 = new AllocationID();
-               final Slot slot2 = createSlot(resource1);
-
-               allocatedSlots.add(allocation2, new SlotDescriptor(slot2), 
slot2);
-
-               assertTrue(allocatedSlots.contains(slot1));
-               assertTrue(allocatedSlots.contains(slot2));
-               assertTrue(allocatedSlots.containResource(resource1));
-
-               assertEquals(slot1, allocatedSlots.get(allocation1));
-               assertEquals(slot2, allocatedSlots.get(allocation2));
-               assertEquals(2, 
allocatedSlots.getSlotsByResource(resource1).size());
-               assertEquals(2, allocatedSlots.size());
-
-               final AllocationID allocation3 = new AllocationID();
-               final ResourceID resource2 = new ResourceID("resource2");
-               final Slot slot3 = createSlot(resource2);
-
-               allocatedSlots.add(allocation3, new SlotDescriptor(slot2), 
slot3);
-
-               assertTrue(allocatedSlots.contains(slot1));
-               assertTrue(allocatedSlots.contains(slot2));
-               assertTrue(allocatedSlots.contains(slot3));
-               assertTrue(allocatedSlots.containResource(resource1));
-               assertTrue(allocatedSlots.containResource(resource2));
-
-               assertEquals(slot1, allocatedSlots.get(allocation1));
-               assertEquals(slot2, allocatedSlots.get(allocation2));
-               assertEquals(slot3, allocatedSlots.get(allocation3));
-               assertEquals(2, 
allocatedSlots.getSlotsByResource(resource1).size());
-               assertEquals(1, 
allocatedSlots.getSlotsByResource(resource2).size());
-               assertEquals(3, allocatedSlots.size());
-
-               allocatedSlots.remove(slot2);
-
-               assertTrue(allocatedSlots.contains(slot1));
-               assertFalse(allocatedSlots.contains(slot2));
-               assertTrue(allocatedSlots.contains(slot3));
-               assertTrue(allocatedSlots.containResource(resource1));
-               assertTrue(allocatedSlots.containResource(resource2));
-
-               assertEquals(slot1, allocatedSlots.get(allocation1));
-               assertNull(allocatedSlots.get(allocation2));
-               assertEquals(slot3, allocatedSlots.get(allocation3));
-               assertEquals(1, 
allocatedSlots.getSlotsByResource(resource1).size());
-               assertEquals(1, 
allocatedSlots.getSlotsByResource(resource2).size());
-               assertEquals(2, allocatedSlots.size());
-
-               allocatedSlots.remove(slot1);
-
-               assertFalse(allocatedSlots.contains(slot1));
-               assertFalse(allocatedSlots.contains(slot2));
-               assertTrue(allocatedSlots.contains(slot3));
-               assertFalse(allocatedSlots.containResource(resource1));
-               assertTrue(allocatedSlots.containResource(resource2));
-
-               assertNull(allocatedSlots.get(allocation1));
-               assertNull(allocatedSlots.get(allocation2));
-               assertEquals(slot3, allocatedSlots.get(allocation3));
-               assertEquals(0, 
allocatedSlots.getSlotsByResource(resource1).size());
-               assertEquals(1, 
allocatedSlots.getSlotsByResource(resource2).size());
-               assertEquals(1, allocatedSlots.size());
-
-               allocatedSlots.remove(slot3);
-
-               assertFalse(allocatedSlots.contains(slot1));
-               assertFalse(allocatedSlots.contains(slot2));
-               assertFalse(allocatedSlots.contains(slot3));
-               assertFalse(allocatedSlots.containResource(resource1));
-               assertFalse(allocatedSlots.containResource(resource2));
-
-               assertNull(allocatedSlots.get(allocation1));
-               assertNull(allocatedSlots.get(allocation2));
-               assertNull(allocatedSlots.get(allocation3));
-               assertEquals(0, 
allocatedSlots.getSlotsByResource(resource1).size());
-               assertEquals(0, 
allocatedSlots.getSlotsByResource(resource2).size());
-               assertEquals(0, allocatedSlots.size());
-       }
-
-       private Slot createSlot(final ResourceID resourceId) {
-               Slot slot = mock(Slot.class);
-               when(slot.getTaskManagerID()).thenReturn(resourceId);
-               return slot;
-       }
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *     http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.flink.runtime.instance;
+//
+//import org.apache.flink.runtime.clusterframework.types.AllocationID;
+//import org.apache.flink.runtime.clusterframework.types.ResourceID;
+//import org.junit.Test;
+//
+//import static org.junit.Assert.assertEquals;
+//import static org.junit.Assert.assertFalse;
+//import static org.junit.Assert.assertNull;
+//import static org.junit.Assert.assertTrue;
+//import static org.mockito.Mockito.mock;
+//import static org.mockito.Mockito.when;
+//
+//public class AllocatedSlotsTest {
+//
+//     @Test
+//     public void testOperations() throws Exception {
+//             SlotPool.AllocatedSlots allocatedSlots = new 
SlotPool.AllocatedSlots();
+//
+//             final AllocationID allocation1 = new AllocationID();
+//             final ResourceID resource1 = new ResourceID("resource1");
+//             final Slot slot1 = createSlot(resource1);
+//
+//             allocatedSlots.add(allocation1, new SlotDescriptor(slot1), 
slot1);
+//
+//             assertTrue(allocatedSlots.contains(slot1));
+//             assertTrue(allocatedSlots.containResource(resource1));
+//
+//             assertEquals(slot1, allocatedSlots.get(allocation1));
+//             assertEquals(1, 
allocatedSlots.getSlotsByResource(resource1).size());
+//             assertEquals(1, allocatedSlots.size());
+//
+//             final AllocationID allocation2 = new AllocationID();
+//             final Slot slot2 = createSlot(resource1);
+//
+//             allocatedSlots.add(allocation2, new SlotDescriptor(slot2), 
slot2);
+//
+//             assertTrue(allocatedSlots.contains(slot1));
+//             assertTrue(allocatedSlots.contains(slot2));
+//             assertTrue(allocatedSlots.containResource(resource1));
+//
+//             assertEquals(slot1, allocatedSlots.get(allocation1));
+//             assertEquals(slot2, allocatedSlots.get(allocation2));
+//             assertEquals(2, 
allocatedSlots.getSlotsByResource(resource1).size());
+//             assertEquals(2, allocatedSlots.size());
+//
+//             final AllocationID allocation3 = new AllocationID();
+//             final ResourceID resource2 = new ResourceID("resource2");
+//             final Slot slot3 = createSlot(resource2);
+//
+//             allocatedSlots.add(allocation3, new SlotDescriptor(slot2), 
slot3);
+//
+//             assertTrue(allocatedSlots.contains(slot1));
+//             assertTrue(allocatedSlots.contains(slot2));
+//             assertTrue(allocatedSlots.contains(slot3));
+//             assertTrue(allocatedSlots.containResource(resource1));
+//             assertTrue(allocatedSlots.containResource(resource2));
+//
+//             assertEquals(slot1, allocatedSlots.get(allocation1));
+//             assertEquals(slot2, allocatedSlots.get(allocation2));
+//             assertEquals(slot3, allocatedSlots.get(allocation3));
+//             assertEquals(2, 
allocatedSlots.getSlotsByResource(resource1).size());
+//             assertEquals(1, 
allocatedSlots.getSlotsByResource(resource2).size());
+//             assertEquals(3, allocatedSlots.size());
+//
+//             allocatedSlots.remove(slot2);
+//
+//             assertTrue(allocatedSlots.contains(slot1));
+//             assertFalse(allocatedSlots.contains(slot2));
+//             assertTrue(allocatedSlots.contains(slot3));
+//             assertTrue(allocatedSlots.containResource(resource1));
+//             assertTrue(allocatedSlots.containResource(resource2));
+//
+//             assertEquals(slot1, allocatedSlots.get(allocation1));
+//             assertNull(allocatedSlots.get(allocation2));
+//             assertEquals(slot3, allocatedSlots.get(allocation3));
+//             assertEquals(1, 
allocatedSlots.getSlotsByResource(resource1).size());
+//             assertEquals(1, 
allocatedSlots.getSlotsByResource(resource2).size());
+//             assertEquals(2, allocatedSlots.size());
+//
+//             allocatedSlots.remove(slot1);
+//
+//             assertFalse(allocatedSlots.contains(slot1));
+//             assertFalse(allocatedSlots.contains(slot2));
+//             assertTrue(allocatedSlots.contains(slot3));
+//             assertFalse(allocatedSlots.containResource(resource1));
+//             assertTrue(allocatedSlots.containResource(resource2));
+//
+//             assertNull(allocatedSlots.get(allocation1));
+//             assertNull(allocatedSlots.get(allocation2));
+//             assertEquals(slot3, allocatedSlots.get(allocation3));
+//             assertEquals(0, 
allocatedSlots.getSlotsByResource(resource1).size());
+//             assertEquals(1, 
allocatedSlots.getSlotsByResource(resource2).size());
+//             assertEquals(1, allocatedSlots.size());
+//
+//             allocatedSlots.remove(slot3);
+//
+//             assertFalse(allocatedSlots.contains(slot1));
+//             assertFalse(allocatedSlots.contains(slot2));
+//             assertFalse(allocatedSlots.contains(slot3));
+//             assertFalse(allocatedSlots.containResource(resource1));
+//             assertFalse(allocatedSlots.containResource(resource2));
+//
+//             assertNull(allocatedSlots.get(allocation1));
+//             assertNull(allocatedSlots.get(allocation2));
+//             assertNull(allocatedSlots.get(allocation3));
+//             assertEquals(0, 
allocatedSlots.getSlotsByResource(resource1).size());
+//             assertEquals(0, 
allocatedSlots.getSlotsByResource(resource2).size());
+//             assertEquals(0, allocatedSlots.size());
+//     }
+//
+//     private Slot createSlot(final ResourceID resourceId) {
+//             Slot slot = mock(Slot.class);
+//             when(slot.getTaskManagerID()).thenReturn(resourceId);
+//             return slot;
+//     }
+//}

http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
index 8e31085..4d58a31 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
@@ -1,124 +1,123 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class AvailableSlotsTest {
-
-       static final ResourceProfile DEFAULT_TESTING_PROFILE = new 
ResourceProfile(1.0, 512);
-
-       static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE = new 
ResourceProfile(2.0, 1024);
-
-       @Test
-       public void testAddAndRemove() throws Exception {
-               SlotPool.AvailableSlots availableSlots = new 
SlotPool.AvailableSlots();
-
-               final ResourceID resource1 = new ResourceID("resource1");
-               final ResourceID resource2 = new ResourceID("resource2");
-
-               final SlotDescriptor slot1 = createSlotDescriptor(resource1);
-               final SlotDescriptor slot2 = createSlotDescriptor(resource1);
-               final SlotDescriptor slot3 = createSlotDescriptor(resource2);
-
-               availableSlots.add(slot1);
-               availableSlots.add(slot2);
-               availableSlots.add(slot3);
-
-               assertEquals(3, availableSlots.size());
-               assertTrue(availableSlots.contains(slot1));
-               assertTrue(availableSlots.contains(slot2));
-               assertTrue(availableSlots.contains(slot3));
-               assertTrue(availableSlots.containResource(resource1));
-               assertTrue(availableSlots.containResource(resource2));
-
-               availableSlots.removeByResource(resource1);
-
-               assertEquals(1, availableSlots.size());
-               assertFalse(availableSlots.contains(slot1));
-               assertFalse(availableSlots.contains(slot2));
-               assertTrue(availableSlots.contains(slot3));
-               assertFalse(availableSlots.containResource(resource1));
-               assertTrue(availableSlots.containResource(resource2));
-
-               availableSlots.removeByResource(resource2);
-
-               assertEquals(0, availableSlots.size());
-               assertFalse(availableSlots.contains(slot1));
-               assertFalse(availableSlots.contains(slot2));
-               assertFalse(availableSlots.contains(slot3));
-               assertFalse(availableSlots.containResource(resource1));
-               assertFalse(availableSlots.containResource(resource2));
-       }
-
-       @Test
-       public void testPollFreeSlot() {
-               SlotPool.AvailableSlots availableSlots = new 
SlotPool.AvailableSlots();
-
-               final ResourceID resource1 = new ResourceID("resource1");
-               final SlotDescriptor slot1 = createSlotDescriptor(resource1);
-
-               availableSlots.add(slot1);
-
-               assertEquals(1, availableSlots.size());
-               assertTrue(availableSlots.contains(slot1));
-               assertTrue(availableSlots.containResource(resource1));
-
-               assertNull(availableSlots.poll(DEFAULT_TESTING_BIG_PROFILE));
-
-               assertEquals(slot1, 
availableSlots.poll(DEFAULT_TESTING_PROFILE));
-               assertEquals(0, availableSlots.size());
-               assertFalse(availableSlots.contains(slot1));
-               assertFalse(availableSlots.containResource(resource1));
-       }
-
-       static SlotDescriptor createSlotDescriptor(final ResourceID resourceID) 
{
-               return createSlotDescriptor(resourceID, new JobID());
-       }
-
-       static SlotDescriptor createSlotDescriptor(final ResourceID resourceID, 
final JobID jobID) {
-               return createSlotDescriptor(resourceID, jobID, 
DEFAULT_TESTING_PROFILE);
-       }
-
-       static SlotDescriptor createSlotDescriptor(final ResourceID resourceID, 
final JobID jobID,
-               final ResourceProfile resourceProfile)
-       {
-               return createSlotDescriptor(resourceID, jobID, resourceProfile, 
0);
-       }
-
-       static SlotDescriptor createSlotDescriptor(final ResourceID resourceID, 
final JobID jobID,
-               final ResourceProfile resourceProfile, final int slotNumber)
-       {
-               TaskManagerLocation location = mock(TaskManagerLocation.class);
-               when(location.getResourceID()).thenReturn(resourceID);
-               return new SlotDescriptor(jobID, location, slotNumber, 
resourceProfile, mock(TaskManagerGateway.class));
-       }
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *     http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.flink.runtime.instance;
+//
+//import org.apache.flink.api.common.JobID;
+//import org.apache.flink.runtime.clusterframework.types.ResourceID;
+//import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+//import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+//import org.junit.Test;
+//
+//import static org.junit.Assert.assertEquals;
+//import static org.junit.Assert.assertFalse;
+//import static org.junit.Assert.assertNull;
+//import static org.junit.Assert.assertTrue;
+//import static org.mockito.Mockito.mock;
+//import static org.mockito.Mockito.when;
+//
+//public class AvailableSlotsTest {
+//
+//     static final ResourceProfile DEFAULT_TESTING_PROFILE = new 
ResourceProfile(1.0, 512);
+//
+//     static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE = new 
ResourceProfile(2.0, 1024);
+//
+//     @Test
+//     public void testAddAndRemove() throws Exception {
+//             SlotPool.AvailableSlots availableSlots = new 
SlotPool.AvailableSlots();
+//
+//             final ResourceID resource1 = new ResourceID("resource1");
+//             final ResourceID resource2 = new ResourceID("resource2");
+//
+//             final SlotDescriptor slot1 = createSlotDescriptor(resource1);
+//             final SlotDescriptor slot2 = createSlotDescriptor(resource1);
+//             final SlotDescriptor slot3 = createSlotDescriptor(resource2);
+//
+//             availableSlots.add(slot1);
+//             availableSlots.add(slot2);
+//             availableSlots.add(slot3);
+//
+//             assertEquals(3, availableSlots.size());
+//             assertTrue(availableSlots.contains(slot1));
+//             assertTrue(availableSlots.contains(slot2));
+//             assertTrue(availableSlots.contains(slot3));
+//             assertTrue(availableSlots.containResource(resource1));
+//             assertTrue(availableSlots.containResource(resource2));
+//
+//             availableSlots.removeByResource(resource1);
+//
+//             assertEquals(1, availableSlots.size());
+//             assertFalse(availableSlots.contains(slot1));
+//             assertFalse(availableSlots.contains(slot2));
+//             assertTrue(availableSlots.contains(slot3));
+//             assertFalse(availableSlots.containResource(resource1));
+//             assertTrue(availableSlots.containResource(resource2));
+//
+//             availableSlots.removeByResource(resource2);
+//
+//             assertEquals(0, availableSlots.size());
+//             assertFalse(availableSlots.contains(slot1));
+//             assertFalse(availableSlots.contains(slot2));
+//             assertFalse(availableSlots.contains(slot3));
+//             assertFalse(availableSlots.containResource(resource1));
+//             assertFalse(availableSlots.containResource(resource2));
+//     }
+//
+//     @Test
+//     public void testPollFreeSlot() {
+//             SlotPool.AvailableSlots availableSlots = new 
SlotPool.AvailableSlots();
+//
+//             final ResourceID resource1 = new ResourceID("resource1");
+//             final SlotDescriptor slot1 = createSlotDescriptor(resource1);
+//
+//             availableSlots.add(slot1);
+//
+//             assertEquals(1, availableSlots.size());
+//             assertTrue(availableSlots.contains(slot1));
+//             assertTrue(availableSlots.containResource(resource1));
+//
+//             assertNull(availableSlots.poll(DEFAULT_TESTING_BIG_PROFILE));
+//
+//             assertEquals(slot1, 
availableSlots.poll(DEFAULT_TESTING_PROFILE));
+//             assertEquals(0, availableSlots.size());
+//             assertFalse(availableSlots.contains(slot1));
+//             assertFalse(availableSlots.containResource(resource1));
+//     }
+//
+//     static SlotDescriptor createSlotDescriptor(final ResourceID resourceID) 
{
+//             return createSlotDescriptor(resourceID, new JobID());
+//     }
+//
+//     static SlotDescriptor createSlotDescriptor(final ResourceID resourceID, 
final JobID jobID) {
+//             return createSlotDescriptor(resourceID, jobID, 
DEFAULT_TESTING_PROFILE);
+//     }
+//
+//     static SlotDescriptor createSlotDescriptor(final ResourceID resourceID, 
final JobID jobID,
+//             final ResourceProfile resourceProfile)
+//     {
+//             return createSlotDescriptor(resourceID, jobID, resourceProfile, 
0);
+//     }
+//
+//     static SlotDescriptor createSlotDescriptor(final ResourceID resourceID, 
final JobID jobID,
+//             final ResourceProfile resourceProfile, final int slotNumber)
+//     {
+//             TaskManagerLocation location = mock(TaskManagerLocation.class);
+//             when(location.getResourceID()).thenReturn(resourceID);
+//             return new SlotDescriptor(jobID, location, slotNumber, 
resourceProfile, mock(ActorGateway.class));
+//     }
+//}

http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index 30cdbd6..cc1d194 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -1,297 +1,299 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.BiFunction;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import org.apache.flink.util.TestLogger;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import static 
org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
-import static 
org.apache.flink.runtime.instance.AvailableSlotsTest.createSlotDescriptor;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-public class SlotPoolTest extends TestLogger {
-
-       private Executor executor;
-
-       private SlotPool slotPool;
-
-       private ResourceManagerGateway resourceManagerGateway;
-
-       @Before
-       public void setUp() throws Exception {
-               this.executor = Executors.newFixedThreadPool(1);
-               this.slotPool = new SlotPool(executor);
-               this.resourceManagerGateway = 
mock(ResourceManagerGateway.class);
-               when(resourceManagerGateway
-                       .requestSlot(any(UUID.class), any(UUID.class), 
any(SlotRequest.class), any(Time.class)))
-                       .thenReturn(mock(Future.class));
-               slotPool.setResourceManager(UUID.randomUUID(), 
resourceManagerGateway);
-               slotPool.setJobManagerLeaderId(UUID.randomUUID());
-       }
-
-       @After
-       public void tearDown() throws Exception {
-       }
-
-       @Test
-       public void testAllocateSimpleSlot() throws Exception {
-               ResourceID resourceID = new ResourceID("resource");
-               slotPool.registerResource(resourceID);
-
-               JobID jobID = new JobID();
-               AllocationID allocationID = new AllocationID();
-               Future<SimpleSlot> future = slotPool.allocateSimpleSlot(jobID, 
DEFAULT_TESTING_PROFILE, allocationID);
-               assertFalse(future.isDone());
-               verify(resourceManagerGateway).requestSlot(any(UUID.class), 
any(UUID.class), any(SlotRequest.class), any(Time.class));
-
-               SlotDescriptor slotDescriptor = 
createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
-               assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
-
-               SimpleSlot slot = future.get(1, TimeUnit.SECONDS);
-               assertTrue(future.isDone());
-               assertTrue(slot.isAlive());
-               assertEquals(resourceID, slot.getTaskManagerID());
-               assertEquals(jobID, slot.getJobID());
-               assertEquals(slotPool, slot.getOwner());
-       }
-
-       @Test
-       public void testAllocateSharedSlot() throws Exception {
-               ResourceID resourceID = new ResourceID("resource");
-               slotPool.registerResource(resourceID);
-
-               JobVertexID vid = new JobVertexID();
-               SlotSharingGroup sharingGroup = new SlotSharingGroup(vid);
-               SlotSharingGroupAssignment assignment = 
sharingGroup.getTaskAssignment();
-
-               JobID jobID = new JobID();
-               AllocationID allocationID = new AllocationID();
-               Future<SharedSlot> future = slotPool.allocateSharedSlot(jobID, 
DEFAULT_TESTING_PROFILE, assignment, allocationID);
-
-               assertFalse(future.isDone());
-               verify(resourceManagerGateway).requestSlot(any(UUID.class), 
any(UUID.class), any(SlotRequest.class), any(Time.class));
-
-               SlotDescriptor slotDescriptor = 
createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
-               assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
-
-               SharedSlot slot = future.get(1, TimeUnit.SECONDS);
-               assertTrue(future.isDone());
-               assertTrue(slot.isAlive());
-               assertEquals(resourceID, slot.getTaskManagerID());
-               assertEquals(jobID, slot.getJobID());
-               assertEquals(slotPool, slot.getOwner());
-
-               SimpleSlot simpleSlot = slot.allocateSubSlot(vid);
-               assertNotNull(simpleSlot);
-               assertTrue(simpleSlot.isAlive());
-       }
-
-       @Test
-       public void testAllocateSlotWithoutResourceManager() throws Exception {
-               slotPool.disconnectResourceManager();
-               Future<SimpleSlot> future = slotPool.allocateSimpleSlot(new 
JobID(), DEFAULT_TESTING_PROFILE);
-               future.handleAsync(
-                       new BiFunction<SimpleSlot, Throwable, Void>() {
-                               @Override
-                               public Void apply(SimpleSlot simpleSlot, 
Throwable throwable) {
-                                       assertNull(simpleSlot);
-                                       assertNotNull(throwable);
-                                       return null;
-                               }
-                       },
-                       executor);
-               try {
-                       future.get(1, TimeUnit.SECONDS);
-                       fail("We expected a ExecutionException.");
-               } catch (ExecutionException ex) {
-                       // we expect the exception
-               }
-       }
-
-       @Test
-       public void testAllocationFulfilledByReturnedSlot() throws Exception {
-               ResourceID resourceID = new ResourceID("resource");
-               slotPool.registerResource(resourceID);
-
-               JobID jobID = new JobID();
-
-               AllocationID allocationID1 = new AllocationID();
-               Future<SimpleSlot> future1 = slotPool.allocateSimpleSlot(jobID, 
DEFAULT_TESTING_PROFILE, allocationID1);
-
-               AllocationID allocationID2 = new AllocationID();
-               Future<SimpleSlot> future2 = slotPool.allocateSimpleSlot(jobID, 
DEFAULT_TESTING_PROFILE, allocationID2);
-
-               assertFalse(future1.isDone());
-               assertFalse(future2.isDone());
-               verify(resourceManagerGateway, times(2))
-                       .requestSlot(any(UUID.class), any(UUID.class), 
any(SlotRequest.class), any(Time.class));
-
-               SlotDescriptor slotDescriptor = 
createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
-               assertTrue(slotPool.offerSlot(allocationID1, slotDescriptor));
-
-               SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
-               assertTrue(future1.isDone());
-               assertFalse(future2.isDone());
-
-               // return this slot to pool
-               slot1.releaseSlot();
-
-               // second allocation fulfilled by previous slot returning
-               SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
-               assertTrue(future2.isDone());
-
-               assertNotEquals(slot1, slot2);
-               assertTrue(slot1.isReleased());
-               assertTrue(slot2.isAlive());
-               assertEquals(slot1.getTaskManagerID(), 
slot2.getTaskManagerID());
-               assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber());
-       }
-
-       @Test
-       public void testAllocateWithFreeSlot() throws Exception {
-               ResourceID resourceID = new ResourceID("resource");
-               slotPool.registerResource(resourceID);
-
-               JobID jobID = new JobID();
-               AllocationID allocationID1 = new AllocationID();
-               Future<SimpleSlot> future1 = slotPool.allocateSimpleSlot(jobID, 
DEFAULT_TESTING_PROFILE, allocationID1);
-               assertFalse(future1.isDone());
-
-               SlotDescriptor slotDescriptor = 
createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
-               assertTrue(slotPool.offerSlot(allocationID1, slotDescriptor));
-
-               SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
-               assertTrue(future1.isDone());
-
-               // return this slot to pool
-               slot1.releaseSlot();
-
-               AllocationID allocationID2 = new AllocationID();
-               Future<SimpleSlot> future2 = slotPool.allocateSimpleSlot(jobID, 
DEFAULT_TESTING_PROFILE, allocationID2);
-
-               // second allocation fulfilled by previous slot returning
-               SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
-               assertTrue(future2.isDone());
-
-               assertNotEquals(slot1, slot2);
-               assertTrue(slot1.isReleased());
-               assertTrue(slot2.isAlive());
-               assertEquals(slot1.getTaskManagerID(), 
slot2.getTaskManagerID());
-               assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber());
-       }
-
-       @Test
-       public void testOfferSlot() throws Exception {
-               ResourceID resourceID = new ResourceID("resource");
-               slotPool.registerResource(resourceID);
-
-               JobID jobID = new JobID();
-               AllocationID allocationID = new AllocationID();
-               Future<SimpleSlot> future = slotPool.allocateSimpleSlot(jobID, 
DEFAULT_TESTING_PROFILE, allocationID);
-               assertFalse(future.isDone());
-               verify(resourceManagerGateway).requestSlot(any(UUID.class), 
any(UUID.class), any(SlotRequest.class), any(Time.class));
-
-               // slot from unregistered resource
-               SlotDescriptor invalid = createSlotDescriptor(new 
ResourceID("unregistered"), jobID, DEFAULT_TESTING_PROFILE);
-               assertFalse(slotPool.offerSlot(allocationID, invalid));
-
-               SlotDescriptor slotDescriptor = 
createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
-
-               // reject offering with mismatch allocation id
-               assertFalse(slotPool.offerSlot(new AllocationID(), 
slotDescriptor));
-
-               // accepted slot
-               assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
-               SimpleSlot slot = future.get(1, TimeUnit.SECONDS);
-               assertTrue(future.isDone());
-               assertTrue(slot.isAlive());
-
-               // conflict offer with using slot
-               SlotDescriptor conflict = createSlotDescriptor(resourceID, 
jobID, DEFAULT_TESTING_PROFILE);
-               assertFalse(slotPool.offerSlot(allocationID, conflict));
-
-               // duplicated offer with using slot
-               assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
-               assertTrue(future.isDone());
-               assertTrue(slot.isAlive());
-
-               // duplicated offer with free slot
-               slot.releaseSlot();
-               assertTrue(slot.isReleased());
-               assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
-       }
-
-       @Test
-       public void testReleaseResource() throws Exception {
-               ResourceID resourceID = new ResourceID("resource");
-               slotPool.registerResource(resourceID);
-
-               JobID jobID = new JobID();
-
-               AllocationID allocationID1 = new AllocationID();
-               Future<SimpleSlot> future1 = slotPool.allocateSimpleSlot(jobID, 
DEFAULT_TESTING_PROFILE, allocationID1);
-
-               AllocationID allocationID2 = new AllocationID();
-               Future<SimpleSlot> future2 = slotPool.allocateSimpleSlot(jobID, 
DEFAULT_TESTING_PROFILE, allocationID2);
-
-               SlotDescriptor slotDescriptor = 
createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
-               assertTrue(slotPool.offerSlot(allocationID1, slotDescriptor));
-
-               SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
-               assertTrue(future1.isDone());
-               assertFalse(future2.isDone());
-
-               slotPool.releaseResource(resourceID);
-               assertTrue(slot1.isReleased());
-
-               // slot released and not usable, second allocation still not 
fulfilled
-               Thread.sleep(10);
-               assertFalse(future2.isDone());
-       }
-
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *     http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.flink.runtime.instance;
+//
+//import org.apache.flink.api.common.JobID;
+//import org.apache.flink.api.common.time.Time;
+//import org.apache.flink.runtime.clusterframework.types.AllocationID;
+//import org.apache.flink.runtime.clusterframework.types.ResourceID;
+//import org.apache.flink.runtime.concurrent.BiFunction;
+//import org.apache.flink.runtime.concurrent.Future;
+//import org.apache.flink.runtime.jobgraph.JobVertexID;
+//import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+//import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+//import org.apache.flink.runtime.resourcemanager.SlotRequest;
+//import org.apache.flink.util.TestLogger;
+//import org.junit.After;
+//import org.junit.Before;
+//import org.junit.Test;
+//
+//import java.util.UUID;
+//import java.util.concurrent.ExecutionException;
+//import java.util.concurrent.Executor;
+//import java.util.concurrent.ExecutorService;
+//import java.util.concurrent.Executors;
+//import java.util.concurrent.TimeUnit;
+//
+//import static 
org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
+//import static 
org.apache.flink.runtime.instance.AvailableSlotsTest.createSlotDescriptor;
+//import static org.junit.Assert.assertEquals;
+//import static org.junit.Assert.assertFalse;
+//import static org.junit.Assert.assertNotEquals;
+//import static org.junit.Assert.assertNotNull;
+//import static org.junit.Assert.assertNull;
+//import static org.junit.Assert.assertTrue;
+//import static org.junit.Assert.fail;
+//import static org.mockito.Matchers.any;
+//import static org.mockito.Mockito.mock;
+//import static org.mockito.Mockito.times;
+//import static org.mockito.Mockito.verify;
+//import static org.mockito.Mockito.when;
+//
+//public class SlotPoolTest extends TestLogger {
+//
+//     private ExecutorService executor;
+//
+//     private SlotPool slotPool;
+//
+//     private ResourceManagerGateway resourceManagerGateway;
+//
+//     @Before
+//     public void setUp() throws Exception {
+//             this.executor = Executors.newFixedThreadPool(1);
+//             this.slotPool = new SlotPool(executor);
+//             this.resourceManagerGateway = 
mock(ResourceManagerGateway.class);
+//             when(resourceManagerGateway
+//                     .requestSlot(any(UUID.class), any(UUID.class), 
any(SlotRequest.class), any(Time.class)))
+//                     .thenReturn(mock(Future.class));
+//
+//             slotPool.setResourceManager(UUID.randomUUID(), 
resourceManagerGateway);
+//             slotPool.setJobManagerLeaderId(UUID.randomUUID());
+//     }
+//
+//     @After
+//     public void tearDown() throws Exception {
+//     }
+//
+//     @Test
+//     public void testAllocateSimpleSlot() throws Exception {
+//             ResourceID resourceID = new ResourceID("resource");
+//             slotPool.registerResource(resourceID);
+//
+//             JobID jobID = new JobID();
+//             AllocationID allocationID = new AllocationID();
+//             Future<SimpleSlot> future = slotPool.allocateSimpleSlot(jobID, 
DEFAULT_TESTING_PROFILE, allocationID);
+//             assertFalse(future.isDone());
+//             verify(resourceManagerGateway).requestSlot(any(UUID.class), 
any(UUID.class), any(SlotRequest.class), any(Time.class));
+//
+//             SlotDescriptor slotDescriptor = 
createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+//             assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
+//
+//             SimpleSlot slot = future.get(1, TimeUnit.SECONDS);
+//             assertTrue(future.isDone());
+//             assertTrue(slot.isAlive());
+//             assertEquals(resourceID, slot.getTaskManagerID());
+//             assertEquals(jobID, slot.getJobID());
+//             assertEquals(slotPool, slot.getOwner());
+//     }
+//
+//     @Test
+//     public void testAllocateSharedSlot() throws Exception {
+//             ResourceID resourceID = new ResourceID("resource");
+//             slotPool.registerResource(resourceID);
+//
+//             JobVertexID vid = new JobVertexID();
+//             SlotSharingGroup sharingGroup = new SlotSharingGroup(vid);
+//             SlotSharingGroupAssignment assignment = 
sharingGroup.getTaskAssignment();
+//
+//             JobID jobID = new JobID();
+//             AllocationID allocationID = new AllocationID();
+//             Future<SharedSlot> future = slotPool.allocateSharedSlot(jobID, 
DEFAULT_TESTING_PROFILE, assignment, allocationID);
+//
+//             assertFalse(future.isDone());
+//             verify(resourceManagerGateway).requestSlot(any(UUID.class), 
any(UUID.class), any(SlotRequest.class), any(Time.class));
+//
+//             SlotDescriptor slotDescriptor = 
createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+//             assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
+//
+//             SharedSlot slot = future.get(1, TimeUnit.SECONDS);
+//             assertTrue(future.isDone());
+//             assertTrue(slot.isAlive());
+//             assertEquals(resourceID, slot.getTaskManagerID());
+//             assertEquals(jobID, slot.getJobID());
+//             assertEquals(slotPool, slot.getOwner());
+//
+//             SimpleSlot simpleSlot = slot.allocateSubSlot(vid);
+//             assertNotNull(simpleSlot);
+//             assertTrue(simpleSlot.isAlive());
+//     }
+//
+//     @Test
+//     public void testAllocateSlotWithoutResourceManager() throws Exception {
+//             slotPool.disconnectResourceManager();
+//             Future<SimpleSlot> future = slotPool.allocateSimpleSlot(new 
JobID(), DEFAULT_TESTING_PROFILE);
+//             future.handleAsync(
+//                     new BiFunction<SimpleSlot, Throwable, Void>() {
+//                             @Override
+//                             public Void apply(SimpleSlot simpleSlot, 
Throwable throwable) {
+//                                     assertNull(simpleSlot);
+//                                     assertNotNull(throwable);
+//                                     return null;
+//                             }
+//                     },
+//                     executor);
+//             try {
+//                     future.get(1, TimeUnit.SECONDS);
+//                     fail("We expected a ExecutionException.");
+//             } catch (ExecutionException ex) {
+//                     // we expect the exception
+//             }
+//     }
+//
+//     @Test
+//     public void testAllocationFulfilledByReturnedSlot() throws Exception {
+//             ResourceID resourceID = new ResourceID("resource");
+//             slotPool.registerResource(resourceID);
+//
+//             JobID jobID = new JobID();
+//
+//             AllocationID allocationID1 = new AllocationID();
+//             Future<SimpleSlot> future1 = slotPool.allocateSimpleSlot(jobID, 
DEFAULT_TESTING_PROFILE, allocationID1);
+//
+//             AllocationID allocationID2 = new AllocationID();
+//             Future<SimpleSlot> future2 = slotPool.allocateSimpleSlot(jobID, 
DEFAULT_TESTING_PROFILE, allocationID2);
+//
+//             assertFalse(future1.isDone());
+//             assertFalse(future2.isDone());
+//             verify(resourceManagerGateway, times(2))
+//                     .requestSlot(any(UUID.class), any(UUID.class), 
any(SlotRequest.class), any(Time.class));
+//
+//             SlotDescriptor slotDescriptor = 
createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+//             assertTrue(slotPool.offerSlot(allocationID1, slotDescriptor));
+//
+//             SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
+//             assertTrue(future1.isDone());
+//             assertFalse(future2.isDone());
+//
+//             // return this slot to pool
+//             slot1.releaseSlot();
+//
+//             // second allocation fulfilled by previous slot returning
+//             SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
+//             assertTrue(future2.isDone());
+//
+//             assertNotEquals(slot1, slot2);
+//             assertTrue(slot1.isReleased());
+//             assertTrue(slot2.isAlive());
+//             assertEquals(slot1.getTaskManagerID(), 
slot2.getTaskManagerID());
+//             assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber());
+//     }
+//
+//     @Test
+//     public void testAllocateWithFreeSlot() throws Exception {
+//             ResourceID resourceID = new ResourceID("resource");
+//             slotPool.registerResource(resourceID);
+//
+//             JobID jobID = new JobID();
+//             AllocationID allocationID1 = new AllocationID();
+//             Future<SimpleSlot> future1 = slotPool.allocateSimpleSlot(jobID, 
DEFAULT_TESTING_PROFILE, allocationID1);
+//             assertFalse(future1.isDone());
+//
+//             SlotDescriptor slotDescriptor = 
createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+//             assertTrue(slotPool.offerSlot(allocationID1, slotDescriptor));
+//
+//             SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
+//             assertTrue(future1.isDone());
+//
+//             // return this slot to pool
+//             slot1.releaseSlot();
+//
+//             AllocationID allocationID2 = new AllocationID();
+//             Future<SimpleSlot> future2 = slotPool.allocateSimpleSlot(jobID, 
DEFAULT_TESTING_PROFILE, allocationID2);
+//
+//             // second allocation fulfilled by previous slot returning
+//             SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
+//             assertTrue(future2.isDone());
+//
+//             assertNotEquals(slot1, slot2);
+//             assertTrue(slot1.isReleased());
+//             assertTrue(slot2.isAlive());
+//             assertEquals(slot1.getTaskManagerID(), 
slot2.getTaskManagerID());
+//             assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber());
+//     }
+//
+//     @Test
+//     public void testOfferSlot() throws Exception {
+//             ResourceID resourceID = new ResourceID("resource");
+//             slotPool.registerResource(resourceID);
+//
+//             JobID jobID = new JobID();
+//             AllocationID allocationID = new AllocationID();
+//             Future<SimpleSlot> future = slotPool.allocateSimpleSlot(jobID, 
DEFAULT_TESTING_PROFILE, allocationID);
+//             assertFalse(future.isDone());
+//             verify(resourceManagerGateway).requestSlot(any(UUID.class), 
any(UUID.class), any(SlotRequest.class), any(Time.class));
+//
+//             // slot from unregistered resource
+//             SlotDescriptor invalid = createSlotDescriptor(new 
ResourceID("unregistered"), jobID, DEFAULT_TESTING_PROFILE);
+//             assertFalse(slotPool.offerSlot(allocationID, invalid));
+//
+//             SlotDescriptor slotDescriptor = 
createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+//
+//             // reject offering with mismatch allocation id
+//             assertFalse(slotPool.offerSlot(new AllocationID(), 
slotDescriptor));
+//
+//             // accepted slot
+//             assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
+//             SimpleSlot slot = future.get(1, TimeUnit.SECONDS);
+//             assertTrue(future.isDone());
+//             assertTrue(slot.isAlive());
+//
+//             // conflict offer with using slot
+//             SlotDescriptor conflict = createSlotDescriptor(resourceID, 
jobID, DEFAULT_TESTING_PROFILE);
+//             assertFalse(slotPool.offerSlot(allocationID, conflict));
+//
+//             // duplicated offer with using slot
+//             assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
+//             assertTrue(future.isDone());
+//             assertTrue(slot.isAlive());
+//
+//             // duplicated offer with free slot
+//             slot.releaseSlot();
+//             assertTrue(slot.isReleased());
+//             assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
+//     }
+//
+//     @Test
+//     public void testReleaseResource() throws Exception {
+//             ResourceID resourceID = new ResourceID("resource");
+//             slotPool.registerResource(resourceID);
+//
+//             JobID jobID = new JobID();
+//
+//             AllocationID allocationID1 = new AllocationID();
+//             Future<SimpleSlot> future1 = slotPool.allocateSimpleSlot(jobID, 
DEFAULT_TESTING_PROFILE, allocationID1);
+//
+//             AllocationID allocationID2 = new AllocationID();
+//             Future<SimpleSlot> future2 = slotPool.allocateSimpleSlot(jobID, 
DEFAULT_TESTING_PROFILE, allocationID2);
+//
+//             SlotDescriptor slotDescriptor = 
createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+//             assertTrue(slotPool.offerSlot(allocationID1, slotDescriptor));
+//
+//             SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
+//             assertTrue(future1.isDone());
+//             assertFalse(future2.isDone());
+//
+//             slotPool.releaseResource(resourceID);
+//             assertTrue(slot1.isReleased());
+//
+//             // slot released and not usable, second allocation still not 
fulfilled
+//             Thread.sleep(10);
+//             assertFalse(future2.isDone());
+//     }
+//
+//}

http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index dd43337..f5b3892 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.runtime.minicluster;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
@@ -31,7 +33,7 @@ import org.junit.Test;
  */
 public class MiniClusterITCase extends TestLogger {
 
-//     @Test
+       @Test
        public void runJobWithSingleRpcService() throws Exception {
                MiniClusterConfiguration cfg = new MiniClusterConfiguration();
 
@@ -74,6 +76,13 @@ public class MiniClusterITCase extends TestLogger {
                task.setMaxParallelism(1);
                task.setInvokableClass(NoOpInvokable.class);
 
-               return new JobGraph(new JobID(), "Test Job", task);
+               JobGraph jg = new JobGraph(new JobID(), "Test Job", task);
+               jg.setAllowQueuedScheduling(true);
+
+               ExecutionConfig executionConfig = new ExecutionConfig();
+               
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE,
 1000));
+               jg.setExecutionConfig(executionConfig);
+
+               return jg;
        }
 }

Reply via email to