http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java deleted file mode 100644 index 1ec3f9c..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.jobmaster.JobManagerGateway; -import org.apache.flink.util.Preconditions; - -import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; - -import java.nio.charset.Charset; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -/** - * Base class for most request handlers. The handlers must produce a JSON response. - */ -public abstract class AbstractJsonRequestHandler implements RequestHandler { - - private static final Charset ENCODING = Charset.forName("UTF-8"); - - protected final Executor executor; - - protected AbstractJsonRequestHandler(Executor executor) { - this.executor = Preconditions.checkNotNull(executor); - } - - @Override - public CompletableFuture<FullHttpResponse> handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) { - CompletableFuture<String> resultFuture = handleJsonRequest(pathParams, queryParams, jobManagerGateway); - - return resultFuture.thenApplyAsync( - (String result) -> { - byte[] bytes = result.getBytes(ENCODING); - - DefaultFullHttpResponse response = new DefaultFullHttpResponse( - HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(bytes)); - - response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ENCODING.name()); - response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes()); - - return response; - }); - } - - /** - * Core method that handles the request and generates the response. The method needs to - * respond with a valid JSON string. Exceptions may be thrown and will be handled. - * - * @param pathParams The map of REST path parameters, decoded by the router. - * @param queryParams The map of query parameters. - * @param jobManagerGateway to communicate with the JobManager. - * - * @return The JSON string that is the HTTP response. - * - * @throws Exception Handlers may forward exceptions. Exceptions of type - * {@link org.apache.flink.runtime.webmonitor.NotFoundException} will cause a HTTP 404 - * response with the exception message, other exceptions will cause a HTTP 500 response - * with the exception stack trace. - */ - public abstract CompletableFuture<String> handleJsonRequest( - Map<String, String> pathParams, - Map<String, String> queryParams, - JobManagerGateway jobManagerGateway); - -}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java deleted file mode 100644 index 1b20673..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.executiongraph.AccessExecution; -import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.util.FlinkException; - -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -/** - * Base class for request handlers whose response depends on a specific subtask execution attempt - * (defined via the "attempt" parameter) of a specific subtask (defined via the - * "subtasknum" parameter) in a specific job vertex (defined via the "vertexid" parameter) in a - * specific job, defined via (defined voa the "jobid" parameter). - */ -public abstract class AbstractSubtaskAttemptRequestHandler extends AbstractSubtaskRequestHandler { - - public AbstractSubtaskAttemptRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { - super(executionGraphHolder, executor); - } - - @Override - public CompletableFuture<String> handleRequest(AccessExecutionVertex vertex, Map<String, String> params) { - final String attemptNumberString = params.get("attempt"); - if (attemptNumberString == null) { - return FutureUtils.completedExceptionally(new FlinkException("Attempt number parameter missing")); - } - - final int attempt; - try { - attempt = Integer.parseInt(attemptNumberString); - } - catch (NumberFormatException e) { - return FutureUtils.completedExceptionally(new FlinkException("Invalid attempt number parameter")); - } - - final AccessExecution currentAttempt = vertex.getCurrentExecutionAttempt(); - if (attempt == currentAttempt.getAttemptNumber()) { - return handleRequest(currentAttempt, params); - } - else if (attempt >= 0 && attempt < currentAttempt.getAttemptNumber()) { - AccessExecution exec = vertex.getPriorExecutionAttempt(attempt); - - if (exec != null) { - return handleRequest(exec, params); - } else { - return FutureUtils.completedExceptionally(new RequestHandlerException("Execution for attempt " + attempt + - " has already been deleted.")); - } - } - else { - return FutureUtils.completedExceptionally(new FlinkException("Attempt does not exist: " + attempt)); - } - } - - public abstract CompletableFuture<String> handleRequest(AccessExecution execAttempt, Map<String, String> params); -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java deleted file mode 100644 index ab85034..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.util.FlinkException; - -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -/** - * Base class for request handlers whose response depends on a specific subtask (defined via the - * "subtasknum" parameter) in a specific job vertex (defined via the "vertexid" parameter) in a - * specific job, defined via (defined voa the "jobid" parameter). - */ -public abstract class AbstractSubtaskRequestHandler extends AbstractJobVertexRequestHandler { - - public AbstractSubtaskRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { - super(executionGraphHolder, executor); - } - - @Override - public final CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) { - final String subtaskNumberString = params.get("subtasknum"); - if (subtaskNumberString == null) { - return FutureUtils.completedExceptionally(new FlinkException("Subtask number parameter missing")); - } - - final int subtask; - try { - subtask = Integer.parseInt(subtaskNumberString); - } - catch (NumberFormatException e) { - return FutureUtils.completedExceptionally(new FlinkException("Invalid subtask number parameter", e)); - } - - if (subtask < 0 || subtask >= jobVertex.getParallelism()) { - return FutureUtils.completedExceptionally(new FlinkException("subtask does not exist: " + subtask)); - } - - final AccessExecutionVertex vertex = jobVertex.getTaskVertices()[subtask]; - return handleRequest(vertex, params); - } - - public abstract CompletableFuture<String> handleRequest(AccessExecutionVertex vertex, Map<String, String> params); -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java deleted file mode 100644 index 17db2e8..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.concurrent.FlinkFutureException; -import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.jobmaster.JobManagerGateway; -import org.apache.flink.runtime.messages.webmonitor.StatusOverview; -import org.apache.flink.runtime.util.EnvironmentInformation; -import org.apache.flink.util.FlinkException; - -import com.fasterxml.jackson.core.JsonGenerator; - -import java.io.IOException; -import java.io.StringWriter; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Responder that returns the status of the Flink cluster, such as how many - * TaskManagers are currently connected, and how many jobs are running. - */ -public class ClusterOverviewHandler extends AbstractJsonRequestHandler { - - private static final String CLUSTER_OVERVIEW_REST_PATH = "/overview"; - - private static final String version = EnvironmentInformation.getVersion(); - - private static final String commitID = EnvironmentInformation.getRevisionInformation().commitId; - - private final Time timeout; - - public ClusterOverviewHandler(Executor executor, Time timeout) { - super(executor); - this.timeout = checkNotNull(timeout); - } - - @Override - public String[] getPaths() { - return new String[]{CLUSTER_OVERVIEW_REST_PATH}; - } - - @Override - public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) { - // we need no parameters, get all requests - try { - if (jobManagerGateway != null) { - CompletableFuture<StatusOverview> overviewFuture = jobManagerGateway.requestStatusOverview(timeout); - - return overviewFuture.thenApplyAsync( - (StatusOverview overview) -> { - StringWriter writer = new StringWriter(); - try { - JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); - - gen.writeStartObject(); - gen.writeNumberField("taskmanagers", overview.getNumTaskManagersConnected()); - gen.writeNumberField("slots-total", overview.getNumSlotsTotal()); - gen.writeNumberField("slots-available", overview.getNumSlotsAvailable()); - gen.writeNumberField("jobs-running", overview.getNumJobsRunningOrPending()); - gen.writeNumberField("jobs-finished", overview.getNumJobsFinished()); - gen.writeNumberField("jobs-cancelled", overview.getNumJobsCancelled()); - gen.writeNumberField("jobs-failed", overview.getNumJobsFailed()); - gen.writeStringField("flink-version", version); - if (!commitID.equals(EnvironmentInformation.UNKNOWN)) { - gen.writeStringField("flink-commit", commitID); - } - gen.writeEndObject(); - - gen.close(); - return writer.toString(); - } catch (IOException exception) { - throw new FlinkFutureException("Could not write cluster overview.", exception); - } - }, - executor); - } else { - throw new Exception("No connection to the leading JobManager."); - } - } - catch (Exception e) { - return FutureUtils.completedExceptionally(new FlinkException("Failed to fetch list of all running jobs: ", e)); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java deleted file mode 100644 index 34898e7..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.configuration.ConfigConstants; - -import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; -import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed; - -/** - * Responder that returns a constant String. - */ [email protected] -public class ConstantTextHandler extends SimpleChannelInboundHandler<Routed> { - - private final byte[] encodedText; - - public ConstantTextHandler(String text) { - this.encodedText = text.getBytes(ConfigConstants.DEFAULT_CHARSET); - } - - @Override - protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception { - HttpResponse response = new DefaultFullHttpResponse( - HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(encodedText)); - - response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, encodedText.length); - response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain"); - - KeepAliveWrite.flush(ctx, routed.request(), response); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java deleted file mode 100644 index acf1cd0..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.concurrent.FlinkFutureException; -import org.apache.flink.runtime.jobmaster.JobManagerGateway; -import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview; - -import com.fasterxml.jackson.core.JsonGenerator; - -import java.io.StringWriter; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; - -import static java.util.Objects.requireNonNull; - -/** - * Responder that returns with a list of all JobIDs of jobs found at the target actor. - * May serve the IDs of current jobs, or past jobs, depending on whether this handler is - * given the JobManager or Archive Actor Reference. - */ -public class CurrentJobIdsHandler extends AbstractJsonRequestHandler { - - private static final String CURRENT_JOB_IDS_REST_PATH = "/jobs"; - - private final Time timeout; - - public CurrentJobIdsHandler(Executor executor, Time timeout) { - super(executor); - this.timeout = requireNonNull(timeout); - } - - @Override - public String[] getPaths() { - return new String[]{CURRENT_JOB_IDS_REST_PATH}; - } - - @Override - public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) { - return CompletableFuture.supplyAsync( - () -> { - // we need no parameters, get all requests - try { - if (jobManagerGateway != null) { - CompletableFuture<JobsWithIDsOverview> overviewFuture = jobManagerGateway.requestJobsOverview(timeout); - JobsWithIDsOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); - - StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); - - gen.writeStartObject(); - - gen.writeArrayFieldStart("jobs-running"); - for (JobID jid : overview.getJobsRunningOrPending()) { - gen.writeString(jid.toString()); - } - gen.writeEndArray(); - - gen.writeArrayFieldStart("jobs-finished"); - for (JobID jid : overview.getJobsFinished()) { - gen.writeString(jid.toString()); - } - gen.writeEndArray(); - - gen.writeArrayFieldStart("jobs-cancelled"); - for (JobID jid : overview.getJobsCancelled()) { - gen.writeString(jid.toString()); - } - gen.writeEndArray(); - - gen.writeArrayFieldStart("jobs-failed"); - for (JobID jid : overview.getJobsFailed()) { - gen.writeString(jid.toString()); - } - gen.writeEndArray(); - - gen.writeEndObject(); - - gen.close(); - return writer.toString(); - } - else { - throw new Exception("No connection to the leading JobManager."); - } - } - catch (Exception e) { - throw new FlinkFutureException("Failed to fetch list of all running jobs.", e); - } - }, - executor); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java deleted file mode 100644 index a5b116c..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.concurrent.FlinkFutureException; -import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.jobmaster.JobManagerGateway; -import org.apache.flink.runtime.messages.webmonitor.JobDetails; -import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; -import org.apache.flink.runtime.webmonitor.WebMonitorUtils; -import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; - -import com.fasterxml.jackson.core.JsonGenerator; - -import java.io.IOException; -import java.io.StringWriter; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Request handler that returns a summary of the job status. - */ -public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler { - - private static final String ALL_JOBS_REST_PATH = "/joboverview"; - private static final String RUNNING_JOBS_REST_PATH = "/joboverview/running"; - private static final String COMPLETED_JOBS_REST_PATH = "/joboverview/completed"; - - private final Time timeout; - - private final boolean includeRunningJobs; - private final boolean includeFinishedJobs; - - public CurrentJobsOverviewHandler( - Executor executor, - Time timeout, - boolean includeRunningJobs, - boolean includeFinishedJobs) { - - super(executor); - this.timeout = checkNotNull(timeout); - this.includeRunningJobs = includeRunningJobs; - this.includeFinishedJobs = includeFinishedJobs; - } - - @Override - public String[] getPaths() { - if (includeRunningJobs && includeFinishedJobs) { - return new String[]{ALL_JOBS_REST_PATH}; - } - if (includeRunningJobs) { - return new String[]{RUNNING_JOBS_REST_PATH}; - } else { - return new String[]{COMPLETED_JOBS_REST_PATH}; - } - } - - @Override - public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) { - if (jobManagerGateway != null) { - CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(includeRunningJobs, includeFinishedJobs, timeout); - - return jobDetailsFuture.thenApplyAsync( - (MultipleJobsDetails result) -> { - final long now = System.currentTimeMillis(); - - StringWriter writer = new StringWriter(); - try { - JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); - gen.writeStartObject(); - - if (includeRunningJobs && includeFinishedJobs) { - gen.writeArrayFieldStart("running"); - for (JobDetails detail : result.getRunningJobs()) { - writeJobDetailOverviewAsJson(detail, gen, now); - } - gen.writeEndArray(); - - gen.writeArrayFieldStart("finished"); - for (JobDetails detail : result.getFinishedJobs()) { - writeJobDetailOverviewAsJson(detail, gen, now); - } - gen.writeEndArray(); - } else { - gen.writeArrayFieldStart("jobs"); - for (JobDetails detail : includeRunningJobs ? result.getRunningJobs() : result.getFinishedJobs()) { - writeJobDetailOverviewAsJson(detail, gen, now); - } - gen.writeEndArray(); - } - - gen.writeEndObject(); - gen.close(); - return writer.toString(); - } catch (IOException e) { - throw new FlinkFutureException("Could not write current jobs overview json.", e); - } - }, - executor); - } - else { - return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager.")); - } - } - - /** - * Archivist for the CurrentJobsOverviewHandler. - */ - public static class CurrentJobsOverviewJsonArchivist implements JsonArchivist { - - @Override - public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { - StringWriter writer = new StringWriter(); - try (JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer)) { - gen.writeStartObject(); - gen.writeArrayFieldStart("running"); - gen.writeEndArray(); - gen.writeArrayFieldStart("finished"); - writeJobDetailOverviewAsJson(WebMonitorUtils.createDetailsForJob(graph), gen, System.currentTimeMillis()); - gen.writeEndArray(); - gen.writeEndObject(); - } - String json = writer.toString(); - String path = ALL_JOBS_REST_PATH; - return Collections.singleton(new ArchivedJson(path, json)); - } - } - - public static void writeJobDetailOverviewAsJson(JobDetails details, JsonGenerator gen, long now) throws IOException { - gen.writeStartObject(); - - gen.writeStringField("jid", details.getJobId().toString()); - gen.writeStringField("name", details.getJobName()); - gen.writeStringField("state", details.getStatus().name()); - - gen.writeNumberField("start-time", details.getStartTime()); - gen.writeNumberField("end-time", details.getEndTime()); - gen.writeNumberField("duration", (details.getEndTime() <= 0 ? now : details.getEndTime()) - details.getStartTime()); - gen.writeNumberField("last-modification", details.getLastUpdateTime()); - - gen.writeObjectFieldStart("tasks"); - gen.writeNumberField("total", details.getNumTasks()); - - final int[] perState = details.getNumVerticesPerExecutionState(); - gen.writeNumberField("pending", perState[ExecutionState.CREATED.ordinal()] + - perState[ExecutionState.SCHEDULED.ordinal()] + - perState[ExecutionState.DEPLOYING.ordinal()]); - gen.writeNumberField("running", perState[ExecutionState.RUNNING.ordinal()]); - gen.writeNumberField("finished", perState[ExecutionState.FINISHED.ordinal()]); - gen.writeNumberField("canceling", perState[ExecutionState.CANCELING.ordinal()]); - gen.writeNumberField("canceled", perState[ExecutionState.CANCELED.ordinal()]); - gen.writeNumberField("failed", perState[ExecutionState.FAILED.ordinal()]); - gen.writeEndObject(); - - gen.writeEndObject(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java deleted file mode 100644 index 39984b1..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.jobmaster.JobManagerGateway; -import org.apache.flink.runtime.util.EnvironmentInformation; - -import com.fasterxml.jackson.core.JsonGenerator; - -import java.io.IOException; -import java.io.StringWriter; -import java.util.Map; -import java.util.TimeZone; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -/** - * Responder that returns the parameters that define how the asynchronous requests - * against this web server should behave. It defines for example the refresh interval, - * and time zone of the server timestamps. - */ -public class DashboardConfigHandler extends AbstractJsonRequestHandler { - - private static final String DASHBOARD_CONFIG_REST_PATH = "/config"; - - private final String configString; - - public DashboardConfigHandler(Executor executor, long refreshInterval) { - super(executor); - try { - this.configString = createConfigJson(refreshInterval); - } - catch (Exception e) { - // should never happen - throw new RuntimeException(e.getMessage(), e); - } - } - - @Override - public String[] getPaths() { - return new String[]{DASHBOARD_CONFIG_REST_PATH}; - } - - @Override - public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) { - return CompletableFuture.completedFuture(configString); - } - - public static String createConfigJson(long refreshInterval) throws IOException { - StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); - - TimeZone timeZone = TimeZone.getDefault(); - String timeZoneName = timeZone.getDisplayName(); - long timeZoneOffset = timeZone.getRawOffset(); - - gen.writeStartObject(); - gen.writeNumberField("refresh-interval", refreshInterval); - gen.writeNumberField("timezone-offset", timeZoneOffset); - gen.writeStringField("timezone-name", timeZoneName); - gen.writeStringField("flink-version", EnvironmentInformation.getVersion()); - - EnvironmentInformation.RevisionInformation revision = EnvironmentInformation.getRevisionInformation(); - if (revision != null) { - gen.writeStringField("flink-revision", revision.commitId + " @ " + revision.commitDate); - } - - gen.writeEndObject(); - - gen.close(); - - return writer.toString(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java index 978432b..c95cc32 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler; import java.util.Map; import java.util.concurrent.CompletableFuture; http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java index 0b0d32e..760c836 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java @@ -35,6 +35,8 @@ import org.apache.flink.optimizer.plan.StreamingPlan; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler; +import org.apache.flink.runtime.rest.handler.legacy.JsonFactory; import org.apache.flink.util.ExceptionUtils; import com.fasterxml.jackson.core.JsonGenerator; http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java index d9df1d4..04f663d 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java @@ -20,6 +20,8 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.runtime.concurrent.FlinkFutureException; import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler; +import org.apache.flink.runtime.rest.handler.legacy.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java index 95281a4..4248dd4 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java @@ -21,6 +21,8 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.runtime.concurrent.FlinkFutureException; import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler; +import org.apache.flink.runtime.rest.handler.legacy.JsonFactory; import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler; import com.fasterxml.jackson.core.JsonGenerator; http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java index b117b3d..4d79492 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.concurrent.FlinkFutureException; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.runtime.rest.handler.legacy.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java index 7ada0b4..16a1565 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.concurrent.FlinkFutureException; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.runtime.rest.handler.legacy.JsonFactory; import org.apache.flink.util.Preconditions; import com.fasterxml.jackson.core.JsonGenerator; http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java index 61b3f58..9a0bac4 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler; import java.io.File; import java.util.Map; http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java deleted file mode 100644 index 4dede3a..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; -import org.apache.flink.runtime.concurrent.FlinkFutureException; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; - -import com.fasterxml.jackson.core.JsonGenerator; - -import java.io.IOException; -import java.io.StringWriter; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -/** - * Request handler that returns the aggregated user accumulators of a job. - */ -public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler { - - private static final String JOB_ACCUMULATORS_REST_PATH = "/jobs/:jobid/accumulators"; - - public JobAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { - super(executionGraphHolder, executor); - } - - @Override - public String[] getPaths() { - return new String[]{JOB_ACCUMULATORS_REST_PATH}; - } - - @Override - public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) { - return CompletableFuture.supplyAsync( - () -> { - try { - return createJobAccumulatorsJson(graph); - } catch (IOException e) { - throw new FlinkFutureException("Could not create job accumulators json.", e); - } - }, - executor); - } - - /** - * Archivist for the JobAccumulatorsHandler. - */ - public static class JobAccumulatorsJsonArchivist implements JsonArchivist { - - @Override - public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { - String json = createJobAccumulatorsJson(graph); - String path = JOB_ACCUMULATORS_REST_PATH - .replace(":jobid", graph.getJobID().toString()); - return Collections.singletonList(new ArchivedJson(path, json)); - } - } - - public static String createJobAccumulatorsJson(AccessExecutionGraph graph) throws IOException { - StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); - - StringifiedAccumulatorResult[] allAccumulators = graph.getAccumulatorResultsStringified(); - - gen.writeStartObject(); - - gen.writeArrayFieldStart("job-accumulators"); - // empty for now - gen.writeEndArray(); - - gen.writeArrayFieldStart("user-task-accumulators"); - for (StringifiedAccumulatorResult acc : allAccumulators) { - gen.writeStartObject(); - gen.writeStringField("name", acc.getName()); - gen.writeStringField("type", acc.getType()); - gen.writeStringField("value", acc.getValue()); - gen.writeEndObject(); - } - gen.writeEndArray(); - gen.writeEndObject(); - - gen.close(); - return writer.toString(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java deleted file mode 100644 index 1a7d868..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.concurrent.FlinkFutureException; -import org.apache.flink.runtime.jobmaster.JobManagerGateway; -import org.apache.flink.util.Preconditions; -import org.apache.flink.util.StringUtils; - -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -/** - * Request handler for the CANCEL request. - */ -public class JobCancellationHandler extends AbstractJsonRequestHandler { - - private static final String JOB_CONCELLATION_REST_PATH = "/jobs/:jobid/cancel"; - private static final String JOB_CONCELLATION_YARN_REST_PATH = "/jobs/:jobid/yarn-cancel"; - - private final Time timeout; - - public JobCancellationHandler(Executor executor, Time timeout) { - super(executor); - this.timeout = Preconditions.checkNotNull(timeout); - } - - @Override - public String[] getPaths() { - return new String[]{JOB_CONCELLATION_REST_PATH, JOB_CONCELLATION_YARN_REST_PATH}; - } - - @Override - public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) { - return CompletableFuture.supplyAsync( - () -> { - try { - JobID jobId = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid"))); - if (jobManagerGateway != null) { - jobManagerGateway.cancelJob(jobId, timeout); - return "{}"; - } - else { - throw new Exception("No connection to the leading JobManager."); - } - } - catch (Exception e) { - throw new FlinkFutureException("Failed to cancel the job with id: " + pathParams.get("jobid"), e); - } - }, - executor); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java deleted file mode 100644 index 4e41447..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java +++ /dev/null @@ -1,428 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.CoreOptions; -import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; -import org.apache.flink.runtime.concurrent.FlinkFutureException; -import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.jobmaster.JobManagerGateway; -import org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.NotFoundException; - -import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; - -import com.fasterxml.jackson.core.JsonGenerator; - -import javax.annotation.Nullable; - -import java.io.IOException; -import java.io.StringWriter; -import java.nio.charset.Charset; -import java.util.ArrayDeque; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Request handler for {@link CancelJobWithSavepoint} messages. - */ -public class JobCancellationWithSavepointHandlers { - - private static final String CANCEL_WITH_SAVEPOINT_REST_PATH = "/jobs/:jobid/cancel-with-savepoint"; - private static final String CANCEL_WITH_SAVEPOINT_DIRECTORY_REST_PATH = "/jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory"; - - /** URL for in-progress cancellations. */ - private static final String CANCELLATION_IN_PROGRESS_REST_PATH = "/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId"; - - /** Encodings for String. */ - private static final Charset ENCODING = ConfigConstants.DEFAULT_CHARSET; - - /** Shared lock between Trigger and In-Progress handlers. */ - private final Object lock = new Object(); - - /** In-Progress requests. */ - private final Map<JobID, Long> inProgress = new HashMap<>(); - - /** Succeeded/failed request. Either String or Throwable. */ - private final Map<Long, Object> completed = new HashMap<>(); - - /** Atomic request counter. */ - private long requestCounter; - - /** Handler for trigger requests. */ - private final TriggerHandler triggerHandler; - - /** Handler for in-progress requests. */ - private final InProgressHandler inProgressHandler; - - /** Default savepoint directory. */ - private final String defaultSavepointDirectory; - - public JobCancellationWithSavepointHandlers( - ExecutionGraphHolder currentGraphs, - Executor executor) { - this(currentGraphs, executor, null); - } - - public JobCancellationWithSavepointHandlers( - ExecutionGraphHolder currentGraphs, - Executor executor, - @Nullable String defaultSavepointDirectory) { - - this.triggerHandler = new TriggerHandler(currentGraphs, executor); - this.inProgressHandler = new InProgressHandler(); - this.defaultSavepointDirectory = defaultSavepointDirectory; - } - - public TriggerHandler getTriggerHandler() { - return triggerHandler; - } - - public InProgressHandler getInProgressHandler() { - return inProgressHandler; - } - - // ------------------------------------------------------------------------ - // New requests - // ------------------------------------------------------------------------ - - /** - * Handler for triggering a {@link CancelJobWithSavepoint} message. - */ - class TriggerHandler implements RequestHandler { - - /** Current execution graphs. */ - private final ExecutionGraphHolder currentGraphs; - - /** Execution context for futures. */ - private final Executor executor; - - public TriggerHandler(ExecutionGraphHolder currentGraphs, Executor executor) { - this.currentGraphs = checkNotNull(currentGraphs); - this.executor = checkNotNull(executor); - } - - @Override - public String[] getPaths() { - return new String[]{CANCEL_WITH_SAVEPOINT_REST_PATH, CANCEL_WITH_SAVEPOINT_DIRECTORY_REST_PATH}; - } - - @Override - @SuppressWarnings("unchecked") - public CompletableFuture<FullHttpResponse> handleRequest( - Map<String, String> pathParams, - Map<String, String> queryParams, - JobManagerGateway jobManagerGateway) { - - if (jobManagerGateway != null) { - JobID jobId = JobID.fromHexString(pathParams.get("jobid")); - final CompletableFuture<Optional<AccessExecutionGraph>> graphFuture; - - graphFuture = currentGraphs.getExecutionGraph(jobId, jobManagerGateway); - - return graphFuture.thenApplyAsync( - (Optional<AccessExecutionGraph> optGraph) -> { - final AccessExecutionGraph graph = optGraph.orElseThrow( - () -> new FlinkFutureException( - new NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.'))); - - CheckpointCoordinator coord = graph.getCheckpointCoordinator(); - if (coord == null) { - throw new FlinkFutureException(new Exception("Cannot find CheckpointCoordinator for job.")); - } - - String targetDirectory = pathParams.get("targetDirectory"); - if (targetDirectory == null) { - if (defaultSavepointDirectory == null) { - throw new IllegalStateException("No savepoint directory configured. " + - "You can either specify a directory when triggering this savepoint or " + - "configure a cluster-wide default via key '" + - CoreOptions.SAVEPOINT_DIRECTORY.key() + "'."); - } else { - targetDirectory = defaultSavepointDirectory; - } - } - - try { - return handleNewRequest(jobManagerGateway, jobId, targetDirectory, coord.getCheckpointTimeout()); - } catch (IOException e) { - throw new FlinkFutureException("Could not cancel job with savepoint.", e); - } - }, executor); - } else { - return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager.")); - } - } - - @SuppressWarnings("unchecked") - private FullHttpResponse handleNewRequest(JobManagerGateway jobManagerGateway, final JobID jobId, String targetDirectory, long checkpointTimeout) throws IOException { - // Check whether a request exists - final long requestId; - final boolean isNewRequest; - synchronized (lock) { - if (inProgress.containsKey(jobId)) { - requestId = inProgress.get(jobId); - isNewRequest = false; - } else { - requestId = ++requestCounter; - inProgress.put(jobId, requestId); - isNewRequest = true; - } - } - - if (isNewRequest) { - boolean success = false; - - try { - // Trigger cancellation - CompletableFuture<String> cancelJobFuture = jobManagerGateway - .cancelJobWithSavepoint(jobId, targetDirectory, Time.milliseconds(checkpointTimeout)); - - cancelJobFuture.whenCompleteAsync( - (String path, Throwable throwable) -> { - try { - if (throwable != null) { - completed.put(requestId, throwable); - } else { - completed.put(requestId, path); - } - } finally { - inProgress.remove(jobId); - } - }, executor); - - success = true; - } finally { - synchronized (lock) { - if (!success) { - inProgress.remove(jobId); - } - } - } - } - - // In-progress location - String location = CANCELLATION_IN_PROGRESS_REST_PATH - .replace(":jobid", jobId.toString()) - .replace(":requestId", Long.toString(requestId)); - - // Accepted response - StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); - gen.writeStartObject(); - gen.writeStringField("status", "accepted"); - gen.writeNumberField("request-id", requestId); - gen.writeStringField("location", location); - gen.writeEndObject(); - gen.close(); - - String json = writer.toString(); - byte[] bytes = json.getBytes(ENCODING); - - DefaultFullHttpResponse response = new DefaultFullHttpResponse( - HttpVersion.HTTP_1_1, - HttpResponseStatus.ACCEPTED, - Unpooled.wrappedBuffer(bytes)); - - response.headers().set(HttpHeaders.Names.LOCATION, location); - - response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ENCODING.name()); - response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes()); - - FullHttpResponse accepted = response; - - return accepted; - } - } - - // ------------------------------------------------------------------------ - // In-progress requests - // ------------------------------------------------------------------------ - - /** - * Handler for in-progress cancel with savepoint operations. - */ - class InProgressHandler implements RequestHandler { - - /** The number of recent checkpoints whose IDs are remembered. */ - private static final int NUM_GHOST_REQUEST_IDS = 16; - - /** Remember some recently completed. */ - private final ArrayDeque<Tuple2<Long, Object>> recentlyCompleted = new ArrayDeque<>(NUM_GHOST_REQUEST_IDS); - - @Override - public String[] getPaths() { - return new String[]{CANCELLATION_IN_PROGRESS_REST_PATH}; - } - - @Override - @SuppressWarnings("unchecked") - public CompletableFuture<FullHttpResponse> handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) { - JobID jobId = JobID.fromHexString(pathParams.get("jobid")); - long requestId = Long.parseLong(pathParams.get("requestId")); - - return CompletableFuture.supplyAsync( - () -> { - try { - synchronized (lock) { - Object result = completed.remove(requestId); - - if (result != null) { - // Add to recent history - recentlyCompleted.add(new Tuple2<>(requestId, result)); - if (recentlyCompleted.size() > NUM_GHOST_REQUEST_IDS) { - recentlyCompleted.remove(); - } - - if (result.getClass() == String.class) { - String savepointPath = (String) result; - return createSuccessResponse(requestId, savepointPath); - } else { - Throwable cause = (Throwable) result; - return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage()); - } - } else { - // Check in-progress - Long inProgressRequestId = inProgress.get(jobId); - if (inProgressRequestId != null) { - // Sanity check - if (inProgressRequestId == requestId) { - return createInProgressResponse(requestId); - } else { - String msg = "Request ID does not belong to JobID"; - return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, msg); - } - } - - // Check recent history - for (Tuple2<Long, Object> recent : recentlyCompleted) { - if (recent.f0 == requestId) { - if (recent.f1.getClass() == String.class) { - String savepointPath = (String) recent.f1; - return createSuccessResponse(requestId, savepointPath); - } else { - Throwable cause = (Throwable) recent.f1; - return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage()); - } - } - } - - return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, "Unknown job/request ID"); - } - } - } catch (Exception e) { - throw new FlinkFutureException("Could not handle in progress request.", e); - } - }); - } - - private FullHttpResponse createSuccessResponse(long requestId, String savepointPath) throws IOException { - StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); - gen.writeStartObject(); - - gen.writeStringField("status", "success"); - gen.writeNumberField("request-id", requestId); - gen.writeStringField("savepoint-path", savepointPath); - - gen.writeEndObject(); - gen.close(); - - String json = writer.toString(); - byte[] bytes = json.getBytes(ENCODING); - - DefaultFullHttpResponse response = new DefaultFullHttpResponse( - HttpVersion.HTTP_1_1, - HttpResponseStatus.CREATED, - Unpooled.wrappedBuffer(bytes)); - - response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ENCODING.name()); - response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes()); - - return response; - } - - private FullHttpResponse createInProgressResponse(long requestId) throws IOException { - StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); - gen.writeStartObject(); - - gen.writeStringField("status", "in-progress"); - gen.writeNumberField("request-id", requestId); - - gen.writeEndObject(); - gen.close(); - - String json = writer.toString(); - byte[] bytes = json.getBytes(ENCODING); - - DefaultFullHttpResponse response = new DefaultFullHttpResponse( - HttpVersion.HTTP_1_1, - HttpResponseStatus.ACCEPTED, - Unpooled.wrappedBuffer(bytes)); - - response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ENCODING.name()); - response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes()); - - return response; - } - - private FullHttpResponse createFailureResponse(HttpResponseStatus code, long requestId, String errMsg) throws IOException { - StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); - gen.writeStartObject(); - - gen.writeStringField("status", "failed"); - gen.writeNumberField("request-id", requestId); - gen.writeStringField("cause", errMsg); - - gen.writeEndObject(); - gen.close(); - - String json = writer.toString(); - byte[] bytes = json.getBytes(ENCODING); - - DefaultFullHttpResponse response = new DefaultFullHttpResponse( - HttpVersion.HTTP_1_1, - code, - Unpooled.wrappedBuffer(bytes)); - - response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ENCODING.name()); - response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes()); - - return response; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java deleted file mode 100644 index 0b15b37..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.api.common.ArchivedExecutionConfig; -import org.apache.flink.runtime.concurrent.FlinkFutureException; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; - -import com.fasterxml.jackson.core.JsonGenerator; - -import java.io.IOException; -import java.io.StringWriter; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -/** - * Request handler that returns the execution config of a job. - */ -public class JobConfigHandler extends AbstractExecutionGraphRequestHandler { - - private static final String JOB_CONFIG_REST_PATH = "/jobs/:jobid/config"; - - public JobConfigHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { - super(executionGraphHolder, executor); - } - - @Override - public String[] getPaths() { - return new String[]{JOB_CONFIG_REST_PATH}; - } - - @Override - public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) { - return CompletableFuture.supplyAsync( - () -> { - try { - return createJobConfigJson(graph); - } catch (IOException e) { - throw new FlinkFutureException("Could not write job config json.", e); - } - }, - executor); - - } - - /** - * Archivist for the JobConfigHandler. - */ - public static class JobConfigJsonArchivist implements JsonArchivist { - - @Override - public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { - String json = createJobConfigJson(graph); - String path = JOB_CONFIG_REST_PATH - .replace(":jobid", graph.getJobID().toString()); - return Collections.singletonList(new ArchivedJson(path, json)); - } - } - - public static String createJobConfigJson(AccessExecutionGraph graph) throws IOException { - StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); - - gen.writeStartObject(); - gen.writeStringField("jid", graph.getJobID().toString()); - gen.writeStringField("name", graph.getJobName()); - - final ArchivedExecutionConfig summary = graph.getArchivedExecutionConfig(); - - if (summary != null) { - gen.writeObjectFieldStart("execution-config"); - - gen.writeStringField("execution-mode", summary.getExecutionMode()); - - gen.writeStringField("restart-strategy", summary.getRestartStrategyDescription()); - gen.writeNumberField("job-parallelism", summary.getParallelism()); - gen.writeBooleanField("object-reuse-mode", summary.getObjectReuseEnabled()); - - Map<String, String> ucVals = summary.getGlobalJobParameters(); - if (ucVals != null) { - gen.writeObjectFieldStart("user-config"); - - for (Map.Entry<String, String> ucVal : ucVals.entrySet()) { - gen.writeStringField(ucVal.getKey(), ucVal.getValue()); - } - - gen.writeEndObject(); - } - - gen.writeEndObject(); - } - gen.writeEndObject(); - - gen.close(); - return writer.toString(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java deleted file mode 100644 index 8a50f87..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java +++ /dev/null @@ -1,225 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.concurrent.FlinkFutureException; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; -import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; -import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; -import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; -import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics; - -import com.fasterxml.jackson.core.JsonGenerator; - -import javax.annotation.Nullable; - -import java.io.IOException; -import java.io.StringWriter; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -/** - * Request handler that returns details about a job. This includes: - * <ul> - * <li>Dataflow plan</li> - * <li>id, name, and current status</li> - * <li>start time, end time, duration</li> - * <li>number of job vertices in each state (pending, running, finished, failed)</li> - * <li>info about job vertices, including runtime, status, I/O bytes and records, subtasks in each status</li> - * </ul> - */ -public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler { - - private static final String JOB_DETAILS_REST_PATH = "/jobs/:jobid"; - private static final String JOB_DETAILS_VERTICES_REST_PATH = "/jobs/:jobid/vertices"; - - private final MetricFetcher fetcher; - - public JobDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) { - super(executionGraphHolder, executor); - this.fetcher = fetcher; - } - - @Override - public String[] getPaths() { - return new String[]{JOB_DETAILS_REST_PATH, JOB_DETAILS_VERTICES_REST_PATH}; - } - - @Override - public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) { - return CompletableFuture.supplyAsync( - () -> { - try { - return createJobDetailsJson(graph, fetcher); - } catch (IOException e) { - throw new FlinkFutureException("Could not create job details json.", e); - } - }, - executor); - } - - /** - * Archivist for the JobDetailsHandler. - */ - public static class JobDetailsJsonArchivist implements JsonArchivist { - - @Override - public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { - String json = createJobDetailsJson(graph, null); - String path1 = JOB_DETAILS_REST_PATH - .replace(":jobid", graph.getJobID().toString()); - String path2 = JOB_DETAILS_VERTICES_REST_PATH - .replace(":jobid", graph.getJobID().toString()); - Collection<ArchivedJson> archives = new ArrayList<>(); - archives.add(new ArchivedJson(path1, json)); - archives.add(new ArchivedJson(path2, json)); - return archives; - } - } - - public static String createJobDetailsJson(AccessExecutionGraph graph, @Nullable MetricFetcher fetcher) throws IOException { - final StringWriter writer = new StringWriter(); - final JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); - - final long now = System.currentTimeMillis(); - - gen.writeStartObject(); - - // basic info - gen.writeStringField("jid", graph.getJobID().toString()); - gen.writeStringField("name", graph.getJobName()); - gen.writeBooleanField("isStoppable", graph.isStoppable()); - gen.writeStringField("state", graph.getState().name()); - - // times and duration - final long jobStartTime = graph.getStatusTimestamp(JobStatus.CREATED); - final long jobEndTime = graph.getState().isGloballyTerminalState() ? - graph.getStatusTimestamp(graph.getState()) : -1L; - gen.writeNumberField("start-time", jobStartTime); - gen.writeNumberField("end-time", jobEndTime); - gen.writeNumberField("duration", (jobEndTime > 0 ? jobEndTime : now) - jobStartTime); - gen.writeNumberField("now", now); - - // timestamps - gen.writeObjectFieldStart("timestamps"); - for (JobStatus status : JobStatus.values()) { - gen.writeNumberField(status.name(), graph.getStatusTimestamp(status)); - } - gen.writeEndObject(); - - // job vertices - int[] jobVerticesPerState = new int[ExecutionState.values().length]; - gen.writeArrayFieldStart("vertices"); - - for (AccessExecutionJobVertex ejv : graph.getVerticesTopologically()) { - int[] tasksPerState = new int[ExecutionState.values().length]; - long startTime = Long.MAX_VALUE; - long endTime = 0; - boolean allFinished = true; - - for (AccessExecutionVertex vertex : ejv.getTaskVertices()) { - final ExecutionState state = vertex.getExecutionState(); - tasksPerState[state.ordinal()]++; - - // take the earliest start time - long started = vertex.getStateTimestamp(ExecutionState.DEPLOYING); - if (started > 0) { - startTime = Math.min(startTime, started); - } - - allFinished &= state.isTerminal(); - endTime = Math.max(endTime, vertex.getStateTimestamp(state)); - } - - long duration; - if (startTime < Long.MAX_VALUE) { - if (allFinished) { - duration = endTime - startTime; - } - else { - endTime = -1L; - duration = now - startTime; - } - } - else { - startTime = -1L; - endTime = -1L; - duration = -1L; - } - - ExecutionState jobVertexState = - ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, ejv.getParallelism()); - jobVerticesPerState[jobVertexState.ordinal()]++; - - gen.writeStartObject(); - gen.writeStringField("id", ejv.getJobVertexId().toString()); - gen.writeStringField("name", ejv.getName()); - gen.writeNumberField("parallelism", ejv.getParallelism()); - gen.writeStringField("status", jobVertexState.name()); - - gen.writeNumberField("start-time", startTime); - gen.writeNumberField("end-time", endTime); - gen.writeNumberField("duration", duration); - - gen.writeObjectFieldStart("tasks"); - for (ExecutionState state : ExecutionState.values()) { - gen.writeNumberField(state.name(), tasksPerState[state.ordinal()]); - } - gen.writeEndObject(); - - MutableIOMetrics counts = new MutableIOMetrics(); - - for (AccessExecutionVertex vertex : ejv.getTaskVertices()) { - counts.addIOMetrics( - vertex.getCurrentExecutionAttempt(), - fetcher, - graph.getJobID().toString(), - ejv.getJobVertexId().toString()); - } - - counts.writeIOMetricsAsJson(gen); - - gen.writeEndObject(); - } - gen.writeEndArray(); - - gen.writeObjectFieldStart("status-counts"); - for (ExecutionState state : ExecutionState.values()) { - gen.writeNumberField(state.name(), jobVerticesPerState[state.ordinal()]); - } - gen.writeEndObject(); - - gen.writeFieldName("plan"); - gen.writeRawValue(graph.getJsonPlan()); - - gen.writeEndObject(); - - gen.close(); - return writer.toString(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java deleted file mode 100644 index 6ffd443..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.concurrent.FlinkFutureException; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; -import org.apache.flink.runtime.executiongraph.ErrorInfo; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; -import org.apache.flink.util.ExceptionUtils; - -import com.fasterxml.jackson.core.JsonGenerator; - -import java.io.IOException; -import java.io.StringWriter; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -/** - * Request handler that returns the configuration of a job. - */ -public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler { - - private static final String JOB_EXCEPTIONS_REST_PATH = "/jobs/:jobid/exceptions"; - - static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20; - - public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { - super(executionGraphHolder, executor); - } - - @Override - public String[] getPaths() { - return new String[]{JOB_EXCEPTIONS_REST_PATH}; - } - - @Override - public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) { - return CompletableFuture.supplyAsync( - () -> { - try { - return createJobExceptionsJson(graph); - } catch (IOException e) { - throw new FlinkFutureException("Could not create job exceptions json.", e); - } - }, - executor - ); - } - - /** - * Archivist for the JobExceptionsHandler. - */ - public static class JobExceptionsJsonArchivist implements JsonArchivist { - - @Override - public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { - String json = createJobExceptionsJson(graph); - String path = JOB_EXCEPTIONS_REST_PATH - .replace(":jobid", graph.getJobID().toString()); - return Collections.singletonList(new ArchivedJson(path, json)); - } - } - - public static String createJobExceptionsJson(AccessExecutionGraph graph) throws IOException { - StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); - - gen.writeStartObject(); - - // most important is the root failure cause - ErrorInfo rootException = graph.getFailureCause(); - if (rootException != null && !rootException.getExceptionAsString().equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) { - gen.writeStringField("root-exception", rootException.getExceptionAsString()); - gen.writeNumberField("timestamp", rootException.getTimestamp()); - } - - // we additionally collect all exceptions (up to a limit) that occurred in the individual tasks - gen.writeArrayFieldStart("all-exceptions"); - - int numExceptionsSoFar = 0; - boolean truncated = false; - - for (AccessExecutionVertex task : graph.getAllExecutionVertices()) { - String t = task.getFailureCauseAsString(); - if (t != null && !t.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) { - if (numExceptionsSoFar >= MAX_NUMBER_EXCEPTION_TO_REPORT) { - truncated = true; - break; - } - - TaskManagerLocation location = task.getCurrentAssignedResourceLocation(); - String locationString = location != null ? - location.getFQDNHostname() + ':' + location.dataPort() : "(unassigned)"; - - gen.writeStartObject(); - gen.writeStringField("exception", t); - gen.writeStringField("task", task.getTaskNameWithSubtaskIndex()); - gen.writeStringField("location", locationString); - long timestamp = task.getStateTimestamp(ExecutionState.FAILED); - gen.writeNumberField("timestamp", timestamp == 0 ? -1 : timestamp); - gen.writeEndObject(); - numExceptionsSoFar++; - } - } - gen.writeEndArray(); - - gen.writeBooleanField("truncated", truncated); - gen.writeEndObject(); - - gen.close(); - return writer.toString(); - } -}
