[FLINK-1376] [runtime] Add proper shared slot release in case of a fatal 
TaskManager failure.

Fixes concurrent modification exception of SharedSlot's subSlots field by 
synchronizing all state changing operations through the associated assignment 
group. Fixes deadlock where Instance.markDead first acquires InstanceLock and 
then by releasing the associated slots the assignment group lockcan block with 
a direct releaseSlot call on a SharedSlot which first acquires the assignment 
group lock and then the instance lock in order to return the slot to the 
instance.

Fixes colocation shared slot releasing. A colocation constraint is now realized 
as a SharedSlot in a SharedSlot where the colocated tasks allocate sub slots.

This cloes #317


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/db1b8b99
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/db1b8b99
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/db1b8b99

Branch: refs/heads/master
Commit: db1b8b993c12f2e74b6cc9a48414265666dc0e69
Parents: 9d181a8
Author: Till Rohrmann <[email protected]>
Authored: Mon Jan 12 10:58:45 2015 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Thu Feb 5 12:17:15 2015 +0100

----------------------------------------------------------------------
 .../flink/runtime/deployment/PartitionInfo.java |   8 +-
 .../flink/runtime/executiongraph/Execution.java |  22 +-
 .../runtime/executiongraph/ExecutionVertex.java |  10 +-
 .../flink/runtime/instance/AllocatedSlot.java   | 192 -------
 .../apache/flink/runtime/instance/Instance.java |  81 ++-
 .../flink/runtime/instance/SharedSlot.java      | 154 ++++++
 .../flink/runtime/instance/SimpleSlot.java      | 124 +++++
 .../org/apache/flink/runtime/instance/Slot.java | 191 +++++++
 .../scheduler/CoLocationConstraint.java         |  12 +-
 .../scheduler/NoResourceAvailableException.java |  17 +-
 .../runtime/jobmanager/scheduler/Scheduler.java | 227 +++++---
 .../jobmanager/scheduler/SharedSlot.java        | 114 ----
 .../scheduler/SlotAllocationFuture.java         |  12 +-
 .../scheduler/SlotAllocationFutureAction.java   |   4 +-
 .../scheduler/SlotAvailabilityListener.java     |   2 +-
 .../scheduler/SlotSharingGroupAssignment.java   | 334 ++++++-----
 .../runtime/jobmanager/scheduler/SubSlot.java   |  75 ---
 .../jobmanager/web/JobManagerInfoServlet.java   | 553 ++++++++++++++++++
 .../jobmanager/web/JobmanagerInfoServlet.java   | 554 -------------------
 .../runtime/jobmanager/web/JsonFactory.java     |   4 +-
 .../runtime/jobmanager/web/WebInfoServer.java   |   2 +-
 .../profiling/impl/JobProfilingData.java        |   7 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   1 +
 .../ExecutionGraphDeploymentTest.java           |   4 +-
 .../executiongraph/ExecutionGraphTestUtils.java |   4 +-
 .../ExecutionStateProgressTest.java             |   4 +-
 .../ExecutionVertexCancelTest.java              |  22 +-
 .../ExecutionVertexDeploymentTest.java          |  18 +-
 .../ExecutionVertexSchedulingTest.java          |   8 +-
 .../runtime/instance/AllocatedSlotTest.java     |  26 +-
 .../flink/runtime/instance/InstanceTest.java    |  22 +-
 .../ScheduleWithCoLocationHintTest.java         | 136 ++---
 .../scheduler/SchedulerIsolatedTasksTest.java   |  44 +-
 .../scheduler/SchedulerSlotSharingTest.java     | 210 +++----
 .../jobmanager/scheduler/SharedSlotsTest.java   |  73 +--
 .../scheduler/SlotAllocationFutureTest.java     |  24 +-
 .../runtime/jobmanager/JobManagerITCase.scala   |   2 +-
 .../jobmanager/TaskManagerFailsITCase.scala     |  39 +-
 .../TaskManagerFailsWithSlotSharingITCase.scala |  46 +-
 39 files changed, 1881 insertions(+), 1501 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java
index cdaf289..dd2c063 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionEdge;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
-import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.io.network.RemoteAddress;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 
@@ -114,7 +114,7 @@ public class PartitionInfo implements IOReadableWritable, 
Serializable {
 
        // 
------------------------------------------------------------------------
 
-       public static PartitionInfo fromEdge(ExecutionEdge edge, AllocatedSlot 
consumerSlot) {
+       public static PartitionInfo fromEdge(ExecutionEdge edge, SimpleSlot 
consumerSlot) {
                IntermediateResultPartition partition = edge.getSource();
                IntermediateResultPartitionID partitionId = 
partition.getPartitionId();
 
@@ -125,7 +125,7 @@ public class PartitionInfo implements IOReadableWritable, 
Serializable {
                RemoteAddress producerAddress = null;
                PartitionLocation producerLocation = PartitionLocation.UNKNOWN;
 
-               AllocatedSlot producerSlot = producer.getAssignedResource();
+               SimpleSlot producerSlot = producer.getAssignedResource();
                ExecutionState producerState = producer.getState();
 
                // The producer needs to be running, otherwise the consumer 
might request a partition,
@@ -145,7 +145,7 @@ public class PartitionInfo implements IOReadableWritable, 
Serializable {
                return new PartitionInfo(partitionId, producerExecutionId, 
producerLocation, producerAddress);
        }
 
-       public static PartitionInfo[] fromEdges(ExecutionEdge[] edges, 
AllocatedSlot consumerSlot) {
+       public static PartitionInfo[] fromEdges(ExecutionEdge[] edges, 
SimpleSlot consumerSlot) {
                // Every edge consumes a different result partition, which 
might be of
                // local, remote, or unknown location.
                PartitionInfo[] partitions = new PartitionInfo[edges.length];

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index a705231..e1a24c4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -101,8 +101,8 @@ public class Execution implements Serializable {
 
 
        private volatile ExecutionState state = CREATED;
-
-       private volatile AllocatedSlot assignedResource;  // once assigned, 
never changes
+       
+       private volatile SimpleSlot assignedResource;  // once assigned, never 
changes
        
        private volatile Throwable failureCause;          // once assigned, 
never changes
        
@@ -141,7 +141,7 @@ public class Execution implements Serializable {
                return state;
        }
        
-       public AllocatedSlot getAssignedResource() {
+       public SimpleSlot getAssignedResource() {
                return assignedResource;
        }
        
@@ -185,7 +185,7 @@ public class Execution implements Serializable {
 
                // sanity check
                if (locationConstraint != null && sharingGroup == null) {
-                       throw new RuntimeException("Trying to schedule with 
co-location constraint but without slot sharing allowed.");
+                       throw new RuntimeException("Trying to schedule with 
co-location constraint but without slot sharing not allowed.");
                }
 
                if (transitionState(CREATED, SCHEDULED)) {
@@ -201,7 +201,7 @@ public class Execution implements Serializable {
 
                                future.setFutureAction(new 
SlotAllocationFutureAction() {
                                        @Override
-                                       public void slotAllocated(AllocatedSlot 
slot) {
+                                       public void slotAllocated(SimpleSlot 
slot) {
                                                try {
                                                        deployToSlot(slot);
                                                }
@@ -216,7 +216,7 @@ public class Execution implements Serializable {
                                });
                        }
                        else {
-                               AllocatedSlot slot = 
scheduler.scheduleImmediately(toSchedule);
+                               SimpleSlot slot = 
scheduler.scheduleImmediately(toSchedule);
                                try {
                                        deployToSlot(slot);
                                }
@@ -237,7 +237,7 @@ public class Execution implements Serializable {
                }
        }
 
-       public void deployToSlot(final AllocatedSlot slot) throws JobException {
+       public void deployToSlot(final SimpleSlot slot) throws JobException {
                // sanity checks
                if (slot == null) {
                        throw new NullPointerException();
@@ -406,7 +406,7 @@ public class Execution implements Serializable {
                                }
                        }
                        else if (consumerState == RUNNING) {
-                               AllocatedSlot consumerSlot = 
consumerVertex.getCurrentAssignedResource();
+                               SimpleSlot consumerSlot = 
consumerVertex.getCurrentAssignedResource();
                                ExecutionAttemptID consumerExecutionId = 
consumerVertex.getCurrentExecutionAttempt().getAttemptId();
 
                                PartitionInfo partitionInfo = 
PartitionInfo.fromEdge(edge, consumerSlot);
@@ -635,7 +635,7 @@ public class Execution implements Serializable {
        }
 
        private void sendCancelRpcCall() {
-               final AllocatedSlot slot = this.assignedResource;
+               final SimpleSlot slot = this.assignedResource;
                if (slot == null) {
                        return;
                }
@@ -662,7 +662,7 @@ public class Execution implements Serializable {
        }
 
        private void sendFailIntermediateResultPartitionsRPCCall() {
-               final AllocatedSlot slot = this.assignedResource;
+               final SimpleSlot slot = this.assignedResource;
                if (slot == null) {
                        return;
                }
@@ -680,7 +680,7 @@ public class Execution implements Serializable {
                }
        }
 
-       private boolean sendUpdateTaskRpcCall(final AllocatedSlot consumerSlot, 
final ExecutionAttemptID executionId, final IntermediateDataSetID resultId, 
final PartitionInfo partitionInfo) throws Exception {
+       private boolean sendUpdateTaskRpcCall(final SimpleSlot consumerSlot, 
final ExecutionAttemptID executionId, final IntermediateDataSetID resultId, 
final PartitionInfo partitionInfo) throws Exception {
                final Instance instance = consumerSlot.getInstance();
 
                final TaskManagerMessages.TaskOperationResult result = 
AkkaUtils.ask(

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 8812569..d3993bb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobKey;
 import 
org.apache.flink.runtime.deployment.PartitionConsumerDeploymentDescriptor;
@@ -25,7 +26,6 @@ import 
org.apache.flink.runtime.deployment.PartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.PartitionInfo;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -178,7 +178,7 @@ public class ExecutionVertex implements Serializable {
                return currentExecution.getFailureCause();
        }
        
-       public AllocatedSlot getCurrentAssignedResource() {
+       public SimpleSlot getCurrentAssignedResource() {
                return currentExecution.getAssignedResource();
        }
        
@@ -304,7 +304,7 @@ public class ExecutionVertex implements Serializable {
                        ExecutionEdge[] sources = inputEdges[i];
                        if (sources != null) {
                                for (int k = 0; k < sources.length; k++) {
-                                       AllocatedSlot sourceSlot = 
sources[k].getSource().getProducer().getCurrentAssignedResource();
+                                       SimpleSlot sourceSlot = 
sources[k].getSource().getProducer().getCurrentAssignedResource();
                                        if (sourceSlot != null) {
                                                
locations.add(sourceSlot.getInstance());
                                                if (locations.size() > 
MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
@@ -346,7 +346,7 @@ public class ExecutionVertex implements Serializable {
                return this.currentExecution.scheduleForExecution(scheduler, 
queued);
        }
 
-       public void deployToSlot(AllocatedSlot slot) throws JobException {
+       public void deployToSlot(SimpleSlot slot) throws JobException {
                this.currentExecution.deployToSlot(slot);
        }
 
@@ -397,7 +397,7 @@ public class ExecutionVertex implements Serializable {
                getExecutionGraph().notifyExecutionChange(getJobvertexId(), 
subTaskIndex, executionId, newState, error);
        }
        
-       TaskDeploymentDescriptor createDeploymentDescriptor(ExecutionAttemptID 
executionId, AllocatedSlot slot) {
+       TaskDeploymentDescriptor createDeploymentDescriptor(ExecutionAttemptID 
executionId, SimpleSlot slot) {
                // Produced intermediate results
                List<PartitionDeploymentDescriptor> producedPartitions = new 
ArrayList<PartitionDeploymentDescriptor>(resultPartitions.length);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
deleted file mode 100644
index f1481f3..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import java.io.Serializable;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.jobmanager.scheduler.Locality;
-
-/**
- * An allocated slot is the unit in which resources are allocated on instances.
- */
-public class AllocatedSlot implements Serializable {
-
-       static final long serialVersionUID = 42L;
-       
-       private static final AtomicIntegerFieldUpdater<AllocatedSlot> 
STATUS_UPDATER = 
-                       
AtomicIntegerFieldUpdater.newUpdater(AllocatedSlot.class, "status");
-       
-       private static final AtomicReferenceFieldUpdater<AllocatedSlot, 
Execution> VERTEX_UPDATER =
-                       
AtomicReferenceFieldUpdater.newUpdater(AllocatedSlot.class, Execution.class, 
"executedTask");
-       
-       private static final int ALLOCATED_AND_ALIVE = 0;               // 
tasks may be added and might be running
-       private static final int CANCELLED = 1;                                 
// no more tasks may run
-       private static final int RELEASED = 2;                                  
// has been given back to the instance
-
-       
-       /** The ID of the job this slice belongs to. */
-       private final JobID jobID;
-       
-       /** The instance on which the slot is allocated */
-       private final Instance instance;
-       
-       /** The number of the slot on which the task is deployed */
-       private final int slotNumber;
-       
-       /** Task being executed in the slot. Volatile to force a memory barrier 
and allow for correct double-checking */
-       private volatile Execution executedTask;
-       
-       /** The state of the vertex, only atomically updated */
-       private volatile int status = ALLOCATED_AND_ALIVE;
-       
-       private Locality locality = Locality.UNCONSTRAINED;
-       
-
-       public AllocatedSlot(JobID jobID, Instance instance, int slotNumber) {
-               if (jobID == null || instance == null || slotNumber < 0) {
-                       throw new IllegalArgumentException();
-               }
-               
-               this.jobID = jobID;
-               this.instance = instance;
-               this.slotNumber = slotNumber;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       
-       /**
-        * Returns the ID of the job this allocated slot belongs to.
-        * 
-        * @return the ID of the job this allocated slot belongs to
-        */
-       public JobID getJobID() {
-               return this.jobID;
-       }
-       
-       public Instance getInstance() {
-               return instance;
-       }
-       
-       public int getSlotNumber() {
-               return slotNumber;
-       }
-       
-       public Execution getExecutedVertex() {
-               return executedTask;
-       }
-       
-       public Locality getLocality() {
-               return locality;
-       }
-       
-       public void setLocality(Locality locality) {
-               this.locality = locality;
-       }
-       
-       public boolean setExecutedVertex(Execution executedVertex) {
-               if (executedVertex == null) {
-                       throw new NullPointerException();
-               }
-               
-               // check that we can actually run in this slot
-               if (status != ALLOCATED_AND_ALIVE) {
-                       return false;
-               }
-               
-               // atomically assign the vertex
-               if (!VERTEX_UPDATER.compareAndSet(this, null, executedVertex)) {
-                       return false;
-               }
-
-               // we need to do a double check that we were not cancelled in 
the meantime
-               if (status != ALLOCATED_AND_ALIVE) {
-                       this.executedTask = null;
-                       return false;
-               }
-               
-               return true;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       //  Status and life cycle
-       // 
--------------------------------------------------------------------------------------------
-       
-       public boolean isAlive() {
-               return status == ALLOCATED_AND_ALIVE;
-       }
-       
-       public boolean isCanceled() {
-               return status != ALLOCATED_AND_ALIVE;
-       }
-       
-       public boolean isReleased() {
-               return status == RELEASED;
-       }
-       
-       
-       public void cancel() {
-               if (STATUS_UPDATER.compareAndSet(this, ALLOCATED_AND_ALIVE, 
CANCELLED)) {
-                       // kill all tasks currently running in this slot
-                       Execution exec = this.executedTask;
-                       if (exec != null && !exec.isFinished()) {
-                               exec.fail(new Exception("The slot in which the 
task was scheduled has been killed (probably loss of TaskManager)."));
-                       }
-               }
-       }
-       
-       public void releaseSlot() {
-               // cancel everything, if there is something. since this is 
atomically status based,
-               // it will not happen twice if another attempt happened before 
or concurrently
-               try {
-                       cancel();
-               } finally {
-                       this.instance.returnAllocatedSlot(this);
-               }
-       }
-       
-       protected boolean markReleased() {
-               return STATUS_UPDATER.compareAndSet(this, CANCELLED, RELEASED);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       //  Utilities
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public String toString() {
-               return instance.getId() + " (" + slotNumber + ") - " + 
getStateName(status);
-       }
-       
-       private static final String getStateName(int state) {
-               switch (state) {
-               case ALLOCATED_AND_ALIVE:
-                       return "ALLOCATED/ALIVE";
-               case CANCELLED:
-                       return "CANCELLED";
-               case RELEASED:
-                       return "RELEASED";
-               default:
-                       return "(unknown)";
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index abbbc34..4f9dc7f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -27,8 +27,10 @@ import java.util.Queue;
 import java.util.Set;
 
 import akka.actor.ActorRef;
+import org.apache.flink.runtime.AbstractID;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
+import 
org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment;
 
 /**
  * An taskManager represents a resource a {@link 
org.apache.flink.runtime.taskmanager.TaskManager} runs on.
@@ -59,7 +61,7 @@ public class Instance implements Serializable {
        private transient final Queue<Integer> availableSlots;
        
        /** Allocated slots on this taskManager */
-       private final Set<AllocatedSlot> allocatedSlots = new 
HashSet<AllocatedSlot>();
+       private final Set<Slot> allocatedSlots = new HashSet<Slot>();
 
        
        /** A listener to be notified upon new slot availability */
@@ -121,20 +123,27 @@ public class Instance implements Serializable {
        }
 
        public void markDead() {
-               if (isDead) {
-                       return;
-               }
-               
-               isDead = true;
-               
                synchronized (instanceLock) {
-                       
+                       if (isDead) {
+                               return;
+                       }
+
+                       isDead = true;
+
                        // no more notifications for the slot releasing
                        this.slotAvailabilityListener = null;
-                       
-                       for (AllocatedSlot slot : allocatedSlots) {
-                               slot.releaseSlot();
-                       }
+               }
+
+               /*
+                * releaseSlot must not own the instanceLock in order to avoid 
dead locks where a slot
+                * owning the assignment group lock wants to give itself back 
to the instance which requires
+                * the instance lock
+                */
+               for (Slot slot : allocatedSlots) {
+                       slot.releaseSlot();
+               }
+
+               synchronized (instanceLock) {
                        allocatedSlots.clear();
                        availableSlots.clear();
                }
@@ -176,8 +185,12 @@ public class Instance implements Serializable {
        // 
--------------------------------------------------------------------------------------------
        // Resource allocation
        // 
--------------------------------------------------------------------------------------------
+
+       public SimpleSlot allocateSimpleSlot(JobID jobID) throws 
InstanceDiedException {
+               return allocateSimpleSlot(jobID, jobID);
+       }
        
-       public AllocatedSlot allocateSlot(JobID jobID) throws 
InstanceDiedException {
+       public SimpleSlot allocateSimpleSlot(JobID jobID, AbstractID groupID) 
throws InstanceDiedException {
                if (jobID == null) {
                        throw new IllegalArgumentException();
                }
@@ -191,15 +204,38 @@ public class Instance implements Serializable {
                        if (nextSlot == null) {
                                return null;
                        } else {
-                               AllocatedSlot slot = new AllocatedSlot(jobID, 
this, nextSlot);
+                               SimpleSlot slot = new SimpleSlot(jobID, this, 
nextSlot, null, groupID);
                                allocatedSlots.add(slot);
                                return slot;
                        }
                }
        }
-       
-       public boolean returnAllocatedSlot(AllocatedSlot slot) {
+
+       public SharedSlot allocateSharedSlot(JobID jobID, 
SlotSharingGroupAssignment sharingGroupAssignment, AbstractID groupID) throws
+       InstanceDiedException {
                // the slot needs to be in the returned to taskManager state
+               if (jobID == null) {
+                       throw new IllegalArgumentException();
+               }
+
+               synchronized (instanceLock) {
+                       if (isDead) {
+                               throw new InstanceDiedException(this);
+                       }
+
+                       Integer nextSlot = availableSlots.poll();
+                       if (nextSlot == null) {
+                               return null;
+                       } else {
+                               SharedSlot slot = new SharedSlot(jobID, this, 
nextSlot,
+                                               sharingGroupAssignment, null, 
groupID);
+                               allocatedSlots.add(slot);
+                               return slot;
+                       }
+               }
+       }
+
+       public boolean returnAllocatedSlot(Slot slot) {
                if (slot == null || slot.getInstance() != this) {
                        throw new IllegalArgumentException("Slot is null or 
belongs to the wrong taskManager.");
                }
@@ -231,14 +267,15 @@ public class Instance implements Serializable {
        }
        
        public void cancelAndReleaseAllSlots() {
+               List<Slot> copy = null;
+
                synchronized (instanceLock) {
                        // we need to do this copy because of concurrent 
modification exceptions
-                       List<AllocatedSlot> copy = new 
ArrayList<AllocatedSlot>(this.allocatedSlots);
+                       copy = new ArrayList<Slot>(this.allocatedSlots);
+               }
                        
-                       for (AllocatedSlot slot : copy) {
-                               slot.releaseSlot();
-                       }
-                       allocatedSlots.clear();
+               for (Slot slot : copy) {
+                       slot.releaseSlot();
                }
        }
 
@@ -293,6 +330,6 @@ public class Instance implements Serializable {
        @Override
        public String toString() {
                return instanceId + " @" + (taskManager != null ? 
taskManager.path() : "ActorRef.noSender") + " " +
-                               numberOfSlots + " slots";
+                               numberOfSlots + " slots" + " - " + hashCode();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
new file mode 100644
index 0000000..2efcf6c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.runtime.AbstractID;
+import org.apache.flink.runtime.jobgraph.JobID;
+import 
org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * This class represents a shared slot. A shared slot can have multiple
+ * {@link org.apache.flink.runtime.instance.SimpleSlot} instances within 
itself. This allows to
+ * schedule multiple tasks simultaneously, enabling Flink's streaming 
capabilities.
+ *
+ * IMPORTANT: This class contains no synchronization. Thus, the caller has to 
guarantee proper
+ * synchronization. In the current implementation, all concurrently modifying 
operations are
+ * passed through a {@link SlotSharingGroupAssignment} object which is 
responsible for
+ * synchronization.
+ *
+ */
+public class SharedSlot extends Slot {
+
+       private final SlotSharingGroupAssignment assignmentGroup;
+
+       private final Set<Slot> subSlots;
+
+       public SharedSlot(JobID jobID, Instance instance, int slotNumber,
+                                       SlotSharingGroupAssignment 
assignmentGroup, SharedSlot parent,
+                                       AbstractID groupID) {
+               super(jobID, instance, slotNumber, parent, groupID);
+
+               this.assignmentGroup = assignmentGroup;
+               this.subSlots = new HashSet<Slot>();
+       }
+
+       public Set<Slot> getSubSlots() {
+               return subSlots;
+       }
+
+       /**
+        * Removes the simple slot from the {@link 
org.apache.flink.runtime.instance.SharedSlot}. Should
+        * only be called through the
+        * {@link 
org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment} 
attribute
+        * assignmnetGroup.
+        *
+        * @param slot slot to be removed from the set of sub slots.
+        * @return Number of remaining sub slots
+        */
+       public int freeSubSlot(Slot slot){
+               if(!subSlots.remove(slot)){
+                       throw new IllegalArgumentException("Wrong shared slot 
for sub slot.");
+               }
+
+               return subSlots.size();
+       }
+
+       @Override
+       public int getNumberLeaves() {
+               int result = 0;
+
+               for(Slot slot: subSlots){
+                       result += slot.getNumberLeaves();
+               }
+
+               return result;
+       }
+
+       @Override
+       public void cancel() {
+               // Guarantee that the operation is only executed once
+               if (markCancelled()) {
+                       assignmentGroup.releaseSharedSlot(this);
+               }
+       }
+
+       /**
+        * Release this shared slot. In order to do this:
+        *
+        * 1. Cancel and release all sub slots atomically with respect to the 
assigned assignment group.
+        * 2. Set the state of the shared slot to be cancelled.
+        * 3. Dispose the shared slot (returning the slot to the instance).
+        *
+        * After cancelAndReleaseSubSlots, the shared slot is marked to be 
dead. This prevents further
+        * sub slot creation by the scheduler.
+        */
+       @Override
+       public void releaseSlot() {
+               assignmentGroup.releaseSharedSlot(this);
+       }
+
+       /**
+        * Creates a new sub slot if the slot is not dead, yet. This method 
should only be called from
+        * the assignment group instance to guarantee synchronization.
+        *
+        * @param jID id to identify tasks which can be deployed in this sub 
slot
+        * @return new sub slot if the shared slot is still alive, otherwise 
null
+        */
+       public SimpleSlot allocateSubSlot(AbstractID jID){
+               if(isDead()){
+                       return null;
+               } else {
+                       SimpleSlot slot = new SimpleSlot(jobID, instance, 
subSlots.size(), this, jID);
+                       subSlots.add(slot);
+
+                       return slot;
+               }
+       }
+
+       public SharedSlot allocateSharedSlot(AbstractID jID){
+               if(isDead()){
+                       return null;
+               } else {
+                       SharedSlot slot = new SharedSlot(jobID, instance, 
subSlots.size(), assignmentGroup, this, jID);
+                       subSlots.add(slot);
+
+                       return slot;
+               }
+       }
+
+       /**
+        * Disposes the given sub slot. This
+        * is done by the means of the assignmentGroup in order to synchronize 
the method. If the
+        * disposed slot was the last sub slot, then the shared slot is marked 
to be cancelled and is
+        * disposed/returned to the owning instance.
+        *
+        * @param slot sub slot which shall be removed from the shared slot
+        */
+       public void disposeChild(SimpleSlot slot){
+               assignmentGroup.releaseSimpleSlot(slot);
+       }
+
+       @Override
+       public String toString() {
+               return "Shared " + super.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
new file mode 100644
index 0000000..5b1af57
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.runtime.AbstractID;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+/**
+ * Class which represents a single slot on a machine or within a shared slot. 
If this slot is part
+ * of a [[SharedSlot]], then its parent attribute is set to this instance. If 
not, then the parent
+ * attribute is null.
+ *
+ * IMPORTANT: This class has no synchronization. Thus it has to be 
synchronized by the calling
+ * object.
+ */
+public class SimpleSlot extends Slot {
+
+       private static final AtomicReferenceFieldUpdater<SimpleSlot, Execution> 
VERTEX_UPDATER =
+                       
AtomicReferenceFieldUpdater.newUpdater(SimpleSlot.class, Execution.class, 
"executedTask");
+
+       /** Task being executed in the slot. Volatile to force a memory barrier 
and allow for correct double-checking */
+       private volatile Execution executedTask;
+
+       private Locality locality = Locality.UNCONSTRAINED;
+
+       public SimpleSlot(JobID jobID, Instance instance, int slotNumber, 
SharedSlot parent, AbstractID groupID){
+               super(jobID, instance, slotNumber, parent, groupID);
+       }
+
+       @Override
+       public int getNumberLeaves() {
+               return 1;
+       }
+
+
+       public Execution getExecution() {
+               return executedTask;
+       }
+
+       public Locality getLocality() {
+               return locality;
+       }
+
+       public void setLocality(Locality locality) {
+               this.locality = locality;
+       }
+
+       public boolean setExecutedVertex(Execution executedVertex) {
+               if (executedVertex == null) {
+                       throw new NullPointerException();
+               }
+
+               // check that we can actually run in this slot
+               if (status != ALLOCATED_AND_ALIVE) {
+                       return false;
+               }
+
+               // atomically assign the vertex
+               if (!VERTEX_UPDATER.compareAndSet(this, null, executedVertex)) {
+                       return false;
+               }
+
+               // we need to do a double check that we were not cancelled in 
the meantime
+               if (status != ALLOCATED_AND_ALIVE) {
+                       this.executedTask = null;
+                       return false;
+               }
+
+               return true;
+       }
+
+       @Override
+       public void cancel() {
+               if (markCancelled()) {
+                       // kill all tasks currently running in this slot
+                       Execution exec = this.executedTask;
+                       if (exec != null && !exec.isFinished()) {
+                               exec.fail(new Exception("The slot in which the 
task was scheduled has been killed (probably loss of TaskManager)."));
+                       }
+               }
+       }
+
+       @Override
+       public void releaseSlot() {
+               // cancel everything, if there is something. since this is 
atomically status based,
+               // it will not happen twice if another attempt happened before 
or concurrently
+               try {
+                       cancel();
+               } finally {
+                       if (getParent() != null) {
+                               // we have to ask our parent to dispose us
+                               getParent().disposeChild(this);
+                       } else {
+                               // we have to give back the slot to the owning 
instance
+                               instance.returnAllocatedSlot(this);
+                       }
+               }
+       }
+
+       @Override
+       public String toString() {
+               return "SimpleSlot " + super.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
new file mode 100644
index 0000000..fb62c4c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.runtime.AbstractID;
+import org.apache.flink.runtime.jobgraph.JobID;
+
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+/**
+ * Base class for slots.
+ */
+public abstract class Slot {
+       protected static final AtomicIntegerFieldUpdater<Slot> STATUS_UPDATER =
+                       AtomicIntegerFieldUpdater.newUpdater(Slot.class, 
"status");
+
+       protected static final int ALLOCATED_AND_ALIVE = 0;             // 
tasks may be added and might be running
+       protected static final int CANCELLED = 1;                               
        // no more tasks may run
+       protected static final int RELEASED = 2;                                
        // has been given back to the instance
+
+       /** The ID of the job this slice belongs to. */
+       protected final JobID jobID;
+
+       /** The instance on which the slot is allocated */
+       protected final Instance instance;
+
+       /** The number of the slot on which the task is deployed */
+       protected final int slotNumber;
+
+       /** The state of the vertex, only atomically updated */
+       protected volatile int status = ALLOCATED_AND_ALIVE;
+
+       /** Indicates whether this slot was marked dead by the system */
+       private boolean dead = false;
+
+       private final AbstractID groupID;
+
+       private final SharedSlot parent;
+
+       private boolean disposed = false;
+
+
+       public Slot(JobID jobID, Instance instance, int slotNumber, SharedSlot 
parent, AbstractID groupID) {
+               if (jobID == null || instance == null || slotNumber < 0) {
+                       throw new IllegalArgumentException();
+               }
+
+               this.jobID = jobID;
+               this.instance = instance;
+               this.slotNumber = slotNumber;
+               this.parent = parent;
+               this.groupID = groupID;
+
+       }
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Returns the ID of the job this allocated slot belongs to.
+        *
+        * @return the ID of the job this allocated slot belongs to
+        */
+       public JobID getJobID() {
+               return this.jobID;
+       }
+
+       public Instance getInstance() {
+               return instance;
+       }
+
+       public int getSlotNumber() {
+               return slotNumber;
+       }
+
+       public AbstractID getGroupID() {
+               return groupID;
+       }
+
+       public SharedSlot getParent() {
+               return parent;
+       }
+
+       public Slot getRoot() {
+               if(parent == null){
+                       return this;
+               } else {
+                       return parent.getRoot();
+               }
+       }
+
+       public abstract int getNumberLeaves();
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Status and life cycle
+       // 
--------------------------------------------------------------------------------------------
+
+       public boolean isAlive() {
+               return status == ALLOCATED_AND_ALIVE;
+       }
+
+       public boolean isCanceled() {
+               return status != ALLOCATED_AND_ALIVE;
+       }
+
+       public boolean isReleased() {
+               return status == RELEASED;
+       }
+
+       public abstract void cancel();
+
+       public abstract void releaseSlot();
+
+       public boolean markReleased() {
+               return STATUS_UPDATER.compareAndSet(this, CANCELLED, RELEASED);
+       }
+
+       public boolean markCancelled() {
+               return STATUS_UPDATER.compareAndSet(this, ALLOCATED_AND_ALIVE, 
CANCELLED);
+       }
+
+       /**
+        * Marks this shared slot to be dead. Returns if the slot was alive 
before. Should only
+        * be called through the {@link 
org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment} 
attribute assignmentGroup.
+        *
+        * @return if the slot was alive before
+        */
+       public boolean markDead() {
+               boolean result = !dead;
+
+               dead = true;
+
+               return result;
+       }
+
+       public boolean isDead() {
+               return dead;
+       }
+
+       public boolean markDisposed() {
+               boolean result = !disposed;
+
+               disposed = true;
+
+               return result;
+       }
+
+       public boolean isDisposed() {
+               return disposed;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Utilities
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public String toString() {
+               return hierarchy() + " - " + instance.getId() + " - " + 
getStateName(status);
+       }
+
+       protected String hierarchy() {
+               return "(" + slotNumber + ")" + (getParent() != null ? 
getParent().hierarchy() : "");
+       }
+
+       private static final String getStateName(int state) {
+               switch (state) {
+                       case ALLOCATED_AND_ALIVE:
+                               return "ALLOCATED/ALIVE";
+                       case CANCELLED:
+                               return "CANCELLED";
+                       case RELEASED:
+                               return "RELEASED";
+                       default:
+                               return "(unknown)";
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
index 739ec09..8ef61b9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.AbstractID;
 import org.apache.flink.runtime.instance.Instance;
 
 import com.google.common.base.Preconditions;
+import org.apache.flink.runtime.instance.SharedSlot;
 
 import java.io.Serializable;
 
@@ -46,7 +47,7 @@ public class CoLocationConstraint implements Serializable {
        
        public Instance getLocation() {
                if (sharedSlot != null) {
-                       return sharedSlot.getAllocatedSlot().getInstance();
+                       return sharedSlot.getInstance();
                } else {
                        throw new IllegalStateException("Not assigned");
                }
@@ -56,7 +57,7 @@ public class CoLocationConstraint implements Serializable {
                if (this.sharedSlot == sharedSlot) {
                        return;
                }
-               else if (this.sharedSlot == null || 
this.sharedSlot.isDisposed()) {
+               else if (this.sharedSlot == null || this.sharedSlot.isDead()) {
                        this.sharedSlot = sharedSlot;
                } else {
                        throw new IllegalStateException("Overriding shared slot 
that is still alive.");
@@ -68,12 +69,17 @@ public class CoLocationConstraint implements Serializable {
        }
        
        public boolean isUnassignedOrDisposed() {
-               return this.sharedSlot == null || this.sharedSlot.isDisposed();
+               return this.sharedSlot == null || this.sharedSlot.isDead();
        }
        
        public AbstractID getGroupId() {
                return this.group.getId();
        }
+
+       @Override
+       public String toString() {
+               return "CoLocation constraint id " + getGroupId() + " shared 
slot " + sharedSlot;
+       }
        
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
index 730952b..93b4541 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
@@ -36,14 +36,19 @@ public class NoResourceAvailableException extends 
JobException {
                                + ". You can decrease the operator parallelism 
or increase the number of slots per TaskManager in the configuration.");
        }
        
-       public NoResourceAvailableException(int numInstances, int 
numSlotsTotal) {
-               super(String.format("%s Resources available to scheduler: 
Number of instances=%d, total number of slots=%d", 
-                               BASE_MESSAGE, numInstances, numSlotsTotal));
+       public NoResourceAvailableException(int numInstances, int 
numSlotsTotal, int availableSlots) {
+               super(String.format("%s Resources available to scheduler: 
Number of instances=%d, total number of slots=%d, available slots=%d",
+                               BASE_MESSAGE, numInstances, numSlotsTotal, 
availableSlots));
        }
        
-       NoResourceAvailableException(ScheduledUnit task, int numInstances, int 
numSlotsTotal) {
-               super(String.format("%s Task to schedule: < %s > in sharing 
group < %s >. Resources available to scheduler: Number of instances=%d, total 
number of slots=%d", 
-                               BASE_MESSAGE, task.getTaskToExecute(), 
task.getSlotSharingGroup(), numInstances, numSlotsTotal));
+       NoResourceAvailableException(ScheduledUnit task, int numInstances, int 
numSlotsTotal, int availableSlots) {
+               super(String.format("%s Task to schedule: < %s > with groupID < 
%s > in sharing group < %s >. Resources available to scheduler: Number of 
instances=%d, total number of slots=%d, available slots=%d",
+                               BASE_MESSAGE, task.getTaskToExecute(),
+                               task.getLocationConstraint() == null ? 
task.getTaskToExecute().getVertex().getJobvertexId() : 
task.getLocationConstraint().getGroupId(),
+                               task.getSlotSharingGroup(),
+                               numInstances,
+                               numSlotsTotal,
+                               availableSlots));
        }
 
        public NoResourceAvailableException(String message) {

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 6495aba..c237aa5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -29,11 +29,15 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import akka.dispatch.Futures;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.AbstractID;
+import org.apache.flink.runtime.instance.SharedSlot;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceDiedException;
 import org.apache.flink.runtime.instance.InstanceListener;
@@ -124,10 +128,10 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
        //  Scheduling
        // 
--------------------------------------------------------------------------------------------
        
-       public AllocatedSlot scheduleImmediately(ScheduledUnit task) throws 
NoResourceAvailableException {
+       public SimpleSlot scheduleImmediately(ScheduledUnit task) throws 
NoResourceAvailableException {
                Object ret = scheduleTask(task, false);
-               if (ret instanceof AllocatedSlot) {
-                       return (AllocatedSlot) ret;
+               if (ret instanceof SimpleSlot) {
+                       return (SimpleSlot) ret;
                }
                else {
                        throw new RuntimeException();
@@ -136,8 +140,8 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
        
        public SlotAllocationFuture scheduleQueued(ScheduledUnit task) throws 
NoResourceAvailableException {
                Object ret = scheduleTask(task, true);
-               if (ret instanceof AllocatedSlot) {
-                       return new SlotAllocationFuture((AllocatedSlot) ret);
+               if (ret instanceof SimpleSlot) {
+                       return new SlotAllocationFuture((SimpleSlot) ret);
                }
                if (ret instanceof SlotAllocationFuture) {
                        return (SlotAllocationFuture) ret;
@@ -148,7 +152,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
        }
        
        /**
-        * Returns either an {@link AllocatedSlot}, or an {@link 
SlotAllocationFuture}.
+        * Returns either an {@link 
org.apache.flink.runtime.instance.SimpleSlot}, or an {@link 
SlotAllocationFuture}.
         */
        private Object scheduleTask(ScheduledUnit task, boolean 
queueIfNoResource) throws NoResourceAvailableException {
                if (task == null) {
@@ -158,7 +162,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                if (LOG.isDebugEnabled()) {
                        LOG.debug("Scheduling task " + task);
                }
-               
+
                final ExecutionVertex vertex = 
task.getTaskToExecute().getVertex();
        
                synchronized (globalLock) {
@@ -176,15 +180,15 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                                final CoLocationConstraint constraint = 
task.getLocationConstraint();
                                
                                // get a slot from the group, if the group has 
one for us (and can fulfill the constraint)
-                               SubSlot slotFromGroup;
+                               SimpleSlot slotFromGroup;
                                if (constraint == null) {
                                        slotFromGroup = 
assignment.getSlotForTask(vertex);
                                }
                                else {
                                        slotFromGroup = 
assignment.getSlotForTask(vertex, constraint);
                                }
-                               
-                               AllocatedSlot newSlot = null;
+
+                               SimpleSlot newSlot = null;
                                
                                // the following needs to make sure any 
allocated slot is released in case of an error
                                try {
@@ -202,15 +206,15 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                                                        
vertex.getPreferredLocations() : 
Collections.singleton(constraint.getLocation());
                                        
                                        // get a new slot, since we could not 
place it into the group, or we could not place it locally
-                                       newSlot = getFreeSlotForTask(vertex, 
locations);
-                                       
-                                       SubSlot toUse;
+                                       newSlot = getFreeSubSlotForTask(vertex, 
locations, assignment, constraint);
+
+                                       SimpleSlot toUse;
                                        
                                        if (newSlot == null) {
                                                if (slotFromGroup == null) {
                                                        // both null
                                                        if (constraint == null 
|| constraint.isUnassigned()) {
-                                                               throw new 
NoResourceAvailableException(task, getNumberOfAvailableInstances(), 
getTotalNumberOfSlots());
+                                                               throw new 
NoResourceAvailableException(task, getNumberOfAvailableInstances(), 
getTotalNumberOfSlots(), getNumberOfAvailableSlots());
                                                        } else {
                                                                throw new 
NoResourceAvailableException("Could not allocate a slot on instance " + 
                                                                                
        constraint.getLocation() + ", as required by the co-location 
constraint.");
@@ -226,11 +230,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                                                        
slotFromGroup.releaseSlot();
                                                }
                                                
-                                               if (constraint == null) {
-                                                       toUse = 
assignment.addNewSlotWithTask(newSlot, vertex);
-                                               } else {
-                                                       toUse = 
assignment.addNewSlotWithTask(newSlot, vertex, constraint);
-                                               }
+                                               toUse = newSlot;
                                        }
                                        else {
                                                // both are available and 
usable. neither is local
@@ -242,7 +242,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                                        // if it was assigned before and the 
new one is not local, it is a fail
                                        if (constraint != null) {
                                                if (constraint.isUnassigned() 
|| toUse.getLocality() == Locality.LOCAL) {
-                                                       
constraint.setSharedSlot(toUse.getSharedSlot());
+                                                       
constraint.setSharedSlot(toUse.getParent());
                                                } else {
                                                        // the fail
                                                        throw new 
NoResourceAvailableException("Could not allocate a slot on instance " + 
@@ -270,7 +270,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                
                        // 2) === schedule without hints and sharing ===
                        
-                       AllocatedSlot slot = getFreeSlotForTask(vertex, 
vertex.getPreferredLocations());
+                       SimpleSlot slot = getFreeSlotForTask(vertex, 
vertex.getPreferredLocations());
                        if (slot != null) {
                                updateLocalityCounters(slot.getLocality());
                                return slot;
@@ -283,7 +283,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                                        return future;
                                }
                                else {
-                                       throw new 
NoResourceAvailableException(getNumberOfAvailableInstances(), 
getTotalNumberOfSlots());
+                                       throw new 
NoResourceAvailableException(getNumberOfAvailableInstances(), 
getTotalNumberOfSlots(), getNumberOfAvailableSlots());
                                }
                        }
                }
@@ -297,69 +297,96 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
         * @param vertex The task to run. 
         * @return The instance to run the vertex on, it {@code null}, if no 
instance is available.
         */
-       protected AllocatedSlot getFreeSlotForTask(ExecutionVertex vertex, 
Iterable<Instance> requestedLocations) {
+       protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex, 
Iterable<Instance> requestedLocations) {
                
                // we need potentially to loop multiple times, because there 
may be false positives
                // in the set-with-available-instances
                while (true) {
-                       if (this.instancesWithAvailableResources.isEmpty()) {
-                               // check if the asynchronous calls did not yet 
return the queues
-                               Instance queuedInstance = 
this.newlyAvailableInstances.poll();
-                               if (queuedInstance == null) {
-                                       return null;
-                               } else {
-                                       
this.instancesWithAvailableResources.add(queuedInstance);
+                       Pair<Instance, Locality> instanceLocalityPair = 
findInstance(requestedLocations);
+
+                       if(instanceLocalityPair == null){
+                               return null;
+                       }
+
+                       Instance instanceToUse = instanceLocalityPair.getLeft();
+                       Locality locality = instanceLocalityPair.getRight();
+
+                       if(LOG.isDebugEnabled()){
+                               if(locality == Locality.LOCAL){
+                                       LOG.debug("Local assignment: " + 
vertex.getSimpleName() + " --> " + instanceToUse);
+                               }else if(locality == Locality.NON_LOCAL){
+                                       LOG.debug("Non-local assignment: " + 
vertex.getSimpleName() + " --> " + instanceToUse);
+                               }else if(locality == Locality.UNCONSTRAINED) {
+                                       LOG.debug("Unconstrained assignment: " 
+ vertex.getSimpleName() + " --> " + instanceToUse);
                                }
                        }
-                       
-                       Iterator<Instance> locations = requestedLocations == 
null ? null : requestedLocations.iterator();
-                       
-                       Instance instanceToUse = null;
-                       Locality locality = Locality.UNCONSTRAINED;
-                       
-                       if (locations != null && locations.hasNext()) {
-                               // we have a locality preference
+
+                       try {
+                               SimpleSlot slot = 
instanceToUse.allocateSimpleSlot(vertex.getJobId(), vertex.getJobvertexId());
                                
-                               while (locations.hasNext()) {
-                                       Instance location = locations.next();
-                                       
-                                       if (location != null && 
this.instancesWithAvailableResources.remove(location)) {
-                                               instanceToUse = location;
-                                               locality = Locality.LOCAL;
-                                               
-                                               if (LOG.isDebugEnabled()) {
-                                                       LOG.debug("Local 
assignment: " + vertex.getSimpleName() + " --> " + location);
-                                               }
-                                               
-                                               break;
-                                       }
+                               // if the instance has further available slots, 
re-add it to the set of available resources.
+                               if (instanceToUse.hasResourcesAvailable()) {
+                                       
this.instancesWithAvailableResources.add(instanceToUse);
                                }
                                
-                               if (instanceToUse == null) {
-                                       instanceToUse = 
this.instancesWithAvailableResources.poll();
-                                       locality = Locality.NON_LOCAL;
-                                       if (LOG.isDebugEnabled()) {
-                                               LOG.debug("Non-local 
assignment: " + vertex.getSimpleName() + " --> " + instanceToUse);
-                                       }
+                               if (slot != null) {
+                                       slot.setLocality(locality);
+                                       return slot;
                                }
                        }
-                       else {
-                               instanceToUse = 
this.instancesWithAvailableResources.poll();
-                               if (LOG.isDebugEnabled()) {
+                       catch (InstanceDiedException e) {
+                               // the instance died it has not yet been 
propagated to this scheduler
+                               // remove the instance from the set of 
available instances
+                               this.allInstances.remove(instanceToUse);
+                               
this.instancesWithAvailableResources.remove(instanceToUse);
+                       }
+                       
+                       // if we failed to get a slot, fall through the loop
+               }
+       }
+
+       protected SimpleSlot getFreeSubSlotForTask(ExecutionVertex vertex,
+                                                                               
        Iterable<Instance> requestedLocations,
+                                                                               
        SlotSharingGroupAssignment groupAssignment,
+                                                                               
        CoLocationConstraint constraint) {
+               // we need potentially to loop multiple times, because there 
may be false positives
+               // in the set-with-available-instances
+               while (true) {
+                       Pair<Instance, Locality> instanceLocalityPair = 
findInstance(requestedLocations);
+
+                       if(instanceLocalityPair == null){
+                               return null;
+                       }
+
+                       Instance instanceToUse = instanceLocalityPair.getLeft();
+                       Locality locality = instanceLocalityPair.getRight();
+
+                       if(LOG.isDebugEnabled()){
+                               if(locality == Locality.LOCAL){
+                                       LOG.debug("Local assignment: " + 
vertex.getSimpleName() + " --> " + instanceToUse);
+                               }else if(locality == Locality.NON_LOCAL){
+                                       LOG.debug("Non-local assignment: " + 
vertex.getSimpleName() + " --> " + instanceToUse);
+                               }else if(locality == Locality.UNCONSTRAINED) {
                                        LOG.debug("Unconstrained assignment: " 
+ vertex.getSimpleName() + " --> " + instanceToUse);
                                }
                        }
-                       
+
                        try {
-                               AllocatedSlot slot = 
instanceToUse.allocateSlot(vertex.getJobId());
-                               
+                               AbstractID groupID = constraint == null ? 
vertex.getJobvertexId() : constraint.getGroupId();
+
+                               // root SharedSlot
+                               SharedSlot sharedSlot = 
instanceToUse.allocateSharedSlot(vertex.getJobId(), groupAssignment, groupID);
+
+                               // If constraint != null, then slot nested in a 
SharedSlot nested in sharedSlot
+                               // If constraint == null, then slot nested in 
sharedSlot
+                               SimpleSlot slot = 
groupAssignment.addSharedSlotAndAllocateSubSlot(sharedSlot, locality, groupID, 
constraint);
+
                                // if the instance has further available slots, 
re-add it to the set of available resources.
                                if (instanceToUse.hasResourcesAvailable()) {
                                        
this.instancesWithAvailableResources.add(instanceToUse);
                                }
-                               
+
                                if (slot != null) {
-                                       slot.setLocality(locality);
                                        return slot;
                                }
                        }
@@ -369,10 +396,61 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                                this.allInstances.remove(instanceToUse);
                                
this.instancesWithAvailableResources.remove(instanceToUse);
                        }
-                       
+
                        // if we failed to get a slot, fall through the loop
                }
        }
+
+       /**
+        * NOTE: This method is not thread-safe, it needs to be synchronized by 
the caller.
+        *
+        * Tries to find a requested instance. If no such instance is available 
it will return a non-
+        * local instance. If no such instance exists (all slots occupied), 
then return null.
+        *
+        * @param requestedLocations
+        * @return
+        */
+       private Pair<Instance, Locality> findInstance(Iterable<Instance> 
requestedLocations){
+               if (this.instancesWithAvailableResources.isEmpty()) {
+                       // check if the asynchronous calls did not yet return 
the queues
+                       Instance queuedInstance = 
this.newlyAvailableInstances.poll();
+                       if (queuedInstance == null) {
+                               return null;
+                       } else {
+                               
this.instancesWithAvailableResources.add(queuedInstance);
+                       }
+               }
+
+               Iterator<Instance> locations = requestedLocations == null ? 
null : requestedLocations.iterator();
+
+               Instance instanceToUse = null;
+               Locality locality = Locality.UNCONSTRAINED;
+
+               if (locations != null && locations.hasNext()) {
+                       // we have a locality preference
+
+                       while (locations.hasNext()) {
+                               Instance location = locations.next();
+
+                               if (location != null && 
this.instancesWithAvailableResources.remove(location)) {
+                                       instanceToUse = location;
+                                       locality = Locality.LOCAL;
+
+                                       break;
+                               }
+                       }
+
+                       if (instanceToUse == null) {
+                               instanceToUse = 
this.instancesWithAvailableResources.poll();
+                               locality = Locality.NON_LOCAL;
+                       }
+               }
+               else {
+                       instanceToUse = 
this.instancesWithAvailableResources.poll();
+               }
+
+               return new ImmutablePair<Instance, Locality>(instanceToUse, 
locality);
+       }
        
        @Override
        public void newSlotAvailable(final Instance instance) {
@@ -386,7 +464,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                //                             (2) scheduler (to check whether 
to take a new task item
                // 
                // that leads with a high probability to deadlocks, when 
scheduling fast
-               
+
                this.newlyAvailableInstances.add(instance);
 
                Futures.future(new Callable<Object>() {
@@ -416,7 +494,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                                ExecutionVertex vertex = 
task.getTaskToExecute().getVertex();
                                
                                try {
-                                       AllocatedSlot newSlot = 
instance.allocateSlot(vertex.getJobId());
+                                       SimpleSlot newSlot = 
instance.allocateSimpleSlot(vertex.getJobId(), vertex.getJobvertexId());
                                        if (newSlot != null) {
                                                
                                                // success, remove from the 
task queue and notify the future
@@ -524,7 +602,16 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
        // 
--------------------------------------------------------------------------------------------
 
        public int getNumberOfAvailableInstances() {
-               return allInstances.size();
+               int numberAvailableInstances = 0;
+               synchronized (this.globalLock) {
+                       for(Instance instance: allInstances){
+                               if(instance.isAlive()){
+                                       numberAvailableInstances++;
+                               }
+                       }
+               }
+
+               return numberAvailableInstances;
        }
        
        public int getNumberOfInstancesWithAvailableSlots() {

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java
deleted file mode 100644
index 1ce8465..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-import java.io.Serializable;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.flink.runtime.instance.AllocatedSlot;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-
-/**
- * 
- * NOTE: This class does no synchronization by itself and its mutating
- *       methods may only be called from within the synchronization scope of
- *       it associated SlotSharingGroupAssignment.
- */
-class SharedSlot implements Serializable {
-
-       static final long serialVersionUID = 42L;
-
-       private final AllocatedSlot allocatedSlot;
-       
-       private final SlotSharingGroupAssignment assignmentGroup;
-       
-       private final Set<SubSlot> subSlots;
-       
-       private int subSlotNumber;
-       
-       private volatile boolean disposed;
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       public SharedSlot(AllocatedSlot allocatedSlot, 
SlotSharingGroupAssignment assignmentGroup) {
-               if (allocatedSlot == null || assignmentGroup == null) {
-                       throw new NullPointerException();
-               }
-               
-               this.allocatedSlot = allocatedSlot;
-               this.assignmentGroup = assignmentGroup;
-               this.subSlots = new HashSet<SubSlot>();
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       AllocatedSlot getAllocatedSlot() {
-               return this.allocatedSlot;
-       }
-       
-       boolean isDisposed() {
-               return disposed;
-       }
-       
-       int getNumberOfAllocatedSubSlots() {
-               return this.subSlots.size();
-       }
-       
-       SubSlot allocateSubSlot(JobVertexID jid) {
-               if (disposed) {
-                       return null;
-               } else {
-                       SubSlot ss = new SubSlot(this, subSlotNumber++, jid);
-                       this.subSlots.add(ss);
-                       return ss;
-               }
-       }
-       
-       void returnAllocatedSlot(SubSlot slot) {
-               if (!slot.isReleased()) {
-                       throw new IllegalArgumentException("SubSlot is not 
released.");
-               }
-               
-               this.assignmentGroup.releaseSubSlot(slot, this);
-       }
-       
-       int releaseSlot(SubSlot slot) {
-               if (!this.subSlots.remove(slot)) {
-                       throw new IllegalArgumentException("Wrong shared slot 
for subslot.");
-               }
-               return subSlots.size();
-       }
-       
-       void dispose() {
-               if (subSlots.isEmpty()) {
-                       disposed = true;
-                       this.allocatedSlot.releaseSlot();
-               } else {
-                       throw new IllegalStateException("Cannot dispose while 
subslots are still alive.");
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public String toString() {
-               return "Shared " + allocatedSlot.toString();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
index eb5f9fb..31bd341 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.runtime.jobmanager.scheduler;
 
-import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.SimpleSlot;
 
 public class SlotAllocationFuture {
        
        private final Object monitor = new Object();
        
-       private volatile AllocatedSlot slot;
+       private volatile SimpleSlot slot;
        
        private volatile SlotAllocationFutureAction action;
        
@@ -32,17 +32,17 @@ public class SlotAllocationFuture {
 
        public SlotAllocationFuture() {}
        
-       public SlotAllocationFuture(AllocatedSlot slot) {
+       public SlotAllocationFuture(SimpleSlot slot) {
                this.slot = slot;
        }
        
        // 
--------------------------------------------------------------------------------------------
        
-       public AllocatedSlot waitTillAllocated() throws InterruptedException {
+       public SimpleSlot waitTillAllocated() throws InterruptedException {
                return waitTillAllocated(0);
        }
        
-       public AllocatedSlot waitTillAllocated(long timeout) throws 
InterruptedException {
+       public SimpleSlot waitTillAllocated(long timeout) throws 
InterruptedException {
                synchronized (monitor) {
                        while (slot == null) {
                                monitor.wait(timeout);
@@ -66,7 +66,7 @@ public class SlotAllocationFuture {
                }
        }
        
-       public void setSlot(AllocatedSlot slot) {
+       public void setSlot(SimpleSlot slot) {
                if (slot == null) {
                        throw new NullPointerException();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureAction.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureAction.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureAction.java
index 11137fd..f9d032f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureAction.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureAction.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.jobmanager.scheduler;
 
-import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.SimpleSlot;
 
 /**
  * An action that is invoked once a {@link SlotAllocationFuture} is triggered.
@@ -30,5 +30,5 @@ public interface SlotAllocationFutureAction {
         * 
         * @param slot The slot that has been allocated.
         */
-       void slotAllocated(AllocatedSlot slot);
+       void slotAllocated(SimpleSlot slot);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java
index 639d2b7..f75f294 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmanager.scheduler;
 import org.apache.flink.runtime.instance.Instance;
 
 /**
- * A SlotAvailabilityListener can be notified when new {@link 
org.apache.flink.runtime.instance.AllocatedSlot}s become available
+ * A SlotAvailabilityListener can be notified when new {@link 
org.apache.flink.runtime.instance.AllocatedSlot2}s become available
  * on an {@link org.apache.flink.runtime.instance.Instance}.
  */
 public interface SlotAvailabilityListener {

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
index 7a0546f..70d4510 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
@@ -33,8 +33,10 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.flink.runtime.AbstractID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SharedSlot;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.Slot;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.slf4j.Logger;
 
@@ -42,86 +44,82 @@ import org.slf4j.Logger;
 public class SlotSharingGroupAssignment implements Serializable {
 
        static final long serialVersionUID = 42L;
-       
+
        private static final Logger LOG = Scheduler.LOG;
-       
+
        private transient final Object lock = new Object();
-       
+
        /** All slots currently allocated to this sharing group */
        private final Set<SharedSlot> allSlots = new 
LinkedHashSet<SharedSlot>();
-       
+
        /** The slots available per vertex type (jid), keyed by instance, to 
make them locatable */
        private final Map<AbstractID, Map<Instance, List<SharedSlot>>> 
availableSlotsPerJid = new LinkedHashMap<AbstractID, Map<Instance, 
List<SharedSlot>>>();
-       
-       
+
        // 
--------------------------------------------------------------------------------------------
-       
-       
-       public SubSlot addNewSlotWithTask(AllocatedSlot slot, ExecutionVertex 
vertex) {
-               JobVertexID id = vertex.getJobvertexId();
-               return addNewSlotWithTask(slot, id, id);
-       }
-       
-       public SubSlot addNewSlotWithTask(AllocatedSlot slot, ExecutionVertex 
vertex, CoLocationConstraint constraint) {
-               AbstractID groupId = constraint.getGroupId();
-               return addNewSlotWithTask(slot, groupId, null);
-       }
-       
-       private SubSlot addNewSlotWithTask(AllocatedSlot slot, AbstractID 
groupId, JobVertexID vertexId) {
-               
-               final SharedSlot sharedSlot = new SharedSlot(slot, this);
-               final Instance location = slot.getInstance();
-               
+
+       public SimpleSlot addSharedSlotAndAllocateSubSlot(SharedSlot 
sharedSlot, Locality locality,
+                                                                               
                        AbstractID groupId, CoLocationConstraint constraint) {
+
+               final Instance location = sharedSlot.getInstance();
+
                synchronized (lock) {
                        // add to the total bookkeeping
                        allSlots.add(sharedSlot);
-                       
-                       // allocate us a sub slot to return
-                       SubSlot subslot = sharedSlot.allocateSubSlot(vertexId);
-                       
+
+                       SimpleSlot subSlot = null;
+
+                       if(constraint == null){
+                               // allocate us a sub slot to return
+                               subSlot = sharedSlot.allocateSubSlot(groupId);
+                       } else {
+                               // we need a colocation slot --> a SimpleSlot 
nested in a SharedSlot to host other colocated tasks
+                               SharedSlot constraintGroupSlot = 
sharedSlot.allocateSharedSlot(groupId);
+                               subSlot = 
constraintGroupSlot.allocateSubSlot(null);
+                       }
+
                        // preserve the locality information
-                       subslot.setLocality(slot.getLocality());
-                       
+                       subSlot.setLocality(locality);
+
                        boolean entryForNewJidExists = false;
-                       
+
                        // let the other vertex types know about this one as 
well
                        for (Map.Entry<AbstractID, Map<Instance, 
List<SharedSlot>>> entry : availableSlotsPerJid.entrySet()) {
-                               
+
                                if (entry.getKey().equals(groupId)) {
                                        entryForNewJidExists = true;
                                        continue;
                                }
-                               
+
                                Map<Instance, List<SharedSlot>> available = 
entry.getValue();
                                putIntoMultiMap(available, location, 
sharedSlot);
                        }
-                       
+
                        // make sure an empty entry exists for this group, if 
no other entry exists
                        if (!entryForNewJidExists) {
                                availableSlotsPerJid.put(groupId, new 
LinkedHashMap<Instance, List<SharedSlot>>());
                        }
-                       
-                       return subslot;
+
+                       return subSlot;
                }
        }
-       
+
        /**
         * Gets a slot suitable for the given task vertex. This method will 
prefer slots that are local
         * (with respect to {@link ExecutionVertex#getPreferredLocations()}), 
but will return non local
         * slots if no local slot is available. The method returns null, when 
no slot is available for the
         * given JobVertexID at all.
-        * 
+        *
         * @param vertex
-        * 
+        *
         * @return A task vertex for a task with the given JobVertexID, or 
null, if none is available.
         */
-       public SubSlot getSlotForTask(ExecutionVertex vertex) {
+       public SimpleSlot getSlotForTask(ExecutionVertex vertex) {
                synchronized (lock) {
                        Pair<SharedSlot, Locality> p = 
getSlotForTaskInternal(vertex.getJobvertexId(), vertex, 
vertex.getPreferredLocations(), false);
-                       
+
                        if (p != null) {
                                SharedSlot ss = p.getLeft();
-                               SubSlot slot = 
ss.allocateSubSlot(vertex.getJobvertexId());
+                               SimpleSlot slot = 
ss.allocateSubSlot(vertex.getJobvertexId());
                                slot.setLocality(p.getRight());
                                return slot;
                        }
@@ -129,17 +127,17 @@ public class SlotSharingGroupAssignment implements 
Serializable {
                                return null;
                        }
                }
-               
+
        }
-       
-       public SubSlot getSlotForTask(ExecutionVertex vertex, 
CoLocationConstraint constraint) {
-               
+
+       public SimpleSlot getSlotForTask(ExecutionVertex vertex, 
CoLocationConstraint constraint) {
+
                synchronized (lock) {
                        SharedSlot shared = constraint.getSharedSlot();
-                       
-                       if (shared != null && !shared.isDisposed()) {
+
+                       if (shared != null && !shared.isDead()) {
                                // initialized and set
-                               SubSlot subslot = shared.allocateSubSlot(null);
+                               SimpleSlot subslot = 
shared.allocateSubSlot(null);
                                subslot.setLocality(Locality.LOCAL);
                                return subslot;
                        }
@@ -147,85 +145,92 @@ public class SlotSharingGroupAssignment implements 
Serializable {
                                // not initialized, grab a new slot. preferred 
locations are defined by the vertex
                                // we only associate the slot with the 
constraint, if it was a local match
                                Pair<SharedSlot, Locality> p = 
getSlotForTaskInternal(constraint.getGroupId(), vertex, 
vertex.getPreferredLocations(), false);
+
                                if (p == null) {
                                        return null;
                                } else {
                                        shared = p.getLeft();
                                        Locality l = p.getRight();
-                                       
-                                       SubSlot sub = 
shared.allocateSubSlot(null);
+
+                                       // we need a colocation slot --> 
SimpleSlot nested in a SharedSlot to host other colocated tasks
+                                       SharedSlot constraintGroupSlot = 
shared.allocateSharedSlot(constraint.getGroupId());
+                                       // Depth=3 => groupID==null
+                                       SimpleSlot sub = 
constraintGroupSlot.allocateSubSlot(null);
                                        sub.setLocality(l);
-                                       
+
                                        if (l != Locality.NON_LOCAL) {
-                                               
constraint.setSharedSlot(shared);
+                                               
constraint.setSharedSlot(constraintGroupSlot);
                                        }
                                        return sub;
                                }
                        }
                        else {
                                // disposed. get a new slot on the same instance
-                               Instance location = 
shared.getAllocatedSlot().getInstance();
+                               Instance location = shared.getInstance();
                                Pair<SharedSlot, Locality> p = 
getSlotForTaskInternal(constraint.getGroupId(), vertex, 
Collections.singleton(location), true);
+
                                if (p == null) {
                                        return null;
                                } else {
                                        shared = p.getLeft();
-                                       constraint.setSharedSlot(shared);
-                                       SubSlot subslot = 
shared.allocateSubSlot(null);
-                                       subslot.setLocality(Locality.LOCAL);
-                                       return subslot;
+                                       // we need colocation slot --> 
SimpleSlot nested in a SharedSlot to host other colocated tasks
+                                       SharedSlot constraintGroupSlot = 
shared.allocateSharedSlot(constraint.getGroupId());
+                                       
constraint.setSharedSlot(constraintGroupSlot);
+                                       SimpleSlot subSlot = 
constraintGroupSlot.allocateSubSlot(null);
+                                       subSlot.setLocality(Locality.LOCAL);
+                                       return subSlot;
                                }
                        }
                }
        }
-       
+
        /**
         * NOTE: This method is not synchronized by itself, needs to be 
synchronized externally.
-        * 
+        *
         * @return An allocated sub slot, or {@code null}, if no slot is 
available.
         */
        private Pair<SharedSlot, Locality> getSlotForTaskInternal(AbstractID 
groupId, ExecutionVertex vertex, Iterable<Instance> preferredLocations, boolean 
localOnly) {
+               Map<Instance, List<SharedSlot>> slotsForGroup = 
availableSlotsPerJid.get(groupId);
+
                if (allSlots.isEmpty()) {
                        return null;
                }
-               
-               Map<Instance, List<SharedSlot>> slotsForGroup = 
availableSlotsPerJid.get(groupId);
-               
+
                // get the available slots for the group
                if (slotsForGroup == null) {
                        // no task is yet scheduled for that group, so all 
slots are available
                        slotsForGroup = new LinkedHashMap<Instance, 
List<SharedSlot>>();
                        availableSlotsPerJid.put(groupId, slotsForGroup);
-                       
+
                        for (SharedSlot availableSlot : allSlots) {
-                               putIntoMultiMap(slotsForGroup, 
availableSlot.getAllocatedSlot().getInstance(), availableSlot);
+                               putIntoMultiMap(slotsForGroup, 
availableSlot.getInstance(), availableSlot);
                        }
                }
                else if (slotsForGroup.isEmpty()) {
                        return null;
                }
-               
+
                // check whether we can schedule the task to a preferred 
location
                boolean didNotGetPreferred = false;
-               
+
                if (preferredLocations != null) {
                        for (Instance location : preferredLocations) {
-                               
+
                                // set the flag that we failed a preferred 
location. If one will be found,
                                // we return early anyways and skip the flag 
evaluation
                                didNotGetPreferred = true;
-                               
+
                                SharedSlot slot = 
removeFromMultiMap(slotsForGroup, location);
-                               if (slot != null && !slot.isDisposed()) {
+                               if (slot != null && !slot.isDead()) {
                                        if (LOG.isDebugEnabled()) {
                                                LOG.debug("Local assignment in 
shared group : " + vertex + " --> " + slot);
                                        }
-                                       
+
                                        return new ImmutablePair<SharedSlot, 
Locality>(slot, Locality.LOCAL);
                                }
                        }
                }
-               
+
                // if we want only local assignments, exit now with a "not 
found" result
                if (didNotGetPreferred && localOnly) {
                        if (LOG.isDebugEnabled()) {
@@ -233,84 +238,153 @@ public class SlotSharingGroupAssignment implements 
Serializable {
                        }
                        return null;
                }
-               
+
                // schedule the task to any available location
                SharedSlot slot = pollFromMultiMap(slotsForGroup);
-               if (slot != null && !slot.isDisposed()) {
+               if (slot != null && !slot.isDead()) {
                        if (LOG.isDebugEnabled()) {
                                LOG.debug((didNotGetPreferred ? "Non-local" : 
"Unconstrained") + " assignment in shared group : " + vertex + " --> " + slot);
                        }
-                       
+
                        return new ImmutablePair<SharedSlot, Locality>(slot, 
didNotGetPreferred ? Locality.NON_LOCAL : Locality.UNCONSTRAINED);
                }
                else {
                        return null;
                }
        }
-       
-       
-       void releaseSubSlot(SubSlot subslot, SharedSlot sharedSlot) {
-               
-               AbstractID groupId = subslot.getGroupId();
-               
+
+       /**
+        * Removes the shared slot from the assignment group.
+        *
+        * @param sharedSlot
+        */
+       private void removeSharedSlot(SharedSlot sharedSlot){
+               if (!allSlots.contains(sharedSlot)) {
+                       throw new IllegalArgumentException("Slot was not 
associated with this SlotSharingGroup before.");
+               }
+
+               allSlots.remove(sharedSlot);
+
+               Instance location = sharedSlot.getInstance();
+
+               for(Map.Entry<AbstractID, Map<Instance, List<SharedSlot>>> 
mapEntry: availableSlotsPerJid.entrySet()){
+                       Map<Instance, List<SharedSlot>> map = 
mapEntry.getValue();
+
+                       List<SharedSlot> list = map.get(location);
+
+                       if(list == null || !list.remove(sharedSlot)){
+                               throw new IllegalStateException("Bug: 
SharedSlot was not available to another vertex type that it was not allocated 
for before.");
+                       }
+
+                       if(list.isEmpty()){
+                               map.remove(location);
+                       }
+               }
+
+               sharedSlot.markCancelled();
+
+               returnAllocatedSlot(sharedSlot);
+       }
+
+       /**
+        * Releases the shared slot from the assignment group.
+        * @param sharedSlot The SharedSlot to be released
+        */
+       public void releaseSharedSlot(SharedSlot sharedSlot){
                synchronized (lock) {
+                       Set<Slot> subSlots = sharedSlot.getSubSlots();
 
-                       if (!allSlots.contains(sharedSlot)) {
-                               throw new IllegalArgumentException("Slot was 
not associated with this SlotSharingGroup before.");
+                       for(Slot subSlot: subSlots) {
+
+                               subSlot.markDisposed();
+
+                               if(subSlot instanceof SharedSlot){
+                                       releaseSharedSlot((SharedSlot) subSlot);
+                               }else if(subSlot instanceof SimpleSlot){
+                                       releaseSimpleSlot((SimpleSlot) subSlot);
+                               }
                        }
-                       
-                       int slotsRemaining = sharedSlot.releaseSlot(subslot);
-                       
-                       if (slotsRemaining == 0) {
-                               // this was the last sub slot. remove this from 
the availability list 
-                               // and trigger disposal
-                               try {
-                                       allSlots.remove(sharedSlot);
-                                       
-                                       Instance location = 
sharedSlot.getAllocatedSlot().getInstance();
-
-                                       if (groupId != null) {
-                                               for (Map.Entry<AbstractID, 
Map<Instance, List<SharedSlot>>> mapEntry : availableSlotsPerJid.entrySet()) {
-                                                       AbstractID id = 
mapEntry.getKey();
-                                                       
-                                                       // hack: we identify co 
location hint entries by the fact that they are keyed
-                                                       //       by an abstract 
id, rather than a job vertex id
-                                                       if (id.getClass() == 
AbstractID.class || id.equals(groupId)) {
-                                                               continue;
-                                                       }
-                                                       
-                                                       Map<Instance, 
List<SharedSlot>> map = mapEntry.getValue();
-                                                       List<SharedSlot> list = 
map.get(location);
-                                                       if (list == null || 
!list.remove(sharedSlot)) {
-                                                               throw new 
IllegalStateException("Bug: SharedSlot was not available to another vertex type 
that it was not allocated for before.");
-                                                       }
-                                                       if (list.isEmpty()) {
-                                                               
map.remove(location);
-                                                       }
-                                               }
+
+                       subSlots.clear();
+
+                       returnSlot(sharedSlot);
+               }
+       }
+
+       /**
+        * Releases the simple slot from the assignment group.
+        * @param simpleSlot The SimpleSlot to be released
+        */
+       public void releaseSimpleSlot(SimpleSlot simpleSlot){
+               synchronized (lock) {
+                       simpleSlot.cancel();
+
+                       returnSlot(simpleSlot);
+               }
+
+       }
+
+       /**
+        * Removes the given slot from the assignment group. If the slot is a 
root object, then it has
+        * to be a SharedSlot and it is removed from the availableSlotsPerJid 
field and the slot is
+        * returned to the instance. If the slot is a sub slot of the root 
slot, then this sub slot
+        * is marked available again for tasks of the same group. Otherwise, 
the slot is simply removed
+        * from its parent if it is not already marked as disposed. If a slot 
is already marked to be
+        * disposed, then the releasing was called from a parent slot which 
will take care of the
+        * disposal.
+        *
+        * IMPORTANT: The method is not synchronized. The caller is responsible 
for that.
+        *
+        * @param slot The slot to be returned.
+        */
+       private void returnSlot(Slot slot){
+               // each slot can only be returned once, if a slot is returned 
then it should no longer be used --> markDead
+               if(slot.markDead()) {
+                       // slot is a root slot
+                       if(slot.getParent() == null){
+                               // only SharedSlots are allowed to be root 
slots in a SlotSharingGroupAssignment
+                               if(slot instanceof SharedSlot){
+                                       removeSharedSlot((SharedSlot) slot);
+                               } else {
+                                       throw new IllegalStateException("Simple 
slot cannot be returned from SlotSharingGroupAssignment.");
+                               }
+                       } else {
+                               AbstractID groupID = slot.getGroupID();
+                               SharedSlot parent = slot.getParent();
+
+                               // Only colocation constraint slots (SimpleSlot 
nested in a SharedSlot nested in a SharedSlot) have a groupID==null
+                               // One can also say, all nested slots deeper 
than 2 have a groupID==null
+                               if(groupID != null){
+                                       if (!allSlots.contains(parent)) {
+                                               throw new 
IllegalArgumentException("Slot was not associated with this SlotSharingGroup 
before.");
+                                       }
+
+                                       // make the shared slot available to 
tasks within the group it available to
+                                       Map<Instance, List<SharedSlot>> 
slotsForJid = availableSlotsPerJid.get(groupID);
+
+                                       // sanity check
+                                       if (slotsForJid == null) {
+                                               throw new 
IllegalStateException("Trying to return a slot for group " + groupID +
+                                                               " when 
available slots indicated that all slots were available.");
                                        }
-                               } finally {
-                                       sharedSlot.dispose();
+
+                                       putIntoMultiMap(slotsForJid, 
parent.getInstance(), parent);
                                }
-                       }
-                       else if (groupId != null) {
-                               // make the shared slot available to tasks 
within the group it available to
-                               Map<Instance, List<SharedSlot>> slotsForJid = 
availableSlotsPerJid.get(groupId);
-                               
-                               // sanity check
-                               if (slotsForJid == null) {
-                                       throw new IllegalStateException("Trying 
to return a slot for group " + groupId + 
-                                                       " when available slots 
indicated that all slots were available.");
+
+                               // if no one else takes care of disposal, then 
remove the slot from the parent
+                               if(slot.markDisposed()) {
+                                       if (slot.getParent().freeSubSlot(slot) 
== 0) {
+                                               
releaseSharedSlot(slot.getParent());
+                                       }
                                }
-                               
-                               putIntoMultiMap(slotsForJid, 
sharedSlot.getAllocatedSlot().getInstance(), sharedSlot);
                        }
                }
        }
-       
-       
-       
-       
+
+       private void returnAllocatedSlot(SharedSlot slot){
+               slot.getInstance().returnAllocatedSlot(slot);
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //  State
        // 
--------------------------------------------------------------------------------------------

Reply via email to