http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java deleted file mode 100644 index 92d457b..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmanager.scheduler; - -import org.apache.flink.runtime.AbstractID; -import org.apache.flink.runtime.instance.AllocatedSlot; - -public class SubSlot extends AllocatedSlot { - - private static final long serialVersionUID = 1361615219044538497L; - - - private final SharedSlot sharedSlot; - - private final AbstractID groupId; - - private final int subSlotNumber; - - - public SubSlot(SharedSlot sharedSlot, int subSlotNumber, AbstractID groupId) { - super(sharedSlot.getAllocatedSlot().getJobID(), - sharedSlot.getAllocatedSlot().getInstance(), - sharedSlot.getAllocatedSlot().getSlotNumber()); - - this.sharedSlot = sharedSlot; - this.groupId = groupId; - this.subSlotNumber = subSlotNumber; - } - - // -------------------------------------------------------------------------------------------- - - public void releaseSlot() { - // cancel everything, if there is something. since this is atomically status based, - // it will not happen twice if another attempt happened before or concurrently - try { - cancel(); - } - finally { - if (markReleased()) { - this.sharedSlot.returnAllocatedSlot(this); - } - } - } - - public SharedSlot getSharedSlot() { - return this.sharedSlot; - } - - public AbstractID getGroupId() { - return groupId; - } - - // -------------------------------------------------------------------------------------------- - - @Override - public String toString() { - return "SubSlot " + subSlotNumber + " (" + super.toString() + ')'; - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java new file mode 100644 index 0000000..1492ae1 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java @@ -0,0 +1,553 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager.web; + +import java.io.IOException; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import akka.actor.ActorRef; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.messages.ArchiveMessages.ArchivedJobs; +import org.apache.flink.runtime.messages.ArchiveMessages.RequestArchivedJobs$; +import org.apache.flink.runtime.messages.JobManagerMessages.AccumulatorResultsResponse; +import org.apache.flink.runtime.messages.JobManagerMessages.AccumulatorResultsFound; +import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobs; +import org.apache.flink.runtime.messages.JobManagerMessages.RequestRunningJobs$; +import org.apache.flink.runtime.messages.JobManagerMessages.RequestTotalNumberOfSlots$; +import org.apache.flink.runtime.messages.JobManagerMessages.RequestNumberRegisteredTaskManager$; +import org.apache.flink.runtime.messages.JobManagerMessages.CancellationResponse; +import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob; +import org.apache.flink.runtime.messages.JobManagerMessages.RequestAccumulatorResults; +import org.apache.flink.runtime.messages.JobManagerMessages.RequestJob; +import org.apache.flink.runtime.messages.JobManagerMessages.JobResponse; +import org.apache.flink.runtime.messages.JobManagerMessages.JobFound; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.StringUtils; +import org.eclipse.jetty.io.EofException; +import scala.concurrent.duration.FiniteDuration; + +public class JobManagerInfoServlet extends HttpServlet { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(JobManagerInfoServlet.class); + + /** Underlying JobManager */ + private final ActorRef jobmanager; + private final ActorRef archive; + private final FiniteDuration timeout; + + + public JobManagerInfoServlet(ActorRef jobmanager, ActorRef archive, FiniteDuration timeout) { + this.jobmanager = jobmanager; + this.archive = archive; + this.timeout = timeout; + } + + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + + resp.setStatus(HttpServletResponse.SC_OK); + resp.setContentType("application/json"); + + try { + if("archive".equals(req.getParameter("get"))) { + List<ExecutionGraph> archivedJobs = new ArrayList<ExecutionGraph>(AkkaUtils + .<ArchivedJobs>ask(archive,RequestArchivedJobs$.MODULE$, timeout) + .asJavaCollection()); + + writeJsonForArchive(resp.getWriter(), archivedJobs); + } + else if("job".equals(req.getParameter("get"))) { + String jobId = req.getParameter("job"); + JobResponse response = AkkaUtils.ask(archive, + new RequestJob(JobID.fromHexString(jobId)), timeout); + + if(response instanceof JobFound){ + ExecutionGraph archivedJob = ((JobFound)response).executionGraph(); + writeJsonForArchivedJob(resp.getWriter(), archivedJob); + }else{ + LOG.warn("DoGet:job: Could not find job for job ID " + jobId); + } + } + else if("groupvertex".equals(req.getParameter("get"))) { + String jobId = req.getParameter("job"); + String groupvertexId = req.getParameter("groupvertex"); + + JobResponse response = AkkaUtils.ask(archive, + new RequestJob(JobID.fromHexString(jobId)), timeout); + + if(response instanceof JobFound && groupvertexId != null){ + ExecutionGraph archivedJob = ((JobFound)response).executionGraph(); + + writeJsonForArchivedJobGroupvertex(resp.getWriter(), archivedJob, + JobVertexID.fromHexString(groupvertexId)); + }else{ + LOG.warn("DoGet:groupvertex: Could not find job for job ID " + jobId); + } + } + else if("taskmanagers".equals(req.getParameter("get"))) { + int numberOfTaskManagers = AkkaUtils.<Integer>ask(jobmanager, + RequestNumberRegisteredTaskManager$.MODULE$, timeout); + int numberOfRegisteredSlots = AkkaUtils.<Integer>ask(jobmanager, + RequestTotalNumberOfSlots$.MODULE$, timeout); + + resp.getWriter().write("{\"taskmanagers\": " + numberOfTaskManagers +", " + + "\"slots\": "+numberOfRegisteredSlots+"}"); + } + else if("cancel".equals(req.getParameter("get"))) { + String jobId = req.getParameter("job"); + AkkaUtils.<CancellationResponse>ask(jobmanager, + new CancelJob(JobID.fromHexString(jobId)), timeout); + } + else if("updates".equals(req.getParameter("get"))) { + String jobId = req.getParameter("job"); + writeJsonUpdatesForJob(resp.getWriter(), JobID.fromHexString(jobId)); + } else if ("version".equals(req.getParameter("get"))) { + writeJsonForVersion(resp.getWriter()); + } + else{ + Iterable<ExecutionGraph> runningJobs = AkkaUtils.<RunningJobs>ask + (jobmanager, RequestRunningJobs$.MODULE$, timeout).asJavaIterable(); + writeJsonForJobs(resp.getWriter(), runningJobs); + } + + } catch (Exception e) { + resp.setStatus(HttpServletResponse.SC_BAD_REQUEST); + resp.getWriter().print(e.getMessage()); + if (LOG.isWarnEnabled()) { + LOG.warn(StringUtils.stringifyException(e)); + } + } + } + + /** + * Writes ManagementGraph as Json for all recent jobs + * + * @param wrt + * @param graphs + */ + private void writeJsonForJobs(PrintWriter wrt, Iterable<ExecutionGraph> graphs) { + try { + wrt.write("["); + + Iterator<ExecutionGraph> it = graphs.iterator(); + // Loop Jobs + while(it.hasNext()){ + ExecutionGraph graph = it.next(); + + writeJsonForJob(wrt, graph); + + //Write seperator between json objects + if(it.hasNext()) { + wrt.write(","); + } + } + wrt.write("]"); + + } catch (EofException eof) { // Connection closed by client + LOG.info("Info server for jobmanager: Connection closed by client, EofException"); + } catch (IOException ioe) { // Connection closed by client + LOG.info("Info server for jobmanager: Connection closed by client, IOException"); + } + } + + private void writeJsonForJob(PrintWriter wrt, ExecutionGraph graph) throws IOException { + //Serialize job to json + wrt.write("{"); + wrt.write("\"jobid\": \"" + graph.getJobID() + "\","); + wrt.write("\"jobname\": \"" + graph.getJobName()+"\","); + wrt.write("\"status\": \""+ graph.getState() + "\","); + wrt.write("\"time\": " + graph.getStatusTimestamp(graph.getState())+","); + + // Serialize ManagementGraph to json + wrt.write("\"groupvertices\": ["); + boolean first = true; + + for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) { + //Write seperator between json objects + if(first) { + first = false; + } else { + wrt.write(","); } + + wrt.write(JsonFactory.toJson(groupVertex)); + } + wrt.write("]"); + wrt.write("}"); + + } + + /** + * Writes Json with a list of currently archived jobs, sorted by time + * + * @param wrt + * @param graphs + */ + private void writeJsonForArchive(PrintWriter wrt, List<ExecutionGraph> graphs) { + + wrt.write("["); + + // sort jobs by time + Collections.sort(graphs, new Comparator<ExecutionGraph>() { + @Override + public int compare(ExecutionGraph o1, ExecutionGraph o2) { + if(o1.getStatusTimestamp(o1.getState()) < o2.getStatusTimestamp(o2.getState())) { + return 1; + } else { + return -1; + } + } + + }); + + // Loop Jobs + for (int i = 0; i < graphs.size(); i++) { + ExecutionGraph graph = graphs.get(i); + + //Serialize job to json + wrt.write("{"); + wrt.write("\"jobid\": \"" + graph.getJobID() + "\","); + wrt.write("\"jobname\": \"" + graph.getJobName()+"\","); + wrt.write("\"status\": \""+ graph.getState() + "\","); + wrt.write("\"time\": " + graph.getStatusTimestamp(graph.getState())); + + wrt.write("}"); + + //Write seperator between json objects + if(i != graphs.size() - 1) { + wrt.write(","); + } + } + wrt.write("]"); + + } + + /** + * Writes infos about archived job in Json format, including groupvertices and groupverticetimes + * + * @param wrt + * @param graph + */ + private void writeJsonForArchivedJob(PrintWriter wrt, ExecutionGraph graph) { + + try { + + wrt.write("["); + + //Serialize job to json + wrt.write("{"); + wrt.write("\"jobid\": \"" + graph.getJobID() + "\","); + wrt.write("\"jobname\": \"" + graph.getJobName()+"\","); + wrt.write("\"status\": \""+ graph.getState() + "\","); + wrt.write("\"SCHEDULED\": "+ graph.getStatusTimestamp(JobStatus.CREATED) + ","); + wrt.write("\"RUNNING\": "+ graph.getStatusTimestamp(JobStatus.RUNNING) + ","); + wrt.write("\"FINISHED\": "+ graph.getStatusTimestamp(JobStatus.FINISHED) + ","); + wrt.write("\"FAILED\": "+ graph.getStatusTimestamp(JobStatus.FAILED) + ","); + wrt.write("\"CANCELED\": "+ graph.getStatusTimestamp(JobStatus.CANCELED) + ","); + + if (graph.getState() == JobStatus.FAILED) { + wrt.write("\"failednodes\": ["); + boolean first = true; + for (ExecutionVertex vertex : graph.getAllExecutionVertices()) { + if (vertex.getExecutionState() == ExecutionState.FAILED) { + SimpleSlot slot = vertex.getCurrentAssignedResource(); + Throwable failureCause = vertex.getFailureCause(); + if (slot != null || failureCause != null) { + if (first) { + first = false; + } else { + wrt.write(","); + } + wrt.write("{"); + wrt.write("\"node\": \"" + (slot == null ? "(none)" : slot + .getInstance().getInstanceConnectionInfo().getFQDNHostname()) + "\","); + wrt.write("\"message\": \"" + (failureCause == null ? "" : StringUtils.escapeHtml(ExceptionUtils.stringifyException(failureCause))) + "\""); + wrt.write("}"); + } + } + } + wrt.write("],"); + } + + // Serialize ManagementGraph to json + wrt.write("\"groupvertices\": ["); + boolean first = true; + for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) { + //Write seperator between json objects + if(first) { + first = false; + } else { + wrt.write(","); } + + wrt.write(JsonFactory.toJson(groupVertex)); + + } + wrt.write("],"); + + // write accumulators + AccumulatorResultsResponse response = AkkaUtils.ask(jobmanager, + new RequestAccumulatorResults(graph.getJobID()), timeout); + + if(response instanceof AccumulatorResultsFound){ + Map<String, Object> accMap = ((AccumulatorResultsFound)response).asJavaMap(); + + wrt.write("\n\"accumulators\": ["); + int i = 0; + for( Entry<String, Object> accumulator : accMap.entrySet()) { + wrt.write("{ \"name\": \""+accumulator.getKey()+" (" + accumulator.getValue().getClass().getName()+")\"," + + " \"value\": \""+accumulator.getValue().toString()+"\"}\n"); + if(++i < accMap.size()) { + wrt.write(","); + } + } + wrt.write("],\n"); + + wrt.write("\"groupverticetimes\": {"); + first = true; + for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) { + + if(first) { + first = false; + } else { + wrt.write(","); } + + // Calculate start and end time for groupvertex + long started = Long.MAX_VALUE; + long ended = 0; + + // Take earliest running state and latest endstate of groupmembers + for (ExecutionVertex vertex : groupVertex.getTaskVertices()) { + + long running = vertex.getStateTimestamp(ExecutionState.RUNNING); + if (running != 0 && running < started) { + started = running; + } + + long finished = vertex.getStateTimestamp(ExecutionState.FINISHED); + long canceled = vertex.getStateTimestamp(ExecutionState.CANCELED); + long failed = vertex.getStateTimestamp(ExecutionState.FAILED); + + if(finished != 0 && finished > ended) { + ended = finished; + } + + if(canceled != 0 && canceled > ended) { + ended = canceled; + } + + if(failed != 0 && failed > ended) { + ended = failed; + } + + } + + wrt.write("\""+groupVertex.getJobVertexId()+"\": {"); + wrt.write("\"groupvertexid\": \"" + groupVertex.getJobVertexId() + "\","); + wrt.write("\"groupvertexname\": \"" + groupVertex + "\","); + wrt.write("\"STARTED\": "+ started + ","); + wrt.write("\"ENDED\": "+ ended); + wrt.write("}"); + + } + }else{ + LOG.warn("Could not find accumulator results for job ID " + graph.getJobID()); + } + + wrt.write("}"); + + wrt.write("}"); + + + wrt.write("]"); + + } catch (EofException eof) { // Connection closed by client + LOG.info("Info server for jobmanager: Connection closed by client, EofException"); + } catch (IOException ioe) { // Connection closed by client + LOG.info("Info server for jobmanager: Connection closed by client, IOException"); + } + + } + + + /** + * Writes all updates (events) for a given job since a given time + * + * @param wrt + * @param jobId + */ + private void writeJsonUpdatesForJob(PrintWriter wrt, JobID jobId) { + + try { + Iterable<ExecutionGraph> graphs = AkkaUtils.<RunningJobs>ask(jobmanager, + RequestRunningJobs$.MODULE$, timeout).asJavaIterable(); + + //Serialize job to json + wrt.write("{"); + wrt.write("\"jobid\": \"" + jobId + "\","); + wrt.write("\"timestamp\": \"" + System.currentTimeMillis() + "\","); + wrt.write("\"recentjobs\": ["); + + boolean first = true; + + for(ExecutionGraph g : graphs){ + if(first){ + first = false; + }else{ + wrt.write(","); + } + + wrt.write("\"" + g.getJobID() + "\""); + } + + wrt.write("],"); + + JobResponse response = AkkaUtils.ask(jobmanager, new RequestJob(jobId), timeout); + + if(response instanceof JobFound){ + ExecutionGraph graph = ((JobFound)response).executionGraph(); + + wrt.write("\"vertexevents\": ["); + + first = true; + for (ExecutionVertex ev : graph.getAllExecutionVertices()) { + if (first) { + first = false; + } else { + wrt.write(","); + } + + wrt.write("{"); + wrt.write("\"vertexid\": \"" + ev.getCurrentExecutionAttempt().getAttemptId() + + "\","); + wrt.write("\"newstate\": \"" + ev.getExecutionState() + "\","); + wrt.write("\"timestamp\": \"" + ev.getStateTimestamp(ev.getExecutionState()) + + "\""); + wrt.write("}"); + } + + wrt.write("],"); + + wrt.write("\"jobevents\": ["); + + wrt.write("{"); + wrt.write("\"newstate\": \"" + graph.getState() + "\","); + wrt.write("\"timestamp\": \"" + graph.getStatusTimestamp(graph.getState()) + "\""); + wrt.write("}"); + + wrt.write("]"); + + wrt.write("}"); + }else{ + wrt.write("\"vertexevents\": [],"); + wrt.write("\"jobevents\": ["); + wrt.write("{"); + wrt.write("\"newstate\": \"" + JobStatus.FINISHED + "\","); + wrt.write("\"timestamp\": \"" + System.currentTimeMillis() + "\""); + wrt.write("}"); + wrt.write("]"); + wrt.write("}"); + LOG.warn("WriteJsonUpdatesForJob: Could not find job with job ID " + jobId); + } + } catch (EofException eof) { // Connection closed by client + LOG.info("Info server for jobmanager: Connection closed by client, EofException"); + } catch (IOException ioe) { // Connection closed by client + LOG.info("Info server for jobmanager: Connection closed by client, IOException"); + } + + } + + /** + * Writes info about one particular archived JobVertex in a job, including all member execution vertices, their times and statuses. + */ + private void writeJsonForArchivedJobGroupvertex(PrintWriter wrt, ExecutionGraph graph, + JobVertexID vertexId) { + ExecutionJobVertex jobVertex = graph.getJobVertex(vertexId); + + // Serialize ManagementGraph to json + wrt.write("{\"groupvertex\": " + JsonFactory.toJson(jobVertex) + ","); + + wrt.write("\"verticetimes\": {"); + boolean first = true; + for (ExecutionJobVertex groupVertex : graph.getAllVertices().values()) { + + for (ExecutionVertex vertex : groupVertex.getTaskVertices()) { + + Execution exec = vertex.getCurrentExecutionAttempt(); + + if(first) { + first = false; + } else { + wrt.write(","); } + + wrt.write("\""+exec.getAttemptId() +"\": {"); + wrt.write("\"vertexid\": \"" + exec.getAttemptId() + "\","); + wrt.write("\"vertexname\": \"" + vertex + "\","); + wrt.write("\"CREATED\": "+ vertex.getStateTimestamp(ExecutionState.CREATED) + ","); + wrt.write("\"SCHEDULED\": "+ vertex.getStateTimestamp(ExecutionState.SCHEDULED) + ","); + wrt.write("\"DEPLOYING\": "+ vertex.getStateTimestamp(ExecutionState.DEPLOYING) + ","); + wrt.write("\"RUNNING\": "+ vertex.getStateTimestamp(ExecutionState.RUNNING) + ","); + wrt.write("\"FINISHED\": "+ vertex.getStateTimestamp(ExecutionState.FINISHED) + ","); + wrt.write("\"CANCELING\": "+ vertex.getStateTimestamp(ExecutionState.CANCELING) + ","); + wrt.write("\"CANCELED\": "+ vertex.getStateTimestamp(ExecutionState.CANCELED) + ","); + wrt.write("\"FAILED\": "+ vertex.getStateTimestamp(ExecutionState.FAILED) + ""); + wrt.write("}"); + } + + } + wrt.write("}}"); + } + + /** + * Writes the version and the revision of Flink. + * + * @param wrt + */ + private void writeJsonForVersion(PrintWriter wrt) { + wrt.write("{"); + wrt.write("\"version\": \"" + EnvironmentInformation.getVersion() + "\","); + wrt.write("\"revision\": \"" + EnvironmentInformation.getRevisionInformation().commitId + "\""); + wrt.write("}"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java deleted file mode 100644 index b842a9b..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java +++ /dev/null @@ -1,554 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmanager.web; - -import java.io.IOException; -import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -import javax.servlet.ServletException; -import javax.servlet.http.HttpServlet; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -import akka.actor.ActorRef; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.messages.ArchiveMessages.ArchivedJobs; -import org.apache.flink.runtime.messages.ArchiveMessages.RequestArchivedJobs$; -import org.apache.flink.runtime.messages.JobManagerMessages.AccumulatorResultsResponse; -import org.apache.flink.runtime.messages.JobManagerMessages.AccumulatorResultsFound; -import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobs; -import org.apache.flink.runtime.messages.JobManagerMessages.RequestRunningJobs$; -import org.apache.flink.runtime.messages.JobManagerMessages.RequestTotalNumberOfSlots$; -import org.apache.flink.runtime.messages.JobManagerMessages.RequestNumberRegisteredTaskManager$; -import org.apache.flink.runtime.messages.JobManagerMessages.CancellationResponse; -import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob; -import org.apache.flink.runtime.messages.JobManagerMessages.RequestAccumulatorResults; -import org.apache.flink.runtime.messages.JobManagerMessages.RequestJob; -import org.apache.flink.runtime.messages.JobManagerMessages.JobResponse; -import org.apache.flink.runtime.messages.JobManagerMessages.JobFound; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.Execution; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.instance.AllocatedSlot; -import org.apache.flink.runtime.jobgraph.JobID; -import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.util.EnvironmentInformation; -import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.StringUtils; -import org.eclipse.jetty.io.EofException; -import scala.concurrent.duration.FiniteDuration; - - -public class JobmanagerInfoServlet extends HttpServlet { - - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(JobmanagerInfoServlet.class); - - /** Underlying JobManager */ - private final ActorRef jobmanager; - private final ActorRef archive; - private final FiniteDuration timeout; - - - public JobmanagerInfoServlet(ActorRef jobmanager, ActorRef archive, FiniteDuration timeout) { - this.jobmanager = jobmanager; - this.archive = archive; - this.timeout = timeout; - } - - - @Override - protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { - - resp.setStatus(HttpServletResponse.SC_OK); - resp.setContentType("application/json"); - - try { - if("archive".equals(req.getParameter("get"))) { - List<ExecutionGraph> archivedJobs = new ArrayList<ExecutionGraph>(AkkaUtils - .<ArchivedJobs>ask(archive,RequestArchivedJobs$.MODULE$, timeout) - .asJavaCollection()); - - writeJsonForArchive(resp.getWriter(), archivedJobs); - } - else if("job".equals(req.getParameter("get"))) { - String jobId = req.getParameter("job"); - JobResponse response = AkkaUtils.ask(archive, - new RequestJob(JobID.fromHexString(jobId)), timeout); - - if(response instanceof JobFound){ - ExecutionGraph archivedJob = ((JobFound)response).executionGraph(); - writeJsonForArchivedJob(resp.getWriter(), archivedJob); - }else{ - LOG.warn("DoGet:job: Could not find job for job ID " + jobId); - } - } - else if("groupvertex".equals(req.getParameter("get"))) { - String jobId = req.getParameter("job"); - String groupvertexId = req.getParameter("groupvertex"); - - JobResponse response = AkkaUtils.ask(archive, - new RequestJob(JobID.fromHexString(jobId)), timeout); - - if(response instanceof JobFound && groupvertexId != null){ - ExecutionGraph archivedJob = ((JobFound)response).executionGraph(); - - writeJsonForArchivedJobGroupvertex(resp.getWriter(), archivedJob, - JobVertexID.fromHexString(groupvertexId)); - }else{ - LOG.warn("DoGet:groupvertex: Could not find job for job ID " + jobId); - } - } - else if("taskmanagers".equals(req.getParameter("get"))) { - int numberOfTaskManagers = AkkaUtils.<Integer>ask(jobmanager, - RequestNumberRegisteredTaskManager$.MODULE$, timeout); - int numberOfRegisteredSlots = AkkaUtils.<Integer>ask(jobmanager, - RequestTotalNumberOfSlots$.MODULE$, timeout); - - resp.getWriter().write("{\"taskmanagers\": " + numberOfTaskManagers +", " + - "\"slots\": "+numberOfRegisteredSlots+"}"); - } - else if("cancel".equals(req.getParameter("get"))) { - String jobId = req.getParameter("job"); - AkkaUtils.<CancellationResponse>ask(jobmanager, - new CancelJob(JobID.fromHexString(jobId)), timeout); - } - else if("updates".equals(req.getParameter("get"))) { - String jobId = req.getParameter("job"); - writeJsonUpdatesForJob(resp.getWriter(), JobID.fromHexString(jobId)); - } else if ("version".equals(req.getParameter("get"))) { - writeJsonForVersion(resp.getWriter()); - } - else{ - Iterable<ExecutionGraph> runningJobs = AkkaUtils.<RunningJobs>ask - (jobmanager, RequestRunningJobs$.MODULE$, timeout).asJavaIterable(); - writeJsonForJobs(resp.getWriter(), runningJobs); - } - - } catch (Exception e) { - resp.setStatus(HttpServletResponse.SC_BAD_REQUEST); - resp.getWriter().print(e.getMessage()); - if (LOG.isWarnEnabled()) { - LOG.warn(StringUtils.stringifyException(e)); - } - } - } - - /** - * Writes ManagementGraph as Json for all recent jobs - * - * @param wrt - * @param graphs - */ - private void writeJsonForJobs(PrintWriter wrt, Iterable<ExecutionGraph> graphs) { - try { - wrt.write("["); - - Iterator<ExecutionGraph> it = graphs.iterator(); - // Loop Jobs - while(it.hasNext()){ - ExecutionGraph graph = it.next(); - - writeJsonForJob(wrt, graph); - - //Write seperator between json objects - if(it.hasNext()) { - wrt.write(","); - } - } - wrt.write("]"); - - } catch (EofException eof) { // Connection closed by client - LOG.info("Info server for jobmanager: Connection closed by client, EofException"); - } catch (IOException ioe) { // Connection closed by client - LOG.info("Info server for jobmanager: Connection closed by client, IOException"); - } - } - - private void writeJsonForJob(PrintWriter wrt, ExecutionGraph graph) throws IOException { - //Serialize job to json - wrt.write("{"); - wrt.write("\"jobid\": \"" + graph.getJobID() + "\","); - wrt.write("\"jobname\": \"" + graph.getJobName()+"\","); - wrt.write("\"status\": \""+ graph.getState() + "\","); - wrt.write("\"time\": " + graph.getStatusTimestamp(graph.getState())+","); - - // Serialize ManagementGraph to json - wrt.write("\"groupvertices\": ["); - boolean first = true; - - for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) { - //Write seperator between json objects - if(first) { - first = false; - } else { - wrt.write(","); } - - wrt.write(JsonFactory.toJson(groupVertex)); - } - wrt.write("]"); - wrt.write("}"); - - } - - /** - * Writes Json with a list of currently archived jobs, sorted by time - * - * @param wrt - * @param graphs - */ - private void writeJsonForArchive(PrintWriter wrt, List<ExecutionGraph> graphs) { - - wrt.write("["); - - // sort jobs by time - Collections.sort(graphs, new Comparator<ExecutionGraph>() { - @Override - public int compare(ExecutionGraph o1, ExecutionGraph o2) { - if(o1.getStatusTimestamp(o1.getState()) < o2.getStatusTimestamp(o2.getState())) { - return 1; - } else { - return -1; - } - } - - }); - - // Loop Jobs - for (int i = 0; i < graphs.size(); i++) { - ExecutionGraph graph = graphs.get(i); - - //Serialize job to json - wrt.write("{"); - wrt.write("\"jobid\": \"" + graph.getJobID() + "\","); - wrt.write("\"jobname\": \"" + graph.getJobName()+"\","); - wrt.write("\"status\": \""+ graph.getState() + "\","); - wrt.write("\"time\": " + graph.getStatusTimestamp(graph.getState())); - - wrt.write("}"); - - //Write seperator between json objects - if(i != graphs.size() - 1) { - wrt.write(","); - } - } - wrt.write("]"); - - } - - /** - * Writes infos about archived job in Json format, including groupvertices and groupverticetimes - * - * @param wrt - * @param graph - */ - private void writeJsonForArchivedJob(PrintWriter wrt, ExecutionGraph graph) { - - try { - - wrt.write("["); - - //Serialize job to json - wrt.write("{"); - wrt.write("\"jobid\": \"" + graph.getJobID() + "\","); - wrt.write("\"jobname\": \"" + graph.getJobName()+"\","); - wrt.write("\"status\": \""+ graph.getState() + "\","); - wrt.write("\"SCHEDULED\": "+ graph.getStatusTimestamp(JobStatus.CREATED) + ","); - wrt.write("\"RUNNING\": "+ graph.getStatusTimestamp(JobStatus.RUNNING) + ","); - wrt.write("\"FINISHED\": "+ graph.getStatusTimestamp(JobStatus.FINISHED) + ","); - wrt.write("\"FAILED\": "+ graph.getStatusTimestamp(JobStatus.FAILED) + ","); - wrt.write("\"CANCELED\": "+ graph.getStatusTimestamp(JobStatus.CANCELED) + ","); - - if (graph.getState() == JobStatus.FAILED) { - wrt.write("\"failednodes\": ["); - boolean first = true; - for (ExecutionVertex vertex : graph.getAllExecutionVertices()) { - if (vertex.getExecutionState() == ExecutionState.FAILED) { - AllocatedSlot slot = vertex.getCurrentAssignedResource(); - Throwable failureCause = vertex.getFailureCause(); - if (slot != null || failureCause != null) { - if (first) { - first = false; - } else { - wrt.write(","); - } - wrt.write("{"); - wrt.write("\"node\": \"" + (slot == null ? "(none)" : slot - .getInstance().getInstanceConnectionInfo().getFQDNHostname()) + "\","); - wrt.write("\"message\": \"" + (failureCause == null ? "" : StringUtils.escapeHtml(ExceptionUtils.stringifyException(failureCause))) + "\""); - wrt.write("}"); - } - } - } - wrt.write("],"); - } - - // Serialize ManagementGraph to json - wrt.write("\"groupvertices\": ["); - boolean first = true; - for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) { - //Write seperator between json objects - if(first) { - first = false; - } else { - wrt.write(","); } - - wrt.write(JsonFactory.toJson(groupVertex)); - - } - wrt.write("],"); - - // write accumulators - AccumulatorResultsResponse response = AkkaUtils.ask(jobmanager, - new RequestAccumulatorResults(graph.getJobID()), timeout); - - if(response instanceof AccumulatorResultsFound){ - Map<String, Object> accMap = ((AccumulatorResultsFound)response).asJavaMap(); - - wrt.write("\n\"accumulators\": ["); - int i = 0; - for( Entry<String, Object> accumulator : accMap.entrySet()) { - wrt.write("{ \"name\": \""+accumulator.getKey()+" (" + accumulator.getValue().getClass().getName()+")\"," - + " \"value\": \""+accumulator.getValue().toString()+"\"}\n"); - if(++i < accMap.size()) { - wrt.write(","); - } - } - wrt.write("],\n"); - - wrt.write("\"groupverticetimes\": {"); - first = true; - for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) { - - if(first) { - first = false; - } else { - wrt.write(","); } - - // Calculate start and end time for groupvertex - long started = Long.MAX_VALUE; - long ended = 0; - - // Take earliest running state and latest endstate of groupmembers - for (ExecutionVertex vertex : groupVertex.getTaskVertices()) { - - long running = vertex.getStateTimestamp(ExecutionState.RUNNING); - if (running != 0 && running < started) { - started = running; - } - - long finished = vertex.getStateTimestamp(ExecutionState.FINISHED); - long canceled = vertex.getStateTimestamp(ExecutionState.CANCELED); - long failed = vertex.getStateTimestamp(ExecutionState.FAILED); - - if(finished != 0 && finished > ended) { - ended = finished; - } - - if(canceled != 0 && canceled > ended) { - ended = canceled; - } - - if(failed != 0 && failed > ended) { - ended = failed; - } - - } - - wrt.write("\""+groupVertex.getJobVertexId()+"\": {"); - wrt.write("\"groupvertexid\": \"" + groupVertex.getJobVertexId() + "\","); - wrt.write("\"groupvertexname\": \"" + groupVertex + "\","); - wrt.write("\"STARTED\": "+ started + ","); - wrt.write("\"ENDED\": "+ ended); - wrt.write("}"); - - } - }else{ - LOG.warn("Could not find accumulator results for job ID " + graph.getJobID()); - } - - wrt.write("}"); - - wrt.write("}"); - - - wrt.write("]"); - - } catch (EofException eof) { // Connection closed by client - LOG.info("Info server for jobmanager: Connection closed by client, EofException"); - } catch (IOException ioe) { // Connection closed by client - LOG.info("Info server for jobmanager: Connection closed by client, IOException"); - } - - } - - - /** - * Writes all updates (events) for a given job since a given time - * - * @param wrt - * @param jobId - */ - private void writeJsonUpdatesForJob(PrintWriter wrt, JobID jobId) { - - try { - Iterable<ExecutionGraph> graphs = AkkaUtils.<RunningJobs>ask(jobmanager, - RequestRunningJobs$.MODULE$, timeout).asJavaIterable(); - - //Serialize job to json - wrt.write("{"); - wrt.write("\"jobid\": \"" + jobId + "\","); - wrt.write("\"timestamp\": \"" + System.currentTimeMillis() + "\","); - wrt.write("\"recentjobs\": ["); - - boolean first = true; - - for(ExecutionGraph g : graphs){ - if(first){ - first = false; - }else{ - wrt.write(","); - } - - wrt.write("\"" + g.getJobID() + "\""); - } - - wrt.write("],"); - - JobResponse response = AkkaUtils.ask(jobmanager, new RequestJob(jobId), timeout); - - if(response instanceof JobFound){ - ExecutionGraph graph = ((JobFound)response).executionGraph(); - - wrt.write("\"vertexevents\": ["); - - first = true; - for (ExecutionVertex ev : graph.getAllExecutionVertices()) { - if (first) { - first = false; - } else { - wrt.write(","); - } - - wrt.write("{"); - wrt.write("\"vertexid\": \"" + ev.getCurrentExecutionAttempt().getAttemptId() - + "\","); - wrt.write("\"newstate\": \"" + ev.getExecutionState() + "\","); - wrt.write("\"timestamp\": \"" + ev.getStateTimestamp(ev.getExecutionState()) - + "\""); - wrt.write("}"); - } - - wrt.write("],"); - - wrt.write("\"jobevents\": ["); - - wrt.write("{"); - wrt.write("\"newstate\": \"" + graph.getState() + "\","); - wrt.write("\"timestamp\": \"" + graph.getStatusTimestamp(graph.getState()) + "\""); - wrt.write("}"); - - wrt.write("]"); - - wrt.write("}"); - }else{ - wrt.write("\"vertexevents\": [],"); - wrt.write("\"jobevents\": ["); - wrt.write("{"); - wrt.write("\"newstate\": \"" + JobStatus.FINISHED + "\","); - wrt.write("\"timestamp\": \"" + System.currentTimeMillis() + "\""); - wrt.write("}"); - wrt.write("]"); - wrt.write("}"); - LOG.warn("WriteJsonUpdatesForJob: Could not find job with job ID " + jobId); - } - } catch (EofException eof) { // Connection closed by client - LOG.info("Info server for jobmanager: Connection closed by client, EofException"); - } catch (IOException ioe) { // Connection closed by client - LOG.info("Info server for jobmanager: Connection closed by client, IOException"); - } - - } - - /** - * Writes info about one particular archived JobVertex in a job, including all member execution vertices, their times and statuses. - */ - private void writeJsonForArchivedJobGroupvertex(PrintWriter wrt, ExecutionGraph graph, - JobVertexID vertexId) { - ExecutionJobVertex jobVertex = graph.getJobVertex(vertexId); - - // Serialize ManagementGraph to json - wrt.write("{\"groupvertex\": " + JsonFactory.toJson(jobVertex) + ","); - - wrt.write("\"verticetimes\": {"); - boolean first = true; - for (ExecutionJobVertex groupVertex : graph.getAllVertices().values()) { - - for (ExecutionVertex vertex : groupVertex.getTaskVertices()) { - - Execution exec = vertex.getCurrentExecutionAttempt(); - - if(first) { - first = false; - } else { - wrt.write(","); } - - wrt.write("\""+exec.getAttemptId() +"\": {"); - wrt.write("\"vertexid\": \"" + exec.getAttemptId() + "\","); - wrt.write("\"vertexname\": \"" + vertex + "\","); - wrt.write("\"CREATED\": "+ vertex.getStateTimestamp(ExecutionState.CREATED) + ","); - wrt.write("\"SCHEDULED\": "+ vertex.getStateTimestamp(ExecutionState.SCHEDULED) + ","); - wrt.write("\"DEPLOYING\": "+ vertex.getStateTimestamp(ExecutionState.DEPLOYING) + ","); - wrt.write("\"RUNNING\": "+ vertex.getStateTimestamp(ExecutionState.RUNNING) + ","); - wrt.write("\"FINISHED\": "+ vertex.getStateTimestamp(ExecutionState.FINISHED) + ","); - wrt.write("\"CANCELING\": "+ vertex.getStateTimestamp(ExecutionState.CANCELING) + ","); - wrt.write("\"CANCELED\": "+ vertex.getStateTimestamp(ExecutionState.CANCELED) + ","); - wrt.write("\"FAILED\": "+ vertex.getStateTimestamp(ExecutionState.FAILED) + ""); - wrt.write("}"); - } - - } - wrt.write("}}"); - } - - /** - * Writes the version and the revision of Flink. - * - * @param wrt - */ - private void writeJsonForVersion(PrintWriter wrt) { - wrt.write("{"); - wrt.write("\"version\": \"" + EnvironmentInformation.getVersion() + "\","); - wrt.write("\"revision\": \"" + EnvironmentInformation.getRevisionInformation().commitId + "\""); - wrt.write("}"); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java index 103c1be..8e46692 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java @@ -22,7 +22,7 @@ import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.executiongraph.IntermediateResult; -import org.apache.flink.runtime.instance.AllocatedSlot; +import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.util.StringUtils; import java.util.HashMap; @@ -38,7 +38,7 @@ public class JsonFactory { json.append("\"vertexname\": \"" + StringUtils.escapeHtml(vertex.getSimpleName()) + "\","); json.append("\"vertexstatus\": \"" + vertex.getExecutionState() + "\","); - AllocatedSlot slot = vertex.getCurrentAssignedResource(); + SimpleSlot slot = vertex.getCurrentAssignedResource(); String instanceName = slot == null ? "(null)" : slot.getInstance() .getInstanceConnectionInfo().getFQDNHostname(); http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java index 733cf5e..71347ee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java @@ -128,7 +128,7 @@ public class WebInfoServer { // ----- the handlers for the servlets ----- ServletContextHandler servletContext = new ServletContextHandler(ServletContextHandler.SESSIONS); servletContext.setContextPath("/"); - servletContext.addServlet(new ServletHolder(new JobmanagerInfoServlet(jobmanager, + servletContext.addServlet(new ServletHolder(new JobManagerInfoServlet(jobmanager, archive, timeout)), "/jobsInfo"); servletContext.addServlet(new ServletHolder(new LogfileInfoServlet(logDirFiles)), "/logInfo"); servletContext.addServlet(new ServletHolder(new SetupInfoServlet(config, jobmanager, timeout)), http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java index 3a53801..ce7db25 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java @@ -26,12 +26,11 @@ import java.util.Set; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.instance.AllocatedSlot; import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.profiling.impl.types.InternalInstanceProfilingData; import org.apache.flink.runtime.profiling.types.InstanceSummaryProfilingEvent; - public class JobProfilingData { private final ExecutionGraph executionGraph; @@ -59,7 +58,7 @@ public class JobProfilingData { public boolean addIfInstanceIsAllocatedByJob(InternalInstanceProfilingData instanceProfilingData) { for (ExecutionVertex executionVertex : this.executionGraph.getAllExecutionVertices()) { - AllocatedSlot slot = executionVertex.getCurrentAssignedResource(); + SimpleSlot slot = executionVertex.getCurrentAssignedResource(); if (slot != null && slot.getInstance().getPath().equals( instanceProfilingData.getInstancePath())) { @@ -76,7 +75,7 @@ public class JobProfilingData { final Set<Instance> tempSet = new HashSet<Instance>(); for (ExecutionVertex executionVertex : this.executionGraph.getAllExecutionVertices()) { - AllocatedSlot slot = executionVertex.getCurrentAssignedResource(); + SimpleSlot slot = executionVertex.getCurrentAssignedResource(); if (slot != null) { tempSet.add(slot.getInstance()); } http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index c780589..532f7f8 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -437,6 +437,7 @@ class JobManager(val configuration: Configuration) case Terminated(taskManager) => { log.info("Task manager {} terminated.", taskManager.path) + JobManager.LOG.warn(s"Task manager ${taskManager.path} terminated.") instanceManager.unregisterTaskManager(taskManager) context.unwatch(taskManager) } http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index f4c95c9..4ee28e6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -42,8 +42,8 @@ import org.apache.flink.runtime.deployment.PartitionConsumerDeploymentDescriptor import org.apache.flink.runtime.deployment.PartitionDeploymentDescriptor; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.instance.AllocatedSlot; import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.jobgraph.AbstractJobVertex; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobID; @@ -120,7 +120,7 @@ public class ExecutionGraphDeploymentTest { final Instance instance = getInstance(simpleTaskManager); - final AllocatedSlot slot = instance.allocateSlot(jobId); + final SimpleSlot slot = instance.allocateSimpleSlot(jobId); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index 21caca0..e8e1f7e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -33,11 +33,11 @@ import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.instance.AllocatedSlot; import org.apache.flink.runtime.instance.HardwareDescription; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.InstanceConnectionInfo; import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.jobgraph.AbstractJobVertex; import org.apache.flink.runtime.jobgraph.JobID; import org.apache.flink.runtime.jobgraph.JobStatus; @@ -68,7 +68,7 @@ public class ExecutionGraphTestUtils { } } - public static void setVertexResource(ExecutionVertex vertex, AllocatedSlot slot) { + public static void setVertexResource(ExecutionVertex vertex, SimpleSlot slot) { try { Execution exec = vertex.getCurrentExecutionAttempt(); http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java index 4a8c69b..2f1af70 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java @@ -30,7 +30,7 @@ import akka.actor.Props; import akka.testkit.JavaTestKit; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.instance.AllocatedSlot; +import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.jobgraph.AbstractJobVertex; import org.apache.flink.runtime.jobgraph.JobID; import org.apache.flink.runtime.jobgraph.JobStatus; @@ -76,7 +76,7 @@ public class ExecutionStateProgressTest { // mock resources and mock taskmanager ActorRef taskManager = system.actorOf(Props.create(SimpleAcknowledgingTaskManager.class)); for (ExecutionVertex ee : ejv.getTaskVertices()) { - AllocatedSlot slot = getInstance(taskManager).allocateSlot(jid); + SimpleSlot slot = getInstance(taskManager).allocateSimpleSlot(jid); ee.deployToSlot(slot); } http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java index 3c1b174..ee89954 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java @@ -37,7 +37,7 @@ import akka.testkit.JavaTestKit; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.instance.AllocatedSlot; +import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.jobgraph.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -148,7 +148,7 @@ public class ExecutionVertexCancelTest { new TaskOperationResult(execId, false)))); Instance instance = getInstance(taskManager); - AllocatedSlot slot = instance.allocateSlot(new JobID()); + SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); vertex.deployToSlot(slot); @@ -223,7 +223,7 @@ public class ExecutionVertexCancelTest { TaskOperationResult(execId, false), new TaskOperationResult(execId, true)))); Instance instance = getInstance(taskManager); - AllocatedSlot slot = instance.allocateSlot(new JobID()); + SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); vertex.deployToSlot(slot); @@ -296,7 +296,7 @@ public class ExecutionVertexCancelTest { TaskOperationResult(execId, true)))); Instance instance = getInstance(taskManager); - AllocatedSlot slot = instance.allocateSlot(new JobID()); + SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); setVertexState(vertex, ExecutionState.RUNNING); setVertexResource(vertex, slot); @@ -344,7 +344,7 @@ public class ExecutionVertexCancelTest { TaskOperationResult(execId, true)))); Instance instance = getInstance(taskManager); - AllocatedSlot slot = instance.allocateSlot(new JobID()); + SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); setVertexState(vertex, ExecutionState.RUNNING); setVertexResource(vertex, slot); @@ -400,7 +400,7 @@ public class ExecutionVertexCancelTest { TaskOperationResult(execId, false)))); Instance instance = getInstance(taskManager); - AllocatedSlot slot = instance.allocateSlot(new JobID()); + SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); setVertexState(vertex, ExecutionState.RUNNING); setVertexResource(vertex, slot); @@ -441,7 +441,7 @@ public class ExecutionVertexCancelTest { CancelSequenceTaskManagerCreator())); Instance instance = getInstance(taskManager); - AllocatedSlot slot = instance.allocateSlot(new JobID()); + SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); setVertexState(vertex, ExecutionState.RUNNING); setVertexResource(vertex, slot); @@ -487,7 +487,7 @@ public class ExecutionVertexCancelTest { ))); Instance instance = getInstance(taskManager); - AllocatedSlot slot = instance.allocateSlot(new JobID()); + SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); setVertexState(vertex, ExecutionState.RUNNING); setVertexResource(vertex, slot); @@ -544,7 +544,7 @@ public class ExecutionVertexCancelTest { // the scheduler (or any caller) needs to know that the slot should be released try { Instance instance = getInstance(ActorRef.noSender()); - AllocatedSlot slot = instance.allocateSlot(new JobID()); + SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); vertex.deployToSlot(slot); fail("Method should throw an exception"); @@ -587,7 +587,7 @@ public class ExecutionVertexCancelTest { setVertexState(vertex, ExecutionState.CANCELING); Instance instance = getInstance(ActorRef.noSender()); - AllocatedSlot slot = instance.allocateSlot(new JobID()); + SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); vertex.deployToSlot(slot); fail("Method should throw an exception"); @@ -601,7 +601,7 @@ public class ExecutionVertexCancelTest { AkkaUtils.DEFAULT_TIMEOUT()); Instance instance = getInstance(ActorRef.noSender()); - AllocatedSlot slot = instance.allocateSlot(new JobID()); + SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); setVertexResource(vertex, slot); setVertexState(vertex, ExecutionState.CANCELING); http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java index 330292b..c0d1db8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java @@ -29,8 +29,8 @@ import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.instance.AllocatedSlot; import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.jobgraph.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult; @@ -64,7 +64,7 @@ public class ExecutionVertexDeploymentTest { // mock taskmanager to simply accept the call Instance instance = getInstance(tm); - final AllocatedSlot slot = instance.allocateSlot(new JobID()); + final SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); final ExecutionJobVertex ejv = getExecutionVertex(jid); @@ -106,8 +106,8 @@ public class ExecutionVertexDeploymentTest { Props.create(SimpleAcknowledgingTaskManager.class)); final Instance instance = getInstance(simpleTaskManager); + final SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); - final AllocatedSlot slot = instance.allocateSlot(new JobID()); final ExecutionJobVertex ejv = getExecutionVertex(jid); @@ -151,7 +151,7 @@ public class ExecutionVertexDeploymentTest { final Instance instance = getInstance(simpleTaskManager); - final AllocatedSlot slot = instance.allocateSlot(new JobID()); + final SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); final ExecutionJobVertex ejv = getExecutionVertex(jid); @@ -206,7 +206,7 @@ public class ExecutionVertexDeploymentTest { Props.create(SimpleFailingTaskManager.class)); final Instance instance = getInstance(simpleTaskManager); - final AllocatedSlot slot = instance.allocateSlot(new JobID()); + final SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); final ExecutionJobVertex ejv = getExecutionVertex(jid); @@ -242,8 +242,8 @@ public class ExecutionVertexDeploymentTest { Props.create(SimpleFailingTaskManager.class)); final Instance instance = getInstance(simpleTaskManager); + final SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); - final AllocatedSlot slot = instance.allocateSlot(new JobID()); final ExecutionJobVertex ejv = getExecutionVertex(jid); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], @@ -289,9 +289,8 @@ public class ExecutionVertexDeploymentTest { final TestActorRef simpleTaskManager = TestActorRef.create(system, Props.create(SimpleAcknowledgingTaskManager.class)); - final Instance instance = getInstance(simpleTaskManager); - final AllocatedSlot slot = instance.allocateSlot(new JobID()); + final SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); final ExecutionJobVertex ejv = getExecutionVertex(jid); @@ -343,8 +342,7 @@ public class ExecutionVertexDeploymentTest { TaskOperationResult(eid, false), new TaskOperationResult(eid, true)))); final Instance instance = getInstance(simpleTaskManager); - - final AllocatedSlot slot = instance.allocateSlot(new JobID()); + final SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java index 24ac44b..8230433 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java @@ -30,8 +30,8 @@ import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.instance.AllocatedSlot; import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.jobgraph.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; @@ -65,7 +65,7 @@ public class ExecutionVertexSchedulingTest { try { // a slot than cannot be deployed to final Instance instance = getInstance(ActorRef.noSender()); - final AllocatedSlot slot = instance.allocateSlot(new JobID()); + final SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); slot.cancel(); assertFalse(slot.isReleased()); @@ -96,7 +96,7 @@ public class ExecutionVertexSchedulingTest { try { // a slot than cannot be deployed to final Instance instance = getInstance(ActorRef.noSender()); - final AllocatedSlot slot = instance.allocateSlot(new JobID()); + final SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); slot.cancel(); assertFalse(slot.isReleased()); @@ -136,7 +136,7 @@ public class ExecutionVertexSchedulingTest { .SimpleAcknowledgingTaskManager.class)); final Instance instance = getInstance(tm); - final AllocatedSlot slot = instance.allocateSlot(new JobID()); + final SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); final ExecutionJobVertex ejv = getExecutionVertex(new JobVertexID()); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java index 15cdefb..94bdef1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java @@ -36,7 +36,7 @@ public class AllocatedSlotTest { try { // cancel, then release { - AllocatedSlot slot = getSlot(); + SimpleSlot slot = getSlot(); assertTrue(slot.isAlive()); slot.cancel(); @@ -52,7 +52,7 @@ public class AllocatedSlotTest { // release immediately { - AllocatedSlot slot = getSlot(); + SimpleSlot slot = getSlot(); assertTrue(slot.isAlive()); slot.releaseSlot(); @@ -75,32 +75,32 @@ public class AllocatedSlotTest { // assign to alive slot { - AllocatedSlot slot = getSlot(); + SimpleSlot slot = getSlot(); assertTrue(slot.setExecutedVertex(ev)); - assertEquals(ev, slot.getExecutedVertex()); + assertEquals(ev, slot.getExecution()); // try to add another one assertFalse(slot.setExecutedVertex(ev_2)); - assertEquals(ev, slot.getExecutedVertex()); + assertEquals(ev, slot.getExecution()); } // assign to canceled slot { - AllocatedSlot slot = getSlot(); + SimpleSlot slot = getSlot(); slot.cancel(); assertFalse(slot.setExecutedVertex(ev)); - assertNull(slot.getExecutedVertex()); + assertNull(slot.getExecution()); } // assign to released { - AllocatedSlot slot = getSlot(); + SimpleSlot slot = getSlot(); slot.releaseSlot(); assertFalse(slot.setExecutedVertex(ev)); - assertNull(slot.getExecutedVertex()); + assertNull(slot.getExecution()); } } catch (Exception e) { @@ -114,9 +114,9 @@ public class AllocatedSlotTest { try { Execution ev = mock(Execution.class); - AllocatedSlot slot = getSlot(); + SimpleSlot slot = getSlot(); assertTrue(slot.setExecutedVertex(ev)); - assertEquals(ev, slot.getExecutedVertex()); + assertEquals(ev, slot.getExecution()); slot.cancel(); slot.releaseSlot(); @@ -130,12 +130,12 @@ public class AllocatedSlotTest { } } - public static AllocatedSlot getSlot() throws Exception { + public static SimpleSlot getSlot() throws Exception { HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024); InetAddress address = InetAddress.getByName("127.0.0.1"); InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001); Instance instance = new Instance(ActorRef.noSender(), connection, new InstanceID(), hardwareDescription, 1); - return instance.allocateSlot(new JobID()); + return instance.allocateSimpleSlot(new JobID()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java index a7820a3..47f7a2b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java @@ -45,10 +45,10 @@ public class InstanceTest { assertEquals(4, instance.getNumberOfAvailableSlots()); assertEquals(0, instance.getNumberOfAllocatedSlots()); - AllocatedSlot slot1 = instance.allocateSlot(new JobID()); - AllocatedSlot slot2 = instance.allocateSlot(new JobID()); - AllocatedSlot slot3 = instance.allocateSlot(new JobID()); - AllocatedSlot slot4 = instance.allocateSlot(new JobID()); + SimpleSlot slot1 = instance.allocateSimpleSlot(new JobID()); + SimpleSlot slot2 = instance.allocateSimpleSlot(new JobID()); + SimpleSlot slot3 = instance.allocateSimpleSlot(new JobID()); + SimpleSlot slot4 = instance.allocateSimpleSlot(new JobID()); assertNotNull(slot1); assertNotNull(slot2); @@ -61,7 +61,7 @@ public class InstanceTest { slot3.getSlotNumber() + slot4.getSlotNumber()); // no more slots - assertNull(instance.allocateSlot(new JobID())); + assertNull(instance.allocateSimpleSlot(new JobID())); try { instance.returnAllocatedSlot(slot2); fail("instance accepted a non-cancelled slot."); @@ -109,9 +109,9 @@ public class InstanceTest { assertEquals(3, instance.getNumberOfAvailableSlots()); - AllocatedSlot slot1 = instance.allocateSlot(new JobID()); - AllocatedSlot slot2 = instance.allocateSlot(new JobID()); - AllocatedSlot slot3 = instance.allocateSlot(new JobID()); + SimpleSlot slot1 = instance.allocateSimpleSlot(new JobID()); + SimpleSlot slot2 = instance.allocateSimpleSlot(new JobID()); + SimpleSlot slot3 = instance.allocateSimpleSlot(new JobID()); instance.markDead(); @@ -139,9 +139,9 @@ public class InstanceTest { assertEquals(3, instance.getNumberOfAvailableSlots()); - AllocatedSlot slot1 = instance.allocateSlot(new JobID()); - AllocatedSlot slot2 = instance.allocateSlot(new JobID()); - AllocatedSlot slot3 = instance.allocateSlot(new JobID()); + SimpleSlot slot1 = instance.allocateSimpleSlot(new JobID()); + SimpleSlot slot2 = instance.allocateSimpleSlot(new JobID()); + SimpleSlot slot3 = instance.allocateSimpleSlot(new JobID()); instance.cancelAndReleaseAllSlots();
