http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/WebHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/WebHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/WebHandler.java new file mode 100644 index 0000000..58fda14 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/WebHandler.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler; + +/** + * Marker interface for web handlers which can describe their paths. + */ +public interface WebHandler { + + /** + * Returns an array of REST URL's under which this handler can be registered. + * + * @return array containing REST URL's under which this handler can be registered. + */ + String[] getPaths(); +}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java new file mode 100644 index 0000000..e214a36 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.concurrent.FlinkFutureException; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.runtime.rest.NotFoundException; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Base class for request handlers whose response depends on an ExecutionGraph + * that can be retrieved via "jobid" parameter. + */ +public abstract class AbstractExecutionGraphRequestHandler extends AbstractJsonRequestHandler { + + private final ExecutionGraphHolder executionGraphHolder; + + public AbstractExecutionGraphRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + super(executor); + this.executionGraphHolder = Preconditions.checkNotNull(executionGraphHolder); + } + + @Override + public CompletableFuture<String> handleJsonRequest( + Map<String, String> pathParams, + Map<String, String> queryParams, + JobManagerGateway jobManagerGateway) { + String jidString = pathParams.get("jobid"); + if (jidString == null) { + throw new RuntimeException("JobId parameter missing"); + } + + JobID jid; + try { + jid = JobID.fromHexString(jidString); + } + catch (Exception e) { + return FutureUtils.completedExceptionally(new FlinkException("Invalid JobID string '" + jidString + "'", e)); + } + + final CompletableFuture<Optional<AccessExecutionGraph>> graphFuture = executionGraphHolder.getExecutionGraph(jid, jobManagerGateway); + + return graphFuture.thenComposeAsync( + (Optional<AccessExecutionGraph> optGraph) -> { + if (optGraph.isPresent()) { + return handleRequest(optGraph.get(), pathParams); + } else { + throw new FlinkFutureException(new NotFoundException("Could not find job with jobId " + jid + '.')); + } + }, executor); + } + + public abstract CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params); +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJobVertexRequestHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJobVertexRequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJobVertexRequestHandler.java new file mode 100644 index 0000000..e2e4484 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJobVertexRequestHandler.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Base class for request handlers whose response depends on a specific job vertex (defined + * via the "vertexid" parameter) in a specific job, defined via (defined voa the "jobid" parameter). + */ +public abstract class AbstractJobVertexRequestHandler extends AbstractExecutionGraphRequestHandler { + + public AbstractJobVertexRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + super(executionGraphHolder, executor); + } + + @Override + public final CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) { + final JobVertexID vid = parseJobVertexId(params); + + final AccessExecutionJobVertex jobVertex = graph.getJobVertex(vid); + if (jobVertex == null) { + throw new IllegalArgumentException("No vertex with ID '" + vid + "' exists."); + } + + return handleRequest(jobVertex, params); + } + + /** + * Returns the job vertex ID parsed from the provided parameters. + * + * @param params Path parameters + * @return Parsed job vertex ID or <code>null</code> if not available. + */ + public static JobVertexID parseJobVertexId(Map<String, String> params) { + String jobVertexIdParam = params.get("vertexid"); + if (jobVertexIdParam == null) { + return null; + } + + try { + return JobVertexID.fromHexString(jobVertexIdParam); + } catch (RuntimeException ignored) { + return null; + } + } + + public abstract CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params); +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJsonRequestHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJsonRequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJsonRequestHandler.java new file mode 100644 index 0000000..43c4af3 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJsonRequestHandler.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.runtime.rest.NotFoundException; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; + +import java.nio.charset.Charset; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Base class for most request handlers. The handlers must produce a JSON response. + */ +public abstract class AbstractJsonRequestHandler implements RequestHandler { + + private static final Charset ENCODING = Charset.forName("UTF-8"); + + protected final Executor executor; + + protected AbstractJsonRequestHandler(Executor executor) { + this.executor = Preconditions.checkNotNull(executor); + } + + @Override + public CompletableFuture<FullHttpResponse> handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) { + CompletableFuture<String> resultFuture = handleJsonRequest(pathParams, queryParams, jobManagerGateway); + + return resultFuture.thenApplyAsync( + (String result) -> { + byte[] bytes = result.getBytes(ENCODING); + + DefaultFullHttpResponse response = new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(bytes)); + + response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ENCODING.name()); + response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes()); + + return response; + }); + } + + /** + * Core method that handles the request and generates the response. The method needs to + * respond with a valid JSON string. Exceptions may be thrown and will be handled. + * + * @param pathParams The map of REST path parameters, decoded by the router. + * @param queryParams The map of query parameters. + * @param jobManagerGateway to communicate with the JobManager. + * + * @return The JSON string that is the HTTP response. + * + * @throws Exception Handlers may forward exceptions. Exceptions of type + * {@link NotFoundException} will cause a HTTP 404 + * response with the exception message, other exceptions will cause a HTTP 500 response + * with the exception stack trace. + */ + public abstract CompletableFuture<String> handleJsonRequest( + Map<String, String> pathParams, + Map<String, String> queryParams, + JobManagerGateway jobManagerGateway); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskAttemptRequestHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskAttemptRequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskAttemptRequestHandler.java new file mode 100644 index 0000000..ec277d8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskAttemptRequestHandler.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.AccessExecution; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.util.FlinkException; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Base class for request handlers whose response depends on a specific subtask execution attempt + * (defined via the "attempt" parameter) of a specific subtask (defined via the + * "subtasknum" parameter) in a specific job vertex (defined via the "vertexid" parameter) in a + * specific job, defined via (defined voa the "jobid" parameter). + */ +public abstract class AbstractSubtaskAttemptRequestHandler extends AbstractSubtaskRequestHandler { + + public AbstractSubtaskAttemptRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + super(executionGraphHolder, executor); + } + + @Override + public CompletableFuture<String> handleRequest(AccessExecutionVertex vertex, Map<String, String> params) { + final String attemptNumberString = params.get("attempt"); + if (attemptNumberString == null) { + return FutureUtils.completedExceptionally(new FlinkException("Attempt number parameter missing")); + } + + final int attempt; + try { + attempt = Integer.parseInt(attemptNumberString); + } + catch (NumberFormatException e) { + return FutureUtils.completedExceptionally(new FlinkException("Invalid attempt number parameter")); + } + + final AccessExecution currentAttempt = vertex.getCurrentExecutionAttempt(); + if (attempt == currentAttempt.getAttemptNumber()) { + return handleRequest(currentAttempt, params); + } + else if (attempt >= 0 && attempt < currentAttempt.getAttemptNumber()) { + AccessExecution exec = vertex.getPriorExecutionAttempt(attempt); + + if (exec != null) { + return handleRequest(exec, params); + } else { + return FutureUtils.completedExceptionally(new RequestHandlerException("Execution for attempt " + attempt + + " has already been deleted.")); + } + } + else { + return FutureUtils.completedExceptionally(new FlinkException("Attempt does not exist: " + attempt)); + } + } + + public abstract CompletableFuture<String> handleRequest(AccessExecution execAttempt, Map<String, String> params); +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskRequestHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskRequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskRequestHandler.java new file mode 100644 index 0000000..d69038a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskRequestHandler.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.util.FlinkException; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Base class for request handlers whose response depends on a specific subtask (defined via the + * "subtasknum" parameter) in a specific job vertex (defined via the "vertexid" parameter) in a + * specific job, defined via (defined voa the "jobid" parameter). + */ +public abstract class AbstractSubtaskRequestHandler extends AbstractJobVertexRequestHandler { + + public AbstractSubtaskRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + super(executionGraphHolder, executor); + } + + @Override + public final CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) { + final String subtaskNumberString = params.get("subtasknum"); + if (subtaskNumberString == null) { + return FutureUtils.completedExceptionally(new FlinkException("Subtask number parameter missing")); + } + + final int subtask; + try { + subtask = Integer.parseInt(subtaskNumberString); + } + catch (NumberFormatException e) { + return FutureUtils.completedExceptionally(new FlinkException("Invalid subtask number parameter", e)); + } + + if (subtask < 0 || subtask >= jobVertex.getParallelism()) { + return FutureUtils.completedExceptionally(new FlinkException("subtask does not exist: " + subtask)); + } + + final AccessExecutionVertex vertex = jobVertex.getTaskVertices()[subtask]; + return handleRequest(vertex, params); + } + + public abstract CompletableFuture<String> handleRequest(AccessExecutionVertex vertex, Map<String, String> params); +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java new file mode 100644 index 0000000..db13633 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.FlinkFutureException; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.runtime.messages.webmonitor.StatusOverview; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.util.FlinkException; + +import com.fasterxml.jackson.core.JsonGenerator; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Responder that returns the status of the Flink cluster, such as how many + * TaskManagers are currently connected, and how many jobs are running. + */ +public class ClusterOverviewHandler extends AbstractJsonRequestHandler { + + private static final String CLUSTER_OVERVIEW_REST_PATH = "/overview"; + + private static final String version = EnvironmentInformation.getVersion(); + + private static final String commitID = EnvironmentInformation.getRevisionInformation().commitId; + + private final Time timeout; + + public ClusterOverviewHandler(Executor executor, Time timeout) { + super(executor); + this.timeout = checkNotNull(timeout); + } + + @Override + public String[] getPaths() { + return new String[]{CLUSTER_OVERVIEW_REST_PATH}; + } + + @Override + public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) { + // we need no parameters, get all requests + try { + if (jobManagerGateway != null) { + CompletableFuture<StatusOverview> overviewFuture = jobManagerGateway.requestStatusOverview(timeout); + + return overviewFuture.thenApplyAsync( + (StatusOverview overview) -> { + StringWriter writer = new StringWriter(); + try { + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + + gen.writeStartObject(); + gen.writeNumberField("taskmanagers", overview.getNumTaskManagersConnected()); + gen.writeNumberField("slots-total", overview.getNumSlotsTotal()); + gen.writeNumberField("slots-available", overview.getNumSlotsAvailable()); + gen.writeNumberField("jobs-running", overview.getNumJobsRunningOrPending()); + gen.writeNumberField("jobs-finished", overview.getNumJobsFinished()); + gen.writeNumberField("jobs-cancelled", overview.getNumJobsCancelled()); + gen.writeNumberField("jobs-failed", overview.getNumJobsFailed()); + gen.writeStringField("flink-version", version); + if (!commitID.equals(EnvironmentInformation.UNKNOWN)) { + gen.writeStringField("flink-commit", commitID); + } + gen.writeEndObject(); + + gen.close(); + return writer.toString(); + } catch (IOException exception) { + throw new FlinkFutureException("Could not write cluster overview.", exception); + } + }, + executor); + } else { + throw new Exception("No connection to the leading JobManager."); + } + } + catch (Exception e) { + return FutureUtils.completedExceptionally(new FlinkException("Failed to fetch list of all running jobs: ", e)); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ConstantTextHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ConstantTextHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ConstantTextHandler.java new file mode 100644 index 0000000..57214f0 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ConstantTextHandler.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.configuration.ConfigConstants; + +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed; + +/** + * Responder that returns a constant String. + */ [email protected] +public class ConstantTextHandler extends SimpleChannelInboundHandler<Routed> { + + private final byte[] encodedText; + + public ConstantTextHandler(String text) { + this.encodedText = text.getBytes(ConfigConstants.DEFAULT_CHARSET); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception { + HttpResponse response = new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(encodedText)); + + response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, encodedText.length); + response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain"); + + KeepAliveWrite.flush(ctx, routed.request(), response); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java new file mode 100644 index 0000000..07d9707 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.FlinkFutureException; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview; + +import com.fasterxml.jackson.core.JsonGenerator; + +import java.io.StringWriter; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +import static java.util.Objects.requireNonNull; + +/** + * Responder that returns with a list of all JobIDs of jobs found at the target actor. + * May serve the IDs of current jobs, or past jobs, depending on whether this handler is + * given the JobManager or Archive Actor Reference. + */ +public class CurrentJobIdsHandler extends AbstractJsonRequestHandler { + + private static final String CURRENT_JOB_IDS_REST_PATH = "/jobs"; + + private final Time timeout; + + public CurrentJobIdsHandler(Executor executor, Time timeout) { + super(executor); + this.timeout = requireNonNull(timeout); + } + + @Override + public String[] getPaths() { + return new String[]{CURRENT_JOB_IDS_REST_PATH}; + } + + @Override + public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) { + return CompletableFuture.supplyAsync( + () -> { + // we need no parameters, get all requests + try { + if (jobManagerGateway != null) { + CompletableFuture<JobsWithIDsOverview> overviewFuture = jobManagerGateway.requestJobsOverview(timeout); + JobsWithIDsOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + + gen.writeStartObject(); + + gen.writeArrayFieldStart("jobs-running"); + for (JobID jid : overview.getJobsRunningOrPending()) { + gen.writeString(jid.toString()); + } + gen.writeEndArray(); + + gen.writeArrayFieldStart("jobs-finished"); + for (JobID jid : overview.getJobsFinished()) { + gen.writeString(jid.toString()); + } + gen.writeEndArray(); + + gen.writeArrayFieldStart("jobs-cancelled"); + for (JobID jid : overview.getJobsCancelled()) { + gen.writeString(jid.toString()); + } + gen.writeEndArray(); + + gen.writeArrayFieldStart("jobs-failed"); + for (JobID jid : overview.getJobsFailed()) { + gen.writeString(jid.toString()); + } + gen.writeEndArray(); + + gen.writeEndObject(); + + gen.close(); + return writer.toString(); + } + else { + throw new Exception("No connection to the leading JobManager."); + } + } + catch (Exception e) { + throw new FlinkFutureException("Failed to fetch list of all running jobs.", e); + } + }, + executor); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java new file mode 100644 index 0000000..6f85320 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.FlinkFutureException; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; + +import com.fasterxml.jackson.core.JsonGenerator; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Request handler that returns a summary of the job status. + */ +public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler { + + private static final String ALL_JOBS_REST_PATH = "/joboverview"; + private static final String RUNNING_JOBS_REST_PATH = "/joboverview/running"; + private static final String COMPLETED_JOBS_REST_PATH = "/joboverview/completed"; + + private final Time timeout; + + private final boolean includeRunningJobs; + private final boolean includeFinishedJobs; + + public CurrentJobsOverviewHandler( + Executor executor, + Time timeout, + boolean includeRunningJobs, + boolean includeFinishedJobs) { + + super(executor); + this.timeout = checkNotNull(timeout); + this.includeRunningJobs = includeRunningJobs; + this.includeFinishedJobs = includeFinishedJobs; + } + + @Override + public String[] getPaths() { + if (includeRunningJobs && includeFinishedJobs) { + return new String[]{ALL_JOBS_REST_PATH}; + } + if (includeRunningJobs) { + return new String[]{RUNNING_JOBS_REST_PATH}; + } else { + return new String[]{COMPLETED_JOBS_REST_PATH}; + } + } + + @Override + public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) { + if (jobManagerGateway != null) { + CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(includeRunningJobs, includeFinishedJobs, timeout); + + return jobDetailsFuture.thenApplyAsync( + (MultipleJobsDetails result) -> { + final long now = System.currentTimeMillis(); + + StringWriter writer = new StringWriter(); + try { + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + gen.writeStartObject(); + + if (includeRunningJobs && includeFinishedJobs) { + gen.writeArrayFieldStart("running"); + for (JobDetails detail : result.getRunningJobs()) { + writeJobDetailOverviewAsJson(detail, gen, now); + } + gen.writeEndArray(); + + gen.writeArrayFieldStart("finished"); + for (JobDetails detail : result.getFinishedJobs()) { + writeJobDetailOverviewAsJson(detail, gen, now); + } + gen.writeEndArray(); + } else { + gen.writeArrayFieldStart("jobs"); + for (JobDetails detail : includeRunningJobs ? result.getRunningJobs() : result.getFinishedJobs()) { + writeJobDetailOverviewAsJson(detail, gen, now); + } + gen.writeEndArray(); + } + + gen.writeEndObject(); + gen.close(); + return writer.toString(); + } catch (IOException e) { + throw new FlinkFutureException("Could not write current jobs overview json.", e); + } + }, + executor); + } + else { + return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager.")); + } + } + + /** + * Archivist for the CurrentJobsOverviewHandler. + */ + public static class CurrentJobsOverviewJsonArchivist implements JsonArchivist { + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + StringWriter writer = new StringWriter(); + try (JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer)) { + gen.writeStartObject(); + gen.writeArrayFieldStart("running"); + gen.writeEndArray(); + gen.writeArrayFieldStart("finished"); + writeJobDetailOverviewAsJson(WebMonitorUtils.createDetailsForJob(graph), gen, System.currentTimeMillis()); + gen.writeEndArray(); + gen.writeEndObject(); + } + String json = writer.toString(); + String path = ALL_JOBS_REST_PATH; + return Collections.singleton(new ArchivedJson(path, json)); + } + } + + public static void writeJobDetailOverviewAsJson(JobDetails details, JsonGenerator gen, long now) throws IOException { + gen.writeStartObject(); + + gen.writeStringField("jid", details.getJobId().toString()); + gen.writeStringField("name", details.getJobName()); + gen.writeStringField("state", details.getStatus().name()); + + gen.writeNumberField("start-time", details.getStartTime()); + gen.writeNumberField("end-time", details.getEndTime()); + gen.writeNumberField("duration", (details.getEndTime() <= 0 ? now : details.getEndTime()) - details.getStartTime()); + gen.writeNumberField("last-modification", details.getLastUpdateTime()); + + gen.writeObjectFieldStart("tasks"); + gen.writeNumberField("total", details.getNumTasks()); + + final int[] perState = details.getNumVerticesPerExecutionState(); + gen.writeNumberField("pending", perState[ExecutionState.CREATED.ordinal()] + + perState[ExecutionState.SCHEDULED.ordinal()] + + perState[ExecutionState.DEPLOYING.ordinal()]); + gen.writeNumberField("running", perState[ExecutionState.RUNNING.ordinal()]); + gen.writeNumberField("finished", perState[ExecutionState.FINISHED.ordinal()]); + gen.writeNumberField("canceling", perState[ExecutionState.CANCELING.ordinal()]); + gen.writeNumberField("canceled", perState[ExecutionState.CANCELED.ordinal()]); + gen.writeNumberField("failed", perState[ExecutionState.FAILED.ordinal()]); + gen.writeEndObject(); + + gen.writeEndObject(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandler.java new file mode 100644 index 0000000..e8854f4 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandler.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.runtime.util.EnvironmentInformation; + +import com.fasterxml.jackson.core.JsonGenerator; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.Map; +import java.util.TimeZone; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Responder that returns the parameters that define how the asynchronous requests + * against this web server should behave. It defines for example the refresh interval, + * and time zone of the server timestamps. + */ +public class DashboardConfigHandler extends AbstractJsonRequestHandler { + + private static final String DASHBOARD_CONFIG_REST_PATH = "/config"; + + private final String configString; + + public DashboardConfigHandler(Executor executor, long refreshInterval) { + super(executor); + try { + this.configString = createConfigJson(refreshInterval); + } + catch (Exception e) { + // should never happen + throw new RuntimeException(e.getMessage(), e); + } + } + + @Override + public String[] getPaths() { + return new String[]{DASHBOARD_CONFIG_REST_PATH}; + } + + @Override + public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) { + return CompletableFuture.completedFuture(configString); + } + + public static String createConfigJson(long refreshInterval) throws IOException { + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + + TimeZone timeZone = TimeZone.getDefault(); + String timeZoneName = timeZone.getDisplayName(); + long timeZoneOffset = timeZone.getRawOffset(); + + gen.writeStartObject(); + gen.writeNumberField("refresh-interval", refreshInterval); + gen.writeNumberField("timezone-offset", timeZoneOffset); + gen.writeStringField("timezone-name", timeZoneName); + gen.writeStringField("flink-version", EnvironmentInformation.getVersion()); + + EnvironmentInformation.RevisionInformation revision = EnvironmentInformation.getRevisionInformation(); + if (revision != null) { + gen.writeStringField("flink-revision", revision.commitId + " @ " + revision.commitDate); + } + + gen.writeEndObject(); + + gen.close(); + + return writer.toString(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphHolder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphHolder.java new file mode 100644 index 0000000..8a47e50 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphHolder.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; +import java.util.WeakHashMap; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Gateway to obtaining an {@link ExecutionGraph} from a source, like JobManager or Archive. + * + * <p>The holder will cache the ExecutionGraph behind a weak reference, which will be cleared + * at some point once no one else is pointing to the ExecutionGraph. + * Note that while the holder runs in the same JVM as the JobManager or Archive, the reference should + * stay valid. + */ +public class ExecutionGraphHolder { + + private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphHolder.class); + + private final Time timeout; + + private final WeakHashMap<JobID, AccessExecutionGraph> cache = new WeakHashMap<>(); + + public ExecutionGraphHolder(Time timeout) { + this.timeout = checkNotNull(timeout); + } + + /** + * Retrieves the execution graph with {@link JobID} jid wrapped in {@link Optional} or + * {@link Optional#empty()} if it cannot be found. + * + * @param jid jobID of the execution graph to be retrieved + * @return Optional ExecutionGraph if it has been retrievable, empty if there has been no ExecutionGraph + */ + public CompletableFuture<Optional<AccessExecutionGraph>> getExecutionGraph(JobID jid, JobManagerGateway jobManagerGateway) { + AccessExecutionGraph cached = cache.get(jid); + if (cached != null) { + if (cached.getState() == JobStatus.SUSPENDED) { + cache.remove(jid); + } else { + return CompletableFuture.completedFuture(Optional.of(cached)); + } + } + + CompletableFuture<Optional<AccessExecutionGraph>> executionGraphFuture = jobManagerGateway.requestJob(jid, timeout); + + executionGraphFuture.thenAcceptAsync( + optExecutionGraph -> + optExecutionGraph.ifPresent(executionGraph -> cache.put(jid, executionGraph))); + + return executionGraphFuture; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandler.java new file mode 100644 index 0000000..0a3b050 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandler.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.concurrent.FlinkFutureException; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; + +import com.fasterxml.jackson.core.JsonGenerator; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Request handler that returns the aggregated user accumulators of a job. + */ +public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler { + + private static final String JOB_ACCUMULATORS_REST_PATH = "/jobs/:jobid/accumulators"; + + public JobAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + super(executionGraphHolder, executor); + } + + @Override + public String[] getPaths() { + return new String[]{JOB_ACCUMULATORS_REST_PATH}; + } + + @Override + public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) { + return CompletableFuture.supplyAsync( + () -> { + try { + return createJobAccumulatorsJson(graph); + } catch (IOException e) { + throw new FlinkFutureException("Could not create job accumulators json.", e); + } + }, + executor); + } + + /** + * Archivist for the JobAccumulatorsHandler. + */ + public static class JobAccumulatorsJsonArchivist implements JsonArchivist { + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + String json = createJobAccumulatorsJson(graph); + String path = JOB_ACCUMULATORS_REST_PATH + .replace(":jobid", graph.getJobID().toString()); + return Collections.singletonList(new ArchivedJson(path, json)); + } + } + + public static String createJobAccumulatorsJson(AccessExecutionGraph graph) throws IOException { + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + + StringifiedAccumulatorResult[] allAccumulators = graph.getAccumulatorResultsStringified(); + + gen.writeStartObject(); + + gen.writeArrayFieldStart("job-accumulators"); + // empty for now + gen.writeEndArray(); + + gen.writeArrayFieldStart("user-task-accumulators"); + for (StringifiedAccumulatorResult acc : allAccumulators) { + gen.writeStartObject(); + gen.writeStringField("name", acc.getName()); + gen.writeStringField("type", acc.getType()); + gen.writeStringField("value", acc.getValue()); + gen.writeEndObject(); + } + gen.writeEndArray(); + gen.writeEndObject(); + + gen.close(); + return writer.toString(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandler.java new file mode 100644 index 0000000..a194f30 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandler.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.FlinkFutureException; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Request handler for the CANCEL request. + */ +public class JobCancellationHandler extends AbstractJsonRequestHandler { + + private static final String JOB_CONCELLATION_REST_PATH = "/jobs/:jobid/cancel"; + private static final String JOB_CONCELLATION_YARN_REST_PATH = "/jobs/:jobid/yarn-cancel"; + + private final Time timeout; + + public JobCancellationHandler(Executor executor, Time timeout) { + super(executor); + this.timeout = Preconditions.checkNotNull(timeout); + } + + @Override + public String[] getPaths() { + return new String[]{JOB_CONCELLATION_REST_PATH, JOB_CONCELLATION_YARN_REST_PATH}; + } + + @Override + public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) { + return CompletableFuture.supplyAsync( + () -> { + try { + JobID jobId = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid"))); + if (jobManagerGateway != null) { + jobManagerGateway.cancelJob(jobId, timeout); + return "{}"; + } + else { + throw new Exception("No connection to the leading JobManager."); + } + } + catch (Exception e) { + throw new FlinkFutureException("Failed to cancel the job with id: " + pathParams.get("jobid"), e); + } + }, + executor); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java new file mode 100644 index 0000000..23e94f5 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.concurrent.FlinkFutureException; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint; +import org.apache.flink.runtime.rest.NotFoundException; + +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; + +import com.fasterxml.jackson.core.JsonGenerator; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.StringWriter; +import java.nio.charset.Charset; +import java.util.ArrayDeque; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Request handler for {@link CancelJobWithSavepoint} messages. + */ +public class JobCancellationWithSavepointHandlers { + + private static final String CANCEL_WITH_SAVEPOINT_REST_PATH = "/jobs/:jobid/cancel-with-savepoint"; + private static final String CANCEL_WITH_SAVEPOINT_DIRECTORY_REST_PATH = "/jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory"; + + /** URL for in-progress cancellations. */ + private static final String CANCELLATION_IN_PROGRESS_REST_PATH = "/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId"; + + /** Encodings for String. */ + private static final Charset ENCODING = ConfigConstants.DEFAULT_CHARSET; + + /** Shared lock between Trigger and In-Progress handlers. */ + private final Object lock = new Object(); + + /** In-Progress requests. */ + private final Map<JobID, Long> inProgress = new HashMap<>(); + + /** Succeeded/failed request. Either String or Throwable. */ + private final Map<Long, Object> completed = new HashMap<>(); + + /** Atomic request counter. */ + private long requestCounter; + + /** Handler for trigger requests. */ + private final TriggerHandler triggerHandler; + + /** Handler for in-progress requests. */ + private final InProgressHandler inProgressHandler; + + /** Default savepoint directory. */ + private final String defaultSavepointDirectory; + + public JobCancellationWithSavepointHandlers( + ExecutionGraphHolder currentGraphs, + Executor executor) { + this(currentGraphs, executor, null); + } + + public JobCancellationWithSavepointHandlers( + ExecutionGraphHolder currentGraphs, + Executor executor, + @Nullable String defaultSavepointDirectory) { + + this.triggerHandler = new TriggerHandler(currentGraphs, executor); + this.inProgressHandler = new InProgressHandler(); + this.defaultSavepointDirectory = defaultSavepointDirectory; + } + + public TriggerHandler getTriggerHandler() { + return triggerHandler; + } + + public InProgressHandler getInProgressHandler() { + return inProgressHandler; + } + + // ------------------------------------------------------------------------ + // New requests + // ------------------------------------------------------------------------ + + /** + * Handler for triggering a {@link CancelJobWithSavepoint} message. + */ + class TriggerHandler implements RequestHandler { + + /** Current execution graphs. */ + private final ExecutionGraphHolder currentGraphs; + + /** Execution context for futures. */ + private final Executor executor; + + public TriggerHandler(ExecutionGraphHolder currentGraphs, Executor executor) { + this.currentGraphs = checkNotNull(currentGraphs); + this.executor = checkNotNull(executor); + } + + @Override + public String[] getPaths() { + return new String[]{CANCEL_WITH_SAVEPOINT_REST_PATH, CANCEL_WITH_SAVEPOINT_DIRECTORY_REST_PATH}; + } + + @Override + @SuppressWarnings("unchecked") + public CompletableFuture<FullHttpResponse> handleRequest( + Map<String, String> pathParams, + Map<String, String> queryParams, + JobManagerGateway jobManagerGateway) { + + if (jobManagerGateway != null) { + JobID jobId = JobID.fromHexString(pathParams.get("jobid")); + final CompletableFuture<Optional<AccessExecutionGraph>> graphFuture; + + graphFuture = currentGraphs.getExecutionGraph(jobId, jobManagerGateway); + + return graphFuture.thenApplyAsync( + (Optional<AccessExecutionGraph> optGraph) -> { + final AccessExecutionGraph graph = optGraph.orElseThrow( + () -> new FlinkFutureException( + new NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.'))); + + CheckpointCoordinator coord = graph.getCheckpointCoordinator(); + if (coord == null) { + throw new FlinkFutureException(new Exception("Cannot find CheckpointCoordinator for job.")); + } + + String targetDirectory = pathParams.get("targetDirectory"); + if (targetDirectory == null) { + if (defaultSavepointDirectory == null) { + throw new IllegalStateException("No savepoint directory configured. " + + "You can either specify a directory when triggering this savepoint or " + + "configure a cluster-wide default via key '" + + CoreOptions.SAVEPOINT_DIRECTORY.key() + "'."); + } else { + targetDirectory = defaultSavepointDirectory; + } + } + + try { + return handleNewRequest(jobManagerGateway, jobId, targetDirectory, coord.getCheckpointTimeout()); + } catch (IOException e) { + throw new FlinkFutureException("Could not cancel job with savepoint.", e); + } + }, executor); + } else { + return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager.")); + } + } + + @SuppressWarnings("unchecked") + private FullHttpResponse handleNewRequest(JobManagerGateway jobManagerGateway, final JobID jobId, String targetDirectory, long checkpointTimeout) throws IOException { + // Check whether a request exists + final long requestId; + final boolean isNewRequest; + synchronized (lock) { + if (inProgress.containsKey(jobId)) { + requestId = inProgress.get(jobId); + isNewRequest = false; + } else { + requestId = ++requestCounter; + inProgress.put(jobId, requestId); + isNewRequest = true; + } + } + + if (isNewRequest) { + boolean success = false; + + try { + // Trigger cancellation + CompletableFuture<String> cancelJobFuture = jobManagerGateway + .cancelJobWithSavepoint(jobId, targetDirectory, Time.milliseconds(checkpointTimeout)); + + cancelJobFuture.whenCompleteAsync( + (String path, Throwable throwable) -> { + try { + if (throwable != null) { + completed.put(requestId, throwable); + } else { + completed.put(requestId, path); + } + } finally { + inProgress.remove(jobId); + } + }, executor); + + success = true; + } finally { + synchronized (lock) { + if (!success) { + inProgress.remove(jobId); + } + } + } + } + + // In-progress location + String location = CANCELLATION_IN_PROGRESS_REST_PATH + .replace(":jobid", jobId.toString()) + .replace(":requestId", Long.toString(requestId)); + + // Accepted response + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + gen.writeStartObject(); + gen.writeStringField("status", "accepted"); + gen.writeNumberField("request-id", requestId); + gen.writeStringField("location", location); + gen.writeEndObject(); + gen.close(); + + String json = writer.toString(); + byte[] bytes = json.getBytes(ENCODING); + + DefaultFullHttpResponse response = new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, + HttpResponseStatus.ACCEPTED, + Unpooled.wrappedBuffer(bytes)); + + response.headers().set(HttpHeaders.Names.LOCATION, location); + + response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ENCODING.name()); + response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes()); + + FullHttpResponse accepted = response; + + return accepted; + } + } + + // ------------------------------------------------------------------------ + // In-progress requests + // ------------------------------------------------------------------------ + + /** + * Handler for in-progress cancel with savepoint operations. + */ + class InProgressHandler implements RequestHandler { + + /** The number of recent checkpoints whose IDs are remembered. */ + private static final int NUM_GHOST_REQUEST_IDS = 16; + + /** Remember some recently completed. */ + private final ArrayDeque<Tuple2<Long, Object>> recentlyCompleted = new ArrayDeque<>(NUM_GHOST_REQUEST_IDS); + + @Override + public String[] getPaths() { + return new String[]{CANCELLATION_IN_PROGRESS_REST_PATH}; + } + + @Override + @SuppressWarnings("unchecked") + public CompletableFuture<FullHttpResponse> handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) { + JobID jobId = JobID.fromHexString(pathParams.get("jobid")); + long requestId = Long.parseLong(pathParams.get("requestId")); + + return CompletableFuture.supplyAsync( + () -> { + try { + synchronized (lock) { + Object result = completed.remove(requestId); + + if (result != null) { + // Add to recent history + recentlyCompleted.add(new Tuple2<>(requestId, result)); + if (recentlyCompleted.size() > NUM_GHOST_REQUEST_IDS) { + recentlyCompleted.remove(); + } + + if (result.getClass() == String.class) { + String savepointPath = (String) result; + return createSuccessResponse(requestId, savepointPath); + } else { + Throwable cause = (Throwable) result; + return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage()); + } + } else { + // Check in-progress + Long inProgressRequestId = inProgress.get(jobId); + if (inProgressRequestId != null) { + // Sanity check + if (inProgressRequestId == requestId) { + return createInProgressResponse(requestId); + } else { + String msg = "Request ID does not belong to JobID"; + return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, msg); + } + } + + // Check recent history + for (Tuple2<Long, Object> recent : recentlyCompleted) { + if (recent.f0 == requestId) { + if (recent.f1.getClass() == String.class) { + String savepointPath = (String) recent.f1; + return createSuccessResponse(requestId, savepointPath); + } else { + Throwable cause = (Throwable) recent.f1; + return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage()); + } + } + } + + return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, "Unknown job/request ID"); + } + } + } catch (Exception e) { + throw new FlinkFutureException("Could not handle in progress request.", e); + } + }); + } + + private FullHttpResponse createSuccessResponse(long requestId, String savepointPath) throws IOException { + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + gen.writeStartObject(); + + gen.writeStringField("status", "success"); + gen.writeNumberField("request-id", requestId); + gen.writeStringField("savepoint-path", savepointPath); + + gen.writeEndObject(); + gen.close(); + + String json = writer.toString(); + byte[] bytes = json.getBytes(ENCODING); + + DefaultFullHttpResponse response = new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, + HttpResponseStatus.CREATED, + Unpooled.wrappedBuffer(bytes)); + + response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ENCODING.name()); + response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes()); + + return response; + } + + private FullHttpResponse createInProgressResponse(long requestId) throws IOException { + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + gen.writeStartObject(); + + gen.writeStringField("status", "in-progress"); + gen.writeNumberField("request-id", requestId); + + gen.writeEndObject(); + gen.close(); + + String json = writer.toString(); + byte[] bytes = json.getBytes(ENCODING); + + DefaultFullHttpResponse response = new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, + HttpResponseStatus.ACCEPTED, + Unpooled.wrappedBuffer(bytes)); + + response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ENCODING.name()); + response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes()); + + return response; + } + + private FullHttpResponse createFailureResponse(HttpResponseStatus code, long requestId, String errMsg) throws IOException { + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + gen.writeStartObject(); + + gen.writeStringField("status", "failed"); + gen.writeNumberField("request-id", requestId); + gen.writeStringField("cause", errMsg); + + gen.writeEndObject(); + gen.close(); + + String json = writer.toString(); + byte[] bytes = json.getBytes(ENCODING); + + DefaultFullHttpResponse response = new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, + code, + Unpooled.wrappedBuffer(bytes)); + + response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ENCODING.name()); + response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes()); + + return response; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java new file mode 100644 index 0000000..bb1cf8f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.api.common.ArchivedExecutionConfig; +import org.apache.flink.runtime.concurrent.FlinkFutureException; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; + +import com.fasterxml.jackson.core.JsonGenerator; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Request handler that returns the execution config of a job. + */ +public class JobConfigHandler extends AbstractExecutionGraphRequestHandler { + + private static final String JOB_CONFIG_REST_PATH = "/jobs/:jobid/config"; + + public JobConfigHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + super(executionGraphHolder, executor); + } + + @Override + public String[] getPaths() { + return new String[]{JOB_CONFIG_REST_PATH}; + } + + @Override + public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) { + return CompletableFuture.supplyAsync( + () -> { + try { + return createJobConfigJson(graph); + } catch (IOException e) { + throw new FlinkFutureException("Could not write job config json.", e); + } + }, + executor); + + } + + /** + * Archivist for the JobConfigHandler. + */ + public static class JobConfigJsonArchivist implements JsonArchivist { + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + String json = createJobConfigJson(graph); + String path = JOB_CONFIG_REST_PATH + .replace(":jobid", graph.getJobID().toString()); + return Collections.singletonList(new ArchivedJson(path, json)); + } + } + + public static String createJobConfigJson(AccessExecutionGraph graph) throws IOException { + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + + gen.writeStartObject(); + gen.writeStringField("jid", graph.getJobID().toString()); + gen.writeStringField("name", graph.getJobName()); + + final ArchivedExecutionConfig summary = graph.getArchivedExecutionConfig(); + + if (summary != null) { + gen.writeObjectFieldStart("execution-config"); + + gen.writeStringField("execution-mode", summary.getExecutionMode()); + + gen.writeStringField("restart-strategy", summary.getRestartStrategyDescription()); + gen.writeNumberField("job-parallelism", summary.getParallelism()); + gen.writeBooleanField("object-reuse-mode", summary.getObjectReuseEnabled()); + + Map<String, String> ucVals = summary.getGlobalJobParameters(); + if (ucVals != null) { + gen.writeObjectFieldStart("user-config"); + + for (Map.Entry<String, String> ucVal : ucVals.entrySet()) { + gen.writeStringField(ucVal.getKey(), ucVal.getValue()); + } + + gen.writeEndObject(); + } + + gen.writeEndObject(); + } + gen.writeEndObject(); + + gen.close(); + return writer.toString(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandler.java new file mode 100644 index 0000000..dd6aee8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandler.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.concurrent.FlinkFutureException; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; + +import com.fasterxml.jackson.core.JsonGenerator; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Request handler that returns details about a job. This includes: + * <ul> + * <li>Dataflow plan</li> + * <li>id, name, and current status</li> + * <li>start time, end time, duration</li> + * <li>number of job vertices in each state (pending, running, finished, failed)</li> + * <li>info about job vertices, including runtime, status, I/O bytes and records, subtasks in each status</li> + * </ul> + */ +public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler { + + private static final String JOB_DETAILS_REST_PATH = "/jobs/:jobid"; + private static final String JOB_DETAILS_VERTICES_REST_PATH = "/jobs/:jobid/vertices"; + + private final MetricFetcher fetcher; + + public JobDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) { + super(executionGraphHolder, executor); + this.fetcher = fetcher; + } + + @Override + public String[] getPaths() { + return new String[]{JOB_DETAILS_REST_PATH, JOB_DETAILS_VERTICES_REST_PATH}; + } + + @Override + public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) { + return CompletableFuture.supplyAsync( + () -> { + try { + return createJobDetailsJson(graph, fetcher); + } catch (IOException e) { + throw new FlinkFutureException("Could not create job details json.", e); + } + }, + executor); + } + + /** + * Archivist for the JobDetailsHandler. + */ + public static class JobDetailsJsonArchivist implements JsonArchivist { + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + String json = createJobDetailsJson(graph, null); + String path1 = JOB_DETAILS_REST_PATH + .replace(":jobid", graph.getJobID().toString()); + String path2 = JOB_DETAILS_VERTICES_REST_PATH + .replace(":jobid", graph.getJobID().toString()); + Collection<ArchivedJson> archives = new ArrayList<>(); + archives.add(new ArchivedJson(path1, json)); + archives.add(new ArchivedJson(path2, json)); + return archives; + } + } + + public static String createJobDetailsJson(AccessExecutionGraph graph, @Nullable MetricFetcher fetcher) throws IOException { + final StringWriter writer = new StringWriter(); + final JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + + final long now = System.currentTimeMillis(); + + gen.writeStartObject(); + + // basic info + gen.writeStringField("jid", graph.getJobID().toString()); + gen.writeStringField("name", graph.getJobName()); + gen.writeBooleanField("isStoppable", graph.isStoppable()); + gen.writeStringField("state", graph.getState().name()); + + // times and duration + final long jobStartTime = graph.getStatusTimestamp(JobStatus.CREATED); + final long jobEndTime = graph.getState().isGloballyTerminalState() ? + graph.getStatusTimestamp(graph.getState()) : -1L; + gen.writeNumberField("start-time", jobStartTime); + gen.writeNumberField("end-time", jobEndTime); + gen.writeNumberField("duration", (jobEndTime > 0 ? jobEndTime : now) - jobStartTime); + gen.writeNumberField("now", now); + + // timestamps + gen.writeObjectFieldStart("timestamps"); + for (JobStatus status : JobStatus.values()) { + gen.writeNumberField(status.name(), graph.getStatusTimestamp(status)); + } + gen.writeEndObject(); + + // job vertices + int[] jobVerticesPerState = new int[ExecutionState.values().length]; + gen.writeArrayFieldStart("vertices"); + + for (AccessExecutionJobVertex ejv : graph.getVerticesTopologically()) { + int[] tasksPerState = new int[ExecutionState.values().length]; + long startTime = Long.MAX_VALUE; + long endTime = 0; + boolean allFinished = true; + + for (AccessExecutionVertex vertex : ejv.getTaskVertices()) { + final ExecutionState state = vertex.getExecutionState(); + tasksPerState[state.ordinal()]++; + + // take the earliest start time + long started = vertex.getStateTimestamp(ExecutionState.DEPLOYING); + if (started > 0) { + startTime = Math.min(startTime, started); + } + + allFinished &= state.isTerminal(); + endTime = Math.max(endTime, vertex.getStateTimestamp(state)); + } + + long duration; + if (startTime < Long.MAX_VALUE) { + if (allFinished) { + duration = endTime - startTime; + } + else { + endTime = -1L; + duration = now - startTime; + } + } + else { + startTime = -1L; + endTime = -1L; + duration = -1L; + } + + ExecutionState jobVertexState = + ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, ejv.getParallelism()); + jobVerticesPerState[jobVertexState.ordinal()]++; + + gen.writeStartObject(); + gen.writeStringField("id", ejv.getJobVertexId().toString()); + gen.writeStringField("name", ejv.getName()); + gen.writeNumberField("parallelism", ejv.getParallelism()); + gen.writeStringField("status", jobVertexState.name()); + + gen.writeNumberField("start-time", startTime); + gen.writeNumberField("end-time", endTime); + gen.writeNumberField("duration", duration); + + gen.writeObjectFieldStart("tasks"); + for (ExecutionState state : ExecutionState.values()) { + gen.writeNumberField(state.name(), tasksPerState[state.ordinal()]); + } + gen.writeEndObject(); + + MutableIOMetrics counts = new MutableIOMetrics(); + + for (AccessExecutionVertex vertex : ejv.getTaskVertices()) { + counts.addIOMetrics( + vertex.getCurrentExecutionAttempt(), + fetcher, + graph.getJobID().toString(), + ejv.getJobVertexId().toString()); + } + + counts.writeIOMetricsAsJson(gen); + + gen.writeEndObject(); + } + gen.writeEndArray(); + + gen.writeObjectFieldStart("status-counts"); + for (ExecutionState state : ExecutionState.values()) { + gen.writeNumberField(state.name(), jobVerticesPerState[state.ordinal()]); + } + gen.writeEndObject(); + + gen.writeFieldName("plan"); + gen.writeRawValue(graph.getJsonPlan()); + + gen.writeEndObject(); + + gen.close(); + return writer.toString(); + } +}
