[FLINK-1415] [runtime] Clean up archiving of ExecutionGraphs

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ae0dc2d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ae0dc2d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ae0dc2d

Branch: refs/heads/master
Commit: 8ae0dc2d768aecfa3129df553f43d827792b65d7
Parents: db1b8b9
Author: Stephan Ewen <[email protected]>
Authored: Wed Feb 4 13:40:13 2015 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Thu Feb 5 12:17:15 2015 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java | 35 ++++++++---
 .../runtime/executiongraph/ExecutionGraph.java  | 51 ++++++++++++----
 .../executiongraph/ExecutionJobVertex.java      | 41 +++++++++++--
 .../runtime/executiongraph/ExecutionVertex.java | 50 ++++++++++++----
 .../apache/flink/runtime/instance/Instance.java | 62 ++++++++++----------
 .../flink/runtime/instance/SharedSlot.java      |  4 +-
 .../flink/runtime/instance/SimpleSlot.java      |  6 +-
 .../org/apache/flink/runtime/instance/Slot.java | 32 ++++++----
 .../scheduler/CoLocationConstraint.java         |  6 +-
 .../scheduler/NoResourceAvailableException.java |  9 ++-
 .../runtime/jobmanager/scheduler/Scheduler.java |  1 -
 .../scheduler/SlotAvailabilityListener.java     |  4 +-
 .../jobmanager/web/JobManagerInfoServlet.java   | 27 ++++-----
 .../runtime/jobmanager/web/JsonFactory.java     |  7 +--
 .../runtime/jobmanager/web/WebInfoServer.java   |  6 --
 .../flink/runtime/jobmanager/JobManager.scala   |  5 +-
 .../runtime/jobmanager/MemoryArchivist.scala    | 14 ++---
 .../runtime/messages/ArchiveMessages.scala      |  9 +++
 .../runtime/messages/JobmanagerMessages.scala   | 31 ++++++++++
 .../test/javaApiOperators/DataSinkITCase.java   |  2 +-
 20 files changed, 275 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index e1a24c4..27977c3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -22,12 +22,14 @@ import akka.actor.ActorRef;
 import akka.dispatch.OnComplete;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
+
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.deployment.PartitionInfo;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
@@ -40,6 +42,7 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
+
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -78,7 +81,7 @@ import static 
org.apache.flink.runtime.execution.ExecutionState.SCHEDULED;
  */
 public class Execution implements Serializable {
 
-       static final long serialVersionUID = 42L;
+       private static final long serialVersionUID = 42L;
 
        private static final AtomicReferenceFieldUpdater<Execution, 
ExecutionState> STATE_UPDATER =
                        AtomicReferenceFieldUpdater.newUpdater(Execution.class, 
ExecutionState.class, "state");
@@ -97,22 +100,25 @@ public class Execution implements Serializable {
 
        private final int attemptNumber;
 
-       public FiniteDuration timeout;
+       private final FiniteDuration timeout;
 
 
        private volatile ExecutionState state = CREATED;
        
-       private volatile SimpleSlot assignedResource;  // once assigned, never 
changes
+       private volatile SimpleSlot assignedResource;     // once assigned, 
never changes until the execution is archived
        
        private volatile Throwable failureCause;          // once assigned, 
never changes
        
+       private volatile InstanceConnectionInfo assignedResourceLocation; // 
for the archived execution
+       
        // 
--------------------------------------------------------------------------------------------
        
-       public Execution(ExecutionVertex vertex, int attemptNumber, long 
startTimestamp,
-                                       FiniteDuration timeout) {
+       public Execution(ExecutionVertex vertex, int attemptNumber, long 
startTimestamp, FiniteDuration timeout) {
+               checkArgument(attemptNumber >= 0);
+               
                this.vertex = checkNotNull(vertex);
                this.attemptId = new ExecutionAttemptID();
-               checkArgument(attemptNumber >= 0);
+               
                this.attemptNumber = attemptNumber;
 
                this.stateTimestamps = new long[ExecutionState.values().length];
@@ -145,6 +151,10 @@ public class Execution implements Serializable {
                return assignedResource;
        }
        
+       public InstanceConnectionInfo getAssignedResourceLocation() {
+               return assignedResourceLocation;
+       }
+       
        public Throwable getFailureCause() {
                return failureCause;
        }
@@ -161,6 +171,16 @@ public class Execution implements Serializable {
                return state == FINISHED || state == FAILED || state == 
CANCELED;
        }
        
+       /**
+        * This method cleans fields that are irrelevant for the archived 
execution attempt.
+        */
+       public void prepareForArchiving() {
+               if (assignedResource != null && assignedResource.isAlive()) {
+                       throw new IllegalStateException("Cannot archive 
Execution while the assigned resource is still running.");
+               }
+               assignedResource = null;
+       }
+       
        // 
--------------------------------------------------------------------------------------------
        //  Actions
        // 
--------------------------------------------------------------------------------------------
@@ -267,6 +287,7 @@ public class Execution implements Serializable {
                                throw new JobException("Could not assign the 
ExecutionVertex to the slot " + slot);
                        }
                        this.assignedResource = slot;
+                       this.assignedResourceLocation = 
slot.getInstance().getInstanceConnectionInfo();
 
                        // race double check, did we fail/cancel and do we need 
to release the slot?
                        if (this.state != DEPLOYING) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 54c5d3a..3f857e5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import akka.actor.ActorRef;
+
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -36,6 +37,7 @@ import 
org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.Serializable;
@@ -54,7 +56,7 @@ import static akka.dispatch.Futures.future;
 
 public class ExecutionGraph implements Serializable {
 
-       static final long serialVersionUID = 42L;
+       private static final long serialVersionUID = 42L;
 
        private static final AtomicReferenceFieldUpdater<ExecutionGraph, 
JobStatus> STATE_UPDATER =
                        
AtomicReferenceFieldUpdater.newUpdater(ExecutionGraph.class, JobStatus.class, 
"state");
@@ -71,10 +73,10 @@ public class ExecutionGraph implements Serializable {
        private final String jobName;
 
        /** The job configuration that was originally attached to the JobGraph. 
*/
-       private transient final Configuration jobConfiguration;
+       private final Configuration jobConfiguration;
 
        /** The classloader of the user code. */
-       private final ClassLoader userClassLoader;
+       private ClassLoader userClassLoader;
 
        /** All job vertices that are part of this graph */
        private final ConcurrentHashMap<JobVertexID, ExecutionJobVertex> tasks;
@@ -83,20 +85,20 @@ public class ExecutionGraph implements Serializable {
        private final List<ExecutionJobVertex> verticesInCreationOrder;
 
        /** All intermediate results that are part of this graph */
-       private transient final ConcurrentHashMap<IntermediateDataSetID, 
IntermediateResult> intermediateResults;
+       private final ConcurrentHashMap<IntermediateDataSetID, 
IntermediateResult> intermediateResults;
 
        /** The currently executed tasks, for callbacks */
-       private transient final ConcurrentHashMap<ExecutionAttemptID, 
Execution> currentExecutions;
+       private final ConcurrentHashMap<ExecutionAttemptID, Execution> 
currentExecutions;
 
-       private transient final List<BlobKey> requiredJarFiles;
+       private final List<BlobKey> requiredJarFiles;
 
-       private transient final List<ActorRef> jobStatusListenerActors;
+       private final List<ActorRef> jobStatusListenerActors;
 
-       private transient final List<ActorRef> executionListenerActors;
+       private final List<ActorRef> executionListenerActors;
 
        private final long[] stateTimestamps;
 
-       private transient final Object progressLock = new Object();
+       private final Object progressLock = new Object();
 
        private int nextVertexToFinish;
 
@@ -110,12 +112,13 @@ public class ExecutionGraph implements Serializable {
 
        private volatile Throwable failureCause;
 
-       private transient Scheduler scheduler;
+       private Scheduler scheduler;
 
        private boolean allowQueuedScheduling = true;
 
        private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES;
 
+       
        public ExecutionGraph(JobID jobId, String jobName, Configuration 
jobConfig, FiniteDuration timeout) {
                this(jobId, jobName, jobConfig, timeout, new 
ArrayList<BlobKey>());
        }
@@ -598,8 +601,11 @@ public class ExecutionGraph implements Serializable {
        public void restart() {
                try {
                        if (state == JobStatus.FAILED) {
-                               transitionState(JobStatus.FAILED, 
JobStatus.RESTARTING);
+                               if (!transitionState(JobStatus.FAILED, 
JobStatus.RESTARTING)) {
+                                       throw new 
IllegalStateException("Execution Graph left the state FAILED while trying to 
restart.");
+                               }
                        }
+                       
                        synchronized (progressLock) {
                                if (state != JobStatus.RESTARTING) {
                                        throw new IllegalStateException("Can 
only restart job from state restarting.");
@@ -627,4 +633,27 @@ public class ExecutionGraph implements Serializable {
                        fail(t);
                }
        }
+       
+       /**
+        * This method cleans fields that are irrelevant for the archived 
execution attempt.
+        */
+       public void prepareForArchiving() {
+               if (!state.isTerminalState()) {
+                       throw new IllegalStateException("Can only archive the 
job from a terminal state");
+               }
+               
+               userClassLoader = null;
+               
+               for (ExecutionJobVertex vertex : verticesInCreationOrder) {
+                       vertex.prepareForArchiving();
+               }
+               
+               intermediateResults.clear();
+               currentExecutions.clear();
+               requiredJarFiles.clear();
+               jobStatusListenerActors.clear();
+               executionListenerActors.clear();
+               
+               scheduler = null;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 7eaa1e6..e2febc7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -42,12 +42,13 @@ import scala.concurrent.duration.FiniteDuration;
 
 
 public class ExecutionJobVertex implements Serializable {
-       static final long serialVersionUID = 42L;
+       
+       private static final long serialVersionUID = 42L;
        
        /** Use the same log for all ExecutionGraph classes */
        private static final Logger LOG = ExecutionGraph.LOG;
        
-       private transient final Object stateMonitor = new Object();
+       private final Object stateMonitor = new Object();
        
        private final ExecutionGraph graph;
        
@@ -55,9 +56,9 @@ public class ExecutionJobVertex implements Serializable {
        
        private final ExecutionVertex[] taskVertices;
 
-       private transient final IntermediateResult[] producedDataSets;
+       private IntermediateResult[] producedDataSets;
        
-       private transient final List<IntermediateResult> inputs;
+       private final List<IntermediateResult> inputs;
        
        private final int parallelism;
        
@@ -71,7 +72,7 @@ public class ExecutionJobVertex implements Serializable {
        
        private final InputSplit[] inputSplits;
        
-       private transient InputSplitAssigner splitAssigner;
+       private InputSplitAssigner splitAssigner;
        
        
        public ExecutionJobVertex(ExecutionGraph graph, AbstractJobVertex 
jobVertex,
@@ -308,6 +309,36 @@ public class ExecutionJobVertex implements Serializable {
                }
        }
        
+       /**
+        * This method cleans fields that are irrelevant for the archived 
execution attempt.
+        */
+       public void prepareForArchiving() {
+               
+               for (ExecutionVertex vertex : taskVertices) {
+                       vertex.prepareForArchiving();
+               }
+               
+               // clear intermediate results
+               inputs.clear();
+               producedDataSets = null;
+               
+               // reset shared groups
+               if (slotSharingGroup != null) {
+                       slotSharingGroup.clearTaskAssignment();
+               }
+               if (coLocationGroup != null) {
+                       coLocationGroup.resetConstraints();
+               }
+               
+               // reset splits and split assigner
+               splitAssigner = null;
+               if (inputSplits != null) {
+                       for (int i = 0; i < inputSplits.length; i++) {
+                               inputSplits[i] = null;
+                       }
+               }
+       }
+       
        
//---------------------------------------------------------------------------------------------
        //  Notifications
        
//---------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index d3993bb..1092f89 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -37,6 +38,7 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.slf4j.Logger;
+
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.Serializable;
@@ -56,9 +58,8 @@ import static 
org.apache.flink.runtime.execution.ExecutionState.FINISHED;
  */
 public class ExecutionVertex implements Serializable {
 
-       static final long serialVersionUID = 42L;
+       private static final long serialVersionUID = 42L;
 
-       @SuppressWarnings("unused")
        private static final Logger LOG = ExecutionGraph.LOG;
        
        private static final int MAX_DISTINCT_LOCATIONS_TO_CONSIDER = 8;
@@ -67,9 +68,9 @@ public class ExecutionVertex implements Serializable {
        
        private final ExecutionJobVertex jobVertex;
        
-       private transient final IntermediateResultPartition[] resultPartitions;
+       private IntermediateResultPartition[] resultPartitions;
        
-       private transient ExecutionEdge[][] inputEdges;
+       private ExecutionEdge[][] inputEdges;
        
        private final int subTaskIndex;
        
@@ -182,6 +183,10 @@ public class ExecutionVertex implements Serializable {
                return currentExecution.getAssignedResource();
        }
        
+       public InstanceConnectionInfo getCurrentAssignedResourceLocation() {
+               return currentExecution.getAssignedResourceLocation();
+       }
+       
        public ExecutionGraph getExecutionGraph() {
                return this.jobVertex.getGraph();
        }
@@ -322,6 +327,11 @@ public class ExecutionVertex implements Serializable {
        // 
--------------------------------------------------------------------------------------------
 
        public void resetForNewExecution() {
+               
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Resetting exection vertex {} for new 
execution.", getSimpleName());
+               }
+               
                synchronized (priorExecutions) {
                        Execution execution = currentExecution;
                        ExecutionState state = execution.getState();
@@ -369,6 +379,31 @@ public class ExecutionVertex implements Serializable {
 
                return 
currentExecution.scheduleOrUpdateConsumers(partition.getConsumers());
        }
+       
+       /**
+        * This method cleans fields that are irrelevant for the archived 
execution attempt.
+        */
+       public void prepareForArchiving() {
+               Execution execution = currentExecution;
+               ExecutionState state = execution.getState();
+
+               // sanity check
+               if (!(state == FINISHED || state == CANCELED || state == 
FAILED)) {
+                       throw new IllegalStateException("Cannot archive 
ExecutionVertex that is not in a finished state.");
+               }
+               
+               // prepare the current execution for archiving
+               execution.prepareForArchiving();
+               
+               // prepare previous executions for archiving
+               for (Execution exec : priorExecutions) {
+                       exec.prepareForArchiving();
+               }
+               
+               // clear the unnecessary fields in this class
+               this.resultPartitions = null;
+               this.inputEdges = null;
+       }
 
        // 
--------------------------------------------------------------------------------------------
        //   Notifications from the Execution Attempt
@@ -447,13 +482,6 @@ public class ExecutionVertex implements Serializable {
                return getTaskName() + " (" + (getParallelSubtaskIndex()+1) + 
'/' + getTotalNumberOfParallelSubtasks() + ')';
        }
 
-       /*
-        * Clears all Edges of this ExecutionVertex
-        */
-       public void clearExecutionEdges() {
-               inputEdges = null;
-       }
-
        @Override
        public String toString() {
                return getSimpleName();

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index 4f9dc7f..a5a9263 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.instance;
 
-import java.io.Serializable;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -33,17 +32,16 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
 import 
org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment;
 
 /**
- * An taskManager represents a resource a {@link 
org.apache.flink.runtime.taskmanager.TaskManager} runs on.
+ * An instance represents a {@link 
org.apache.flink.runtime.taskmanager.TaskManager}
+ * registered at a JobManager and ready to receive work.
  */
-public class Instance implements Serializable {
-
-       static final long serialVersionUID = 42L;
+public class Instance {
        
        /** The lock on which to synchronize allocations and failure state 
changes */
-       private transient final Object instanceLock = new Object();
+       private final Object instanceLock = new Object();
        
        /** The actor ref to the task manager represented by this taskManager. 
*/
-       private transient final ActorRef taskManager;
+       private final ActorRef taskManager;
 
        /** The instance connection information for the data transfer. */
        private final InstanceConnectionInfo connectionInfo;
@@ -58,28 +56,27 @@ public class Instance implements Serializable {
        private final int numberOfSlots;
 
        /** A list of available slot positions */
-       private transient final Queue<Integer> availableSlots;
+       private final Queue<Integer> availableSlots;
        
        /** Allocated slots on this taskManager */
        private final Set<Slot> allocatedSlots = new HashSet<Slot>();
 
-       
        /** A listener to be notified upon new slot availability */
-       private transient SlotAvailabilityListener slotAvailabilityListener;
+       private SlotAvailabilityListener slotAvailabilityListener;
        
-       /**
-        * Time when last heat beat has been received from the task manager 
running on this taskManager.
-        */
+       /** Time when last heat beat has been received from the task manager 
running on this taskManager. */
        private volatile long lastReceivedHeartBeat = 
System.currentTimeMillis();
        
+       /** Flag marking the instance as alive or as dead. */
        private volatile boolean isDead;
 
        // 
--------------------------------------------------------------------------------------------
        
        /**
-        * Constructs an abstract taskManager object.
+        * Constructs an instance reflecting a registered TaskManager.
         * 
         * @param taskManager The actor reference of the represented task 
manager.
+        * @param connectionInfo The remote connection where the task manager 
receives requests.
         * @param id The id under which the taskManager is registered.
         * @param resources The resources available on the machine.
         * @param numberOfSlots The number of task slots offered by this 
taskManager.
@@ -123,15 +120,23 @@ public class Instance implements Serializable {
        }
 
        public void markDead() {
+               
+               // create a copy of the slots to avoid concurrent modification 
exceptions
+               List<Slot> slots;
+               
                synchronized (instanceLock) {
                        if (isDead) {
                                return;
                        }
-
                        isDead = true;
 
                        // no more notifications for the slot releasing
                        this.slotAvailabilityListener = null;
+                       
+                       slots = new ArrayList<Slot>(allocatedSlots);
+                       
+                       allocatedSlots.clear();
+                       availableSlots.clear();
                }
 
                /*
@@ -139,14 +144,9 @@ public class Instance implements Serializable {
                 * owning the assignment group lock wants to give itself back 
to the instance which requires
                 * the instance lock
                 */
-               for (Slot slot : allocatedSlots) {
+               for (Slot slot : slots) {
                        slot.releaseSlot();
                }
-
-               synchronized (instanceLock) {
-                       allocatedSlots.clear();
-                       availableSlots.clear();
-               }
        }
 
 
@@ -185,9 +185,9 @@ public class Instance implements Serializable {
        // 
--------------------------------------------------------------------------------------------
        // Resource allocation
        // 
--------------------------------------------------------------------------------------------
-
+       
        public SimpleSlot allocateSimpleSlot(JobID jobID) throws 
InstanceDiedException {
-               return allocateSimpleSlot(jobID, jobID);
+               return allocateSimpleSlot(jobID, new AbstractID());
        }
        
        public SimpleSlot allocateSimpleSlot(JobID jobID, AbstractID groupID) 
throws InstanceDiedException {
@@ -211,8 +211,9 @@ public class Instance implements Serializable {
                }
        }
 
-       public SharedSlot allocateSharedSlot(JobID jobID, 
SlotSharingGroupAssignment sharingGroupAssignment, AbstractID groupID) throws
-       InstanceDiedException {
+       public SharedSlot allocateSharedSlot(JobID jobID, 
SlotSharingGroupAssignment sharingGroupAssignment, AbstractID groupID)
+                       throws InstanceDiedException
+       {
                // the slot needs to be in the returned to taskManager state
                if (jobID == null) {
                        throw new IllegalArgumentException();
@@ -227,8 +228,7 @@ public class Instance implements Serializable {
                        if (nextSlot == null) {
                                return null;
                        } else {
-                               SharedSlot slot = new SharedSlot(jobID, this, 
nextSlot,
-                                               sharingGroupAssignment, null, 
groupID);
+                               SharedSlot slot = new SharedSlot(jobID, this, 
nextSlot, sharingGroupAssignment, null, groupID);
                                allocatedSlots.add(slot);
                                return slot;
                        }
@@ -267,10 +267,10 @@ public class Instance implements Serializable {
        }
        
        public void cancelAndReleaseAllSlots() {
-               List<Slot> copy = null;
-
+               
+               // we need to do this copy because of concurrent modification 
exceptions
+               List<Slot> copy;
                synchronized (instanceLock) {
-                       // we need to do this copy because of concurrent 
modification exceptions
                        copy = new ArrayList<Slot>(this.allocatedSlots);
                }
                        
@@ -329,7 +329,7 @@ public class Instance implements Serializable {
        
        @Override
        public String toString() {
-               return instanceId + " @" + (taskManager != null ? 
taskManager.path() : "ActorRef.noSender") + " " +
+               return instanceId + " @ " + (taskManager != null ? 
taskManager.path() : "ActorRef.noSender") + " - " +
                                numberOfSlots + " slots" + " - " + hashCode();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
index 2efcf6c..576db22 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
@@ -117,7 +117,7 @@ public class SharedSlot extends Slot {
                if(isDead()){
                        return null;
                } else {
-                       SimpleSlot slot = new SimpleSlot(jobID, instance, 
subSlots.size(), this, jID);
+                       SimpleSlot slot = new SimpleSlot(getJobID(), 
getInstance(), subSlots.size(), this, jID);
                        subSlots.add(slot);
 
                        return slot;
@@ -128,7 +128,7 @@ public class SharedSlot extends Slot {
                if(isDead()){
                        return null;
                } else {
-                       SharedSlot slot = new SharedSlot(jobID, instance, 
subSlots.size(), assignmentGroup, this, jID);
+                       SharedSlot slot = new SharedSlot(getJobID(), 
getInstance(), subSlots.size(), assignmentGroup, this, jID);
                        subSlots.add(slot);
 
                        return slot;

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
index 5b1af57..1d42f1d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
@@ -71,7 +71,7 @@ public class SimpleSlot extends Slot {
                }
 
                // check that we can actually run in this slot
-               if (status != ALLOCATED_AND_ALIVE) {
+               if (getStatus() != ALLOCATED_AND_ALIVE) {
                        return false;
                }
 
@@ -81,7 +81,7 @@ public class SimpleSlot extends Slot {
                }
 
                // we need to do a double check that we were not cancelled in 
the meantime
-               if (status != ALLOCATED_AND_ALIVE) {
+               if (getStatus() != ALLOCATED_AND_ALIVE) {
                        this.executedTask = null;
                        return false;
                }
@@ -112,7 +112,7 @@ public class SimpleSlot extends Slot {
                                getParent().disposeChild(this);
                        } else {
                                // we have to give back the slot to the owning 
instance
-                               instance.returnAllocatedSlot(this);
+                               getInstance().returnAllocatedSlot(this);
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
index fb62c4c..2cf727c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
@@ -24,10 +24,12 @@ import org.apache.flink.runtime.jobgraph.JobID;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 /**
- * Base class for slots.
+ * Base class for task slots. TaskManagers offer one or more task slots, they 
define how many
+ * parallel tasks or task groups a TaskManager executes.
  */
 public abstract class Slot {
-       protected static final AtomicIntegerFieldUpdater<Slot> STATUS_UPDATER =
+       
+       private static final AtomicIntegerFieldUpdater<Slot> STATUS_UPDATER =
                        AtomicIntegerFieldUpdater.newUpdater(Slot.class, 
"status");
 
        protected static final int ALLOCATED_AND_ALIVE = 0;             // 
tasks may be added and might be running
@@ -35,25 +37,27 @@ public abstract class Slot {
        protected static final int RELEASED = 2;                                
        // has been given back to the instance
 
        /** The ID of the job this slice belongs to. */
-       protected final JobID jobID;
+       private final JobID jobID;
 
+       /** The id of the group that this slot is allocated to */
+       private final AbstractID groupID;
+       
        /** The instance on which the slot is allocated */
-       protected final Instance instance;
+       private final Instance instance;
+       
+       /** The parent of this slot in the hierarchy, or null, if this is the 
parent */
+       private final SharedSlot parent;
 
        /** The number of the slot on which the task is deployed */
-       protected final int slotNumber;
+       private final int slotNumber;
 
        /** The state of the vertex, only atomically updated */
-       protected volatile int status = ALLOCATED_AND_ALIVE;
+       private volatile int status = ALLOCATED_AND_ALIVE;
 
        /** Indicates whether this slot was marked dead by the system */
-       private boolean dead = false;
+       private volatile boolean dead = false;
 
-       private final AbstractID groupID;
-
-       private final SharedSlot parent;
-
-       private boolean disposed = false;
+       private volatile boolean disposed = false;
 
 
        public Slot(JobID jobID, Instance instance, int slotNumber, SharedSlot 
parent, AbstractID groupID) {
@@ -102,6 +106,10 @@ public abstract class Slot {
                        return parent.getRoot();
                }
        }
+       
+       public int getStatus() {
+               return status;
+       }
 
        public abstract int getNumberLeaves();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
index 8ef61b9..902b5a1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
@@ -24,11 +24,7 @@ import org.apache.flink.runtime.instance.Instance;
 import com.google.common.base.Preconditions;
 import org.apache.flink.runtime.instance.SharedSlot;
 
-import java.io.Serializable;
-
-public class CoLocationConstraint implements Serializable {
-
-       static final long serialVersionUID = 42L;
+public class CoLocationConstraint {
        
        private final CoLocationGroup group;
        

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
index 93b4541..2b86c43 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
@@ -54,7 +54,9 @@ public class NoResourceAvailableException extends 
JobException {
        public NoResourceAvailableException(String message) {
                super(message);
        }
-
+       
+       // 
--------------------------------------------------------------------------------------------
+       
        @Override
        public boolean equals(Object obj){
                if(obj == null){
@@ -67,4 +69,9 @@ public class NoResourceAvailableException extends 
JobException {
                        return 
getMessage().equals(((NoResourceAvailableException)obj).getMessage());
                }
        }
+       
+       @Override
+       public int hashCode() {
+               return getMessage().hashCode();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index c237aa5..1ad7a50 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -408,7 +408,6 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
         * local instance. If no such instance exists (all slots occupied), 
then return null.
         *
         * @param requestedLocations
-        * @return
         */
        private Pair<Instance, Locality> findInstance(Iterable<Instance> 
requestedLocations){
                if (this.instancesWithAvailableResources.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java
index f75f294..0f02748 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.jobmanager.scheduler;
 import org.apache.flink.runtime.instance.Instance;
 
 /**
- * A SlotAvailabilityListener can be notified when new {@link 
org.apache.flink.runtime.instance.AllocatedSlot2}s become available
- * on an {@link org.apache.flink.runtime.instance.Instance}.
+ * A SlotAvailabilityListener can be notified when new
+ * {@link org.apache.flink.runtime.instance.Slot}s become available on an 
{@link Instance}.
  */
 public interface SlotAvailabilityListener {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
index 1492ae1..6de3b39 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
@@ -34,16 +34,15 @@ import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
 import akka.actor.ActorRef;
+
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.messages.ArchiveMessages.ArchivedJobs;
-import org.apache.flink.runtime.messages.ArchiveMessages.RequestArchivedJobs$;
+import org.apache.flink.runtime.messages.ArchiveMessages;
+import org.apache.flink.runtime.messages.JobManagerMessages;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.AccumulatorResultsResponse;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.AccumulatorResultsFound;
 import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobs;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestRunningJobs$;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestTotalNumberOfSlots$;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestNumberRegisteredTaskManager$;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationResponse;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestAccumulatorResults;
@@ -64,6 +63,7 @@ import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.StringUtils;
 import org.eclipse.jetty.io.EofException;
+
 import scala.concurrent.duration.FiniteDuration;
 
 public class JobManagerInfoServlet extends HttpServlet {
@@ -94,7 +94,7 @@ public class JobManagerInfoServlet extends HttpServlet {
                try {
                        if("archive".equals(req.getParameter("get"))) {
                                List<ExecutionGraph> archivedJobs = new 
ArrayList<ExecutionGraph>(AkkaUtils
-                                               
.<ArchivedJobs>ask(archive,RequestArchivedJobs$.MODULE$, timeout)
+                                               .<ArchivedJobs>ask(archive, 
ArchiveMessages.getRequestArchivedJobs(), timeout)
                                                .asJavaCollection());
 
                                writeJsonForArchive(resp.getWriter(), 
archivedJobs);
@@ -129,9 +129,9 @@ public class JobManagerInfoServlet extends HttpServlet {
                        }
                        else if("taskmanagers".equals(req.getParameter("get"))) 
{
                                int numberOfTaskManagers = 
AkkaUtils.<Integer>ask(jobmanager,
-                                               
RequestNumberRegisteredTaskManager$.MODULE$, timeout);
+                                               
JobManagerMessages.getRequestNumberRegisteredTaskManager(), timeout);
                                int numberOfRegisteredSlots = 
AkkaUtils.<Integer>ask(jobmanager,
-                                               
RequestTotalNumberOfSlots$.MODULE$, timeout);
+                                               
JobManagerMessages.getRequestTotalNumberOfSlots(), timeout);
 
                                resp.getWriter().write("{\"taskmanagers\": " + 
numberOfTaskManagers +", " +
                                                "\"slots\": 
"+numberOfRegisteredSlots+"}");
@@ -149,7 +149,7 @@ public class JobManagerInfoServlet extends HttpServlet {
                        }
                        else{
                                Iterable<ExecutionGraph> runningJobs = 
AkkaUtils.<RunningJobs>ask
-                                               (jobmanager, 
RequestRunningJobs$.MODULE$, timeout).asJavaIterable();
+                                               (jobmanager, 
JobManagerMessages.getRequestRunningJobs(), timeout).asJavaIterable();
                                writeJsonForJobs(resp.getWriter(), runningJobs);
                        }
                        
@@ -292,17 +292,16 @@ public class JobManagerInfoServlet extends HttpServlet {
                                boolean first = true;
                                for (ExecutionVertex vertex : 
graph.getAllExecutionVertices()) {
                                        if (vertex.getExecutionState() == 
ExecutionState.FAILED) {
-                                               SimpleSlot slot = 
vertex.getCurrentAssignedResource();
+                                               InstanceConnectionInfo location 
= vertex.getCurrentAssignedResourceLocation();
                                                Throwable failureCause = 
vertex.getFailureCause();
-                                               if (slot != null || 
failureCause != null) {
+                                               if (location != null || 
failureCause != null) {
                                                        if (first) {
                                                                first = false;
                                                        } else {
                                                                wrt.write(",");
                                                        }
                                                        wrt.write("{");
-                                                       wrt.write("\"node\": 
\"" + (slot == null ? "(none)" : slot
-                                                                       
.getInstance().getInstanceConnectionInfo().getFQDNHostname()) + "\",");
+                                                       wrt.write("\"node\": 
\"" + (location == null ? "(none)" : location.getFQDNHostname()) + "\",");
                                                        wrt.write("\"message\": 
\"" + (failureCause == null ? "" : 
StringUtils.escapeHtml(ExceptionUtils.stringifyException(failureCause))) + 
"\"");
                                                        wrt.write("}");
                                                }
@@ -421,7 +420,7 @@ public class JobManagerInfoServlet extends HttpServlet {
                
                try {
                        Iterable<ExecutionGraph> graphs = 
AkkaUtils.<RunningJobs>ask(jobmanager,
-                                       RequestRunningJobs$.MODULE$, 
timeout).asJavaIterable();
+                                       
ArchiveMessages.getRequestArchivedJobs(), timeout).asJavaIterable();
                        
                        //Serialize job to json
                        wrt.write("{");

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
index 8e46692..89e55d0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
@@ -22,7 +22,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.IntermediateResult;
-import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.util.StringUtils;
 
 import java.util.HashMap;
@@ -38,9 +38,8 @@ public class JsonFactory {
                json.append("\"vertexname\": \"" + 
StringUtils.escapeHtml(vertex.getSimpleName()) + "\",");
                json.append("\"vertexstatus\": \"" + vertex.getExecutionState() 
+ "\",");
                
-               SimpleSlot slot = vertex.getCurrentAssignedResource();
-               String instanceName = slot == null ? "(null)" : 
slot.getInstance()
-                               .getInstanceConnectionInfo().getFQDNHostname();
+               InstanceConnectionInfo location = 
vertex.getCurrentAssignedResourceLocation();
+               String instanceName = location == null ? "(null)" : 
location.getFQDNHostname();
                
                json.append("\"vertexinstancename\": \"" + instanceName + "\"");
                json.append("}");

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
index 71347ee..6f3720e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
@@ -64,11 +64,6 @@ public class WebInfoServer {
        private final Server server;
 
        /**
-        * Timeout for akka requests
-        */
-       private final FiniteDuration timeout;
-
-       /**
         * Port for info server
         */
        private int port;
@@ -92,7 +87,6 @@ public class WebInfoServer {
                
                this.port = 
config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
                                
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
-               this.timeout = timeout;
 
                // get base path of Flink installation
                final String basePath = 
config.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, "");

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 532f7f8..79b9a74 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -460,7 +460,10 @@ class JobManager(val configuration: Configuration)
    */
   private def removeJob(jobID: JobID): Unit = {
     currentJobs.remove(jobID) match {
-      case Some((eg, _)) => archive ! ArchiveExecutionGraph(jobID, eg)
+      case Some((eg, _)) => {
+        eg.prepareForArchiving()
+        archive ! ArchiveExecutionGraph(jobID, eg)
+      }
       case None =>
     }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index 5ca8fb3..28e960d 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -42,13 +42,7 @@ ActorLogging {
       // wrap graph inside a soft reference
       graphs.update(jobID, new SoftReference(graph))
 
-      // clear all execution edges of the graph
-      val iter = graph.getAllExecutionVertices().iterator()
-      while (iter.hasNext) {
-        iter.next().clearExecutionEdges()
-      }
-
-      cleanup(jobID)
+      trimHistory()
     }
 
     case RequestArchivedJobs => {
@@ -89,17 +83,17 @@ ActorLogging {
       case Some(graph) => graph
       case None => null
     }
+    case None => null
   }
 
   /**
    * Remove old ExecutionGraphs belonging to a jobID
    * * if more than max_entries are in the queue.
-   * @param jobID
    */
-  private def cleanup(jobID: JobID): Unit = {
+  private def trimHistory(): Unit = {
     while (graphs.size > max_entries) {
       // get first graph inserted
-      val (jobID, value) = graphs.iterator.next()
+      val (jobID, value) = graphs.head
       graphs.remove(jobID)
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
index 03a5351..884bc2a 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.jobgraph.JobID
  * This object contains the archive specific messages.
  */
 object ArchiveMessages {
+  
   case class ArchiveExecutionGraph(jobID: JobID, graph: ExecutionGraph)
 
   /**
@@ -47,4 +48,12 @@ object ArchiveMessages {
       jobs.asJavaCollection
     }
   }
+  
+  // --------------------------------------------------------------------------
+  // Utility methods to allow simpler case object access from Java
+  // --------------------------------------------------------------------------
+  
+  def getRequestArchivedJobs() : AnyRef = {
+    RequestArchivedJobs
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala
index fbacbd2..7ce013b 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala
@@ -326,4 +326,35 @@ object JobManagerMessages {
 
   case object JobManagerStatusAlive extends JobManagerStatus
 
+    // 
--------------------------------------------------------------------------
+  // Utility methods to allow simpler case object access from Java
+  // --------------------------------------------------------------------------
+  
+  def getRequestNumberRegisteredTaskManager() : AnyRef = {
+    RequestNumberRegisteredTaskManager
+  }
+  
+  def getRequestTotalNumberOfSlots() : AnyRef = {
+    RequestTotalNumberOfSlots
+  }
+  
+  def getRequestBlobManagerPort() : AnyRef = {
+    RequestBlobManagerPort
+  }
+  
+  def getRequestRunningJobs() : AnyRef = {
+    RequestRunningJobs
+  }
+  
+  def getRequestRegisteredTaskManagers() : AnyRef = {
+    RequestRegisteredTaskManagers
+  }
+  
+  def getRequestJobManagerStatus() : AnyRef = {
+    RequestJobManagerStatus
+  }
+  
+  def getJobManagerStatusAlive() : AnyRef = {
+    JobManagerStatusAlive
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
index ca9dc16..d8663e8 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
@@ -41,7 +41,7 @@ import static org.junit.Assert.assertTrue;
 /**
  * Tests for data sinks
  */
-
+@SuppressWarnings("serial")
 @RunWith(Parameterized.class)
 public class DataSinkITCase extends MultipleProgramsTestBase {
 

Reply via email to