http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java index 513dc08..1a7d868 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java @@ -20,11 +20,14 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.FlinkFutureException; import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; /** * Request handler for the CANCEL request. @@ -36,7 +39,8 @@ public class JobCancellationHandler extends AbstractJsonRequestHandler { private final Time timeout; - public JobCancellationHandler(Time timeout) { + public JobCancellationHandler(Executor executor, Time timeout) { + super(executor); this.timeout = Preconditions.checkNotNull(timeout); } @@ -46,19 +50,23 @@ public class JobCancellationHandler extends AbstractJsonRequestHandler { } @Override - public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception { - try { - JobID jobId = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid"))); - if (jobManagerGateway != null) { - jobManagerGateway.cancelJob(jobId, timeout); - return "{}"; - } - else { - throw new Exception("No connection to the leading JobManager."); - } - } - catch (Exception e) { - throw new Exception("Failed to cancel the job with id: " + pathParams.get("jobid") + e.getMessage(), e); - } + public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) { + return CompletableFuture.supplyAsync( + () -> { + try { + JobID jobId = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid"))); + if (jobManagerGateway != null) { + jobManagerGateway.cancelJob(jobId, timeout); + return "{}"; + } + else { + throw new Exception("No connection to the leading JobManager."); + } + } + catch (Exception e) { + throw new FlinkFutureException("Failed to cancel the job with id: " + pathParams.get("jobid"), e); + } + }, + executor); } }
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java index 9b474aa..4e41447 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java @@ -24,12 +24,13 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.concurrent.FlinkFutureException; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.NotFoundException; -import org.apache.flink.util.FlinkException; import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse; @@ -140,48 +141,48 @@ public class JobCancellationWithSavepointHandlers { @Override @SuppressWarnings("unchecked") - public FullHttpResponse handleRequest( + public CompletableFuture<FullHttpResponse> handleRequest( Map<String, String> pathParams, Map<String, String> queryParams, - JobManagerGateway jobManagerGateway) throws Exception { + JobManagerGateway jobManagerGateway) { - try { - if (jobManagerGateway != null) { - JobID jobId = JobID.fromHexString(pathParams.get("jobid")); - final Optional<AccessExecutionGraph> optGraph; + if (jobManagerGateway != null) { + JobID jobId = JobID.fromHexString(pathParams.get("jobid")); + final CompletableFuture<Optional<AccessExecutionGraph>> graphFuture; - try { - optGraph = currentGraphs.getExecutionGraph(jobId, jobManagerGateway); - } catch (Exception e) { - throw new FlinkException("Could not retrieve the execution with jobId " + jobId + " from the JobManager.", e); - } + graphFuture = currentGraphs.getExecutionGraph(jobId, jobManagerGateway); - final AccessExecutionGraph graph = optGraph.orElseThrow( - () -> new NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.')); + return graphFuture.thenApplyAsync( + (Optional<AccessExecutionGraph> optGraph) -> { + final AccessExecutionGraph graph = optGraph.orElseThrow( + () -> new FlinkFutureException( + new NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.'))); - CheckpointCoordinator coord = graph.getCheckpointCoordinator(); - if (coord == null) { - throw new Exception("Cannot find CheckpointCoordinator for job."); - } + CheckpointCoordinator coord = graph.getCheckpointCoordinator(); + if (coord == null) { + throw new FlinkFutureException(new Exception("Cannot find CheckpointCoordinator for job.")); + } - String targetDirectory = pathParams.get("targetDirectory"); - if (targetDirectory == null) { - if (defaultSavepointDirectory == null) { - throw new IllegalStateException("No savepoint directory configured. " + + String targetDirectory = pathParams.get("targetDirectory"); + if (targetDirectory == null) { + if (defaultSavepointDirectory == null) { + throw new IllegalStateException("No savepoint directory configured. " + "You can either specify a directory when triggering this savepoint or " + "configure a cluster-wide default via key '" + CoreOptions.SAVEPOINT_DIRECTORY.key() + "'."); - } else { - targetDirectory = defaultSavepointDirectory; + } else { + targetDirectory = defaultSavepointDirectory; + } } - } - return handleNewRequest(jobManagerGateway, jobId, targetDirectory, coord.getCheckpointTimeout()); - } else { - throw new Exception("No connection to the leading JobManager."); - } - } catch (Exception e) { - throw new Exception("Failed to cancel the job: " + e.getMessage(), e); + try { + return handleNewRequest(jobManagerGateway, jobId, targetDirectory, coord.getCheckpointTimeout()); + } catch (IOException e) { + throw new FlinkFutureException("Could not cancel job with savepoint.", e); + } + }, executor); + } else { + return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager.")); } } @@ -288,64 +289,63 @@ public class JobCancellationWithSavepointHandlers { @Override @SuppressWarnings("unchecked") - public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception { - try { - if (jobManagerGateway != null) { - JobID jobId = JobID.fromHexString(pathParams.get("jobid")); - long requestId = Long.parseLong(pathParams.get("requestId")); + public CompletableFuture<FullHttpResponse> handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) { + JobID jobId = JobID.fromHexString(pathParams.get("jobid")); + long requestId = Long.parseLong(pathParams.get("requestId")); - synchronized (lock) { - Object result = completed.remove(requestId); - - if (result != null) { - // Add to recent history - recentlyCompleted.add(new Tuple2<>(requestId, result)); - if (recentlyCompleted.size() > NUM_GHOST_REQUEST_IDS) { - recentlyCompleted.remove(); - } + return CompletableFuture.supplyAsync( + () -> { + try { + synchronized (lock) { + Object result = completed.remove(requestId); + + if (result != null) { + // Add to recent history + recentlyCompleted.add(new Tuple2<>(requestId, result)); + if (recentlyCompleted.size() > NUM_GHOST_REQUEST_IDS) { + recentlyCompleted.remove(); + } - if (result.getClass() == String.class) { - String savepointPath = (String) result; - return createSuccessResponse(requestId, savepointPath); - } else { - Throwable cause = (Throwable) result; - return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage()); - } - } else { - // Check in-progress - Long inProgressRequestId = inProgress.get(jobId); - if (inProgressRequestId != null) { - // Sanity check - if (inProgressRequestId == requestId) { - return createInProgressResponse(requestId); + if (result.getClass() == String.class) { + String savepointPath = (String) result; + return createSuccessResponse(requestId, savepointPath); } else { - String msg = "Request ID does not belong to JobID"; - return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, msg); + Throwable cause = (Throwable) result; + return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage()); } - } - - // Check recent history - for (Tuple2<Long, Object> recent : recentlyCompleted) { - if (recent.f0 == requestId) { - if (recent.f1.getClass() == String.class) { - String savepointPath = (String) recent.f1; - return createSuccessResponse(requestId, savepointPath); + } else { + // Check in-progress + Long inProgressRequestId = inProgress.get(jobId); + if (inProgressRequestId != null) { + // Sanity check + if (inProgressRequestId == requestId) { + return createInProgressResponse(requestId); } else { - Throwable cause = (Throwable) recent.f1; - return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage()); + String msg = "Request ID does not belong to JobID"; + return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, msg); } } - } - return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, "Unknown job/request ID"); + // Check recent history + for (Tuple2<Long, Object> recent : recentlyCompleted) { + if (recent.f0 == requestId) { + if (recent.f1.getClass() == String.class) { + String savepointPath = (String) recent.f1; + return createSuccessResponse(requestId, savepointPath); + } else { + Throwable cause = (Throwable) recent.f1; + return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage()); + } + } + } + + return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, "Unknown job/request ID"); + } } + } catch (Exception e) { + throw new FlinkFutureException("Could not handle in progress request.", e); } - } else { - throw new Exception("No connection to the leading JobManager."); - } - } catch (Exception e) { - throw new Exception("Failed to cancel the job: " + e.getMessage(), e); - } + }); } private FullHttpResponse createSuccessResponse(long requestId, String savepointPath) throws IOException { http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java index 72cf8b7..0b15b37 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.api.common.ArchivedExecutionConfig; +import org.apache.flink.runtime.concurrent.FlinkFutureException; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; @@ -31,6 +32,8 @@ import java.io.StringWriter; import java.util.Collection; import java.util.Collections; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; /** * Request handler that returns the execution config of a job. @@ -39,8 +42,8 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler { private static final String JOB_CONFIG_REST_PATH = "/jobs/:jobid/config"; - public JobConfigHandler(ExecutionGraphHolder executionGraphHolder) { - super(executionGraphHolder); + public JobConfigHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + super(executionGraphHolder, executor); } @Override @@ -49,8 +52,17 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler { } @Override - public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception { - return createJobConfigJson(graph); + public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) { + return CompletableFuture.supplyAsync( + () -> { + try { + return createJobConfigJson(graph); + } catch (IOException e) { + throw new FlinkFutureException("Could not write job config json.", e); + } + }, + executor); + } /** http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java index 87ac7c3..8a50f87 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.webmonitor.handlers; +import org.apache.flink.runtime.concurrent.FlinkFutureException; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; @@ -39,6 +40,8 @@ import java.io.StringWriter; import java.util.ArrayList; import java.util.Collection; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; /** * Request handler that returns details about a job. This includes: @@ -57,8 +60,8 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler { private final MetricFetcher fetcher; - public JobDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) { - super(executionGraphHolder); + public JobDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) { + super(executionGraphHolder, executor); this.fetcher = fetcher; } @@ -68,8 +71,16 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler { } @Override - public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception { - return createJobDetailsJson(graph, fetcher); + public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) { + return CompletableFuture.supplyAsync( + () -> { + try { + return createJobDetailsJson(graph, fetcher); + } catch (IOException e) { + throw new FlinkFutureException("Could not create job details json.", e); + } + }, + executor); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java index e31299b..6ffd443 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.webmonitor.handlers; +import org.apache.flink.runtime.concurrent.FlinkFutureException; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; @@ -35,6 +36,8 @@ import java.io.StringWriter; import java.util.Collection; import java.util.Collections; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; /** * Request handler that returns the configuration of a job. @@ -45,8 +48,8 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler { static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20; - public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder) { - super(executionGraphHolder); + public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + super(executionGraphHolder, executor); } @Override @@ -55,8 +58,17 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler { } @Override - public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception { - return createJobExceptionsJson(graph); + public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) { + return CompletableFuture.supplyAsync( + () -> { + try { + return createJobExceptionsJson(graph); + } catch (IOException e) { + throw new FlinkFutureException("Could not create job exceptions json.", e); + } + }, + executor + ); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java index e2437e6..cb6d8c0 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java @@ -19,12 +19,16 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.concurrent.FlinkFutureException; import org.apache.flink.runtime.jobmaster.JobManagerGateway; import com.fasterxml.jackson.core.JsonGenerator; +import java.io.IOException; import java.io.StringWriter; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; /** * Returns the Job Manager's configuration. @@ -35,7 +39,8 @@ public class JobManagerConfigHandler extends AbstractJsonRequestHandler { private final Configuration config; - public JobManagerConfigHandler(Configuration config) { + public JobManagerConfigHandler(Executor executor, Configuration config) { + super(executor); this.config = config; } @@ -45,31 +50,38 @@ public class JobManagerConfigHandler extends AbstractJsonRequestHandler { } @Override - public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception { - StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) { + return CompletableFuture.supplyAsync( + () -> { + try { + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); - gen.writeStartArray(); - for (String key : config.keySet()) { - gen.writeStartObject(); - gen.writeStringField("key", key); + gen.writeStartArray(); + for (String key : config.keySet()) { + gen.writeStartObject(); + gen.writeStringField("key", key); - // Mask key values which contain sensitive information - if (key.toLowerCase().contains("password")) { - String value = config.getString(key, null); - if (value != null) { - value = "******"; - } - gen.writeStringField("value", value); - } - else { - gen.writeStringField("value", config.getString(key, null)); - } - gen.writeEndObject(); - } - gen.writeEndArray(); + // Mask key values which contain sensitive information + if (key.toLowerCase().contains("password")) { + String value = config.getString(key, null); + if (value != null) { + value = "******"; + } + gen.writeStringField("value", value); + } else { + gen.writeStringField("value", config.getString(key, null)); + } + gen.writeEndObject(); + } + gen.writeEndArray(); - gen.close(); - return writer.toString(); + gen.close(); + return writer.toString(); + } catch (IOException e) { + throw new FlinkFutureException("Could not write configuration.", e); + } + }, + executor); } } http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java index d17b6bb..b3a9dd5 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java @@ -27,6 +27,8 @@ import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; /** * Request handler that returns the JSON program plan of a job graph. @@ -35,8 +37,8 @@ public class JobPlanHandler extends AbstractExecutionGraphRequestHandler { private static final String JOB_PLAN_REST_PATH = "/jobs/:jobid/plan"; - public JobPlanHandler(ExecutionGraphHolder executionGraphHolder) { - super(executionGraphHolder); + public JobPlanHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + super(executionGraphHolder, executor); } @Override @@ -45,8 +47,8 @@ public class JobPlanHandler extends AbstractExecutionGraphRequestHandler { } @Override - public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception { - return graph.getJsonPlan(); + public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) { + return CompletableFuture.completedFuture(graph.getJsonPlan()); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java index 3526734..f63403f 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java @@ -20,11 +20,14 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.FlinkFutureException; import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; /** * Request handler for the STOP request. @@ -36,7 +39,8 @@ public class JobStoppingHandler extends AbstractJsonRequestHandler { private final Time timeout; - public JobStoppingHandler(Time timeout) { + public JobStoppingHandler(Executor executor, Time timeout) { + super(executor); this.timeout = Preconditions.checkNotNull(timeout); } @@ -46,19 +50,23 @@ public class JobStoppingHandler extends AbstractJsonRequestHandler { } @Override - public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception { - try { - JobID jobId = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid"))); - if (jobManagerGateway != null) { - jobManagerGateway.stopJob(jobId, timeout); - return "{}"; - } - else { - throw new Exception("No connection to the leading JobManager."); - } - } - catch (Exception e) { - throw new Exception("Failed to stop the job with id: " + pathParams.get("jobid") + e.getMessage(), e); - } + public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) { + return CompletableFuture.supplyAsync( + () -> { + try { + JobID jobId = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid"))); + if (jobManagerGateway != null) { + jobManagerGateway.stopJob(jobId, timeout); + return "{}"; + } + else { + throw new Exception("No connection to the leading JobManager."); + } + } + catch (Exception e) { + throw new FlinkFutureException("Failed to stop the job with id: " + pathParams.get("jobid") + '.', e); + } + }, + executor); } } http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java index 8e90dfc..9c613ff 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.concurrent.FlinkFutureException; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; @@ -33,6 +34,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; /** * Request handler that returns the accummulators for a given vertex. @@ -41,8 +44,8 @@ public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandle private static final String JOB_VERTEX_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/accumulators"; - public JobVertexAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) { - super(executionGraphHolder); + public JobVertexAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + super(executionGraphHolder, executor); } @Override @@ -51,8 +54,17 @@ public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandle } @Override - public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception { - return createVertexAccumulatorsJson(jobVertex); + public CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) { + return CompletableFuture.supplyAsync( + () -> { + try { + return createVertexAccumulatorsJson(jobVertex); + } catch (IOException e) { + throw new FlinkFutureException("Could not create job vertex accumulators json.", e); + } + }, + executor); + } /** http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java index cde8ca9..963153f 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.webmonitor.handlers; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; @@ -27,8 +28,11 @@ import org.apache.flink.runtime.webmonitor.OperatorBackPressureStats; import com.fasterxml.jackson.core.JsonGenerator; +import java.io.IOException; import java.io.StringWriter; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import scala.Option; @@ -51,10 +55,11 @@ public class JobVertexBackPressureHandler extends AbstractJobVertexRequestHandle public JobVertexBackPressureHandler( ExecutionGraphHolder executionGraphHolder, + Executor executor, BackPressureStatsTracker backPressureStatsTracker, int refreshInterval) { - super(executionGraphHolder); + super(executionGraphHolder, executor); this.backPressureStatsTracker = checkNotNull(backPressureStatsTracker, "Stats tracker"); checkArgument(refreshInterval >= 0, "Negative timeout"); this.refreshInterval = refreshInterval; @@ -66,11 +71,11 @@ public class JobVertexBackPressureHandler extends AbstractJobVertexRequestHandle } @Override - public String handleRequest( + public CompletableFuture<String> handleRequest( AccessExecutionJobVertex accessJobVertex, - Map<String, String> params) throws Exception { + Map<String, String> params) { if (accessJobVertex instanceof ArchivedExecutionJobVertex) { - return ""; + return CompletableFuture.completedFuture(""); } ExecutionJobVertex jobVertex = (ExecutionJobVertex) accessJobVertex; try (StringWriter writer = new StringWriter(); @@ -116,7 +121,9 @@ public class JobVertexBackPressureHandler extends AbstractJobVertexRequestHandle gen.writeEndObject(); gen.close(); - return writer.toString(); + return CompletableFuture.completedFuture(writer.toString()); + } catch (IOException e) { + return FutureUtils.completedExceptionally(e); } } http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java index 7757fdd..bd1745c 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.webmonitor.handlers; +import org.apache.flink.runtime.concurrent.FlinkFutureException; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; @@ -39,6 +40,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; /** * A request handler that provides the details of a job vertex, including id, name, parallelism, @@ -50,8 +53,8 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler { private final MetricFetcher fetcher; - public JobVertexDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) { - super(executionGraphHolder); + public JobVertexDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) { + super(executionGraphHolder, executor); this.fetcher = fetcher; } @@ -61,8 +64,16 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler { } @Override - public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception { - return createVertexDetailsJson(jobVertex, params.get("jobid"), fetcher); + public CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) { + return CompletableFuture.supplyAsync( + () -> { + try { + return createVertexDetailsJson(jobVertex, params.get("jobid"), fetcher); + } catch (IOException e) { + throw new FlinkFutureException("Could not write the vertex details json.", e); + } + }, + executor); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java index a612782..0827720 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.webmonitor.handlers; +import org.apache.flink.runtime.concurrent.FlinkFutureException; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; @@ -41,6 +42,8 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; /** * A request handler that provides the details of a job vertex, including id, name, and the @@ -52,8 +55,8 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle private final MetricFetcher fetcher; - public JobVertexTaskManagersHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) { - super(executionGraphHolder); + public JobVertexTaskManagersHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) { + super(executionGraphHolder, executor); this.fetcher = fetcher; } @@ -63,8 +66,16 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle } @Override - public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception { - return createVertexDetailsByTaskManagerJson(jobVertex, params.get("jobid"), fetcher); + public CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) { + return CompletableFuture.supplyAsync( + () -> { + try { + return createVertexDetailsByTaskManagerJson(jobVertex, params.get("jobid"), fetcher); + } catch (IOException e) { + throw new FlinkFutureException("Could not create TaskManager json.", e); + } + }, + executor); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java index 079be8f..8ca785f 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java @@ -19,11 +19,11 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.runtime.jobmaster.JobManagerGateway; -import org.apache.flink.runtime.webmonitor.NotFoundException; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse; import java.util.Map; +import java.util.concurrent.CompletableFuture; /** * Base interface for all request handlers. @@ -44,13 +44,8 @@ public interface RequestHandler { * @param jobManagerGateway to talk to the JobManager. * * @return The full http response. - * - * @throws Exception Handlers may forward exceptions. Exceptions of type - * {@link NotFoundException} will cause a HTTP 404 - * response with the exception message, other exceptions will cause a HTTP 500 response - * with the exception stack trace. */ - FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception; + CompletableFuture<FullHttpResponse> handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway); /** * Returns an array of REST URL's under which this handler can be registered. http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java index 28e9ddf..301b217 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java @@ -23,6 +23,8 @@ import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; /** * Request handler providing details about a single task execution attempt. @@ -31,8 +33,8 @@ public class SubtaskCurrentAttemptDetailsHandler extends SubtaskExecutionAttempt public static final String SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum"; - public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) { - super(executionGraphHolder, fetcher); + public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) { + super(executionGraphHolder, executor, fetcher); } @Override @@ -41,7 +43,7 @@ public class SubtaskCurrentAttemptDetailsHandler extends SubtaskExecutionAttempt } @Override - public String handleRequest(AccessExecutionVertex vertex, Map<String, String> params) throws Exception { + public CompletableFuture<String> handleRequest(AccessExecutionVertex vertex, Map<String, String> params) { return handleRequest(vertex.getCurrentExecutionAttempt(), params); } } http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java index 171277f..3c0d1d9 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.concurrent.FlinkFutureException; import org.apache.flink.runtime.executiongraph.AccessExecution; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; @@ -35,6 +36,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; /** * Base class for request handlers whose response depends on a specific job vertex (defined @@ -44,8 +47,8 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA private static final String SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators"; - public SubtaskExecutionAttemptAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) { - super(executionGraphHolder); + public SubtaskExecutionAttemptAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + super(executionGraphHolder, executor); } @Override @@ -54,8 +57,16 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA } @Override - public String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception { - return createAttemptAccumulatorsJson(execAttempt); + public CompletableFuture<String> handleRequest(AccessExecution execAttempt, Map<String, String> params) { + return CompletableFuture.supplyAsync( + () -> { + try { + return createAttemptAccumulatorsJson(execAttempt); + } catch (IOException e) { + throw new FlinkFutureException("Could not create accumulator json.", e); + } + }, + executor); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java index 37c0e50..ad836df 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.webmonitor.handlers; +import org.apache.flink.runtime.concurrent.FlinkFutureException; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecution; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; @@ -40,6 +41,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import static org.apache.flink.runtime.webmonitor.handlers.SubtaskCurrentAttemptDetailsHandler.SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH; @@ -52,8 +55,8 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp private final MetricFetcher fetcher; - public SubtaskExecutionAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) { - super(executionGraphHolder); + public SubtaskExecutionAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) { + super(executionGraphHolder, executor); this.fetcher = fetcher; } @@ -63,8 +66,16 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp } @Override - public String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception { - return createAttemptDetailsJson(execAttempt, params.get("jobid"), params.get("vertexid"), fetcher); + public CompletableFuture<String> handleRequest(AccessExecution execAttempt, Map<String, String> params) { + return CompletableFuture.supplyAsync( + () -> { + try { + return createAttemptDetailsJson(execAttempt, params.get("jobid"), params.get("vertexid"), fetcher); + } catch (IOException e) { + throw new FlinkFutureException("Could not create attempt details json.", e); + } + }, + executor); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java index 64bdfb4..8142548 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.concurrent.FlinkFutureException; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; @@ -35,6 +36,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; /** * Request handler that returns the accumulators for all subtasks of job vertex. @@ -43,8 +46,8 @@ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHand private static final String SUBTASKS_ALL_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/accumulators"; - public SubtasksAllAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) { - super(executionGraphHolder); + public SubtasksAllAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + super(executionGraphHolder, executor); } @Override @@ -53,8 +56,16 @@ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHand } @Override - public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception { - return createSubtasksAccumulatorsJson(jobVertex); + public CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) { + return CompletableFuture.supplyAsync( + () -> { + try { + return createSubtasksAccumulatorsJson(jobVertex); + } catch (IOException e) { + throw new FlinkFutureException("Could not create subtasks accumulator json.", e); + } + }, + executor); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java index ea88587..d766206 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.webmonitor.handlers; +import org.apache.flink.runtime.concurrent.FlinkFutureException; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; @@ -35,6 +36,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; /** * Request handler that returns the state transition timestamps for all subtasks, plus their @@ -44,8 +47,8 @@ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler { private static final String SUBTASK_TIMES_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasktimes"; - public SubtasksTimesHandler(ExecutionGraphHolder executionGraphHolder) { - super(executionGraphHolder); + public SubtasksTimesHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + super(executionGraphHolder, executor); } @Override @@ -54,8 +57,16 @@ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler { } @Override - public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception { - return createSubtaskTimesJson(jobVertex); + public CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) { + return CompletableFuture.supplyAsync( + () -> { + try { + return createSubtaskTimesJson(jobVertex); + } catch (IOException e) { + throw new FlinkFutureException("Could not write subtask time json.", e); + } + }, + executor); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java index a8ab7a3..9f83ed0 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java @@ -19,6 +19,8 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.FlinkFutureException; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.jobmaster.JobManagerGateway; @@ -28,14 +30,14 @@ import org.apache.flink.util.StringUtils; import com.fasterxml.jackson.core.JsonGenerator; +import java.io.IOException; import java.io.StringWriter; -import java.util.ArrayList; import java.util.Collection; -import java.util.List; +import java.util.Collections; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.Executor; import static java.util.Objects.requireNonNull; @@ -53,7 +55,8 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler { private final MetricFetcher fetcher; - public TaskManagersHandler(Time timeout, MetricFetcher fetcher) { + public TaskManagersHandler(Executor executor, Time timeout, MetricFetcher fetcher) { + super(executor); this.timeout = requireNonNull(timeout); this.fetcher = fetcher; } @@ -64,134 +67,139 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler { } @Override - public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception { - try { - if (jobManagerGateway != null) { - // whether one task manager's metrics are requested, or all task manager, we - // return them in an array. This avoids unnecessary code complexity. - // If only one task manager is requested, we only fetch one task manager metrics. - final List<Instance> instances = new ArrayList<>(); - if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) { - try { - InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(pathParams.get(TASK_MANAGER_ID_KEY))); - CompletableFuture<Optional<Instance>> tmInstanceFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, timeout); - - Optional<Instance> instance = tmInstanceFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); - - instance.ifPresent(instances::add); - } - // this means the id string was invalid. Keep the list empty. - catch (IllegalArgumentException e){ - // do nothing. - } - } else { - CompletableFuture<Collection<Instance>> tmInstancesFuture = jobManagerGateway.requestTaskManagerInstances(timeout); - - Collection<Instance> tmInstances = tmInstancesFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); - instances.addAll(tmInstances); - } + public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) { + if (jobManagerGateway != null) { + // whether one task manager's metrics are requested, or all task manager, we + // return them in an array. This avoids unnecessary code complexity. + // If only one task manager is requested, we only fetch one task manager metrics. + if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) { + InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(pathParams.get(TASK_MANAGER_ID_KEY))); + CompletableFuture<Optional<Instance>> tmInstanceFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, timeout); + + return tmInstanceFuture.thenApplyAsync( + (Optional<Instance> optTaskManager) -> { + try { + return writeTaskManagersJson( + optTaskManager.map(Collections::singleton).orElse(Collections.emptySet()), + pathParams); + } catch (IOException e) { + throw new FlinkFutureException("Could not write TaskManagers JSON.", e); + } + }, + executor); + } else { + CompletableFuture<Collection<Instance>> tmInstancesFuture = jobManagerGateway.requestTaskManagerInstances(timeout); + + return tmInstancesFuture.thenApplyAsync( + (Collection<Instance> taskManagers) -> { + try { + return writeTaskManagersJson(taskManagers, pathParams); + } catch (IOException e) { + throw new FlinkFutureException("Could not write TaskManagers JSON.", e); + } + }, + executor); + } + } + else { + return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager.")); + } + } - StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); - - gen.writeStartObject(); - gen.writeArrayFieldStart("taskmanagers"); - - for (Instance instance : instances) { - gen.writeStartObject(); - gen.writeStringField("id", instance.getId().toString()); - gen.writeStringField("path", instance.getTaskManagerGateway().getAddress()); - gen.writeNumberField("dataPort", instance.getTaskManagerLocation().dataPort()); - gen.writeNumberField("timeSinceLastHeartbeat", instance.getLastHeartBeat()); - gen.writeNumberField("slotsNumber", instance.getTotalNumberOfSlots()); - gen.writeNumberField("freeSlots", instance.getNumberOfAvailableSlots()); - gen.writeNumberField("cpuCores", instance.getResources().getNumberOfCPUCores()); - gen.writeNumberField("physicalMemory", instance.getResources().getSizeOfPhysicalMemory()); - gen.writeNumberField("freeMemory", instance.getResources().getSizeOfJvmHeap()); - gen.writeNumberField("managedMemory", instance.getResources().getSizeOfManagedMemory()); - - // only send metrics when only one task manager requests them. - if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) { - fetcher.update(); - MetricStore.TaskManagerMetricStore metrics = fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString()); - if (metrics != null) { - gen.writeObjectFieldStart("metrics"); - long heapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0")); - long heapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0")); - long heapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0")); - - gen.writeNumberField("heapCommitted", heapCommitted); - gen.writeNumberField("heapUsed", heapUsed); - gen.writeNumberField("heapMax", heapTotal); - - long nonHeapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0")); - long nonHeapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0")); - long nonHeapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0")); - - gen.writeNumberField("nonHeapCommitted", nonHeapCommitted); - gen.writeNumberField("nonHeapUsed", nonHeapUsed); - gen.writeNumberField("nonHeapMax", nonHeapTotal); - - gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted); - gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed); - gen.writeNumberField("totalMax", heapTotal + nonHeapTotal); - - long directCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0")); - long directUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0")); - long directMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0")); - - gen.writeNumberField("directCount", directCount); - gen.writeNumberField("directUsed", directUsed); - gen.writeNumberField("directMax", directMax); - - long mappedCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0")); - long mappedUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0")); - long mappedMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0")); - - gen.writeNumberField("mappedCount", mappedCount); - gen.writeNumberField("mappedUsed", mappedUsed); - gen.writeNumberField("mappedMax", mappedMax); - - long memorySegmentsAvailable = Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0")); - long memorySegmentsTotal = Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0")); - - gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable); - gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal); - - gen.writeArrayFieldStart("garbageCollectors"); - - for (String gcName : metrics.garbageCollectorNames) { - String count = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null); - String time = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null); - if (count != null && time != null) { - gen.writeStartObject(); - gen.writeStringField("name", gcName); - gen.writeNumberField("count", Long.valueOf(count)); - gen.writeNumberField("time", Long.valueOf(time)); - gen.writeEndObject(); - } - } - - gen.writeEndArray(); + private String writeTaskManagersJson(Collection<Instance> instances, Map<String, String> pathParams) throws IOException { + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + + gen.writeStartObject(); + gen.writeArrayFieldStart("taskmanagers"); + + for (Instance instance : instances) { + gen.writeStartObject(); + gen.writeStringField("id", instance.getId().toString()); + gen.writeStringField("path", instance.getTaskManagerGateway().getAddress()); + gen.writeNumberField("dataPort", instance.getTaskManagerLocation().dataPort()); + gen.writeNumberField("timeSinceLastHeartbeat", instance.getLastHeartBeat()); + gen.writeNumberField("slotsNumber", instance.getTotalNumberOfSlots()); + gen.writeNumberField("freeSlots", instance.getNumberOfAvailableSlots()); + gen.writeNumberField("cpuCores", instance.getResources().getNumberOfCPUCores()); + gen.writeNumberField("physicalMemory", instance.getResources().getSizeOfPhysicalMemory()); + gen.writeNumberField("freeMemory", instance.getResources().getSizeOfJvmHeap()); + gen.writeNumberField("managedMemory", instance.getResources().getSizeOfManagedMemory()); + + // only send metrics when only one task manager requests them. + if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) { + fetcher.update(); + MetricStore.TaskManagerMetricStore metrics = fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString()); + if (metrics != null) { + gen.writeObjectFieldStart("metrics"); + long heapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0")); + long heapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0")); + long heapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0")); + + gen.writeNumberField("heapCommitted", heapCommitted); + gen.writeNumberField("heapUsed", heapUsed); + gen.writeNumberField("heapMax", heapTotal); + + long nonHeapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0")); + long nonHeapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0")); + long nonHeapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0")); + + gen.writeNumberField("nonHeapCommitted", nonHeapCommitted); + gen.writeNumberField("nonHeapUsed", nonHeapUsed); + gen.writeNumberField("nonHeapMax", nonHeapTotal); + + gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted); + gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed); + gen.writeNumberField("totalMax", heapTotal + nonHeapTotal); + + long directCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0")); + long directUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0")); + long directMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0")); + + gen.writeNumberField("directCount", directCount); + gen.writeNumberField("directUsed", directUsed); + gen.writeNumberField("directMax", directMax); + + long mappedCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0")); + long mappedUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0")); + long mappedMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0")); + + gen.writeNumberField("mappedCount", mappedCount); + gen.writeNumberField("mappedUsed", mappedUsed); + gen.writeNumberField("mappedMax", mappedMax); + + long memorySegmentsAvailable = Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0")); + long memorySegmentsTotal = Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0")); + + gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable); + gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal); + + gen.writeArrayFieldStart("garbageCollectors"); + + for (String gcName : metrics.garbageCollectorNames) { + String count = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null); + String time = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null); + if (count != null && time != null) { + gen.writeStartObject(); + gen.writeStringField("name", gcName); + gen.writeNumberField("count", Long.valueOf(count)); + gen.writeNumberField("time", Long.valueOf(time)); gen.writeEndObject(); } } + gen.writeEndArray(); gen.writeEndObject(); } - - gen.writeEndArray(); - gen.writeEndObject(); - - gen.close(); - return writer.toString(); - } - else { - throw new Exception("No connection to the leading JobManager."); } + + gen.writeEndObject(); } - catch (Exception e) { - throw new RuntimeException("Failed to fetch list of all task managers: " + e.getMessage(), e); - } + + gen.writeEndArray(); + gen.writeEndObject(); + + gen.close(); + return writer.toString(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java index d4c9b2a..3affd7c 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints; +import org.apache.flink.runtime.concurrent.FlinkFutureException; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; @@ -34,6 +35,8 @@ import java.io.StringWriter; import java.util.Collection; import java.util.Collections; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; /** * Handler that returns a job's snapshotting settings. @@ -42,8 +45,8 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle private static final String CHECKPOINT_CONFIG_REST_PATH = "/jobs/:jobid/checkpoints/config"; - public CheckpointConfigHandler(ExecutionGraphHolder executionGraphHolder) { - super(executionGraphHolder); + public CheckpointConfigHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + super(executionGraphHolder, executor); } @Override @@ -52,8 +55,16 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle } @Override - public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception { - return createCheckpointConfigJson(graph); + public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) { + return CompletableFuture.supplyAsync( + () -> { + try { + return createCheckpointConfigJson(graph); + } catch (IOException e) { + throw new FlinkFutureException("Could not create checkpoint config json.", e); + } + }, + executor); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java index 664744b..96cc3e0 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats; import org.apache.flink.runtime.checkpoint.FailedCheckpointStats; import org.apache.flink.runtime.checkpoint.TaskStateStats; +import org.apache.flink.runtime.concurrent.FlinkFutureException; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler; @@ -40,6 +41,8 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; /** * Request handler that returns checkpoint stats for a single job vertex. @@ -50,8 +53,8 @@ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequest private final CheckpointStatsCache cache; - public CheckpointStatsDetailsHandler(ExecutionGraphHolder executionGraphHolder, CheckpointStatsCache cache) { - super(executionGraphHolder); + public CheckpointStatsDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, CheckpointStatsCache cache) { + super(executionGraphHolder, executor); this.cache = cache; } @@ -61,30 +64,38 @@ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequest } @Override - public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception { - long checkpointId = parseCheckpointId(params); - if (checkpointId == -1) { - return "{}"; - } - - CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot(); - if (snapshot == null) { - return "{}"; - } - - AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId); - - if (checkpoint != null) { - cache.tryAdd(checkpoint); - } else { - checkpoint = cache.tryGet(checkpointId); - - if (checkpoint == null) { - return "{}"; - } - } - - return createCheckpointDetailsJson(checkpoint); + public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) { + return CompletableFuture.supplyAsync( + () -> { + long checkpointId = parseCheckpointId(params); + if (checkpointId == -1) { + return "{}"; + } + + CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot(); + if (snapshot == null) { + return "{}"; + } + + AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId); + + if (checkpoint != null) { + cache.tryAdd(checkpoint); + } else { + checkpoint = cache.tryGet(checkpointId); + + if (checkpoint == null) { + return "{}"; + } + } + + try { + return createCheckpointDetailsJson(checkpoint); + } catch (IOException e) { + throw new FlinkFutureException("Could not create checkpoint details json.", e); + } + }, + executor); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java index d116c56..045248b 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; import org.apache.flink.runtime.checkpoint.MinMaxAvgStats; import org.apache.flink.runtime.checkpoint.SubtaskStateStats; import org.apache.flink.runtime.checkpoint.TaskStateStats; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmaster.JobManagerGateway; @@ -43,6 +44,8 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import static org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsHandler.writeMinMaxAvg; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -57,8 +60,8 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap private final CheckpointStatsCache cache; - public CheckpointStatsDetailsSubtasksHandler(ExecutionGraphHolder executionGraphHolder, CheckpointStatsCache cache) { - super(executionGraphHolder); + public CheckpointStatsDetailsSubtasksHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, CheckpointStatsCache cache) { + super(executionGraphHolder, executor); this.cache = checkNotNull(cache); } @@ -68,28 +71,28 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap } @Override - public String handleJsonRequest( - Map<String, String> pathParams, - Map<String, String> queryParams, - JobManagerGateway jobManagerGateway) throws Exception { + public CompletableFuture<String> handleJsonRequest( + Map<String, String> pathParams, + Map<String, String> queryParams, + JobManagerGateway jobManagerGateway) { return super.handleJsonRequest(pathParams, queryParams, jobManagerGateway); } @Override - public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception { + public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) { long checkpointId = CheckpointStatsDetailsHandler.parseCheckpointId(params); if (checkpointId == -1) { - return "{}"; + return CompletableFuture.completedFuture("{}"); } JobVertexID vertexId = AbstractJobVertexRequestHandler.parseJobVertexId(params); if (vertexId == null) { - return "{}"; + return CompletableFuture.completedFuture("{}"); } CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot(); if (snapshot == null) { - return "{}"; + return CompletableFuture.completedFuture("{}"); } AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId); @@ -100,16 +103,20 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap checkpoint = cache.tryGet(checkpointId); if (checkpoint == null) { - return "{}"; + return CompletableFuture.completedFuture("{}"); } } TaskStateStats taskStats = checkpoint.getTaskStateStats(vertexId); if (taskStats == null) { - return "{}"; + return CompletableFuture.completedFuture("{}"); } - return createSubtaskCheckpointDetailsJson(checkpoint, taskStats); + try { + return CompletableFuture.completedFuture(createSubtaskCheckpointDetailsJson(checkpoint, taskStats)); + } catch (IOException e) { + return FutureUtils.completedExceptionally(e); + } } /** http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java index a86c5fd..a60aee0 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary; import org.apache.flink.runtime.checkpoint.FailedCheckpointStats; import org.apache.flink.runtime.checkpoint.MinMaxAvgStats; import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats; +import org.apache.flink.runtime.concurrent.FlinkFutureException; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler; @@ -43,6 +44,8 @@ import java.io.StringWriter; import java.util.Collection; import java.util.Collections; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; /** * Handler that returns checkpoint statistics for a job. @@ -51,8 +54,8 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler private static final String CHECKPOINT_STATS_REST_PATH = "/jobs/:jobid/checkpoints"; - public CheckpointStatsHandler(ExecutionGraphHolder executionGraphHolder) { - super(executionGraphHolder); + public CheckpointStatsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + super(executionGraphHolder, executor); } @Override @@ -61,8 +64,16 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler } @Override - public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception { - return createCheckpointStatsJson(graph); + public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) { + return CompletableFuture.supplyAsync( + () -> { + try { + return createCheckpointStatsJson(graph); + } catch (IOException e) { + throw new FlinkFutureException("Could not create checkpoint stats json.", e); + } + }, + executor); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java index b95f2c4..cf286ce 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.webmonitor.metrics; +import org.apache.flink.runtime.concurrent.FlinkFutureException; import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.webmonitor.handlers.AbstractJsonRequestHandler; import org.apache.flink.runtime.webmonitor.handlers.JsonFactory; @@ -28,6 +29,8 @@ import com.fasterxml.jackson.core.JsonGenerator; import java.io.IOException; import java.io.StringWriter; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; /** * Abstract request handler that returns a list of all available metrics or the values for a set of metrics. @@ -43,17 +46,27 @@ import java.util.Map; public abstract class AbstractMetricsHandler extends AbstractJsonRequestHandler { private final MetricFetcher fetcher; - public AbstractMetricsHandler(MetricFetcher fetcher) { + public AbstractMetricsHandler(Executor executor, MetricFetcher fetcher) { + super(executor); this.fetcher = Preconditions.checkNotNull(fetcher); } @Override - public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception { - fetcher.update(); - String requestedMetricsList = queryParams.get("get"); - return requestedMetricsList != null - ? getMetricsValues(pathParams, requestedMetricsList) - : getAvailableMetricsList(pathParams); + public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) { + return CompletableFuture.supplyAsync( + () -> { + fetcher.update(); + String requestedMetricsList = queryParams.get("get"); + try { + return requestedMetricsList != null + ? getMetricsValues(pathParams, requestedMetricsList) + : getAvailableMetricsList(pathParams); + } catch (IOException e) { + throw new FlinkFutureException("Could not retrieve metrics.", e); + } + }, + executor); + } /** http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java index 7252d8a..2bd6683 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.webmonitor.metrics; import java.util.Map; +import java.util.concurrent.Executor; /** * Request handler that returns for the job manager a list of all available metrics or the values for a set of metrics. @@ -35,8 +36,8 @@ public class JobManagerMetricsHandler extends AbstractMetricsHandler { private static final String JOBMANAGER_METRICS_REST_PATH = "/jobmanager/metrics"; - public JobManagerMetricsHandler(MetricFetcher fetcher) { - super(fetcher); + public JobManagerMetricsHandler(Executor executor, MetricFetcher fetcher) { + super(executor, fetcher); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java index a193457..e5e2500 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.webmonitor.metrics; import java.util.Map; +import java.util.concurrent.Executor; /** * Request handler that returns for a given job a list of all available metrics or the values for a set of metrics. @@ -35,8 +36,8 @@ public class JobMetricsHandler extends AbstractMetricsHandler { public static final String PARAMETER_JOB_ID = "jobid"; private static final String JOB_METRICS_REST_PATH = "/jobs/:jobid/metrics"; - public JobMetricsHandler(MetricFetcher fetcher) { - super(fetcher); + public JobMetricsHandler(Executor executor, MetricFetcher fetcher) { + super(executor, fetcher); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java index e893da4..1d2cd84 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.webmonitor.metrics; import java.util.Map; +import java.util.concurrent.Executor; /** * Request handler that returns for a given task a list of all available metrics or the values for a set of metrics. @@ -35,8 +36,8 @@ public class JobVertexMetricsHandler extends AbstractMetricsHandler { public static final String PARAMETER_VERTEX_ID = "vertexid"; private static final String JOB_VERTEX_METRICS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/metrics"; - public JobVertexMetricsHandler(MetricFetcher fetcher) { - super(fetcher); + public JobVertexMetricsHandler(Executor executor, MetricFetcher fetcher) { + super(executor, fetcher); } @Override