http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java deleted file mode 100644 index cb6d8c0..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.concurrent.FlinkFutureException; -import org.apache.flink.runtime.jobmaster.JobManagerGateway; - -import com.fasterxml.jackson.core.JsonGenerator; - -import java.io.IOException; -import java.io.StringWriter; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -/** - * Returns the Job Manager's configuration. - */ -public class JobManagerConfigHandler extends AbstractJsonRequestHandler { - - private static final String JOBMANAGER_CONFIG_REST_PATH = "/jobmanager/config"; - - private final Configuration config; - - public JobManagerConfigHandler(Executor executor, Configuration config) { - super(executor); - this.config = config; - } - - @Override - public String[] getPaths() { - return new String[]{JOBMANAGER_CONFIG_REST_PATH}; - } - - @Override - public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) { - return CompletableFuture.supplyAsync( - () -> { - try { - StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); - - gen.writeStartArray(); - for (String key : config.keySet()) { - gen.writeStartObject(); - gen.writeStringField("key", key); - - // Mask key values which contain sensitive information - if (key.toLowerCase().contains("password")) { - String value = config.getString(key, null); - if (value != null) { - value = "******"; - } - gen.writeStringField("value", value); - } else { - gen.writeStringField("value", config.getString(key, null)); - } - gen.writeEndObject(); - } - gen.writeEndArray(); - - gen.close(); - return writer.toString(); - } catch (IOException e) { - throw new FlinkFutureException("Could not write configuration.", e); - } - }, - executor); - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java deleted file mode 100644 index b3a9dd5..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; - -import java.io.IOException; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -/** - * Request handler that returns the JSON program plan of a job graph. - */ -public class JobPlanHandler extends AbstractExecutionGraphRequestHandler { - - private static final String JOB_PLAN_REST_PATH = "/jobs/:jobid/plan"; - - public JobPlanHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { - super(executionGraphHolder, executor); - } - - @Override - public String[] getPaths() { - return new String[]{JOB_PLAN_REST_PATH}; - } - - @Override - public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) { - return CompletableFuture.completedFuture(graph.getJsonPlan()); - } - - /** - * Archivist for the JobPlanHandler. - */ - public static class JobPlanJsonArchivist implements JsonArchivist { - - @Override - public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { - String path = JOB_PLAN_REST_PATH - .replace(":jobid", graph.getJobID().toString()); - String json = graph.getJsonPlan(); - return Collections.singletonList(new ArchivedJson(path, json)); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java deleted file mode 100644 index f63403f..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.concurrent.FlinkFutureException; -import org.apache.flink.runtime.jobmaster.JobManagerGateway; -import org.apache.flink.util.Preconditions; -import org.apache.flink.util.StringUtils; - -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -/** - * Request handler for the STOP request. - */ -public class JobStoppingHandler extends AbstractJsonRequestHandler { - - private static final String JOB_STOPPING_REST_PATH = "/jobs/:jobid/stop"; - private static final String JOB_STOPPING_YARN_REST_PATH = "/jobs/:jobid/yarn-stop"; - - private final Time timeout; - - public JobStoppingHandler(Executor executor, Time timeout) { - super(executor); - this.timeout = Preconditions.checkNotNull(timeout); - } - - @Override - public String[] getPaths() { - return new String[]{JOB_STOPPING_REST_PATH, JOB_STOPPING_YARN_REST_PATH}; - } - - @Override - public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) { - return CompletableFuture.supplyAsync( - () -> { - try { - JobID jobId = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid"))); - if (jobManagerGateway != null) { - jobManagerGateway.stopJob(jobId, timeout); - return "{}"; - } - else { - throw new Exception("No connection to the leading JobManager."); - } - } - catch (Exception e) { - throw new FlinkFutureException("Failed to stop the job with id: " + pathParams.get("jobid") + '.', e); - } - }, - executor); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java deleted file mode 100644 index 9c613ff..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; -import org.apache.flink.runtime.concurrent.FlinkFutureException; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; - -import com.fasterxml.jackson.core.JsonGenerator; - -import java.io.IOException; -import java.io.StringWriter; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -/** - * Request handler that returns the accummulators for a given vertex. - */ -public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandler { - - private static final String JOB_VERTEX_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/accumulators"; - - public JobVertexAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { - super(executionGraphHolder, executor); - } - - @Override - public String[] getPaths() { - return new String[]{JOB_VERTEX_ACCUMULATORS_REST_PATH}; - } - - @Override - public CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) { - return CompletableFuture.supplyAsync( - () -> { - try { - return createVertexAccumulatorsJson(jobVertex); - } catch (IOException e) { - throw new FlinkFutureException("Could not create job vertex accumulators json.", e); - } - }, - executor); - - } - - /** - * Archivist for JobVertexAccumulatorsHandler. - */ - public static class JobVertexAccumulatorsJsonArchivist implements JsonArchivist { - - @Override - public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { - List<ArchivedJson> archive = new ArrayList<>(); - for (AccessExecutionJobVertex task : graph.getAllVertices().values()) { - String json = createVertexAccumulatorsJson(task); - String path = JOB_VERTEX_ACCUMULATORS_REST_PATH - .replace(":jobid", graph.getJobID().toString()) - .replace(":vertexid", task.getJobVertexId().toString()); - archive.add(new ArchivedJson(path, json)); - } - return archive; - } - } - - public static String createVertexAccumulatorsJson(AccessExecutionJobVertex jobVertex) throws IOException { - StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); - - StringifiedAccumulatorResult[] accs = jobVertex.getAggregatedUserAccumulatorsStringified(); - - gen.writeStartObject(); - gen.writeStringField("id", jobVertex.getJobVertexId().toString()); - - gen.writeArrayFieldStart("user-accumulators"); - for (StringifiedAccumulatorResult acc : accs) { - gen.writeStartObject(); - gen.writeStringField("name", acc.getName()); - gen.writeStringField("type", acc.getType()); - gen.writeStringField("value", acc.getValue()); - gen.writeEndObject(); - } - gen.writeEndArray(); - - gen.writeEndObject(); - - gen.close(); - return writer.toString(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java deleted file mode 100644 index 963153f..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; -import org.apache.flink.runtime.webmonitor.BackPressureStatsTracker; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.OperatorBackPressureStats; - -import com.fasterxml.jackson.core.JsonGenerator; - -import java.io.IOException; -import java.io.StringWriter; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -import scala.Option; - -import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Request handler that returns back pressure stats for a single job vertex and - * all its sub tasks. - */ -public class JobVertexBackPressureHandler extends AbstractJobVertexRequestHandler { - - private static final String JOB_VERTEX_BACKPRESSURE_REST_PATH = "/jobs/:jobid/vertices/:vertexid/backpressure"; - - /** Back pressure stats tracker. */ - private final BackPressureStatsTracker backPressureStatsTracker; - - /** Time after which stats are considered outdated. */ - private final int refreshInterval; - - public JobVertexBackPressureHandler( - ExecutionGraphHolder executionGraphHolder, - Executor executor, - BackPressureStatsTracker backPressureStatsTracker, - int refreshInterval) { - - super(executionGraphHolder, executor); - this.backPressureStatsTracker = checkNotNull(backPressureStatsTracker, "Stats tracker"); - checkArgument(refreshInterval >= 0, "Negative timeout"); - this.refreshInterval = refreshInterval; - } - - @Override - public String[] getPaths() { - return new String[]{JOB_VERTEX_BACKPRESSURE_REST_PATH}; - } - - @Override - public CompletableFuture<String> handleRequest( - AccessExecutionJobVertex accessJobVertex, - Map<String, String> params) { - if (accessJobVertex instanceof ArchivedExecutionJobVertex) { - return CompletableFuture.completedFuture(""); - } - ExecutionJobVertex jobVertex = (ExecutionJobVertex) accessJobVertex; - try (StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer)) { - - gen.writeStartObject(); - - Option<OperatorBackPressureStats> statsOption = backPressureStatsTracker - .getOperatorBackPressureStats(jobVertex); - - if (statsOption.isDefined()) { - OperatorBackPressureStats stats = statsOption.get(); - - // Check whether we need to refresh - if (refreshInterval <= System.currentTimeMillis() - stats.getEndTimestamp()) { - backPressureStatsTracker.triggerStackTraceSample(jobVertex); - gen.writeStringField("status", "deprecated"); - } else { - gen.writeStringField("status", "ok"); - } - - gen.writeStringField("backpressure-level", getBackPressureLevel(stats.getMaxBackPressureRatio())); - gen.writeNumberField("end-timestamp", stats.getEndTimestamp()); - - // Sub tasks - gen.writeArrayFieldStart("subtasks"); - int numSubTasks = stats.getNumberOfSubTasks(); - for (int i = 0; i < numSubTasks; i++) { - double ratio = stats.getBackPressureRatio(i); - - gen.writeStartObject(); - gen.writeNumberField("subtask", i); - gen.writeStringField("backpressure-level", getBackPressureLevel(ratio)); - gen.writeNumberField("ratio", ratio); - gen.writeEndObject(); - } - gen.writeEndArray(); - } else { - backPressureStatsTracker.triggerStackTraceSample(jobVertex); - gen.writeStringField("status", "deprecated"); - } - - gen.writeEndObject(); - gen.close(); - - return CompletableFuture.completedFuture(writer.toString()); - } catch (IOException e) { - return FutureUtils.completedExceptionally(e); - } - } - - /** - * Returns the back pressure level as a String. - * - * @param backPressureRatio Ratio of back pressures samples to total number of samples. - * - * @return Back pressure level ('no', 'low', or 'high') - */ - static String getBackPressureLevel(double backPressureRatio) { - if (backPressureRatio <= 0.10) { - return "ok"; - } else if (backPressureRatio <= 0.5) { - return "low"; - } else { - return "high"; - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java deleted file mode 100644 index bd1745c..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.concurrent.FlinkFutureException; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; -import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; -import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics; - -import com.fasterxml.jackson.core.JsonGenerator; - -import javax.annotation.Nullable; - -import java.io.IOException; -import java.io.StringWriter; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -/** - * A request handler that provides the details of a job vertex, including id, name, parallelism, - * and the runtime and metrics of all its subtasks. - */ -public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler { - - private static final String JOB_VERTEX_DETAILS_REST_PATH = "/jobs/:jobid/vertices/:vertexid"; - - private final MetricFetcher fetcher; - - public JobVertexDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) { - super(executionGraphHolder, executor); - this.fetcher = fetcher; - } - - @Override - public String[] getPaths() { - return new String[]{JOB_VERTEX_DETAILS_REST_PATH}; - } - - @Override - public CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) { - return CompletableFuture.supplyAsync( - () -> { - try { - return createVertexDetailsJson(jobVertex, params.get("jobid"), fetcher); - } catch (IOException e) { - throw new FlinkFutureException("Could not write the vertex details json.", e); - } - }, - executor); - } - - /** - * Archivist for the JobVertexDetailsHandler. - */ - public static class JobVertexDetailsJsonArchivist implements JsonArchivist { - - @Override - public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { - List<ArchivedJson> archive = new ArrayList<>(); - for (AccessExecutionJobVertex task : graph.getAllVertices().values()) { - String json = createVertexDetailsJson(task, graph.getJobID().toString(), null); - String path = JOB_VERTEX_DETAILS_REST_PATH - .replace(":jobid", graph.getJobID().toString()) - .replace(":vertexid", task.getJobVertexId().toString()); - archive.add(new ArchivedJson(path, json)); - } - return archive; - } - } - - public static String createVertexDetailsJson( - AccessExecutionJobVertex jobVertex, - String jobID, - @Nullable MetricFetcher fetcher) throws IOException { - final long now = System.currentTimeMillis(); - - StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); - - gen.writeStartObject(); - - gen.writeStringField("id", jobVertex.getJobVertexId().toString()); - gen.writeStringField("name", jobVertex.getName()); - gen.writeNumberField("parallelism", jobVertex.getParallelism()); - gen.writeNumberField("now", now); - - gen.writeArrayFieldStart("subtasks"); - int num = 0; - for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) { - final ExecutionState status = vertex.getExecutionState(); - - TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation(); - String locationString = location == null ? "(unassigned)" : location.getHostname() + ":" + location.dataPort(); - - long startTime = vertex.getStateTimestamp(ExecutionState.DEPLOYING); - if (startTime == 0) { - startTime = -1; - } - long endTime = status.isTerminal() ? vertex.getStateTimestamp(status) : -1; - long duration = startTime > 0 ? ((endTime > 0 ? endTime : now) - startTime) : -1; - - gen.writeStartObject(); - gen.writeNumberField("subtask", num); - gen.writeStringField("status", status.name()); - gen.writeNumberField("attempt", vertex.getCurrentExecutionAttempt().getAttemptNumber()); - gen.writeStringField("host", locationString); - gen.writeNumberField("start-time", startTime); - gen.writeNumberField("end-time", endTime); - gen.writeNumberField("duration", duration); - - MutableIOMetrics counts = new MutableIOMetrics(); - - counts.addIOMetrics( - vertex.getCurrentExecutionAttempt(), - fetcher, - jobID, - jobVertex.getJobVertexId().toString() - ); - - counts.writeIOMetricsAsJson(gen); - - gen.writeEndObject(); - - num++; - } - gen.writeEndArray(); - - gen.writeEndObject(); - - gen.close(); - return writer.toString(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java deleted file mode 100644 index 0827720..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.concurrent.FlinkFutureException; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; -import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; -import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; -import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics; - -import com.fasterxml.jackson.core.JsonGenerator; - -import javax.annotation.Nullable; - -import java.io.IOException; -import java.io.StringWriter; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -/** - * A request handler that provides the details of a job vertex, including id, name, and the - * runtime and metrics of all its subtasks aggregated by TaskManager. - */ -public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandler { - - private static final String JOB_VERTEX_TASKMANAGERS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/taskmanagers"; - - private final MetricFetcher fetcher; - - public JobVertexTaskManagersHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) { - super(executionGraphHolder, executor); - this.fetcher = fetcher; - } - - @Override - public String[] getPaths() { - return new String[]{JOB_VERTEX_TASKMANAGERS_REST_PATH}; - } - - @Override - public CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) { - return CompletableFuture.supplyAsync( - () -> { - try { - return createVertexDetailsByTaskManagerJson(jobVertex, params.get("jobid"), fetcher); - } catch (IOException e) { - throw new FlinkFutureException("Could not create TaskManager json.", e); - } - }, - executor); - } - - /** - * Archivist for JobVertexTaskManagersHandler. - */ - public static class JobVertexTaskManagersJsonArchivist implements JsonArchivist { - - @Override - public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { - List<ArchivedJson> archive = new ArrayList<>(); - for (AccessExecutionJobVertex task : graph.getAllVertices().values()) { - String json = createVertexDetailsByTaskManagerJson(task, graph.getJobID().toString(), null); - String path = JOB_VERTEX_TASKMANAGERS_REST_PATH - .replace(":jobid", graph.getJobID().toString()) - .replace(":vertexid", task.getJobVertexId().toString()); - archive.add(new ArchivedJson(path, json)); - } - return archive; - } - } - - public static String createVertexDetailsByTaskManagerJson( - AccessExecutionJobVertex jobVertex, - String jobID, - @Nullable MetricFetcher fetcher) throws IOException { - StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); - - // Build a map that groups tasks by TaskManager - Map<String, List<AccessExecutionVertex>> taskManagerVertices = new HashMap<>(); - - for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) { - TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation(); - String taskManager = location == null ? "(unassigned)" : location.getHostname() + ":" + location.dataPort(); - - List<AccessExecutionVertex> vertices = taskManagerVertices.get(taskManager); - - if (vertices == null) { - vertices = new ArrayList<>(); - taskManagerVertices.put(taskManager, vertices); - } - - vertices.add(vertex); - } - - // Build JSON response - final long now = System.currentTimeMillis(); - - gen.writeStartObject(); - - gen.writeStringField("id", jobVertex.getJobVertexId().toString()); - gen.writeStringField("name", jobVertex.getName()); - gen.writeNumberField("now", now); - - gen.writeArrayFieldStart("taskmanagers"); - for (Map.Entry<String, List<AccessExecutionVertex>> entry : taskManagerVertices.entrySet()) { - String host = entry.getKey(); - List<AccessExecutionVertex> taskVertices = entry.getValue(); - - int[] tasksPerState = new int[ExecutionState.values().length]; - - long startTime = Long.MAX_VALUE; - long endTime = 0; - boolean allFinished = true; - - MutableIOMetrics counts = new MutableIOMetrics(); - - for (AccessExecutionVertex vertex : taskVertices) { - final ExecutionState state = vertex.getExecutionState(); - tasksPerState[state.ordinal()]++; - - // take the earliest start time - long started = vertex.getStateTimestamp(ExecutionState.DEPLOYING); - if (started > 0) { - startTime = Math.min(startTime, started); - } - - allFinished &= state.isTerminal(); - endTime = Math.max(endTime, vertex.getStateTimestamp(state)); - - counts.addIOMetrics( - vertex.getCurrentExecutionAttempt(), - fetcher, - jobID, - jobVertex.getJobVertexId().toString()); - } - - long duration; - if (startTime < Long.MAX_VALUE) { - if (allFinished) { - duration = endTime - startTime; - } - else { - endTime = -1L; - duration = now - startTime; - } - } - else { - startTime = -1L; - endTime = -1L; - duration = -1L; - } - - ExecutionState jobVertexState = - ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, taskVertices.size()); - - gen.writeStartObject(); - - gen.writeStringField("host", host); - gen.writeStringField("status", jobVertexState.name()); - - gen.writeNumberField("start-time", startTime); - gen.writeNumberField("end-time", endTime); - gen.writeNumberField("duration", duration); - - counts.writeIOMetricsAsJson(gen); - - gen.writeObjectFieldStart("status-counts"); - for (ExecutionState state : ExecutionState.values()) { - gen.writeNumberField(state.name(), tasksPerState[state.ordinal()]); - } - gen.writeEndObject(); - - gen.writeEndObject(); - } - gen.writeEndArray(); - - gen.writeEndObject(); - - gen.close(); - return writer.toString(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonFactory.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonFactory.java deleted file mode 100644 index 4ce0baf..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonFactory.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -/** - * A holder for the singleton Jackson JSON factory. Since the Jackson's JSON factory object - * is a heavyweight object that is encouraged to be shared, we use a singleton instance across - * all request handlers. - */ -public class JsonFactory { - - /** The singleton Jackson JSON factory. */ - public static final com.fasterxml.jackson.core.JsonFactory JACKSON_FACTORY = - new com.fasterxml.jackson.core.JsonFactory(); - - // -------------------------------------------------------------------------------------------- - - private JsonFactory() {} -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java deleted file mode 100644 index 8ca785f..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.jobmaster.JobManagerGateway; - -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse; - -import java.util.Map; -import java.util.concurrent.CompletableFuture; - -/** - * Base interface for all request handlers. - * - * <p>Most handlers will want to use the {@link AbstractJsonRequestHandler} - * as a starting point, which produces a valid HTTP response. - */ -public interface RequestHandler { - - /** - * Core method that handles the request and generates the response. The method needs to - * respond with a full http response, including content-type, content-length, etc. - * - * <p>Exceptions may be throws and will be handled. - * - * @param pathParams The map of REST path parameters, decoded by the router. - * @param queryParams The map of query parameters. - * @param jobManagerGateway to talk to the JobManager. - * - * @return The full http response. - */ - CompletableFuture<FullHttpResponse> handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway); - - /** - * Returns an array of REST URL's under which this handler can be registered. - * - * @return array containing REST URL's under which this handler can be registered. - */ - String[] getPaths(); -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandlerException.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandlerException.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandlerException.java deleted file mode 100644 index bb61d16..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandlerException.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -/** - * Base class for request handler exceptions. - */ -public class RequestHandlerException extends Exception { - - private static final long serialVersionUID = 7570352908725875886L; - - public RequestHandlerException(String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java deleted file mode 100644 index 301b217..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; - -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -/** - * Request handler providing details about a single task execution attempt. - */ -public class SubtaskCurrentAttemptDetailsHandler extends SubtaskExecutionAttemptDetailsHandler { - - public static final String SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum"; - - public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) { - super(executionGraphHolder, executor, fetcher); - } - - @Override - public String[] getPaths() { - return new String[]{SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH}; - } - - @Override - public CompletableFuture<String> handleRequest(AccessExecutionVertex vertex, Map<String, String> params) { - return handleRequest(vertex.getCurrentExecutionAttempt(), params); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java deleted file mode 100644 index 3c0d1d9..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; -import org.apache.flink.runtime.concurrent.FlinkFutureException; -import org.apache.flink.runtime.executiongraph.AccessExecution; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; - -import com.fasterxml.jackson.core.JsonGenerator; - -import java.io.IOException; -import java.io.StringWriter; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -/** - * Base class for request handlers whose response depends on a specific job vertex (defined - * via the "vertexid" parameter) in a specific job, defined via (defined voa the "jobid" parameter). - */ -public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskAttemptRequestHandler { - - private static final String SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators"; - - public SubtaskExecutionAttemptAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { - super(executionGraphHolder, executor); - } - - @Override - public String[] getPaths() { - return new String[]{SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH}; - } - - @Override - public CompletableFuture<String> handleRequest(AccessExecution execAttempt, Map<String, String> params) { - return CompletableFuture.supplyAsync( - () -> { - try { - return createAttemptAccumulatorsJson(execAttempt); - } catch (IOException e) { - throw new FlinkFutureException("Could not create accumulator json.", e); - } - }, - executor); - } - - /** - * Archivist for the SubtaskExecutionAttemptAccumulatorsHandler. - */ - public static class SubtaskExecutionAttemptAccumulatorsJsonArchivist implements JsonArchivist { - - @Override - public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { - List<ArchivedJson> archive = new ArrayList<>(); - for (AccessExecutionJobVertex task : graph.getAllVertices().values()) { - for (AccessExecutionVertex subtask : task.getTaskVertices()) { - String curAttemptJson = createAttemptAccumulatorsJson(subtask.getCurrentExecutionAttempt()); - String curAttemptPath = SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH - .replace(":jobid", graph.getJobID().toString()) - .replace(":vertexid", task.getJobVertexId().toString()) - .replace(":subtasknum", String.valueOf(subtask.getParallelSubtaskIndex())) - .replace(":attempt", String.valueOf(subtask.getCurrentExecutionAttempt().getAttemptNumber())); - - archive.add(new ArchivedJson(curAttemptPath, curAttemptJson)); - - for (int x = 0; x < subtask.getCurrentExecutionAttempt().getAttemptNumber(); x++) { - AccessExecution attempt = subtask.getPriorExecutionAttempt(x); - String json = createAttemptAccumulatorsJson(attempt); - String path = SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH - .replace(":jobid", graph.getJobID().toString()) - .replace(":vertexid", task.getJobVertexId().toString()) - .replace(":subtasknum", String.valueOf(subtask.getParallelSubtaskIndex())) - .replace(":attempt", String.valueOf(attempt.getAttemptNumber())); - archive.add(new ArchivedJson(path, json)); - } - } - } - return archive; - } - } - - public static String createAttemptAccumulatorsJson(AccessExecution execAttempt) throws IOException { - StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); - - final StringifiedAccumulatorResult[] accs = execAttempt.getUserAccumulatorsStringified(); - - gen.writeStartObject(); - - gen.writeNumberField("subtask", execAttempt.getParallelSubtaskIndex()); - gen.writeNumberField("attempt", execAttempt.getAttemptNumber()); - gen.writeStringField("id", execAttempt.getAttemptId().toString()); - - gen.writeArrayFieldStart("user-accumulators"); - for (StringifiedAccumulatorResult acc : accs) { - gen.writeStartObject(); - gen.writeStringField("name", acc.getName()); - gen.writeStringField("type", acc.getType()); - gen.writeStringField("value", acc.getValue()); - gen.writeEndObject(); - } - gen.writeEndArray(); - - gen.writeEndObject(); - - gen.close(); - return writer.toString(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java deleted file mode 100644 index ad836df..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.concurrent.FlinkFutureException; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.AccessExecution; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; -import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; -import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics; - -import com.fasterxml.jackson.core.JsonGenerator; - -import javax.annotation.Nullable; - -import java.io.IOException; -import java.io.StringWriter; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -import static org.apache.flink.runtime.webmonitor.handlers.SubtaskCurrentAttemptDetailsHandler.SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH; - -/** - * Request handler providing details about a single task execution attempt. - */ -public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemptRequestHandler { - - private static final String SUBTASK_ATTEMPT_DETAILS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt"; - - private final MetricFetcher fetcher; - - public SubtaskExecutionAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) { - super(executionGraphHolder, executor); - this.fetcher = fetcher; - } - - @Override - public String[] getPaths() { - return new String[]{SUBTASK_ATTEMPT_DETAILS_REST_PATH}; - } - - @Override - public CompletableFuture<String> handleRequest(AccessExecution execAttempt, Map<String, String> params) { - return CompletableFuture.supplyAsync( - () -> { - try { - return createAttemptDetailsJson(execAttempt, params.get("jobid"), params.get("vertexid"), fetcher); - } catch (IOException e) { - throw new FlinkFutureException("Could not create attempt details json.", e); - } - }, - executor); - } - - /** - * Archivist for the SubtaskExecutionAttemptDetailsHandler. - */ - public static class SubtaskExecutionAttemptDetailsJsonArchivist implements JsonArchivist { - - @Override - public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { - List<ArchivedJson> archive = new ArrayList<>(); - for (AccessExecutionJobVertex task : graph.getAllVertices().values()) { - for (AccessExecutionVertex subtask : task.getTaskVertices()) { - String curAttemptJson = createAttemptDetailsJson(subtask.getCurrentExecutionAttempt(), graph.getJobID().toString(), task.getJobVertexId().toString(), null); - String curAttemptPath1 = SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH - .replace(":jobid", graph.getJobID().toString()) - .replace(":vertexid", task.getJobVertexId().toString()) - .replace(":subtasknum", String.valueOf(subtask.getParallelSubtaskIndex())); - String curAttemptPath2 = SUBTASK_ATTEMPT_DETAILS_REST_PATH - .replace(":jobid", graph.getJobID().toString()) - .replace(":vertexid", task.getJobVertexId().toString()) - .replace(":subtasknum", String.valueOf(subtask.getParallelSubtaskIndex())) - .replace(":attempt", String.valueOf(subtask.getCurrentExecutionAttempt().getAttemptNumber())); - - archive.add(new ArchivedJson(curAttemptPath1, curAttemptJson)); - archive.add(new ArchivedJson(curAttemptPath2, curAttemptJson)); - - for (int x = 0; x < subtask.getCurrentExecutionAttempt().getAttemptNumber(); x++) { - AccessExecution attempt = subtask.getPriorExecutionAttempt(x); - String json = createAttemptDetailsJson(attempt, graph.getJobID().toString(), task.getJobVertexId().toString(), null); - String path = SUBTASK_ATTEMPT_DETAILS_REST_PATH - .replace(":jobid", graph.getJobID().toString()) - .replace(":vertexid", task.getJobVertexId().toString()) - .replace(":subtasknum", String.valueOf(subtask.getParallelSubtaskIndex())) - .replace(":attempt", String.valueOf(attempt.getAttemptNumber())); - archive.add(new ArchivedJson(path, json)); - } - } - } - return archive; - } - } - - public static String createAttemptDetailsJson( - AccessExecution execAttempt, - String jobID, - String vertexID, - @Nullable MetricFetcher fetcher) throws IOException { - StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); - - final ExecutionState status = execAttempt.getState(); - final long now = System.currentTimeMillis(); - - TaskManagerLocation location = execAttempt.getAssignedResourceLocation(); - String locationString = location == null ? "(unassigned)" : location.getHostname(); - - long startTime = execAttempt.getStateTimestamp(ExecutionState.DEPLOYING); - if (startTime == 0) { - startTime = -1; - } - long endTime = status.isTerminal() ? execAttempt.getStateTimestamp(status) : -1; - long duration = startTime > 0 ? ((endTime > 0 ? endTime : now) - startTime) : -1; - - gen.writeStartObject(); - gen.writeNumberField("subtask", execAttempt.getParallelSubtaskIndex()); - gen.writeStringField("status", status.name()); - gen.writeNumberField("attempt", execAttempt.getAttemptNumber()); - gen.writeStringField("host", locationString); - gen.writeNumberField("start-time", startTime); - gen.writeNumberField("end-time", endTime); - gen.writeNumberField("duration", duration); - - MutableIOMetrics counts = new MutableIOMetrics(); - - counts.addIOMetrics( - execAttempt, - fetcher, - jobID, - vertexID - ); - - counts.writeIOMetricsAsJson(gen); - - gen.writeEndObject(); - - gen.close(); - return writer.toString(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java deleted file mode 100644 index 8142548..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; -import org.apache.flink.runtime.concurrent.FlinkFutureException; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; - -import com.fasterxml.jackson.core.JsonGenerator; - -import java.io.IOException; -import java.io.StringWriter; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -/** - * Request handler that returns the accumulators for all subtasks of job vertex. - */ -public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHandler { - - private static final String SUBTASKS_ALL_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/accumulators"; - - public SubtasksAllAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { - super(executionGraphHolder, executor); - } - - @Override - public String[] getPaths() { - return new String[]{SUBTASKS_ALL_ACCUMULATORS_REST_PATH}; - } - - @Override - public CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) { - return CompletableFuture.supplyAsync( - () -> { - try { - return createSubtasksAccumulatorsJson(jobVertex); - } catch (IOException e) { - throw new FlinkFutureException("Could not create subtasks accumulator json.", e); - } - }, - executor); - } - - /** - * Archivist for the SubtasksAllAccumulatorsHandler. - */ - public static class SubtasksAllAccumulatorsJsonArchivist implements JsonArchivist { - - @Override - public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { - List<ArchivedJson> archive = new ArrayList<>(); - for (AccessExecutionJobVertex task : graph.getAllVertices().values()) { - String json = createSubtasksAccumulatorsJson(task); - String path = SUBTASKS_ALL_ACCUMULATORS_REST_PATH - .replace(":jobid", graph.getJobID().toString()) - .replace(":vertexid", task.getJobVertexId().toString()); - archive.add(new ArchivedJson(path, json)); - } - return archive; - } - } - - public static String createSubtasksAccumulatorsJson(AccessExecutionJobVertex jobVertex) throws IOException { - StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); - - gen.writeStartObject(); - gen.writeStringField("id", jobVertex.getJobVertexId().toString()); - gen.writeNumberField("parallelism", jobVertex.getParallelism()); - - gen.writeArrayFieldStart("subtasks"); - - int num = 0; - for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) { - - TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation(); - String locationString = location == null ? "(unassigned)" : location.getHostname(); - - gen.writeStartObject(); - - gen.writeNumberField("subtask", num++); - gen.writeNumberField("attempt", vertex.getCurrentExecutionAttempt().getAttemptNumber()); - gen.writeStringField("host", locationString); - - StringifiedAccumulatorResult[] accs = vertex.getCurrentExecutionAttempt().getUserAccumulatorsStringified(); - gen.writeArrayFieldStart("user-accumulators"); - for (StringifiedAccumulatorResult acc : accs) { - gen.writeStartObject(); - gen.writeStringField("name", acc.getName()); - gen.writeStringField("type", acc.getType()); - gen.writeStringField("value", acc.getValue()); - gen.writeEndObject(); - } - gen.writeEndArray(); - - gen.writeEndObject(); - } - gen.writeEndArray(); - - gen.writeEndObject(); - gen.close(); - return writer.toString(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java deleted file mode 100644 index d766206..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.concurrent.FlinkFutureException; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; - -import com.fasterxml.jackson.core.JsonGenerator; - -import java.io.IOException; -import java.io.StringWriter; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -/** - * Request handler that returns the state transition timestamps for all subtasks, plus their - * location and duration. - */ -public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler { - - private static final String SUBTASK_TIMES_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasktimes"; - - public SubtasksTimesHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { - super(executionGraphHolder, executor); - } - - @Override - public String[] getPaths() { - return new String[]{SUBTASK_TIMES_REST_PATH}; - } - - @Override - public CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) { - return CompletableFuture.supplyAsync( - () -> { - try { - return createSubtaskTimesJson(jobVertex); - } catch (IOException e) { - throw new FlinkFutureException("Could not write subtask time json.", e); - } - }, - executor); - } - - /** - * Archivist for the SubtasksTimesHandler. - */ - public static class SubtasksTimesJsonArchivist implements JsonArchivist { - - @Override - public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { - List<ArchivedJson> archive = new ArrayList<>(); - for (AccessExecutionJobVertex task : graph.getAllVertices().values()) { - String json = createSubtaskTimesJson(task); - String path = SUBTASK_TIMES_REST_PATH - .replace(":jobid", graph.getJobID().toString()) - .replace(":vertexid", task.getJobVertexId().toString()); - archive.add(new ArchivedJson(path, json)); - } - return archive; - } - } - - public static String createSubtaskTimesJson(AccessExecutionJobVertex jobVertex) throws IOException { - final long now = System.currentTimeMillis(); - - StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); - - gen.writeStartObject(); - - gen.writeStringField("id", jobVertex.getJobVertexId().toString()); - gen.writeStringField("name", jobVertex.getName()); - gen.writeNumberField("now", now); - - gen.writeArrayFieldStart("subtasks"); - - int num = 0; - for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) { - - long[] timestamps = vertex.getCurrentExecutionAttempt().getStateTimestamps(); - ExecutionState status = vertex.getExecutionState(); - - long scheduledTime = timestamps[ExecutionState.SCHEDULED.ordinal()]; - - long start = scheduledTime > 0 ? scheduledTime : -1; - long end = status.isTerminal() ? timestamps[status.ordinal()] : now; - long duration = start >= 0 ? end - start : -1L; - - gen.writeStartObject(); - gen.writeNumberField("subtask", num++); - - TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation(); - String locationString = location == null ? "(unassigned)" : location.getHostname(); - gen.writeStringField("host", locationString); - - gen.writeNumberField("duration", duration); - - gen.writeObjectFieldStart("timestamps"); - for (ExecutionState state : ExecutionState.values()) { - gen.writeNumberField(state.name(), timestamps[state.ordinal()]); - } - gen.writeEndObject(); - - gen.writeEndObject(); - } - - gen.writeEndArray(); - gen.writeEndObject(); - - gen.close(); - return writer.toString(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java deleted file mode 100644 index d53d2b1..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java +++ /dev/null @@ -1,335 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -/***************************************************************************** - * This code is based on the "HttpStaticFileServerHandler" from the - * Netty project's HTTP server example. - * - * See http://netty.io and - * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java - *****************************************************************************/ - -import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.blob.BlobCache; -import org.apache.flink.runtime.blob.BlobKey; -import org.apache.flink.runtime.blob.BlobView; -import org.apache.flink.runtime.concurrent.FlinkFutureException; -import org.apache.flink.runtime.instance.Instance; -import org.apache.flink.runtime.instance.InstanceID; -import org.apache.flink.runtime.jobmaster.JobManagerGateway; -import org.apache.flink.runtime.rest.handler.RedirectHandler; -import org.apache.flink.runtime.webmonitor.WebHandler; -import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; -import org.apache.flink.util.Preconditions; -import org.apache.flink.util.StringUtils; - -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; -import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; -import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpChunkedInput; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed; -import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler; -import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile; -import org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future; -import org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureListener; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.net.InetSocketAddress; -import java.nio.channels.FileChannel; -import java.util.HashMap; -import java.util.Objects; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executor; - -import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; -import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; -import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK; -import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1; -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Request handler that returns the TaskManager log/out files. - * - * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server - * example.</p> - */ [email protected] -public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> implements WebHandler { - private static final Logger LOG = LoggerFactory.getLogger(TaskManagerLogHandler.class); - - private static final String TASKMANAGER_LOG_REST_PATH = "/taskmanagers/:taskmanagerid/log"; - private static final String TASKMANAGER_OUT_REST_PATH = "/taskmanagers/:taskmanagerid/stdout"; - - /** Keep track of last transmitted log, to clean up old ones. */ - private final HashMap<String, BlobKey> lastSubmittedLog = new HashMap<>(); - private final HashMap<String, BlobKey> lastSubmittedStdout = new HashMap<>(); - - /** Keep track of request status, prevents multiple log requests for a single TM running concurrently. */ - private final ConcurrentHashMap<String, Boolean> lastRequestPending = new ConcurrentHashMap<>(); - private final Configuration config; - - /** Future of the blob cache. */ - private CompletableFuture<BlobCache> cache; - - /** Indicates which log file should be displayed. */ - private FileMode fileMode; - - private final Executor executor; - - private final BlobView blobView; - - /** Used to control whether this handler serves the .log or .out file. */ - public enum FileMode { - LOG, - STDOUT - } - - public TaskManagerLogHandler( - GatewayRetriever<JobManagerGateway> retriever, - Executor executor, - CompletableFuture<String> localJobManagerAddressPromise, - Time timeout, - FileMode fileMode, - Configuration config, - BlobView blobView) { - super(localJobManagerAddressPromise, retriever, timeout); - - this.executor = checkNotNull(executor); - this.config = config; - this.fileMode = fileMode; - - this.blobView = Preconditions.checkNotNull(blobView, "blobView"); - } - - @Override - public String[] getPaths() { - switch (fileMode) { - case LOG: - return new String[]{TASKMANAGER_LOG_REST_PATH}; - case STDOUT: - default: - return new String[]{TASKMANAGER_OUT_REST_PATH}; - } - } - - /** - * Response when running with leading JobManager. - */ - @Override - protected void respondAsLeader(final ChannelHandlerContext ctx, final Routed routed, final JobManagerGateway jobManagerGateway) { - if (cache == null) { - CompletableFuture<Integer> blobPortFuture = jobManagerGateway.requestBlobServerPort(timeout); - cache = blobPortFuture.thenApplyAsync( - (Integer port) -> { - try { - return new BlobCache(new InetSocketAddress(jobManagerGateway.getHostname(), port), config, blobView); - } catch (IOException e) { - throw new FlinkFutureException("Could not create BlobCache.", e); - } - }, - executor); - } - - final String taskManagerID = routed.pathParams().get(TaskManagersHandler.TASK_MANAGER_ID_KEY); - final HttpRequest request = routed.request(); - - //fetch TaskManager logs if no other process is currently doing it - if (lastRequestPending.putIfAbsent(taskManagerID, true) == null) { - try { - InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(taskManagerID)); - CompletableFuture<Optional<Instance>> taskManagerFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, timeout); - - CompletableFuture<BlobKey> blobKeyFuture = taskManagerFuture.thenCompose( - (Optional<Instance> optTMInstance) -> { - Instance taskManagerInstance = optTMInstance.orElseThrow( - () -> new FlinkFutureException("Could not find instance with " + instanceID + '.')); - switch (fileMode) { - case LOG: - return taskManagerInstance.getTaskManagerGateway().requestTaskManagerLog(timeout); - case STDOUT: - default: - return taskManagerInstance.getTaskManagerGateway().requestTaskManagerStdout(timeout); - } - } - ); - - CompletableFuture<String> logPathFuture = blobKeyFuture - .thenCombineAsync( - cache, - (blobKey, blobCache) -> { - //delete previous log file, if it is different than the current one - HashMap<String, BlobKey> lastSubmittedFile = fileMode == FileMode.LOG ? lastSubmittedLog : lastSubmittedStdout; - if (lastSubmittedFile.containsKey(taskManagerID)) { - if (!Objects.equals(blobKey, lastSubmittedFile.get(taskManagerID))) { - try { - blobCache.deleteGlobal(lastSubmittedFile.get(taskManagerID)); - } catch (IOException e) { - throw new FlinkFutureException("Could not delete file for " + taskManagerID + '.', e); - } - lastSubmittedFile.put(taskManagerID, blobKey); - } - } else { - lastSubmittedFile.put(taskManagerID, blobKey); - } - try { - return blobCache.getFile(blobKey).getAbsolutePath(); - } catch (IOException e) { - throw new FlinkFutureException("Could not retrieve blob for " + blobKey + '.', e); - } - }, - executor); - - logPathFuture.exceptionally( - failure -> { - display(ctx, request, "Fetching TaskManager log failed."); - LOG.error("Fetching TaskManager log failed.", failure); - lastRequestPending.remove(taskManagerID); - - return null; - }); - - logPathFuture.thenAccept( - filePath -> { - File file = new File(filePath); - final RandomAccessFile raf; - try { - raf = new RandomAccessFile(file, "r"); - } catch (FileNotFoundException e) { - display(ctx, request, "Displaying TaskManager log failed."); - LOG.error("Displaying TaskManager log failed.", e); - - return; - } - long fileLength; - try { - fileLength = raf.length(); - } catch (IOException ioe) { - display(ctx, request, "Displaying TaskManager log failed."); - LOG.error("Displaying TaskManager log failed.", ioe); - try { - raf.close(); - } catch (IOException e) { - LOG.error("Could not close random access file.", e); - } - - return; - } - final FileChannel fc = raf.getChannel(); - - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); - response.headers().set(CONTENT_TYPE, "text/plain"); - - if (HttpHeaders.isKeepAlive(request)) { - response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE); - } - HttpHeaders.setContentLength(response, fileLength); - - // write the initial line and the header. - ctx.write(response); - - // write the content. - ChannelFuture lastContentFuture; - final GenericFutureListener<Future<? super Void>> completionListener = future -> { - lastRequestPending.remove(taskManagerID); - fc.close(); - raf.close(); - }; - if (ctx.pipeline().get(SslHandler.class) == null) { - ctx.write( - new DefaultFileRegion(fc, 0, fileLength), ctx.newProgressivePromise()) - .addListener(completionListener); - lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); - - } else { - try { - lastContentFuture = ctx.writeAndFlush( - new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)), - ctx.newProgressivePromise()) - .addListener(completionListener); - } catch (IOException e) { - display(ctx, request, "Displaying TaskManager log failed."); - LOG.warn("Could not write http data.", e); - - return; - } - // HttpChunkedInput will write the end marker (LastHttpContent) for us. - } - - // close the connection, if no keep-alive is needed - if (!HttpHeaders.isKeepAlive(request)) { - lastContentFuture.addListener(ChannelFutureListener.CLOSE); - } - }); - } catch (Exception e) { - display(ctx, request, "Error: " + e.getMessage()); - LOG.error("Fetching TaskManager log failed.", e); - lastRequestPending.remove(taskManagerID); - } - } else { - display(ctx, request, "loading..."); - } - } - - private void display(ChannelHandlerContext ctx, HttpRequest request, String message) { - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); - response.headers().set(CONTENT_TYPE, "text/plain"); - - if (HttpHeaders.isKeepAlive(request)) { - response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE); - } - - byte[] buf = message.getBytes(ConfigConstants.DEFAULT_CHARSET); - - ByteBuf b = Unpooled.copiedBuffer(buf); - - HttpHeaders.setContentLength(response, buf.length); - - // write the initial line and the header. - ctx.write(response); - - ctx.write(b); - - ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); - - // close the connection, if no keep-alive is needed - if (!HttpHeaders.isKeepAlive(request)) { - lastContentFuture.addListener(ChannelFutureListener.CLOSE); - } - } -}
