http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java index ec8516d..745a110 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java @@ -47,10 +47,10 @@ public class JarUploadHandler extends AbstractJsonRequestHandler { Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { - + String tempFilePath = queryParams.get("filepath"); String filename = queryParams.get("filename"); - + File tempFile; if (tempFilePath != null && (tempFile = new File(tempFilePath)).exists()) { if (!tempFile.getName().endsWith(".jar")) { @@ -58,7 +58,7 @@ public class JarUploadHandler extends AbstractJsonRequestHandler { tempFile.delete(); return "{\"error\": \"Only Jar files are allowed.\"}"; } - + String filenameWithUUID = UUID.randomUUID() + "_" + filename; File newFile = new File(jarDir, filenameWithUUID); if (tempFile.renameTo(newFile)) { @@ -70,7 +70,7 @@ public class JarUploadHandler extends AbstractJsonRequestHandler { tempFile.delete(); } } - + return "{\"error\": \"Failed to upload the file.\"}"; } }
http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java index c403aa2..163e583 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java @@ -18,13 +18,14 @@ package org.apache.flink.runtime.webmonitor.handlers; -import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import com.fasterxml.jackson.core.JsonGenerator; + import java.io.IOException; import java.io.StringWriter; import java.util.Collection; @@ -37,7 +38,7 @@ import java.util.Map; public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler { private static final String JOB_ACCUMULATORS_REST_PATH = "/jobs/:jobid/accumulators"; - + public JobAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) { super(executionGraphHolder); } @@ -52,6 +53,9 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler return createJobAccumulatorsJson(graph); } + /** + * Archivist for the JobAccumulatorsHandler. + */ public static class JobAccumulatorsJsonArchivist implements JsonArchivist { @Override @@ -65,7 +69,7 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler public static String createJobAccumulatorsJson(AccessExecutionGraph graph) throws IOException { StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); StringifiedAccumulatorResult[] allAccumulators = graph.getAccumulatorResultsStringified(); @@ -74,7 +78,7 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler gen.writeArrayFieldStart("job-accumulators"); // empty for now gen.writeEndArray(); - + gen.writeArrayFieldStart("user-task-accumulators"); for (StringifiedAccumulatorResult acc : allAccumulators) { gen.writeStartObject(); http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java index f5d6853..3f7b824 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java @@ -18,14 +18,6 @@ package org.apache.flink.runtime.webmonitor.handlers; -import akka.dispatch.OnComplete; -import com.fasterxml.jackson.core.JsonGenerator; -import io.netty.buffer.Unpooled; -import io.netty.handler.codec.http.DefaultFullHttpResponse; -import io.netty.handler.codec.http.FullHttpResponse; -import io.netty.handler.codec.http.HttpHeaders; -import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.codec.http.HttpVersion; import org.apache.flink.api.common.JobID; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; @@ -36,11 +28,18 @@ import org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoi import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure; import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; + +import akka.dispatch.OnComplete; +import com.fasterxml.jackson.core.JsonGenerator; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; import javax.annotation.Nullable; + import java.io.IOException; import java.io.StringWriter; import java.nio.charset.Charset; @@ -48,6 +47,10 @@ import java.util.ArrayDeque; import java.util.HashMap; import java.util.Map; +import scala.concurrent.ExecutionContext; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -67,13 +70,13 @@ public class JobCancellationWithSavepointHandlers { /** Shared lock between Trigger and In-Progress handlers. */ private final Object lock = new Object(); - /** In-Progress requests */ + /** In-Progress requests. */ private final Map<JobID, Long> inProgress = new HashMap<>(); /** Succeeded/failed request. Either String or Throwable. */ private final Map<Long, Object> completed = new HashMap<>(); - /** Atomic request counter */ + /** Atomic request counter. */ private long requestCounter; /** Handler for trigger requests. */ @@ -244,7 +247,7 @@ public class JobCancellationWithSavepointHandlers { // Accepted response StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); gen.writeStartObject(); gen.writeStringField("status", "accepted"); gen.writeNumberField("request-id", requestId); @@ -283,7 +286,7 @@ public class JobCancellationWithSavepointHandlers { /** The number of recent checkpoints whose IDs are remembered. */ private static final int NUM_GHOST_REQUEST_IDS = 16; - /** Remember some recently completed */ + /** Remember some recently completed. */ private final ArrayDeque<Tuple2<Long, Object>> recentlyCompleted = new ArrayDeque<>(NUM_GHOST_REQUEST_IDS); @Override @@ -324,7 +327,7 @@ public class JobCancellationWithSavepointHandlers { if (inProgressRequestId == requestId) { return createInProgressResponse(requestId); } else { - String msg= "Request ID does not belong to JobID"; + String msg = "Request ID does not belong to JobID"; return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, msg); } } @@ -355,7 +358,7 @@ public class JobCancellationWithSavepointHandlers { private FullHttpResponse createSuccessResponse(long requestId, String savepointPath) throws IOException { StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); gen.writeStartObject(); gen.writeStringField("status", "success"); @@ -381,7 +384,7 @@ public class JobCancellationWithSavepointHandlers { private FullHttpResponse createInProgressResponse(long requestId) throws IOException { StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); gen.writeStartObject(); gen.writeStringField("status", "in-progress"); @@ -406,7 +409,7 @@ public class JobCancellationWithSavepointHandlers { private FullHttpResponse createFailureResponse(HttpResponseStatus code, long requestId, String errMsg) throws IOException { StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); gen.writeStartObject(); gen.writeStringField("status", "failed"); http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java index 2b96456..72cf8b7 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java @@ -18,19 +18,19 @@ package org.apache.flink.runtime.webmonitor.handlers; -import java.io.IOException; -import java.io.StringWriter; -import java.util.Map; - -import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.api.common.ArchivedExecutionConfig; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import com.fasterxml.jackson.core.JsonGenerator; + +import java.io.IOException; +import java.io.StringWriter; import java.util.Collection; import java.util.Collections; +import java.util.Map; /** * Request handler that returns the execution config of a job. @@ -53,6 +53,9 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler { return createJobConfigJson(graph); } + /** + * Archivist for the JobConfigHandler. + */ public static class JobConfigJsonArchivist implements JsonArchivist { @Override @@ -66,7 +69,7 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler { public static String createJobConfigJson(AccessExecutionGraph graph) throws IOException { StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); gen.writeStartObject(); gen.writeStringField("jid", graph.getJobID().toString()); @@ -86,7 +89,7 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler { Map<String, String> ucVals = summary.getGlobalJobParameters(); if (ucVals != null) { gen.writeObjectFieldStart("user-config"); - + for (Map.Entry<String, String> ucVal : ucVals.entrySet()) { gen.writeStringField(ucVal.getKey(), ucVal.getValue()); } @@ -97,7 +100,7 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler { gen.writeEndObject(); } gen.writeEndObject(); - + gen.close(); return writer.toString(); } http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java index 37a1c19..87ac7c3 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.webmonitor.handlers; -import com.fasterxml.jackson.core.JsonGenerator; - import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; @@ -32,7 +30,10 @@ import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics; +import com.fasterxml.jackson.core.JsonGenerator; + import javax.annotation.Nullable; + import java.io.IOException; import java.io.StringWriter; import java.util.ArrayList; @@ -40,7 +41,7 @@ import java.util.Collection; import java.util.Map; /** - * Request handler that returns details about a job, including: + * Request handler that returns details about a job. This includes: * <ul> * <li>Dataflow plan</li> * <li>id, name, and current status</li> @@ -71,6 +72,9 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler { return createJobDetailsJson(graph, fetcher); } + /** + * Archivist for the JobDetailsHandler. + */ public static class JobDetailsJsonArchivist implements JsonArchivist { @Override @@ -89,18 +93,18 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler { public static String createJobDetailsJson(AccessExecutionGraph graph, @Nullable MetricFetcher fetcher) throws IOException { final StringWriter writer = new StringWriter(); - final JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + final JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); final long now = System.currentTimeMillis(); - + gen.writeStartObject(); - + // basic info gen.writeStringField("jid", graph.getJobID().toString()); gen.writeStringField("name", graph.getJobName()); gen.writeBooleanField("isStoppable", graph.isStoppable()); gen.writeStringField("state", graph.getState().name()); - + // times and duration final long jobStartTime = graph.getStatusTimestamp(JobStatus.CREATED); final long jobEndTime = graph.getState().isGloballyTerminalState() ? @@ -109,14 +113,14 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler { gen.writeNumberField("end-time", jobEndTime); gen.writeNumberField("duration", (jobEndTime > 0 ? jobEndTime : now) - jobStartTime); gen.writeNumberField("now", now); - + // timestamps gen.writeObjectFieldStart("timestamps"); for (JobStatus status : JobStatus.values()) { gen.writeNumberField(status.name(), graph.getStatusTimestamp(status)); } gen.writeEndObject(); - + // job vertices int[] jobVerticesPerState = new int[ExecutionState.values().length]; gen.writeArrayFieldStart("vertices"); @@ -126,7 +130,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler { long startTime = Long.MAX_VALUE; long endTime = 0; boolean allFinished = true; - + for (AccessExecutionVertex vertex : ejv.getTaskVertices()) { final ExecutionState state = vertex.getExecutionState(); tasksPerState[state.ordinal()]++; @@ -136,11 +140,11 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler { if (started > 0) { startTime = Math.min(startTime, started); } - + allFinished &= state.isTerminal(); endTime = Math.max(endTime, vertex.getStateTimestamp(state)); } - + long duration; if (startTime < Long.MAX_VALUE) { if (allFinished) { @@ -156,8 +160,8 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler { endTime = -1L; duration = -1L; } - - ExecutionState jobVertexState = + + ExecutionState jobVertexState = ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, ejv.getParallelism()); jobVerticesPerState[jobVertexState.ordinal()]++; @@ -170,13 +174,13 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler { gen.writeNumberField("start-time", startTime); gen.writeNumberField("end-time", endTime); gen.writeNumberField("duration", duration); - + gen.writeObjectFieldStart("tasks"); for (ExecutionState state : ExecutionState.values()) { gen.writeNumberField(state.name(), tasksPerState[state.ordinal()]); } gen.writeEndObject(); - + MutableIOMetrics counts = new MutableIOMetrics(); for (AccessExecutionVertex vertex : ejv.getTaskVertices()) { @@ -188,7 +192,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler { } counts.writeIOMetricsAsJson(gen); - + gen.writeEndObject(); } gen.writeEndArray(); http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java index 81cdc83..181b270 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.webmonitor.handlers; -import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -27,6 +26,8 @@ import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.util.ExceptionUtils; +import com.fasterxml.jackson.core.JsonGenerator; + import java.io.IOException; import java.io.StringWriter; import java.util.Collection; @@ -41,7 +42,7 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler { private static final String JOB_EXCEPTIONS_REST_PATH = "/jobs/:jobid/exceptions"; static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20; - + public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder) { super(executionGraphHolder); } @@ -56,6 +57,9 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler { return createJobExceptionsJson(graph); } + /** + * Archivist for the JobExceptionsHandler. + */ public static class JobExceptionsJsonArchivist implements JsonArchivist { @Override @@ -69,10 +73,10 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler { public static String createJobExceptionsJson(AccessExecutionGraph graph) throws IOException { StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); gen.writeStartObject(); - + // most important is the root failure cause String rootException = graph.getFailureCauseAsString(); if (rootException != null && !rootException.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) { @@ -84,7 +88,7 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler { int numExceptionsSoFar = 0; boolean truncated = false; - + for (AccessExecutionVertex task : graph.getAllExecutionVertices()) { String t = task.getFailureCauseAsString(); if (t != null && !t.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) { http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java index 5fcf010..d1aeea4 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java @@ -18,10 +18,11 @@ package org.apache.flink.runtime.webmonitor.handlers; -import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.instance.ActorGateway; +import com.fasterxml.jackson.core.JsonGenerator; + import java.io.StringWriter; import java.util.Map; @@ -46,7 +47,7 @@ public class JobManagerConfigHandler extends AbstractJsonRequestHandler { @Override public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); gen.writeStartArray(); for (String key : config.keySet()) { @@ -54,9 +55,9 @@ public class JobManagerConfigHandler extends AbstractJsonRequestHandler { gen.writeStringField("key", key); // Mask key values which contain sensitive information - if(key.toLowerCase().contains("password")) { + if (key.toLowerCase().contains("password")) { String value = config.getString(key, null); - if(value != null) { + if (value != null) { value = "******"; } gen.writeStringField("value", value); http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java index 885d04e..d17b6bb 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java @@ -34,7 +34,7 @@ import java.util.Map; public class JobPlanHandler extends AbstractExecutionGraphRequestHandler { private static final String JOB_PLAN_REST_PATH = "/jobs/:jobid/plan"; - + public JobPlanHandler(ExecutionGraphHolder executionGraphHolder) { super(executionGraphHolder); } @@ -49,6 +49,9 @@ public class JobPlanHandler extends AbstractExecutionGraphRequestHandler { return graph.getJsonPlan(); } + /** + * Archivist for the JobPlanHandler. + */ public static class JobPlanJsonArchivist implements JsonArchivist { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java index 2532a1e..8e90dfc 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.webmonitor.handlers; -import com.fasterxml.jackson.core.JsonGenerator; - import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; @@ -27,6 +25,8 @@ import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import com.fasterxml.jackson.core.JsonGenerator; + import java.io.IOException; import java.io.StringWriter; import java.util.ArrayList; @@ -34,11 +34,13 @@ import java.util.Collection; import java.util.List; import java.util.Map; - +/** + * Request handler that returns the accummulators for a given vertex. + */ public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandler { private static final String JOB_VERTEX_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/accumulators"; - + public JobVertexAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) { super(executionGraphHolder); } @@ -53,6 +55,9 @@ public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandle return createVertexAccumulatorsJson(jobVertex); } + /** + * Archivist for JobVertexAccumulatorsHandler. + */ public static class JobVertexAccumulatorsJsonArchivist implements JsonArchivist { @Override @@ -71,13 +76,13 @@ public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandle public static String createVertexAccumulatorsJson(AccessExecutionJobVertex jobVertex) throws IOException { StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); StringifiedAccumulatorResult[] accs = jobVertex.getAggregatedUserAccumulatorsStringified(); gen.writeStartObject(); gen.writeStringField("id", jobVertex.getJobVertexId().toString()); - + gen.writeArrayFieldStart("user-accumulators"); for (StringifiedAccumulatorResult acc : accs) { gen.writeStartObject(); @@ -87,7 +92,7 @@ public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandle gen.writeEndObject(); } gen.writeEndArray(); - + gen.writeEndObject(); gen.close(); http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java index 52167e1..cde8ca9 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java @@ -18,18 +18,20 @@ package org.apache.flink.runtime.webmonitor.handlers; -import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.webmonitor.BackPressureStatsTracker; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.OperatorBackPressureStats; -import scala.Option; + +import com.fasterxml.jackson.core.JsonGenerator; import java.io.StringWriter; import java.util.Map; +import scala.Option; + import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -72,7 +74,7 @@ public class JobVertexBackPressureHandler extends AbstractJobVertexRequestHandle } ExecutionJobVertex jobVertex = (ExecutionJobVertex) accessJobVertex; try (StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer)) { + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer)) { gen.writeStartObject(); http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java index d9a1131..7757fdd 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.webmonitor.handlers; -import com.fasterxml.jackson.core.JsonGenerator; - import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; @@ -31,7 +29,10 @@ import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics; +import com.fasterxml.jackson.core.JsonGenerator; + import javax.annotation.Nullable; + import java.io.IOException; import java.io.StringWriter; import java.util.ArrayList; @@ -45,7 +46,7 @@ import java.util.Map; */ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler { - private static String JOB_VERTEX_DETAILS_REST_PATH = "/jobs/:jobid/vertices/:vertexid"; + private static final String JOB_VERTEX_DETAILS_REST_PATH = "/jobs/:jobid/vertices/:vertexid"; private final MetricFetcher fetcher; @@ -64,6 +65,9 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler { return createVertexDetailsJson(jobVertex, params.get("jobid"), fetcher); } + /** + * Archivist for the JobVertexDetailsHandler. + */ public static class JobVertexDetailsJsonArchivist implements JsonArchivist { @Override @@ -85,9 +89,9 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler { String jobID, @Nullable MetricFetcher fetcher) throws IOException { final long now = System.currentTimeMillis(); - + StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); gen.writeStartObject(); @@ -100,7 +104,7 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler { int num = 0; for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) { final ExecutionState status = vertex.getExecutionState(); - + TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation(); String locationString = location == null ? "(unassigned)" : location.getHostname() + ":" + location.dataPort(); @@ -110,7 +114,7 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler { } long endTime = status.isTerminal() ? vertex.getStateTimestamp(status) : -1; long duration = startTime > 0 ? ((endTime > 0 ? endTime : now) - startTime) : -1; - + gen.writeStartObject(); gen.writeNumberField("subtask", num); gen.writeStringField("status", status.name()); @@ -130,13 +134,13 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler { ); counts.writeIOMetricsAsJson(gen); - + gen.writeEndObject(); - + num++; } gen.writeEndArray(); - + gen.writeEndObject(); gen.close(); http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java index 3878722..a612782 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.webmonitor.handlers; -import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; @@ -31,7 +30,10 @@ import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics; +import com.fasterxml.jackson.core.JsonGenerator; + import javax.annotation.Nullable; + import java.io.IOException; import java.io.StringWriter; import java.util.ArrayList; @@ -65,6 +67,9 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle return createVertexDetailsByTaskManagerJson(jobVertex, params.get("jobid"), fetcher); } + /** + * Archivist for JobVertexTaskManagersHandler. + */ public static class JobVertexTaskManagersJsonArchivist implements JsonArchivist { @Override @@ -86,7 +91,7 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle String jobID, @Nullable MetricFetcher fetcher) throws IOException { StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); // Build a map that groups tasks by TaskManager Map<String, List<AccessExecutionVertex>> taskManagerVertices = new HashMap<>(); @@ -108,7 +113,6 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle // Build JSON response final long now = System.currentTimeMillis(); - gen.writeStartObject(); gen.writeStringField("id", jobVertex.getJobVertexId().toString()); http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonFactory.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonFactory.java index e886532..4ce0baf 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonFactory.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonFactory.java @@ -26,11 +26,10 @@ package org.apache.flink.runtime.webmonitor.handlers; public class JsonFactory { /** The singleton Jackson JSON factory. */ - public static final com.fasterxml.jackson.core.JsonFactory jacksonFactory = + public static final com.fasterxml.jackson.core.JsonFactory JACKSON_FACTORY = new com.fasterxml.jackson.core.JsonFactory(); - + // -------------------------------------------------------------------------------------------- - - /** Don't instantiate */ + private JsonFactory() {} } http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java index b6246e6..66c30af 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java @@ -18,9 +18,10 @@ package org.apache.flink.runtime.webmonitor.handlers; -import io.netty.handler.codec.http.FullHttpResponse; import org.apache.flink.runtime.instance.ActorGateway; +import io.netty.handler.codec.http.FullHttpResponse; + import java.util.Map; /** @@ -36,13 +37,13 @@ public interface RequestHandler { * respond with a full http response, including content-type, content-length, etc. * * <p>Exceptions may be throws and will be handled. - * + * * @param pathParams The map of REST path parameters, decoded by the router. * @param queryParams The map of query parameters. * @param jobManager The JobManager actor. * * @return The full http response. - * + * * @throws Exception Handlers may forward exceptions. Exceptions of type * {@link org.apache.flink.runtime.webmonitor.NotFoundException} will cause a HTTP 404 * response with the exception message, other exceptions will cause a HTTP 500 response http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java index 4cf5f0f..28e9ddf 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java @@ -30,7 +30,7 @@ import java.util.Map; public class SubtaskCurrentAttemptDetailsHandler extends SubtaskExecutionAttemptDetailsHandler { public static final String SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum"; - + public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) { super(executionGraphHolder, fetcher); } http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java index 9026a22..171277f 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.webmonitor.handlers; -import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.executiongraph.AccessExecution; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; @@ -28,6 +27,8 @@ import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import com.fasterxml.jackson.core.JsonGenerator; + import java.io.IOException; import java.io.StringWriter; import java.util.ArrayList; @@ -37,12 +38,12 @@ import java.util.Map; /** * Base class for request handlers whose response depends on a specific job vertex (defined - * via the "vertexid" parameter) in a specific job, defined via (defined voa the "jobid" parameter). + * via the "vertexid" parameter) in a specific job, defined via (defined voa the "jobid" parameter). */ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskAttemptRequestHandler { private static final String SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators"; - + public SubtaskExecutionAttemptAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) { super(executionGraphHolder); } @@ -56,7 +57,10 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA public String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception { return createAttemptAccumulatorsJson(execAttempt); } - + + /** + * Archivist for the SubtaskExecutionAttemptAccumulatorsHandler. + */ public static class SubtaskExecutionAttemptAccumulatorsJsonArchivist implements JsonArchivist { @Override @@ -91,8 +95,8 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA public static String createAttemptAccumulatorsJson(AccessExecution execAttempt) throws IOException { StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); - + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + final StringifiedAccumulatorResult[] accs = execAttempt.getUserAccumulatorsStringified(); gen.writeStartObject(); @@ -100,7 +104,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA gen.writeNumberField("subtask", execAttempt.getParallelSubtaskIndex()); gen.writeNumberField("attempt", execAttempt.getAttemptNumber()); gen.writeStringField("id", execAttempt.getAttemptId().toString()); - + gen.writeArrayFieldStart("user-accumulators"); for (StringifiedAccumulatorResult acc : accs) { gen.writeStartObject(); @@ -110,9 +114,9 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA gen.writeEndObject(); } gen.writeEndArray(); - + gen.writeEndObject(); - + gen.close(); return writer.toString(); } http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java index 078f54a..37c0e50 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.webmonitor.handlers; -import com.fasterxml.jackson.core.JsonGenerator; - import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecution; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; @@ -32,7 +30,10 @@ import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics; +import com.fasterxml.jackson.core.JsonGenerator; + import javax.annotation.Nullable; + import java.io.IOException; import java.io.StringWriter; import java.util.ArrayList; @@ -66,6 +67,9 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp return createAttemptDetailsJson(execAttempt, params.get("jobid"), params.get("vertexid"), fetcher); } + /** + * Archivist for the SubtaskExecutionAttemptDetailsHandler. + */ public static class SubtaskExecutionAttemptDetailsJsonArchivist implements JsonArchivist { @Override @@ -83,7 +87,7 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp .replace(":vertexid", task.getJobVertexId().toString()) .replace(":subtasknum", String.valueOf(subtask.getParallelSubtaskIndex())) .replace(":attempt", String.valueOf(subtask.getCurrentExecutionAttempt().getAttemptNumber())); - + archive.add(new ArchivedJson(curAttemptPath1, curAttemptJson)); archive.add(new ArchivedJson(curAttemptPath2, curAttemptJson)); @@ -109,7 +113,7 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp String vertexID, @Nullable MetricFetcher fetcher) throws IOException { StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); final ExecutionState status = execAttempt.getState(); final long now = System.currentTimeMillis(); @@ -141,7 +145,7 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp jobID, vertexID ); - + counts.writeIOMetricsAsJson(gen); gen.writeEndObject(); http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java index 6c3bc18..64bdfb4 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.webmonitor.handlers; -import com.fasterxml.jackson.core.JsonGenerator; - import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; @@ -29,6 +27,8 @@ import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import com.fasterxml.jackson.core.JsonGenerator; + import java.io.IOException; import java.io.StringWriter; import java.util.ArrayList; @@ -42,7 +42,7 @@ import java.util.Map; public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHandler { private static final String SUBTASKS_ALL_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/accumulators"; - + public SubtasksAllAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) { super(executionGraphHolder); } @@ -57,6 +57,9 @@ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHand return createSubtasksAccumulatorsJson(jobVertex); } + /** + * Archivist for the SubtasksAllAccumulatorsHandler. + */ public static class SubtasksAllAccumulatorsJsonArchivist implements JsonArchivist { @Override @@ -75,22 +78,22 @@ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHand public static String createSubtasksAccumulatorsJson(AccessExecutionJobVertex jobVertex) throws IOException { StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); gen.writeStartObject(); gen.writeStringField("id", jobVertex.getJobVertexId().toString()); gen.writeNumberField("parallelism", jobVertex.getParallelism()); gen.writeArrayFieldStart("subtasks"); - + int num = 0; for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) { TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation(); String locationString = location == null ? "(unassigned)" : location.getHostname(); - + gen.writeStartObject(); - + gen.writeNumberField("subtask", num++); gen.writeNumberField("attempt", vertex.getCurrentExecutionAttempt().getAttemptNumber()); gen.writeStringField("host", locationString); @@ -105,7 +108,7 @@ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHand gen.writeEndObject(); } gen.writeEndArray(); - + gen.writeEndObject(); } gen.writeEndArray(); http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java index adefa80..ea88587 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.webmonitor.handlers; -import com.fasterxml.jackson.core.JsonGenerator; - import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; @@ -29,6 +27,8 @@ import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import com.fasterxml.jackson.core.JsonGenerator; + import java.io.IOException; import java.io.StringWriter; import java.util.ArrayList; @@ -43,7 +43,7 @@ import java.util.Map; public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler { private static final String SUBTASK_TIMES_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasktimes"; - + public SubtasksTimesHandler(ExecutionGraphHolder executionGraphHolder) { super(executionGraphHolder); } @@ -58,6 +58,9 @@ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler { return createSubtaskTimesJson(jobVertex); } + /** + * Archivist for the SubtasksTimesHandler. + */ public static class SubtasksTimesJsonArchivist implements JsonArchivist { @Override @@ -78,28 +81,28 @@ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler { final long now = System.currentTimeMillis(); StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); gen.writeStartObject(); gen.writeStringField("id", jobVertex.getJobVertexId().toString()); gen.writeStringField("name", jobVertex.getName()); gen.writeNumberField("now", now); - + gen.writeArrayFieldStart("subtasks"); int num = 0; for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) { - + long[] timestamps = vertex.getCurrentExecutionAttempt().getStateTimestamps(); ExecutionState status = vertex.getExecutionState(); long scheduledTime = timestamps[ExecutionState.SCHEDULED.ordinal()]; - + long start = scheduledTime > 0 ? scheduledTime : -1; long end = status.isTerminal() ? timestamps[status.ordinal()] : now; long duration = start >= 0 ? end - start : -1L; - + gen.writeStartObject(); gen.writeNumberField("subtask", num++); @@ -108,13 +111,13 @@ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler { gen.writeStringField("host", locationString); gen.writeNumberField("duration", duration); - + gen.writeObjectFieldStart("timestamps"); for (ExecutionState state : ExecutionState.values()) { gen.writeNumberField(state.name(), timestamps[state.ordinal()]); } gen.writeEndObject(); - + gen.writeEndObject(); } http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java index 53ee336..1084623 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java @@ -26,24 +26,6 @@ package org.apache.flink.runtime.webmonitor.handlers; * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java *****************************************************************************/ -import akka.dispatch.Mapper; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.DefaultFileRegion; -import io.netty.handler.codec.http.DefaultHttpResponse; -import io.netty.handler.codec.http.HttpChunkedInput; -import io.netty.handler.codec.http.HttpHeaders; -import io.netty.handler.codec.http.HttpRequest; -import io.netty.handler.codec.http.HttpResponse; -import io.netty.handler.codec.http.LastHttpContent; -import io.netty.handler.codec.http.router.Routed; -import io.netty.handler.ssl.SslHandler; -import io.netty.handler.stream.ChunkedFile; -import io.netty.util.concurrent.GenericFutureListener; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; @@ -65,12 +47,27 @@ import org.apache.flink.runtime.webmonitor.JobManagerRetriever; import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; + +import akka.dispatch.Mapper; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.DefaultFileRegion; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.HttpChunkedInput; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http.router.Routed; +import io.netty.handler.ssl.SslHandler; +import io.netty.handler.stream.ChunkedFile; +import io.netty.util.concurrent.GenericFutureListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Option; -import scala.concurrent.ExecutionContextExecutor; -import scala.concurrent.duration.FiniteDuration; -import scala.reflect.ClassTag$; import java.io.File; import java.io.FileNotFoundException; @@ -81,6 +78,11 @@ import java.nio.channels.FileChannel; import java.util.HashMap; import java.util.concurrent.ConcurrentHashMap; +import scala.Option; +import scala.concurrent.ExecutionContextExecutor; +import scala.concurrent.duration.FiniteDuration; +import scala.reflect.ClassTag$; + import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; import static io.netty.handler.codec.http.HttpResponseStatus.OK; @@ -100,19 +102,19 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase { private static final String TASKMANAGER_LOG_REST_PATH = "/taskmanagers/:taskmanagerid/log"; private static final String TASKMANAGER_OUT_REST_PATH = "/taskmanagers/:taskmanagerid/stdout"; - /** Keep track of last transmitted log, to clean up old ones */ + /** Keep track of last transmitted log, to clean up old ones. */ private final HashMap<String, BlobKey> lastSubmittedLog = new HashMap<>(); private final HashMap<String, BlobKey> lastSubmittedStdout = new HashMap<>(); - /** Keep track of request status, prevents multiple log requests for a single TM running concurrently */ + /** Keep track of request status, prevents multiple log requests for a single TM running concurrently. */ private final ConcurrentHashMap<String, Boolean> lastRequestPending = new ConcurrentHashMap<>(); private final Configuration config; - /** Future of the blob cache */ + /** Future of the blob cache. */ private Future<BlobCache> cache; - /** Indicates which log file should be displayed; true indicates .log, false indicates .out */ - private boolean serveLogFile; + /** Indicates which log file should be displayed. */ + private FileMode fileMode; private final ExecutionContextExecutor executor; @@ -120,6 +122,7 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase { private final BlobView blobView; + /** Used to control whether this handler serves the .log or .out file. */ public enum FileMode { LOG, STDOUT @@ -138,14 +141,7 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase { this.executor = checkNotNull(executor); this.config = config; - switch (fileMode) { - case LOG: - serveLogFile = true; - break; - case STDOUT: - serveLogFile = false; - break; - } + this.fileMode = fileMode; this.blobView = Preconditions.checkNotNull(blobView, "blobView"); @@ -154,10 +150,12 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase { @Override public String[] getPaths() { - if (serveLogFile) { - return new String[]{TASKMANAGER_LOG_REST_PATH}; - } else { - return new String[]{TASKMANAGER_OUT_REST_PATH}; + switch (fileMode) { + case LOG: + return new String[]{TASKMANAGER_LOG_REST_PATH}; + case STDOUT: + default: + return new String[]{TASKMANAGER_OUT_REST_PATH}; } } @@ -199,10 +197,12 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase { public Future<BlobKey> apply(JobManagerMessages.TaskManagerInstance value) { Instance taskManager = value.instance().get(); - if (serveLogFile) { - return taskManager.getTaskManagerGateway().requestTaskManagerLog(timeTimeout); - } else { - return taskManager.getTaskManagerGateway().requestTaskManagerStdout(timeTimeout); + switch (fileMode) { + case LOG: + return taskManager.getTaskManagerGateway().requestTaskManagerLog(timeTimeout); + case STDOUT: + default: + return taskManager.getTaskManagerGateway().requestTaskManagerStdout(timeTimeout); } } }); @@ -223,7 +223,7 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase { final BlobCache blobCache = value.f1; //delete previous log file, if it is different than the current one - HashMap<String, BlobKey> lastSubmittedFile = serveLogFile ? lastSubmittedLog : lastSubmittedStdout; + HashMap<String, BlobKey> lastSubmittedFile = fileMode == FileMode.LOG ? lastSubmittedLog : lastSubmittedStdout; if (lastSubmittedFile.containsKey(taskManagerID)) { if (!blobKey.equals(lastSubmittedFile.get(taskManagerID))) { try { http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java index a23e983..6ad490e 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.webmonitor.handlers; -import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.InstanceID; @@ -28,28 +27,34 @@ import org.apache.flink.runtime.messages.JobManagerMessages.TaskManagerInstance; import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; import org.apache.flink.runtime.webmonitor.metrics.MetricStore; import org.apache.flink.util.StringUtils; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; + +import com.fasterxml.jackson.core.JsonGenerator; import java.io.StringWriter; import java.util.ArrayList; import java.util.List; import java.util.Map; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + import static java.util.Objects.requireNonNull; +/** + * A request handler that provides an overview over all taskmanagers or details for a single one. + */ public class TaskManagersHandler extends AbstractJsonRequestHandler { private static final String TASKMANAGERS_REST_PATH = "/taskmanagers"; private static final String TASKMANAGER_DETAILS_REST_PATH = "/taskmanagers/:taskmanagerid"; public static final String TASK_MANAGER_ID_KEY = "taskmanagerid"; - + private final FiniteDuration timeout; private final MetricFetcher fetcher; - + public TaskManagersHandler(FiniteDuration timeout, MetricFetcher fetcher) { this.timeout = requireNonNull(timeout); this.fetcher = fetcher; @@ -88,7 +93,7 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler { } StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); gen.writeStartObject(); gen.writeArrayFieldStart("taskmanagers"); @@ -112,17 +117,17 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler { MetricStore.TaskManagerMetricStore metrics = fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString()); if (metrics != null) { gen.writeObjectFieldStart("metrics"); - long heapUsed = Long.valueOf( metrics.getMetric("Status.JVM.Memory.Heap.Used", "0")); - long heapCommitted = Long.valueOf( metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0")); - long heapTotal = Long.valueOf( metrics.getMetric("Status.JVM.Memory.Heap.Max", "0")); + long heapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0")); + long heapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0")); + long heapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0")); gen.writeNumberField("heapCommitted", heapCommitted); gen.writeNumberField("heapUsed", heapUsed); gen.writeNumberField("heapMax", heapTotal); - long nonHeapUsed = Long.valueOf( metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0")); - long nonHeapCommitted = Long.valueOf( metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0")); - long nonHeapTotal = Long.valueOf( metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0")); + long nonHeapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0")); + long nonHeapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0")); + long nonHeapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0")); gen.writeNumberField("nonHeapCommitted", nonHeapCommitted); gen.writeNumberField("nonHeapUsed", nonHeapUsed); http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java index 7914c29..d4c9b2a 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints; -import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; @@ -28,6 +27,8 @@ import org.apache.flink.runtime.webmonitor.handlers.JsonFactory; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import com.fasterxml.jackson.core.JsonGenerator; + import java.io.IOException; import java.io.StringWriter; import java.util.Collection; @@ -55,6 +56,9 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle return createCheckpointConfigJson(graph); } + /** + * Archivist for the CheckpointConfigHandler. + */ public static class CheckpointConfigJsonArchivist implements JsonArchivist { @Override @@ -68,7 +72,7 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle private static String createCheckpointConfigJson(AccessExecutionGraph graph) throws IOException { StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); JobCheckpointingSettings settings = graph.getJobCheckpointingSettings(); if (settings == null) { http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCache.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCache.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCache.java index 35d529a..9bbe8a7 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCache.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCache.java @@ -18,9 +18,10 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints; +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; + import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; -import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; import javax.annotation.Nullable; http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java index 16fd9bd..664744b 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints; -import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory; import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; @@ -32,6 +31,8 @@ import org.apache.flink.runtime.webmonitor.handlers.JsonFactory; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import com.fasterxml.jackson.core.JsonGenerator; + import java.io.IOException; import java.io.StringWriter; import java.util.ArrayList; @@ -86,6 +87,9 @@ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequest return createCheckpointDetailsJson(checkpoint); } + /** + * Archivist for the CheckpointStatsDetails. + */ public static class CheckpointStatsDetailsJsonArchivist implements JsonArchivist { @Override @@ -109,7 +113,7 @@ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequest public static String createCheckpointDetailsJson(AbstractCheckpointStats checkpoint) throws IOException { StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); gen.writeStartObject(); gen.writeNumberField("id", checkpoint.getCheckpointId()); http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java index bb39b2c..f96e0c2 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints; -import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory; import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; @@ -35,6 +34,8 @@ import org.apache.flink.runtime.webmonitor.handlers.JsonFactory; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import com.fasterxml.jackson.core.JsonGenerator; + import java.io.IOException; import java.io.StringWriter; import java.util.ArrayList; @@ -107,10 +108,13 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap if (taskStats == null) { return "{}"; } - + return createSubtaskCheckpointDetailsJson(checkpoint, taskStats); } + /** + * Archivist for the CheckpointStatsDetailsSubtasksHandler. + */ public static class CheckpointStatsDetailsSubtasksJsonArchivist implements JsonArchivist { @Override @@ -137,7 +141,7 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap private static String createSubtaskCheckpointDetailsJson(AbstractCheckpointStats checkpoint, TaskStateStats taskStats) throws IOException { StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); gen.writeStartObject(); // Overview http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java index f004888..a86c5fd 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints; -import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts; import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory; @@ -35,7 +34,10 @@ import org.apache.flink.runtime.webmonitor.handlers.JsonFactory; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import com.fasterxml.jackson.core.JsonGenerator; + import javax.annotation.Nullable; + import java.io.IOException; import java.io.StringWriter; import java.util.Collection; @@ -63,6 +65,9 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler return createCheckpointStatsJson(graph); } + /** + * Archivist for the CheckpointStatsJsonHandler. + */ public static class CheckpointStatsJsonArchivist implements JsonArchivist { @Override @@ -76,7 +81,7 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler private static String createCheckpointStatsJson(AccessExecutionGraph graph) throws IOException { StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot(); if (snapshot == null) { http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java index 3337370..d86bfb2 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java @@ -15,9 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.runtime.webmonitor.history; -import io.netty.handler.codec.http.router.Router; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; @@ -34,10 +34,13 @@ import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap; import org.apache.flink.util.FileUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; + +import io.netty.handler.codec.http.router.Router; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.net.ssl.SSLContext; + import java.io.File; import java.io.FileWriter; import java.io.IOException; @@ -53,15 +56,15 @@ import java.util.concurrent.atomic.AtomicBoolean; /** * The HistoryServer provides a WebInterface and REST API to retrieve information about finished jobs for which * the JobManager may have already shut down. - * - * The HistoryServer regularly checks a set of directories for job archives created by the {@link FsJobArchivist} and + * + * <p>The HistoryServer regularly checks a set of directories for job archives created by the {@link FsJobArchivist} and * caches these in a local directory. See {@link HistoryServerArchiveFetcher}. - * - * All configuration options are defined in{@link HistoryServerOptions}. - * - * The WebInterface only displays the "Completed Jobs" page. - * - * The REST API is limited to + * + * <p>All configuration options are defined in{@link HistoryServerOptions}. + * + * <p>The WebInterface only displays the "Completed Jobs" page. + * + * <p>The REST API is limited to * <ul> * <li>/config</li> * <li>/joboverview</li> @@ -110,7 +113,7 @@ public class HistoryServer { }); System.exit(0); } catch (UndeclaredThrowableException ute) { - Throwable cause = ute. getUndeclaredThrowable(); + Throwable cause = ute.getUndeclaredThrowable(); LOG.error("Failed to run HistoryServer.", cause); cause.printStackTrace(); System.exit(1); http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java index 0ff9e02..0fc4314 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java @@ -15,14 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.runtime.webmonitor.history; -import static org.apache.flink.util.Preconditions.checkNotNull; +package org.apache.flink.runtime.webmonitor.history; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.HistoryServerOptions; import org.apache.flink.core.fs.FileStatus; @@ -32,6 +27,11 @@ import org.apache.flink.runtime.history.FsJobArchivist; import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; import org.apache.flink.util.FileUtils; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,12 +49,14 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * This class is used by the {@link HistoryServer} to fetch the job archives that are located at * {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_DIRS}. The directories are polled in regular intervals, defined * by {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL}. - * - * The archives are downloaded and expanded into a file structure analog to the REST API defined in the WebRuntimeMonitor. + * + * <p>The archives are downloaded and expanded into a file structure analog to the REST API defined in the WebRuntimeMonitor. */ class HistoryServerArchiveFetcher { @@ -174,7 +176,7 @@ class HistoryServerArchiveFetcher { } java.nio.file.Path targetPath = target.toPath(); - + // We overwrite existing files since this may be another attempt at fetching this archive. // Existing files may be incomplete/corrupt. if (Files.exists(targetPath)) { @@ -224,10 +226,10 @@ class HistoryServerArchiveFetcher { * This method replicates the JSON response that would be given by the {@link CurrentJobsOverviewHandler} when * listing both running and finished jobs. * - * Every job archive contains a joboverview.json file containing the same structure. Since jobs are archived on + * <p>Every job archive contains a joboverview.json file containing the same structure. Since jobs are archived on * their own however the list of finished jobs only contains a single job. * - * For the display in the HistoryServer WebFrontend we have to combine these overviews. + * <p>For the display in the HistoryServer WebFrontend we have to combine these overviews. */ private static void updateJobOverview(File webDir) { File webOverviewDir = new File(webDir, "overviews"); http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java index ba0e2d2..c14f3d8 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java @@ -26,6 +26,8 @@ package org.apache.flink.runtime.webmonitor.history; * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java *****************************************************************************/ +import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler; + import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; @@ -41,7 +43,6 @@ import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.codec.http.router.Routed; import io.netty.handler.ssl.SslHandler; import io.netty.handler.stream.ChunkedFile; -import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,10 +71,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * Simple file server handler used by the {@link HistoryServer} that serves requests to web frontend's static files, * such as HTML, CSS, JS or JSON files. * - * This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server + * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server * example. - * - * This class is a copy of the {@link StaticFileServerHandler}. The differences are that the request path is + * + * <p>This class is a copy of the {@link StaticFileServerHandler}. The differences are that the request path is * modified to end on ".json" if it does not have a filename extension; when "index.html" is requested we load * "index_hs.html" instead to inject the modified HistoryServer WebInterface and that the caching of the "/joboverview" * page is prevented. @@ -81,12 +82,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull; @ChannelHandler.Sharable public class HistoryServerStaticFileServerHandler extends SimpleChannelInboundHandler<Routed> { - /** Default logger, if none is specified */ + /** Default logger, if none is specified. */ private static final Logger LOG = LoggerFactory.getLogger(HistoryServerStaticFileServerHandler.class); // ------------------------------------------------------------------------ - /** The path in which the static documents are */ + /** The path in which the static documents are. */ private final File rootPath; public HistoryServerStaticFileServerHandler(File rootPath) throws IOException {
