http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
deleted file mode 100644
index 92d457b..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-import org.apache.flink.runtime.AbstractID;
-import org.apache.flink.runtime.instance.AllocatedSlot;
-
-public class SubSlot extends AllocatedSlot {
-
-       private static final long serialVersionUID = 1361615219044538497L;
-       
-
-       private final SharedSlot sharedSlot;
-       
-       private final AbstractID groupId;
-       
-       private final int subSlotNumber;
-       
-       
-       public SubSlot(SharedSlot sharedSlot, int subSlotNumber, AbstractID 
groupId) {
-               super(sharedSlot.getAllocatedSlot().getJobID(),
-                               sharedSlot.getAllocatedSlot().getInstance(),
-                               sharedSlot.getAllocatedSlot().getSlotNumber());
-               
-               this.sharedSlot = sharedSlot;
-               this.groupId = groupId;
-               this.subSlotNumber = subSlotNumber;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       public void releaseSlot() {
-               // cancel everything, if there is something. since this is 
atomically status based,
-               // it will not happen twice if another attempt happened before 
or concurrently
-               try {
-                       cancel();
-               }
-               finally {
-                       if (markReleased()) {
-                               this.sharedSlot.returnAllocatedSlot(this);
-                       }
-               }
-       }
-       
-       public SharedSlot getSharedSlot() {
-               return this.sharedSlot;
-       }
-       
-       public AbstractID getGroupId() {
-               return groupId;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public String toString() {
-               return "SubSlot " + subSlotNumber + " (" + super.toString() + 
')';
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
new file mode 100644
index 0000000..1492ae1
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.web;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import akka.actor.ActorRef;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.messages.ArchiveMessages.ArchivedJobs;
+import org.apache.flink.runtime.messages.ArchiveMessages.RequestArchivedJobs$;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.AccumulatorResultsResponse;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.AccumulatorResultsFound;
+import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobs;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestRunningJobs$;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestTotalNumberOfSlots$;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestNumberRegisteredTaskManager$;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationResponse;
+import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestAccumulatorResults;
+import org.apache.flink.runtime.messages.JobManagerMessages.RequestJob;
+import org.apache.flink.runtime.messages.JobManagerMessages.JobResponse;
+import org.apache.flink.runtime.messages.JobManagerMessages.JobFound;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.StringUtils;
+import org.eclipse.jetty.io.EofException;
+import scala.concurrent.duration.FiniteDuration;
+
+public class JobManagerInfoServlet extends HttpServlet {
+       
+       private static final long serialVersionUID = 1L;
+       
+       private static final Logger LOG = 
LoggerFactory.getLogger(JobManagerInfoServlet.class);
+       
+       /** Underlying JobManager */
+       private final ActorRef jobmanager;
+       private final ActorRef archive;
+       private final FiniteDuration timeout;
+       
+       
+       public JobManagerInfoServlet(ActorRef jobmanager, ActorRef archive, 
FiniteDuration timeout) {
+               this.jobmanager = jobmanager;
+               this.archive = archive;
+               this.timeout = timeout;
+       }
+       
+       
+       @Override
+       protected void doGet(HttpServletRequest req, HttpServletResponse resp) 
throws ServletException, IOException {
+                       
+               resp.setStatus(HttpServletResponse.SC_OK);
+               resp.setContentType("application/json");
+               
+               try {
+                       if("archive".equals(req.getParameter("get"))) {
+                               List<ExecutionGraph> archivedJobs = new 
ArrayList<ExecutionGraph>(AkkaUtils
+                                               
.<ArchivedJobs>ask(archive,RequestArchivedJobs$.MODULE$, timeout)
+                                               .asJavaCollection());
+
+                               writeJsonForArchive(resp.getWriter(), 
archivedJobs);
+                       }
+                       else if("job".equals(req.getParameter("get"))) {
+                               String jobId = req.getParameter("job");
+                               JobResponse response = AkkaUtils.ask(archive,
+                                               new 
RequestJob(JobID.fromHexString(jobId)), timeout);
+
+                               if(response instanceof JobFound){
+                                       ExecutionGraph archivedJob = 
((JobFound)response).executionGraph();
+                                       
writeJsonForArchivedJob(resp.getWriter(), archivedJob);
+                               }else{
+                                       LOG.warn("DoGet:job: Could not find job 
for job ID " + jobId);
+                               }
+                       }
+                       else if("groupvertex".equals(req.getParameter("get"))) {
+                               String jobId = req.getParameter("job");
+                               String groupvertexId = 
req.getParameter("groupvertex");
+
+                               JobResponse response = AkkaUtils.ask(archive,
+                                               new 
RequestJob(JobID.fromHexString(jobId)), timeout);
+
+                               if(response instanceof JobFound && 
groupvertexId != null){
+                                       ExecutionGraph archivedJob = 
((JobFound)response).executionGraph();
+
+                                       
writeJsonForArchivedJobGroupvertex(resp.getWriter(), archivedJob,
+                                                       
JobVertexID.fromHexString(groupvertexId));
+                               }else{
+                                       LOG.warn("DoGet:groupvertex: Could not 
find job for job ID " + jobId);
+                               }
+                       }
+                       else if("taskmanagers".equals(req.getParameter("get"))) 
{
+                               int numberOfTaskManagers = 
AkkaUtils.<Integer>ask(jobmanager,
+                                               
RequestNumberRegisteredTaskManager$.MODULE$, timeout);
+                               int numberOfRegisteredSlots = 
AkkaUtils.<Integer>ask(jobmanager,
+                                               
RequestTotalNumberOfSlots$.MODULE$, timeout);
+
+                               resp.getWriter().write("{\"taskmanagers\": " + 
numberOfTaskManagers +", " +
+                                               "\"slots\": 
"+numberOfRegisteredSlots+"}");
+                       }
+                       else if("cancel".equals(req.getParameter("get"))) {
+                               String jobId = req.getParameter("job");
+                               AkkaUtils.<CancellationResponse>ask(jobmanager,
+                                               new 
CancelJob(JobID.fromHexString(jobId)), timeout);
+                       }
+                       else if("updates".equals(req.getParameter("get"))) {
+                               String jobId = req.getParameter("job");
+                               writeJsonUpdatesForJob(resp.getWriter(), 
JobID.fromHexString(jobId));
+                       } else if ("version".equals(req.getParameter("get"))) {
+                               writeJsonForVersion(resp.getWriter());
+                       }
+                       else{
+                               Iterable<ExecutionGraph> runningJobs = 
AkkaUtils.<RunningJobs>ask
+                                               (jobmanager, 
RequestRunningJobs$.MODULE$, timeout).asJavaIterable();
+                               writeJsonForJobs(resp.getWriter(), runningJobs);
+                       }
+                       
+               } catch (Exception e) {
+                       resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
+                       resp.getWriter().print(e.getMessage());
+                       if (LOG.isWarnEnabled()) {
+                               LOG.warn(StringUtils.stringifyException(e));
+                       }
+               }
+       }
+       
+       /**
+        * Writes ManagementGraph as Json for all recent jobs
+        * 
+        * @param wrt
+        * @param graphs
+        */
+       private void writeJsonForJobs(PrintWriter wrt, Iterable<ExecutionGraph> 
graphs) {
+               try {
+                       wrt.write("[");
+
+                       Iterator<ExecutionGraph> it = graphs.iterator();
+                       // Loop Jobs
+                       while(it.hasNext()){
+                               ExecutionGraph graph = it.next();
+       
+                               writeJsonForJob(wrt, graph);
+       
+                               //Write seperator between json objects
+                               if(it.hasNext()) {
+                                       wrt.write(",");
+                               }
+                       }
+                       wrt.write("]");
+               
+               } catch (EofException eof) { // Connection closed by client
+                       LOG.info("Info server for jobmanager: Connection closed 
by client, EofException");
+               } catch (IOException ioe) { // Connection closed by client      
+                       LOG.info("Info server for jobmanager: Connection closed 
by client, IOException");
+               } 
+       }
+       
+       private void writeJsonForJob(PrintWriter wrt, ExecutionGraph graph) 
throws IOException {
+               //Serialize job to json
+               wrt.write("{");
+               wrt.write("\"jobid\": \"" + graph.getJobID() + "\",");
+               wrt.write("\"jobname\": \"" + graph.getJobName()+"\",");
+               wrt.write("\"status\": \""+ graph.getState() + "\",");
+               wrt.write("\"time\": " + 
graph.getStatusTimestamp(graph.getState())+",");
+               
+               // Serialize ManagementGraph to json
+               wrt.write("\"groupvertices\": [");
+               boolean first = true;
+               
+               for (ExecutionJobVertex groupVertex : 
graph.getVerticesTopologically()) {
+                       //Write seperator between json objects
+                       if(first) {
+                               first = false;
+                       } else {
+                               wrt.write(","); }
+                       
+                       wrt.write(JsonFactory.toJson(groupVertex));
+               }
+               wrt.write("]");
+               wrt.write("}");
+                       
+       }
+       
+       /**
+        * Writes Json with a list of currently archived jobs, sorted by time
+        * 
+        * @param wrt
+        * @param graphs
+        */
+       private void writeJsonForArchive(PrintWriter wrt, List<ExecutionGraph> 
graphs) {
+               
+               wrt.write("[");
+               
+               // sort jobs by time
+               Collections.sort(graphs,  new Comparator<ExecutionGraph>() {
+                       @Override
+                       public int compare(ExecutionGraph o1, ExecutionGraph 
o2) {
+                               if(o1.getStatusTimestamp(o1.getState()) < 
o2.getStatusTimestamp(o2.getState())) {
+                                       return 1;
+                               } else {
+                                       return -1;
+                               }
+                       }
+                       
+               });
+               
+               // Loop Jobs
+               for (int i = 0; i < graphs.size(); i++) {
+                       ExecutionGraph graph = graphs.get(i);
+                       
+                       //Serialize job to json
+                       wrt.write("{");
+                       wrt.write("\"jobid\": \"" + graph.getJobID() + "\",");
+                       wrt.write("\"jobname\": \"" + graph.getJobName()+"\",");
+                       wrt.write("\"status\": \""+ graph.getState() + "\",");
+                       wrt.write("\"time\": " + 
graph.getStatusTimestamp(graph.getState()));
+                       
+                       wrt.write("}");
+                       
+                       //Write seperator between json objects
+                       if(i != graphs.size() - 1) {
+                               wrt.write(",");
+                       }
+               }
+               wrt.write("]");
+               
+       }
+       
+       /**
+        * Writes infos about archived job in Json format, including 
groupvertices and groupverticetimes
+        * 
+        * @param wrt
+        * @param graph
+        */
+       private void writeJsonForArchivedJob(PrintWriter wrt, ExecutionGraph 
graph) {
+               
+               try {
+               
+                       wrt.write("[");
+               
+                       //Serialize job to json
+                       wrt.write("{");
+                       wrt.write("\"jobid\": \"" + graph.getJobID() + "\",");
+                       wrt.write("\"jobname\": \"" + graph.getJobName()+"\",");
+                       wrt.write("\"status\": \""+ graph.getState() + "\",");
+                       wrt.write("\"SCHEDULED\": "+ 
graph.getStatusTimestamp(JobStatus.CREATED) + ",");
+                       wrt.write("\"RUNNING\": "+ 
graph.getStatusTimestamp(JobStatus.RUNNING) + ",");
+                       wrt.write("\"FINISHED\": "+ 
graph.getStatusTimestamp(JobStatus.FINISHED) + ",");
+                       wrt.write("\"FAILED\": "+ 
graph.getStatusTimestamp(JobStatus.FAILED) + ",");
+                       wrt.write("\"CANCELED\": "+ 
graph.getStatusTimestamp(JobStatus.CANCELED) + ",");
+
+                       if (graph.getState() == JobStatus.FAILED) {
+                               wrt.write("\"failednodes\": [");
+                               boolean first = true;
+                               for (ExecutionVertex vertex : 
graph.getAllExecutionVertices()) {
+                                       if (vertex.getExecutionState() == 
ExecutionState.FAILED) {
+                                               SimpleSlot slot = 
vertex.getCurrentAssignedResource();
+                                               Throwable failureCause = 
vertex.getFailureCause();
+                                               if (slot != null || 
failureCause != null) {
+                                                       if (first) {
+                                                               first = false;
+                                                       } else {
+                                                               wrt.write(",");
+                                                       }
+                                                       wrt.write("{");
+                                                       wrt.write("\"node\": 
\"" + (slot == null ? "(none)" : slot
+                                                                       
.getInstance().getInstanceConnectionInfo().getFQDNHostname()) + "\",");
+                                                       wrt.write("\"message\": 
\"" + (failureCause == null ? "" : 
StringUtils.escapeHtml(ExceptionUtils.stringifyException(failureCause))) + 
"\"");
+                                                       wrt.write("}");
+                                               }
+                                       }
+                               }
+                               wrt.write("],");
+                       }
+
+                       // Serialize ManagementGraph to json
+                       wrt.write("\"groupvertices\": [");
+                       boolean first = true;
+                       for (ExecutionJobVertex groupVertex : 
graph.getVerticesTopologically()) {
+                               //Write seperator between json objects
+                               if(first) {
+                                       first = false;
+                               } else {
+                                       wrt.write(","); }
+                               
+                               wrt.write(JsonFactory.toJson(groupVertex));
+                               
+                       }
+                       wrt.write("],");
+                       
+                       // write accumulators
+                       AccumulatorResultsResponse response = 
AkkaUtils.ask(jobmanager,
+                                       new 
RequestAccumulatorResults(graph.getJobID()), timeout);
+
+                       if(response instanceof AccumulatorResultsFound){
+                               Map<String, Object> accMap = 
((AccumulatorResultsFound)response).asJavaMap();
+
+                               wrt.write("\n\"accumulators\": [");
+                               int i = 0;
+                               for( Entry<String, Object> accumulator : 
accMap.entrySet()) {
+                                       wrt.write("{ \"name\": 
\""+accumulator.getKey()+" (" + 
accumulator.getValue().getClass().getName()+")\","
+                                                       + " \"value\": 
\""+accumulator.getValue().toString()+"\"}\n");
+                                       if(++i < accMap.size()) {
+                                               wrt.write(",");
+                                       }
+                               }
+                               wrt.write("],\n");
+
+                               wrt.write("\"groupverticetimes\": {");
+                               first = true;
+                               for (ExecutionJobVertex groupVertex : 
graph.getVerticesTopologically()) {
+
+                                       if(first) {
+                                               first = false;
+                                       } else {
+                                               wrt.write(","); }
+
+                                       // Calculate start and end time for 
groupvertex
+                                       long started = Long.MAX_VALUE;
+                                       long ended = 0;
+
+                                       // Take earliest running state and 
latest endstate of groupmembers
+                                       for (ExecutionVertex vertex : 
groupVertex.getTaskVertices()) {
+
+                                               long running = 
vertex.getStateTimestamp(ExecutionState.RUNNING);
+                                               if (running != 0 && running < 
started) {
+                                                       started = running;
+                                               }
+
+                                               long finished = 
vertex.getStateTimestamp(ExecutionState.FINISHED);
+                                               long canceled = 
vertex.getStateTimestamp(ExecutionState.CANCELED);
+                                               long failed = 
vertex.getStateTimestamp(ExecutionState.FAILED);
+
+                                               if(finished != 0 && finished > 
ended) {
+                                                       ended = finished;
+                                               }
+
+                                               if(canceled != 0 && canceled > 
ended) {
+                                                       ended = canceled;
+                                               }
+
+                                               if(failed != 0 && failed > 
ended) {
+                                                       ended = failed;
+                                               }
+
+                                       }
+
+                                       
wrt.write("\""+groupVertex.getJobVertexId()+"\": {");
+                                       wrt.write("\"groupvertexid\": \"" + 
groupVertex.getJobVertexId() + "\",");
+                                       wrt.write("\"groupvertexname\": \"" + 
groupVertex + "\",");
+                                       wrt.write("\"STARTED\": "+ started + 
",");
+                                       wrt.write("\"ENDED\": "+ ended);
+                                       wrt.write("}");
+
+                               }
+                       }else{
+                               LOG.warn("Could not find accumulator results 
for job ID " + graph.getJobID());
+                       }
+
+                       wrt.write("}");
+                       
+                       wrt.write("}");
+                       
+                       
+               wrt.write("]");
+               
+               } catch (EofException eof) { // Connection closed by client
+                       LOG.info("Info server for jobmanager: Connection closed 
by client, EofException");
+               } catch (IOException ioe) { // Connection closed by client      
+                       LOG.info("Info server for jobmanager: Connection closed 
by client, IOException");
+               } 
+               
+       }
+       
+       
+       /**
+        * Writes all updates (events) for a given job since a given time
+        * 
+        * @param wrt
+        * @param jobId
+        */
+       private void writeJsonUpdatesForJob(PrintWriter wrt, JobID jobId) {
+               
+               try {
+                       Iterable<ExecutionGraph> graphs = 
AkkaUtils.<RunningJobs>ask(jobmanager,
+                                       RequestRunningJobs$.MODULE$, 
timeout).asJavaIterable();
+                       
+                       //Serialize job to json
+                       wrt.write("{");
+                       wrt.write("\"jobid\": \"" + jobId + "\",");
+                       wrt.write("\"timestamp\": \"" + 
System.currentTimeMillis() + "\",");
+                       wrt.write("\"recentjobs\": [");
+
+                       boolean first = true;
+
+                       for(ExecutionGraph g : graphs){
+                               if(first){
+                                       first = false;
+                               }else{
+                                       wrt.write(",");
+                               }
+
+                               wrt.write("\"" + g.getJobID() + "\"");
+                       }
+
+                       wrt.write("],");
+
+                       JobResponse response = AkkaUtils.ask(jobmanager, new 
RequestJob(jobId), timeout);
+
+                       if(response instanceof JobFound){
+                               ExecutionGraph graph = 
((JobFound)response).executionGraph();
+
+                               wrt.write("\"vertexevents\": [");
+
+                               first = true;
+                               for (ExecutionVertex ev : 
graph.getAllExecutionVertices()) {
+                                       if (first) {
+                                               first = false;
+                                       } else {
+                                               wrt.write(",");
+                                       }
+
+                                       wrt.write("{");
+                                       wrt.write("\"vertexid\": \"" + 
ev.getCurrentExecutionAttempt().getAttemptId()
+                                                       + "\",");
+                                       wrt.write("\"newstate\": \"" + 
ev.getExecutionState() + "\",");
+                                       wrt.write("\"timestamp\": \"" + 
ev.getStateTimestamp(ev.getExecutionState())
+                                                       + "\"");
+                                       wrt.write("}");
+                               }
+
+                               wrt.write("],");
+
+                               wrt.write("\"jobevents\": [");
+
+                               wrt.write("{");
+                               wrt.write("\"newstate\": \"" + graph.getState() 
+ "\",");
+                               wrt.write("\"timestamp\": \"" + 
graph.getStatusTimestamp(graph.getState()) + "\"");
+                               wrt.write("}");
+
+                               wrt.write("]");
+
+                               wrt.write("}");
+                       }else{
+                               wrt.write("\"vertexevents\": [],");
+                               wrt.write("\"jobevents\": [");
+                               wrt.write("{");
+                               wrt.write("\"newstate\": \"" + 
JobStatus.FINISHED + "\",");
+                               wrt.write("\"timestamp\": \"" + 
System.currentTimeMillis() + "\"");
+                               wrt.write("}");
+                               wrt.write("]");
+                               wrt.write("}");
+                               LOG.warn("WriteJsonUpdatesForJob: Could not 
find job with job ID " + jobId);
+                       }
+               } catch (EofException eof) { // Connection closed by client
+                       LOG.info("Info server for jobmanager: Connection closed 
by client, EofException");
+               } catch (IOException ioe) { // Connection closed by client      
+                       LOG.info("Info server for jobmanager: Connection closed 
by client, IOException");
+               } 
+               
+       }
+       
+       /**
+        * Writes info about one particular archived JobVertex in a job, 
including all member execution vertices, their times and statuses.
+        */
+       private void writeJsonForArchivedJobGroupvertex(PrintWriter wrt, 
ExecutionGraph graph,
+                                                                               
                        JobVertexID vertexId) {
+               ExecutionJobVertex jobVertex = graph.getJobVertex(vertexId);
+
+               // Serialize ManagementGraph to json
+               wrt.write("{\"groupvertex\": " + JsonFactory.toJson(jobVertex) 
+ ",");
+
+               wrt.write("\"verticetimes\": {");
+               boolean first = true;
+               for (ExecutionJobVertex groupVertex : 
graph.getAllVertices().values()) {
+
+                       for (ExecutionVertex vertex : 
groupVertex.getTaskVertices()) {
+
+                               Execution exec = 
vertex.getCurrentExecutionAttempt();
+
+                               if(first) {
+                                       first = false;
+                               } else {
+                                       wrt.write(","); }
+
+                               wrt.write("\""+exec.getAttemptId() +"\": {");
+                               wrt.write("\"vertexid\": \"" + 
exec.getAttemptId() + "\",");
+                               wrt.write("\"vertexname\": \"" + vertex + 
"\",");
+                               wrt.write("\"CREATED\": "+ 
vertex.getStateTimestamp(ExecutionState.CREATED) + ",");
+                               wrt.write("\"SCHEDULED\": "+ 
vertex.getStateTimestamp(ExecutionState.SCHEDULED) + ",");
+                               wrt.write("\"DEPLOYING\": "+ 
vertex.getStateTimestamp(ExecutionState.DEPLOYING) + ",");
+                               wrt.write("\"RUNNING\": "+ 
vertex.getStateTimestamp(ExecutionState.RUNNING) + ",");
+                               wrt.write("\"FINISHED\": "+ 
vertex.getStateTimestamp(ExecutionState.FINISHED) + ",");
+                               wrt.write("\"CANCELING\": "+ 
vertex.getStateTimestamp(ExecutionState.CANCELING) + ",");
+                               wrt.write("\"CANCELED\": "+ 
vertex.getStateTimestamp(ExecutionState.CANCELED) + ",");
+                               wrt.write("\"FAILED\": "+ 
vertex.getStateTimestamp(ExecutionState.FAILED) + "");
+                               wrt.write("}");
+                       }
+
+               }
+               wrt.write("}}");
+       }
+       
+       /**
+        * Writes the version and the revision of Flink.
+        * 
+        * @param wrt
+        */
+       private void writeJsonForVersion(PrintWriter wrt) {
+               wrt.write("{");
+               wrt.write("\"version\": \"" + 
EnvironmentInformation.getVersion() + "\",");
+               wrt.write("\"revision\": \"" + 
EnvironmentInformation.getRevisionInformation().commitId + "\"");
+               wrt.write("}");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
deleted file mode 100644
index b842a9b..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
+++ /dev/null
@@ -1,554 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.web;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import akka.actor.ActorRef;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.messages.ArchiveMessages.ArchivedJobs;
-import org.apache.flink.runtime.messages.ArchiveMessages.RequestArchivedJobs$;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.AccumulatorResultsResponse;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.AccumulatorResultsFound;
-import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobs;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestRunningJobs$;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestTotalNumberOfSlots$;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestNumberRegisteredTaskManager$;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationResponse;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestAccumulatorResults;
-import org.apache.flink.runtime.messages.JobManagerMessages.RequestJob;
-import org.apache.flink.runtime.messages.JobManagerMessages.JobResponse;
-import org.apache.flink.runtime.messages.JobManagerMessages.JobFound;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.AllocatedSlot;
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.StringUtils;
-import org.eclipse.jetty.io.EofException;
-import scala.concurrent.duration.FiniteDuration;
-
-
-public class JobmanagerInfoServlet extends HttpServlet {
-       
-       private static final long serialVersionUID = 1L;
-       
-       private static final Logger LOG = 
LoggerFactory.getLogger(JobmanagerInfoServlet.class);
-       
-       /** Underlying JobManager */
-       private final ActorRef jobmanager;
-       private final ActorRef archive;
-       private final FiniteDuration timeout;
-       
-       
-       public JobmanagerInfoServlet(ActorRef jobmanager, ActorRef archive, 
FiniteDuration timeout) {
-               this.jobmanager = jobmanager;
-               this.archive = archive;
-               this.timeout = timeout;
-       }
-       
-       
-       @Override
-       protected void doGet(HttpServletRequest req, HttpServletResponse resp) 
throws ServletException, IOException {
-                       
-               resp.setStatus(HttpServletResponse.SC_OK);
-               resp.setContentType("application/json");
-               
-               try {
-                       if("archive".equals(req.getParameter("get"))) {
-                               List<ExecutionGraph> archivedJobs = new 
ArrayList<ExecutionGraph>(AkkaUtils
-                                               
.<ArchivedJobs>ask(archive,RequestArchivedJobs$.MODULE$, timeout)
-                                               .asJavaCollection());
-
-                               writeJsonForArchive(resp.getWriter(), 
archivedJobs);
-                       }
-                       else if("job".equals(req.getParameter("get"))) {
-                               String jobId = req.getParameter("job");
-                               JobResponse response = AkkaUtils.ask(archive,
-                                               new 
RequestJob(JobID.fromHexString(jobId)), timeout);
-
-                               if(response instanceof JobFound){
-                                       ExecutionGraph archivedJob = 
((JobFound)response).executionGraph();
-                                       
writeJsonForArchivedJob(resp.getWriter(), archivedJob);
-                               }else{
-                                       LOG.warn("DoGet:job: Could not find job 
for job ID " + jobId);
-                               }
-                       }
-                       else if("groupvertex".equals(req.getParameter("get"))) {
-                               String jobId = req.getParameter("job");
-                               String groupvertexId = 
req.getParameter("groupvertex");
-
-                               JobResponse response = AkkaUtils.ask(archive,
-                                               new 
RequestJob(JobID.fromHexString(jobId)), timeout);
-
-                               if(response instanceof JobFound && 
groupvertexId != null){
-                                       ExecutionGraph archivedJob = 
((JobFound)response).executionGraph();
-
-                                       
writeJsonForArchivedJobGroupvertex(resp.getWriter(), archivedJob,
-                                                       
JobVertexID.fromHexString(groupvertexId));
-                               }else{
-                                       LOG.warn("DoGet:groupvertex: Could not 
find job for job ID " + jobId);
-                               }
-                       }
-                       else if("taskmanagers".equals(req.getParameter("get"))) 
{
-                               int numberOfTaskManagers = 
AkkaUtils.<Integer>ask(jobmanager,
-                                               
RequestNumberRegisteredTaskManager$.MODULE$, timeout);
-                               int numberOfRegisteredSlots = 
AkkaUtils.<Integer>ask(jobmanager,
-                                               
RequestTotalNumberOfSlots$.MODULE$, timeout);
-
-                               resp.getWriter().write("{\"taskmanagers\": " + 
numberOfTaskManagers +", " +
-                                               "\"slots\": 
"+numberOfRegisteredSlots+"}");
-                       }
-                       else if("cancel".equals(req.getParameter("get"))) {
-                               String jobId = req.getParameter("job");
-                               AkkaUtils.<CancellationResponse>ask(jobmanager,
-                                               new 
CancelJob(JobID.fromHexString(jobId)), timeout);
-                       }
-                       else if("updates".equals(req.getParameter("get"))) {
-                               String jobId = req.getParameter("job");
-                               writeJsonUpdatesForJob(resp.getWriter(), 
JobID.fromHexString(jobId));
-                       } else if ("version".equals(req.getParameter("get"))) {
-                               writeJsonForVersion(resp.getWriter());
-                       }
-                       else{
-                               Iterable<ExecutionGraph> runningJobs = 
AkkaUtils.<RunningJobs>ask
-                                               (jobmanager, 
RequestRunningJobs$.MODULE$, timeout).asJavaIterable();
-                               writeJsonForJobs(resp.getWriter(), runningJobs);
-                       }
-                       
-               } catch (Exception e) {
-                       resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
-                       resp.getWriter().print(e.getMessage());
-                       if (LOG.isWarnEnabled()) {
-                               LOG.warn(StringUtils.stringifyException(e));
-                       }
-               }
-       }
-       
-       /**
-        * Writes ManagementGraph as Json for all recent jobs
-        * 
-        * @param wrt
-        * @param graphs
-        */
-       private void writeJsonForJobs(PrintWriter wrt, Iterable<ExecutionGraph> 
graphs) {
-               try {
-                       wrt.write("[");
-
-                       Iterator<ExecutionGraph> it = graphs.iterator();
-                       // Loop Jobs
-                       while(it.hasNext()){
-                               ExecutionGraph graph = it.next();
-       
-                               writeJsonForJob(wrt, graph);
-       
-                               //Write seperator between json objects
-                               if(it.hasNext()) {
-                                       wrt.write(",");
-                               }
-                       }
-                       wrt.write("]");
-               
-               } catch (EofException eof) { // Connection closed by client
-                       LOG.info("Info server for jobmanager: Connection closed 
by client, EofException");
-               } catch (IOException ioe) { // Connection closed by client      
-                       LOG.info("Info server for jobmanager: Connection closed 
by client, IOException");
-               } 
-       }
-       
-       private void writeJsonForJob(PrintWriter wrt, ExecutionGraph graph) 
throws IOException {
-               //Serialize job to json
-               wrt.write("{");
-               wrt.write("\"jobid\": \"" + graph.getJobID() + "\",");
-               wrt.write("\"jobname\": \"" + graph.getJobName()+"\",");
-               wrt.write("\"status\": \""+ graph.getState() + "\",");
-               wrt.write("\"time\": " + 
graph.getStatusTimestamp(graph.getState())+",");
-               
-               // Serialize ManagementGraph to json
-               wrt.write("\"groupvertices\": [");
-               boolean first = true;
-               
-               for (ExecutionJobVertex groupVertex : 
graph.getVerticesTopologically()) {
-                       //Write seperator between json objects
-                       if(first) {
-                               first = false;
-                       } else {
-                               wrt.write(","); }
-                       
-                       wrt.write(JsonFactory.toJson(groupVertex));
-               }
-               wrt.write("]");
-               wrt.write("}");
-                       
-       }
-       
-       /**
-        * Writes Json with a list of currently archived jobs, sorted by time
-        * 
-        * @param wrt
-        * @param graphs
-        */
-       private void writeJsonForArchive(PrintWriter wrt, List<ExecutionGraph> 
graphs) {
-               
-               wrt.write("[");
-               
-               // sort jobs by time
-               Collections.sort(graphs,  new Comparator<ExecutionGraph>() {
-                       @Override
-                       public int compare(ExecutionGraph o1, ExecutionGraph 
o2) {
-                               if(o1.getStatusTimestamp(o1.getState()) < 
o2.getStatusTimestamp(o2.getState())) {
-                                       return 1;
-                               } else {
-                                       return -1;
-                               }
-                       }
-                       
-               });
-               
-               // Loop Jobs
-               for (int i = 0; i < graphs.size(); i++) {
-                       ExecutionGraph graph = graphs.get(i);
-                       
-                       //Serialize job to json
-                       wrt.write("{");
-                       wrt.write("\"jobid\": \"" + graph.getJobID() + "\",");
-                       wrt.write("\"jobname\": \"" + graph.getJobName()+"\",");
-                       wrt.write("\"status\": \""+ graph.getState() + "\",");
-                       wrt.write("\"time\": " + 
graph.getStatusTimestamp(graph.getState()));
-                       
-                       wrt.write("}");
-                       
-                       //Write seperator between json objects
-                       if(i != graphs.size() - 1) {
-                               wrt.write(",");
-                       }
-               }
-               wrt.write("]");
-               
-       }
-       
-       /**
-        * Writes infos about archived job in Json format, including 
groupvertices and groupverticetimes
-        * 
-        * @param wrt
-        * @param graph
-        */
-       private void writeJsonForArchivedJob(PrintWriter wrt, ExecutionGraph 
graph) {
-               
-               try {
-               
-                       wrt.write("[");
-               
-                       //Serialize job to json
-                       wrt.write("{");
-                       wrt.write("\"jobid\": \"" + graph.getJobID() + "\",");
-                       wrt.write("\"jobname\": \"" + graph.getJobName()+"\",");
-                       wrt.write("\"status\": \""+ graph.getState() + "\",");
-                       wrt.write("\"SCHEDULED\": "+ 
graph.getStatusTimestamp(JobStatus.CREATED) + ",");
-                       wrt.write("\"RUNNING\": "+ 
graph.getStatusTimestamp(JobStatus.RUNNING) + ",");
-                       wrt.write("\"FINISHED\": "+ 
graph.getStatusTimestamp(JobStatus.FINISHED) + ",");
-                       wrt.write("\"FAILED\": "+ 
graph.getStatusTimestamp(JobStatus.FAILED) + ",");
-                       wrt.write("\"CANCELED\": "+ 
graph.getStatusTimestamp(JobStatus.CANCELED) + ",");
-
-                       if (graph.getState() == JobStatus.FAILED) {
-                               wrt.write("\"failednodes\": [");
-                               boolean first = true;
-                               for (ExecutionVertex vertex : 
graph.getAllExecutionVertices()) {
-                                       if (vertex.getExecutionState() == 
ExecutionState.FAILED) {
-                                               AllocatedSlot slot = 
vertex.getCurrentAssignedResource();
-                                               Throwable failureCause = 
vertex.getFailureCause();
-                                               if (slot != null || 
failureCause != null) {
-                                                       if (first) {
-                                                               first = false;
-                                                       } else {
-                                                               wrt.write(",");
-                                                       }
-                                                       wrt.write("{");
-                                                       wrt.write("\"node\": 
\"" + (slot == null ? "(none)" : slot
-                                                                       
.getInstance().getInstanceConnectionInfo().getFQDNHostname()) + "\",");
-                                                       wrt.write("\"message\": 
\"" + (failureCause == null ? "" : 
StringUtils.escapeHtml(ExceptionUtils.stringifyException(failureCause))) + 
"\"");
-                                                       wrt.write("}");
-                                               }
-                                       }
-                               }
-                               wrt.write("],");
-                       }
-
-                       // Serialize ManagementGraph to json
-                       wrt.write("\"groupvertices\": [");
-                       boolean first = true;
-                       for (ExecutionJobVertex groupVertex : 
graph.getVerticesTopologically()) {
-                               //Write seperator between json objects
-                               if(first) {
-                                       first = false;
-                               } else {
-                                       wrt.write(","); }
-                               
-                               wrt.write(JsonFactory.toJson(groupVertex));
-                               
-                       }
-                       wrt.write("],");
-                       
-                       // write accumulators
-                       AccumulatorResultsResponse response = 
AkkaUtils.ask(jobmanager,
-                                       new 
RequestAccumulatorResults(graph.getJobID()), timeout);
-
-                       if(response instanceof AccumulatorResultsFound){
-                               Map<String, Object> accMap = 
((AccumulatorResultsFound)response).asJavaMap();
-
-                               wrt.write("\n\"accumulators\": [");
-                               int i = 0;
-                               for( Entry<String, Object> accumulator : 
accMap.entrySet()) {
-                                       wrt.write("{ \"name\": 
\""+accumulator.getKey()+" (" + 
accumulator.getValue().getClass().getName()+")\","
-                                                       + " \"value\": 
\""+accumulator.getValue().toString()+"\"}\n");
-                                       if(++i < accMap.size()) {
-                                               wrt.write(",");
-                                       }
-                               }
-                               wrt.write("],\n");
-
-                               wrt.write("\"groupverticetimes\": {");
-                               first = true;
-                               for (ExecutionJobVertex groupVertex : 
graph.getVerticesTopologically()) {
-
-                                       if(first) {
-                                               first = false;
-                                       } else {
-                                               wrt.write(","); }
-
-                                       // Calculate start and end time for 
groupvertex
-                                       long started = Long.MAX_VALUE;
-                                       long ended = 0;
-
-                                       // Take earliest running state and 
latest endstate of groupmembers
-                                       for (ExecutionVertex vertex : 
groupVertex.getTaskVertices()) {
-
-                                               long running = 
vertex.getStateTimestamp(ExecutionState.RUNNING);
-                                               if (running != 0 && running < 
started) {
-                                                       started = running;
-                                               }
-
-                                               long finished = 
vertex.getStateTimestamp(ExecutionState.FINISHED);
-                                               long canceled = 
vertex.getStateTimestamp(ExecutionState.CANCELED);
-                                               long failed = 
vertex.getStateTimestamp(ExecutionState.FAILED);
-
-                                               if(finished != 0 && finished > 
ended) {
-                                                       ended = finished;
-                                               }
-
-                                               if(canceled != 0 && canceled > 
ended) {
-                                                       ended = canceled;
-                                               }
-
-                                               if(failed != 0 && failed > 
ended) {
-                                                       ended = failed;
-                                               }
-
-                                       }
-
-                                       
wrt.write("\""+groupVertex.getJobVertexId()+"\": {");
-                                       wrt.write("\"groupvertexid\": \"" + 
groupVertex.getJobVertexId() + "\",");
-                                       wrt.write("\"groupvertexname\": \"" + 
groupVertex + "\",");
-                                       wrt.write("\"STARTED\": "+ started + 
",");
-                                       wrt.write("\"ENDED\": "+ ended);
-                                       wrt.write("}");
-
-                               }
-                       }else{
-                               LOG.warn("Could not find accumulator results 
for job ID " + graph.getJobID());
-                       }
-
-                       wrt.write("}");
-                       
-                       wrt.write("}");
-                       
-                       
-               wrt.write("]");
-               
-               } catch (EofException eof) { // Connection closed by client
-                       LOG.info("Info server for jobmanager: Connection closed 
by client, EofException");
-               } catch (IOException ioe) { // Connection closed by client      
-                       LOG.info("Info server for jobmanager: Connection closed 
by client, IOException");
-               } 
-               
-       }
-       
-       
-       /**
-        * Writes all updates (events) for a given job since a given time
-        * 
-        * @param wrt
-        * @param jobId
-        */
-       private void writeJsonUpdatesForJob(PrintWriter wrt, JobID jobId) {
-               
-               try {
-                       Iterable<ExecutionGraph> graphs = 
AkkaUtils.<RunningJobs>ask(jobmanager,
-                                       RequestRunningJobs$.MODULE$, 
timeout).asJavaIterable();
-                       
-                       //Serialize job to json
-                       wrt.write("{");
-                       wrt.write("\"jobid\": \"" + jobId + "\",");
-                       wrt.write("\"timestamp\": \"" + 
System.currentTimeMillis() + "\",");
-                       wrt.write("\"recentjobs\": [");
-
-                       boolean first = true;
-
-                       for(ExecutionGraph g : graphs){
-                               if(first){
-                                       first = false;
-                               }else{
-                                       wrt.write(",");
-                               }
-
-                               wrt.write("\"" + g.getJobID() + "\"");
-                       }
-
-                       wrt.write("],");
-
-                       JobResponse response = AkkaUtils.ask(jobmanager, new 
RequestJob(jobId), timeout);
-
-                       if(response instanceof JobFound){
-                               ExecutionGraph graph = 
((JobFound)response).executionGraph();
-
-                               wrt.write("\"vertexevents\": [");
-
-                               first = true;
-                               for (ExecutionVertex ev : 
graph.getAllExecutionVertices()) {
-                                       if (first) {
-                                               first = false;
-                                       } else {
-                                               wrt.write(",");
-                                       }
-
-                                       wrt.write("{");
-                                       wrt.write("\"vertexid\": \"" + 
ev.getCurrentExecutionAttempt().getAttemptId()
-                                                       + "\",");
-                                       wrt.write("\"newstate\": \"" + 
ev.getExecutionState() + "\",");
-                                       wrt.write("\"timestamp\": \"" + 
ev.getStateTimestamp(ev.getExecutionState())
-                                                       + "\"");
-                                       wrt.write("}");
-                               }
-
-                               wrt.write("],");
-
-                               wrt.write("\"jobevents\": [");
-
-                               wrt.write("{");
-                               wrt.write("\"newstate\": \"" + graph.getState() 
+ "\",");
-                               wrt.write("\"timestamp\": \"" + 
graph.getStatusTimestamp(graph.getState()) + "\"");
-                               wrt.write("}");
-
-                               wrt.write("]");
-
-                               wrt.write("}");
-                       }else{
-                               wrt.write("\"vertexevents\": [],");
-                               wrt.write("\"jobevents\": [");
-                               wrt.write("{");
-                               wrt.write("\"newstate\": \"" + 
JobStatus.FINISHED + "\",");
-                               wrt.write("\"timestamp\": \"" + 
System.currentTimeMillis() + "\"");
-                               wrt.write("}");
-                               wrt.write("]");
-                               wrt.write("}");
-                               LOG.warn("WriteJsonUpdatesForJob: Could not 
find job with job ID " + jobId);
-                       }
-               } catch (EofException eof) { // Connection closed by client
-                       LOG.info("Info server for jobmanager: Connection closed 
by client, EofException");
-               } catch (IOException ioe) { // Connection closed by client      
-                       LOG.info("Info server for jobmanager: Connection closed 
by client, IOException");
-               } 
-               
-       }
-       
-       /**
-        * Writes info about one particular archived JobVertex in a job, 
including all member execution vertices, their times and statuses.
-        */
-       private void writeJsonForArchivedJobGroupvertex(PrintWriter wrt, 
ExecutionGraph graph,
-                                                                               
                        JobVertexID vertexId) {
-               ExecutionJobVertex jobVertex = graph.getJobVertex(vertexId);
-
-               // Serialize ManagementGraph to json
-               wrt.write("{\"groupvertex\": " + JsonFactory.toJson(jobVertex) 
+ ",");
-
-               wrt.write("\"verticetimes\": {");
-               boolean first = true;
-               for (ExecutionJobVertex groupVertex : 
graph.getAllVertices().values()) {
-
-                       for (ExecutionVertex vertex : 
groupVertex.getTaskVertices()) {
-
-                               Execution exec = 
vertex.getCurrentExecutionAttempt();
-
-                               if(first) {
-                                       first = false;
-                               } else {
-                                       wrt.write(","); }
-
-                               wrt.write("\""+exec.getAttemptId() +"\": {");
-                               wrt.write("\"vertexid\": \"" + 
exec.getAttemptId() + "\",");
-                               wrt.write("\"vertexname\": \"" + vertex + 
"\",");
-                               wrt.write("\"CREATED\": "+ 
vertex.getStateTimestamp(ExecutionState.CREATED) + ",");
-                               wrt.write("\"SCHEDULED\": "+ 
vertex.getStateTimestamp(ExecutionState.SCHEDULED) + ",");
-                               wrt.write("\"DEPLOYING\": "+ 
vertex.getStateTimestamp(ExecutionState.DEPLOYING) + ",");
-                               wrt.write("\"RUNNING\": "+ 
vertex.getStateTimestamp(ExecutionState.RUNNING) + ",");
-                               wrt.write("\"FINISHED\": "+ 
vertex.getStateTimestamp(ExecutionState.FINISHED) + ",");
-                               wrt.write("\"CANCELING\": "+ 
vertex.getStateTimestamp(ExecutionState.CANCELING) + ",");
-                               wrt.write("\"CANCELED\": "+ 
vertex.getStateTimestamp(ExecutionState.CANCELED) + ",");
-                               wrt.write("\"FAILED\": "+ 
vertex.getStateTimestamp(ExecutionState.FAILED) + "");
-                               wrt.write("}");
-                       }
-
-               }
-               wrt.write("}}");
-       }
-       
-       /**
-        * Writes the version and the revision of Flink.
-        * 
-        * @param wrt
-        */
-       private void writeJsonForVersion(PrintWriter wrt) {
-               wrt.write("{");
-               wrt.write("\"version\": \"" + 
EnvironmentInformation.getVersion() + "\",");
-               wrt.write("\"revision\": \"" + 
EnvironmentInformation.getRevisionInformation().commitId + "\"");
-               wrt.write("}");
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
index 103c1be..8e46692 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
@@ -22,7 +22,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.IntermediateResult;
-import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.util.StringUtils;
 
 import java.util.HashMap;
@@ -38,7 +38,7 @@ public class JsonFactory {
                json.append("\"vertexname\": \"" + 
StringUtils.escapeHtml(vertex.getSimpleName()) + "\",");
                json.append("\"vertexstatus\": \"" + vertex.getExecutionState() 
+ "\",");
                
-               AllocatedSlot slot = vertex.getCurrentAssignedResource();
+               SimpleSlot slot = vertex.getCurrentAssignedResource();
                String instanceName = slot == null ? "(null)" : 
slot.getInstance()
                                .getInstanceConnectionInfo().getFQDNHostname();
                

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
index 733cf5e..71347ee 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
@@ -128,7 +128,7 @@ public class WebInfoServer {
                // ----- the handlers for the servlets -----
                ServletContextHandler servletContext = new 
ServletContextHandler(ServletContextHandler.SESSIONS);
                servletContext.setContextPath("/");
-               servletContext.addServlet(new ServletHolder(new 
JobmanagerInfoServlet(jobmanager,
+               servletContext.addServlet(new ServletHolder(new 
JobManagerInfoServlet(jobmanager,
                                archive, timeout)), "/jobsInfo");
                servletContext.addServlet(new ServletHolder(new 
LogfileInfoServlet(logDirFiles)), "/logInfo");
                servletContext.addServlet(new ServletHolder(new 
SetupInfoServlet(config, jobmanager, timeout)),

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java
index 3a53801..ce7db25 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java
@@ -26,12 +26,11 @@ import java.util.Set;
 
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import 
org.apache.flink.runtime.profiling.impl.types.InternalInstanceProfilingData;
 import org.apache.flink.runtime.profiling.types.InstanceSummaryProfilingEvent;
 
-
 public class JobProfilingData {
 
        private final ExecutionGraph executionGraph;
@@ -59,7 +58,7 @@ public class JobProfilingData {
        public boolean 
addIfInstanceIsAllocatedByJob(InternalInstanceProfilingData 
instanceProfilingData) {
 
                for (ExecutionVertex executionVertex : 
this.executionGraph.getAllExecutionVertices()) {
-                       AllocatedSlot slot = 
executionVertex.getCurrentAssignedResource();
+                       SimpleSlot slot = 
executionVertex.getCurrentAssignedResource();
                        if (slot != null && slot.getInstance().getPath().equals(
                                        
instanceProfilingData.getInstancePath()))
                        {
@@ -76,7 +75,7 @@ public class JobProfilingData {
                final Set<Instance> tempSet = new HashSet<Instance>();
                
                for (ExecutionVertex executionVertex : 
this.executionGraph.getAllExecutionVertices()) {
-                       AllocatedSlot slot = 
executionVertex.getCurrentAssignedResource();
+                       SimpleSlot slot = 
executionVertex.getCurrentAssignedResource();
                        if (slot != null) {
                                tempSet.add(slot.getInstance());
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index c780589..532f7f8 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -437,6 +437,7 @@ class JobManager(val configuration: Configuration)
 
     case Terminated(taskManager) => {
       log.info("Task manager {} terminated.", taskManager.path)
+      JobManager.LOG.warn(s"Task manager ${taskManager.path} terminated.")
       instanceManager.unregisterTaskManager(taskManager)
       context.unwatch(taskManager)
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index f4c95c9..4ee28e6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -42,8 +42,8 @@ import 
org.apache.flink.runtime.deployment.PartitionConsumerDeploymentDescriptor
 import org.apache.flink.runtime.deployment.PartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobID;
@@ -120,7 +120,7 @@ public class ExecutionGraphDeploymentTest {
 
                        final Instance instance = 
getInstance(simpleTaskManager);
 
-                       final AllocatedSlot slot = instance.allocateSlot(jobId);
+                       final SimpleSlot slot = 
instance.allocateSimpleSlot(jobId);
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 21caca0..e8e1f7e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -33,11 +33,11 @@ import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -68,7 +68,7 @@ public class ExecutionGraphTestUtils {
                }
        }
        
-       public static void setVertexResource(ExecutionVertex vertex, 
AllocatedSlot slot) {
+       public static void setVertexResource(ExecutionVertex vertex, SimpleSlot 
slot) {
                try {
                        Execution exec = vertex.getCurrentExecutionAttempt();
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index 4a8c69b..2f1af70 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -30,7 +30,7 @@ import akka.actor.Props;
 import akka.testkit.JavaTestKit;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -76,7 +76,7 @@ public class ExecutionStateProgressTest {
                        // mock resources and mock taskmanager
                        ActorRef taskManager = 
system.actorOf(Props.create(SimpleAcknowledgingTaskManager.class));
                        for (ExecutionVertex ee : ejv.getTaskVertices()) {
-                               AllocatedSlot slot = 
getInstance(taskManager).allocateSlot(jid);
+                               SimpleSlot slot = 
getInstance(taskManager).allocateSimpleSlot(jid);
                                ee.deployToSlot(slot);
                        }
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 3c1b174..ee89954 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -37,7 +37,7 @@ import akka.testkit.JavaTestKit;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -148,7 +148,7 @@ public class ExecutionVertexCancelTest {
                                                new TaskOperationResult(execId, 
false))));
 
                                Instance instance = getInstance(taskManager);
-                               AllocatedSlot slot = instance.allocateSlot(new 
JobID());
+                       SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
 
                                vertex.deployToSlot(slot);
 
@@ -223,7 +223,7 @@ public class ExecutionVertexCancelTest {
                                                        
TaskOperationResult(execId, false), new TaskOperationResult(execId, true))));
 
                                        Instance instance = 
getInstance(taskManager);
-                                       AllocatedSlot slot = 
instance.allocateSlot(new JobID());
+                       SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
 
                                        vertex.deployToSlot(slot);
 
@@ -296,7 +296,7 @@ public class ExecutionVertexCancelTest {
                                                                        
TaskOperationResult(execId, true))));
 
                                        Instance instance = 
getInstance(taskManager);
-                                       AllocatedSlot slot = 
instance.allocateSlot(new JobID());
+                       SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
 
                                        setVertexState(vertex, 
ExecutionState.RUNNING);
                                        setVertexResource(vertex, slot);
@@ -344,7 +344,7 @@ public class ExecutionVertexCancelTest {
                                                        
TaskOperationResult(execId, true))));
 
                                        Instance instance = 
getInstance(taskManager);
-                                       AllocatedSlot slot = 
instance.allocateSlot(new JobID());
+                       SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
 
                                        setVertexState(vertex, 
ExecutionState.RUNNING);
                                        setVertexResource(vertex, slot);
@@ -400,7 +400,7 @@ public class ExecutionVertexCancelTest {
                                                        
TaskOperationResult(execId, false))));
 
                                        Instance instance = 
getInstance(taskManager);
-                                       AllocatedSlot slot = 
instance.allocateSlot(new JobID());
+                       SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
 
                                        setVertexState(vertex, 
ExecutionState.RUNNING);
                                        setVertexResource(vertex, slot);
@@ -441,7 +441,7 @@ public class ExecutionVertexCancelTest {
                                                        
CancelSequenceTaskManagerCreator()));
 
                                        Instance instance = 
getInstance(taskManager);
-                                       AllocatedSlot slot = 
instance.allocateSlot(new JobID());
+                       SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
 
                                        setVertexState(vertex, 
ExecutionState.RUNNING);
                                        setVertexResource(vertex, slot);
@@ -487,7 +487,7 @@ public class ExecutionVertexCancelTest {
                                                        )));
 
                                        Instance instance = 
getInstance(taskManager);
-                                       AllocatedSlot slot = 
instance.allocateSlot(new JobID());
+                       SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
 
                                        setVertexState(vertex, 
ExecutionState.RUNNING);
                                        setVertexResource(vertex, slot);
@@ -544,7 +544,7 @@ public class ExecutionVertexCancelTest {
                        // the scheduler (or any caller) needs to know that the 
slot should be released
                        try {
                                Instance instance = 
getInstance(ActorRef.noSender());
-                               AllocatedSlot slot = instance.allocateSlot(new 
JobID());
+                               SimpleSlot slot = 
instance.allocateSimpleSlot(new JobID());
                                
                                vertex.deployToSlot(slot);
                                fail("Method should throw an exception");
@@ -587,7 +587,7 @@ public class ExecutionVertexCancelTest {
                                setVertexState(vertex, 
ExecutionState.CANCELING);
                                
                                Instance instance = 
getInstance(ActorRef.noSender());
-                               AllocatedSlot slot = instance.allocateSlot(new 
JobID());
+                               SimpleSlot slot = 
instance.allocateSimpleSlot(new JobID());
                                
                                vertex.deployToSlot(slot);
                                fail("Method should throw an exception");
@@ -601,7 +601,7 @@ public class ExecutionVertexCancelTest {
                                                AkkaUtils.DEFAULT_TIMEOUT());
                                
                                Instance instance = 
getInstance(ActorRef.noSender());
-                               AllocatedSlot slot = instance.allocateSlot(new 
JobID());
+                               SimpleSlot slot = 
instance.allocateSimpleSlot(new JobID());
                                
                                setVertexResource(vertex, slot);
                                setVertexState(vertex, 
ExecutionState.CANCELING);

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 330292b..c0d1db8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -29,8 +29,8 @@ import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import 
org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult;
@@ -64,7 +64,7 @@ public class ExecutionVertexDeploymentTest {
                        // mock taskmanager to simply accept the call
                        Instance instance = getInstance(tm);
 
-                       final AllocatedSlot slot = instance.allocateSlot(new 
JobID());
+                       final SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
                        
                        final ExecutionJobVertex ejv = getExecutionVertex(jid);
                        
@@ -106,8 +106,8 @@ public class ExecutionVertexDeploymentTest {
                                        
Props.create(SimpleAcknowledgingTaskManager.class));
                        
                        final Instance instance = 
getInstance(simpleTaskManager);
+                       final SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
 
-                       final AllocatedSlot slot = instance.allocateSlot(new 
JobID());
                        
                        final ExecutionJobVertex ejv = getExecutionVertex(jid);
                        
@@ -151,7 +151,7 @@ public class ExecutionVertexDeploymentTest {
                        
                        final Instance instance = 
getInstance(simpleTaskManager);
 
-                       final AllocatedSlot slot = instance.allocateSlot(new 
JobID());
+                       final SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
                        
                        final ExecutionJobVertex ejv = getExecutionVertex(jid);
                        
@@ -206,7 +206,7 @@ public class ExecutionVertexDeploymentTest {
                                        
Props.create(SimpleFailingTaskManager.class));
                        
                        final Instance instance = 
getInstance(simpleTaskManager);
-                       final AllocatedSlot slot = instance.allocateSlot(new 
JobID());
+                       final SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
                        
                        final ExecutionJobVertex ejv = getExecutionVertex(jid);
                        
@@ -242,8 +242,8 @@ public class ExecutionVertexDeploymentTest {
                                        
Props.create(SimpleFailingTaskManager.class));
                        
                        final Instance instance = 
getInstance(simpleTaskManager);
+                       final SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
 
-                       final AllocatedSlot slot = instance.allocateSlot(new 
JobID());
 
                        final ExecutionJobVertex ejv = getExecutionVertex(jid);
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
@@ -289,9 +289,8 @@ public class ExecutionVertexDeploymentTest {
 
                        final TestActorRef simpleTaskManager = 
TestActorRef.create(system,
                                        
Props.create(SimpleAcknowledgingTaskManager.class));
-
                        final Instance instance = 
getInstance(simpleTaskManager);
-                       final AllocatedSlot slot = instance.allocateSlot(new 
JobID());
+                       final SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
 
                        final ExecutionJobVertex ejv = getExecutionVertex(jid);
 
@@ -343,8 +342,7 @@ public class ExecutionVertexDeploymentTest {
                                        TaskOperationResult(eid, false), new 
TaskOperationResult(eid, true))));
 
                        final Instance instance = 
getInstance(simpleTaskManager);
-
-                       final AllocatedSlot slot = instance.allocateSlot(new 
JobID());
+                       final SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 24ac44b..8230433 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -30,8 +30,8 @@ import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
@@ -65,7 +65,7 @@ public class ExecutionVertexSchedulingTest {
                try {
                        // a slot than cannot be deployed to
                        final Instance instance = 
getInstance(ActorRef.noSender());
-                       final AllocatedSlot slot = instance.allocateSlot(new 
JobID());
+                       final SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
                        slot.cancel();
                        assertFalse(slot.isReleased());
                        
@@ -96,7 +96,7 @@ public class ExecutionVertexSchedulingTest {
                try {
                        // a slot than cannot be deployed to
                        final Instance instance = 
getInstance(ActorRef.noSender());
-                       final AllocatedSlot slot = instance.allocateSlot(new 
JobID());
+                       final SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
                        slot.cancel();
                        assertFalse(slot.isReleased());
                        
@@ -136,7 +136,7 @@ public class ExecutionVertexSchedulingTest {
                                        .SimpleAcknowledgingTaskManager.class));
 
                        final Instance instance = getInstance(tm);
-                       final AllocatedSlot slot = instance.allocateSlot(new 
JobID());
+                       final SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
                        
                        final ExecutionJobVertex ejv = getExecutionVertex(new 
JobVertexID());
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java
index 15cdefb..94bdef1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java
@@ -36,7 +36,7 @@ public class AllocatedSlotTest {
                try {
                        // cancel, then release
                        {
-                               AllocatedSlot slot = getSlot();
+                               SimpleSlot slot = getSlot();
                                assertTrue(slot.isAlive());
                                
                                slot.cancel();
@@ -52,7 +52,7 @@ public class AllocatedSlotTest {
                        
                        // release immediately
                        {
-                               AllocatedSlot slot = getSlot();
+                               SimpleSlot slot = getSlot();
                                assertTrue(slot.isAlive());
                                
                                slot.releaseSlot();
@@ -75,32 +75,32 @@ public class AllocatedSlotTest {
                        
                        // assign to alive slot
                        {
-                               AllocatedSlot slot = getSlot();
+                               SimpleSlot slot = getSlot();
                                
                                assertTrue(slot.setExecutedVertex(ev));
-                               assertEquals(ev, slot.getExecutedVertex());
+                               assertEquals(ev, slot.getExecution());
                                
                                // try to add another one
                                assertFalse(slot.setExecutedVertex(ev_2));
-                               assertEquals(ev, slot.getExecutedVertex());
+                               assertEquals(ev, slot.getExecution());
                        }
                        
                        // assign to canceled slot
                        {
-                               AllocatedSlot slot = getSlot();
+                               SimpleSlot slot = getSlot();
                                slot.cancel();
                                
                                assertFalse(slot.setExecutedVertex(ev));
-                               assertNull(slot.getExecutedVertex());
+                               assertNull(slot.getExecution());
                        }
                        
                        // assign to released
                        {
-                               AllocatedSlot slot = getSlot();
+                               SimpleSlot slot = getSlot();
                                slot.releaseSlot();
                                
                                assertFalse(slot.setExecutedVertex(ev));
-                               assertNull(slot.getExecutedVertex());
+                               assertNull(slot.getExecution());
                        }
                }
                catch (Exception e) {
@@ -114,9 +114,9 @@ public class AllocatedSlotTest {
                try {
                        Execution ev = mock(Execution.class);
                        
-                       AllocatedSlot slot = getSlot();
+                       SimpleSlot slot = getSlot();
                        assertTrue(slot.setExecutedVertex(ev));
-                       assertEquals(ev, slot.getExecutedVertex());
+                       assertEquals(ev, slot.getExecution());
                        
                        slot.cancel();
                        slot.releaseSlot();
@@ -130,12 +130,12 @@ public class AllocatedSlotTest {
                }
        }
        
-       public static AllocatedSlot getSlot() throws Exception {
+       public static SimpleSlot getSlot() throws Exception {
                HardwareDescription hardwareDescription = new 
HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
                InetAddress address = InetAddress.getByName("127.0.0.1");
                InstanceConnectionInfo connection = new 
InstanceConnectionInfo(address, 10001);
                
                Instance instance = new Instance(ActorRef.noSender(), 
connection, new InstanceID(), hardwareDescription, 1);
-               return instance.allocateSlot(new JobID());
+               return instance.allocateSimpleSlot(new JobID());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
index a7820a3..47f7a2b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
@@ -45,10 +45,10 @@ public class InstanceTest {
                        assertEquals(4, instance.getNumberOfAvailableSlots());
                        assertEquals(0, instance.getNumberOfAllocatedSlots());
                        
-                       AllocatedSlot slot1 = instance.allocateSlot(new 
JobID());
-                       AllocatedSlot slot2 = instance.allocateSlot(new 
JobID());
-                       AllocatedSlot slot3 = instance.allocateSlot(new 
JobID());
-                       AllocatedSlot slot4 = instance.allocateSlot(new 
JobID());
+                       SimpleSlot slot1 = instance.allocateSimpleSlot(new 
JobID());
+                       SimpleSlot slot2 = instance.allocateSimpleSlot(new 
JobID());
+                       SimpleSlot slot3 = instance.allocateSimpleSlot(new 
JobID());
+                       SimpleSlot slot4 = instance.allocateSimpleSlot(new 
JobID());
                        
                        assertNotNull(slot1);
                        assertNotNull(slot2);
@@ -61,7 +61,7 @@ public class InstanceTest {
                                        slot3.getSlotNumber() + 
slot4.getSlotNumber());
                        
                        // no more slots
-                       assertNull(instance.allocateSlot(new JobID()));
+                       assertNull(instance.allocateSimpleSlot(new JobID()));
                        try {
                                instance.returnAllocatedSlot(slot2);
                                fail("instance accepted a non-cancelled slot.");
@@ -109,9 +109,9 @@ public class InstanceTest {
                        
                        assertEquals(3, instance.getNumberOfAvailableSlots());
                        
-                       AllocatedSlot slot1 = instance.allocateSlot(new 
JobID());
-                       AllocatedSlot slot2 = instance.allocateSlot(new 
JobID());
-                       AllocatedSlot slot3 = instance.allocateSlot(new 
JobID());
+                       SimpleSlot slot1 = instance.allocateSimpleSlot(new 
JobID());
+                       SimpleSlot slot2 = instance.allocateSimpleSlot(new 
JobID());
+                       SimpleSlot slot3 = instance.allocateSimpleSlot(new 
JobID());
                        
                        instance.markDead();
                        
@@ -139,9 +139,9 @@ public class InstanceTest {
                        
                        assertEquals(3, instance.getNumberOfAvailableSlots());
                        
-                       AllocatedSlot slot1 = instance.allocateSlot(new 
JobID());
-                       AllocatedSlot slot2 = instance.allocateSlot(new 
JobID());
-                       AllocatedSlot slot3 = instance.allocateSlot(new 
JobID());
+                       SimpleSlot slot1 = instance.allocateSimpleSlot(new 
JobID());
+                       SimpleSlot slot2 = instance.allocateSimpleSlot(new 
JobID());
+                       SimpleSlot slot3 = instance.allocateSimpleSlot(new 
JobID());
                        
                        instance.cancelAndReleaseAllSlots();
                        

Reply via email to