http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java new file mode 100644 index 0000000..3236062 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.concurrent.FlinkFutureException; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.executiongraph.ErrorInfo; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.util.ExceptionUtils; + +import com.fasterxml.jackson.core.JsonGenerator; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Request handler that returns the configuration of a job. + */ +public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler { + + private static final String JOB_EXCEPTIONS_REST_PATH = "/jobs/:jobid/exceptions"; + + static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20; + + public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + super(executionGraphHolder, executor); + } + + @Override + public String[] getPaths() { + return new String[]{JOB_EXCEPTIONS_REST_PATH}; + } + + @Override + public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) { + return CompletableFuture.supplyAsync( + () -> { + try { + return createJobExceptionsJson(graph); + } catch (IOException e) { + throw new FlinkFutureException("Could not create job exceptions json.", e); + } + }, + executor + ); + } + + /** + * Archivist for the JobExceptionsHandler. + */ + public static class JobExceptionsJsonArchivist implements JsonArchivist { + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + String json = createJobExceptionsJson(graph); + String path = JOB_EXCEPTIONS_REST_PATH + .replace(":jobid", graph.getJobID().toString()); + return Collections.singletonList(new ArchivedJson(path, json)); + } + } + + public static String createJobExceptionsJson(AccessExecutionGraph graph) throws IOException { + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + + gen.writeStartObject(); + + // most important is the root failure cause + ErrorInfo rootException = graph.getFailureCause(); + if (rootException != null && !rootException.getExceptionAsString().equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) { + gen.writeStringField("root-exception", rootException.getExceptionAsString()); + gen.writeNumberField("timestamp", rootException.getTimestamp()); + } + + // we additionally collect all exceptions (up to a limit) that occurred in the individual tasks + gen.writeArrayFieldStart("all-exceptions"); + + int numExceptionsSoFar = 0; + boolean truncated = false; + + for (AccessExecutionVertex task : graph.getAllExecutionVertices()) { + String t = task.getFailureCauseAsString(); + if (t != null && !t.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) { + if (numExceptionsSoFar >= MAX_NUMBER_EXCEPTION_TO_REPORT) { + truncated = true; + break; + } + + TaskManagerLocation location = task.getCurrentAssignedResourceLocation(); + String locationString = location != null ? + location.getFQDNHostname() + ':' + location.dataPort() : "(unassigned)"; + + gen.writeStartObject(); + gen.writeStringField("exception", t); + gen.writeStringField("task", task.getTaskNameWithSubtaskIndex()); + gen.writeStringField("location", locationString); + long timestamp = task.getStateTimestamp(ExecutionState.FAILED); + gen.writeNumberField("timestamp", timestamp == 0 ? -1 : timestamp); + gen.writeEndObject(); + numExceptionsSoFar++; + } + } + gen.writeEndArray(); + + gen.writeBooleanField("truncated", truncated); + gen.writeEndObject(); + + gen.close(); + return writer.toString(); + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandler.java new file mode 100644 index 0000000..364af91 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandler.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.concurrent.FlinkFutureException; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; + +import com.fasterxml.jackson.core.JsonGenerator; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Returns the Job Manager's configuration. + */ +public class JobManagerConfigHandler extends AbstractJsonRequestHandler { + + private static final String JOBMANAGER_CONFIG_REST_PATH = "/jobmanager/config"; + + private final Configuration config; + + public JobManagerConfigHandler(Executor executor, Configuration config) { + super(executor); + this.config = config; + } + + @Override + public String[] getPaths() { + return new String[]{JOBMANAGER_CONFIG_REST_PATH}; + } + + @Override + public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) { + return CompletableFuture.supplyAsync( + () -> { + try { + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + + gen.writeStartArray(); + for (String key : config.keySet()) { + gen.writeStartObject(); + gen.writeStringField("key", key); + + // Mask key values which contain sensitive information + if (key.toLowerCase().contains("password")) { + String value = config.getString(key, null); + if (value != null) { + value = "******"; + } + gen.writeStringField("value", value); + } else { + gen.writeStringField("value", config.getString(key, null)); + } + gen.writeEndObject(); + } + gen.writeEndArray(); + + gen.close(); + return writer.toString(); + } catch (IOException e) { + throw new FlinkFutureException("Could not write configuration.", e); + } + }, + executor); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandler.java new file mode 100644 index 0000000..d9db1ff --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandler.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Request handler that returns the JSON program plan of a job graph. + */ +public class JobPlanHandler extends AbstractExecutionGraphRequestHandler { + + private static final String JOB_PLAN_REST_PATH = "/jobs/:jobid/plan"; + + public JobPlanHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + super(executionGraphHolder, executor); + } + + @Override + public String[] getPaths() { + return new String[]{JOB_PLAN_REST_PATH}; + } + + @Override + public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) { + return CompletableFuture.completedFuture(graph.getJsonPlan()); + } + + /** + * Archivist for the JobPlanHandler. + */ + public static class JobPlanJsonArchivist implements JsonArchivist { + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + String path = JOB_PLAN_REST_PATH + .replace(":jobid", graph.getJobID().toString()); + String json = graph.getJsonPlan(); + return Collections.singletonList(new ArchivedJson(path, json)); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobStoppingHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobStoppingHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobStoppingHandler.java new file mode 100644 index 0000000..cc41a1c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobStoppingHandler.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.FlinkFutureException; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Request handler for the STOP request. + */ +public class JobStoppingHandler extends AbstractJsonRequestHandler { + + private static final String JOB_STOPPING_REST_PATH = "/jobs/:jobid/stop"; + private static final String JOB_STOPPING_YARN_REST_PATH = "/jobs/:jobid/yarn-stop"; + + private final Time timeout; + + public JobStoppingHandler(Executor executor, Time timeout) { + super(executor); + this.timeout = Preconditions.checkNotNull(timeout); + } + + @Override + public String[] getPaths() { + return new String[]{JOB_STOPPING_REST_PATH, JOB_STOPPING_YARN_REST_PATH}; + } + + @Override + public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) { + return CompletableFuture.supplyAsync( + () -> { + try { + JobID jobId = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid"))); + if (jobManagerGateway != null) { + jobManagerGateway.stopJob(jobId, timeout); + return "{}"; + } + else { + throw new Exception("No connection to the leading JobManager."); + } + } + catch (Exception e) { + throw new FlinkFutureException("Failed to stop the job with id: " + pathParams.get("jobid") + '.', e); + } + }, + executor); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandler.java new file mode 100644 index 0000000..9830ab4 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandler.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.concurrent.FlinkFutureException; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; + +import com.fasterxml.jackson.core.JsonGenerator; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Request handler that returns the accummulators for a given vertex. + */ +public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandler { + + private static final String JOB_VERTEX_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/accumulators"; + + public JobVertexAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + super(executionGraphHolder, executor); + } + + @Override + public String[] getPaths() { + return new String[]{JOB_VERTEX_ACCUMULATORS_REST_PATH}; + } + + @Override + public CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) { + return CompletableFuture.supplyAsync( + () -> { + try { + return createVertexAccumulatorsJson(jobVertex); + } catch (IOException e) { + throw new FlinkFutureException("Could not create job vertex accumulators json.", e); + } + }, + executor); + + } + + /** + * Archivist for JobVertexAccumulatorsHandler. + */ + public static class JobVertexAccumulatorsJsonArchivist implements JsonArchivist { + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + List<ArchivedJson> archive = new ArrayList<>(); + for (AccessExecutionJobVertex task : graph.getAllVertices().values()) { + String json = createVertexAccumulatorsJson(task); + String path = JOB_VERTEX_ACCUMULATORS_REST_PATH + .replace(":jobid", graph.getJobID().toString()) + .replace(":vertexid", task.getJobVertexId().toString()); + archive.add(new ArchivedJson(path, json)); + } + return archive; + } + } + + public static String createVertexAccumulatorsJson(AccessExecutionJobVertex jobVertex) throws IOException { + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + + StringifiedAccumulatorResult[] accs = jobVertex.getAggregatedUserAccumulatorsStringified(); + + gen.writeStartObject(); + gen.writeStringField("id", jobVertex.getJobVertexId().toString()); + + gen.writeArrayFieldStart("user-accumulators"); + for (StringifiedAccumulatorResult acc : accs) { + gen.writeStartObject(); + gen.writeStringField("name", acc.getName()); + gen.writeStringField("type", acc.getType()); + gen.writeStringField("value", acc.getValue()); + gen.writeEndObject(); + } + gen.writeEndArray(); + + gen.writeEndObject(); + + gen.close(); + return writer.toString(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandler.java new file mode 100644 index 0000000..59bfc0b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandler.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats; + +import com.fasterxml.jackson.core.JsonGenerator; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Request handler that returns back pressure stats for a single job vertex and + * all its sub tasks. + */ +public class JobVertexBackPressureHandler extends AbstractJobVertexRequestHandler { + + private static final String JOB_VERTEX_BACKPRESSURE_REST_PATH = "/jobs/:jobid/vertices/:vertexid/backpressure"; + + /** Back pressure stats tracker. */ + private final BackPressureStatsTracker backPressureStatsTracker; + + /** Time after which stats are considered outdated. */ + private final int refreshInterval; + + public JobVertexBackPressureHandler( + ExecutionGraphHolder executionGraphHolder, + Executor executor, + BackPressureStatsTracker backPressureStatsTracker, + int refreshInterval) { + + super(executionGraphHolder, executor); + this.backPressureStatsTracker = checkNotNull(backPressureStatsTracker, "Stats tracker"); + checkArgument(refreshInterval >= 0, "Negative timeout"); + this.refreshInterval = refreshInterval; + } + + @Override + public String[] getPaths() { + return new String[]{JOB_VERTEX_BACKPRESSURE_REST_PATH}; + } + + @Override + public CompletableFuture<String> handleRequest( + AccessExecutionJobVertex accessJobVertex, + Map<String, String> params) { + if (accessJobVertex instanceof ArchivedExecutionJobVertex) { + return CompletableFuture.completedFuture(""); + } + ExecutionJobVertex jobVertex = (ExecutionJobVertex) accessJobVertex; + try (StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer)) { + + gen.writeStartObject(); + + Optional<OperatorBackPressureStats> statsOption = backPressureStatsTracker + .getOperatorBackPressureStats(jobVertex); + + if (statsOption.isPresent()) { + OperatorBackPressureStats stats = statsOption.get(); + + // Check whether we need to refresh + if (refreshInterval <= System.currentTimeMillis() - stats.getEndTimestamp()) { + backPressureStatsTracker.triggerStackTraceSample(jobVertex); + gen.writeStringField("status", "deprecated"); + } else { + gen.writeStringField("status", "ok"); + } + + gen.writeStringField("backpressure-level", getBackPressureLevel(stats.getMaxBackPressureRatio())); + gen.writeNumberField("end-timestamp", stats.getEndTimestamp()); + + // Sub tasks + gen.writeArrayFieldStart("subtasks"); + int numSubTasks = stats.getNumberOfSubTasks(); + for (int i = 0; i < numSubTasks; i++) { + double ratio = stats.getBackPressureRatio(i); + + gen.writeStartObject(); + gen.writeNumberField("subtask", i); + gen.writeStringField("backpressure-level", getBackPressureLevel(ratio)); + gen.writeNumberField("ratio", ratio); + gen.writeEndObject(); + } + gen.writeEndArray(); + } else { + backPressureStatsTracker.triggerStackTraceSample(jobVertex); + gen.writeStringField("status", "deprecated"); + } + + gen.writeEndObject(); + gen.close(); + + return CompletableFuture.completedFuture(writer.toString()); + } catch (IOException e) { + return FutureUtils.completedExceptionally(e); + } + } + + /** + * Returns the back pressure level as a String. + * + * @param backPressureRatio Ratio of back pressures samples to total number of samples. + * + * @return Back pressure level ('no', 'low', or 'high') + */ + static String getBackPressureLevel(double backPressureRatio) { + if (backPressureRatio <= 0.10) { + return "ok"; + } else if (backPressureRatio <= 0.5) { + return "low"; + } else { + return "high"; + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandler.java new file mode 100644 index 0000000..3f0c77c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandler.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.concurrent.FlinkFutureException; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; + +import com.fasterxml.jackson.core.JsonGenerator; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * A request handler that provides the details of a job vertex, including id, name, parallelism, + * and the runtime and metrics of all its subtasks. + */ +public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler { + + private static final String JOB_VERTEX_DETAILS_REST_PATH = "/jobs/:jobid/vertices/:vertexid"; + + private final MetricFetcher fetcher; + + public JobVertexDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) { + super(executionGraphHolder, executor); + this.fetcher = fetcher; + } + + @Override + public String[] getPaths() { + return new String[]{JOB_VERTEX_DETAILS_REST_PATH}; + } + + @Override + public CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) { + return CompletableFuture.supplyAsync( + () -> { + try { + return createVertexDetailsJson(jobVertex, params.get("jobid"), fetcher); + } catch (IOException e) { + throw new FlinkFutureException("Could not write the vertex details json.", e); + } + }, + executor); + } + + /** + * Archivist for the JobVertexDetailsHandler. + */ + public static class JobVertexDetailsJsonArchivist implements JsonArchivist { + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + List<ArchivedJson> archive = new ArrayList<>(); + for (AccessExecutionJobVertex task : graph.getAllVertices().values()) { + String json = createVertexDetailsJson(task, graph.getJobID().toString(), null); + String path = JOB_VERTEX_DETAILS_REST_PATH + .replace(":jobid", graph.getJobID().toString()) + .replace(":vertexid", task.getJobVertexId().toString()); + archive.add(new ArchivedJson(path, json)); + } + return archive; + } + } + + public static String createVertexDetailsJson( + AccessExecutionJobVertex jobVertex, + String jobID, + @Nullable MetricFetcher fetcher) throws IOException { + final long now = System.currentTimeMillis(); + + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + + gen.writeStartObject(); + + gen.writeStringField("id", jobVertex.getJobVertexId().toString()); + gen.writeStringField("name", jobVertex.getName()); + gen.writeNumberField("parallelism", jobVertex.getParallelism()); + gen.writeNumberField("now", now); + + gen.writeArrayFieldStart("subtasks"); + int num = 0; + for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) { + final ExecutionState status = vertex.getExecutionState(); + + TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation(); + String locationString = location == null ? "(unassigned)" : location.getHostname() + ":" + location.dataPort(); + + long startTime = vertex.getStateTimestamp(ExecutionState.DEPLOYING); + if (startTime == 0) { + startTime = -1; + } + long endTime = status.isTerminal() ? vertex.getStateTimestamp(status) : -1; + long duration = startTime > 0 ? ((endTime > 0 ? endTime : now) - startTime) : -1; + + gen.writeStartObject(); + gen.writeNumberField("subtask", num); + gen.writeStringField("status", status.name()); + gen.writeNumberField("attempt", vertex.getCurrentExecutionAttempt().getAttemptNumber()); + gen.writeStringField("host", locationString); + gen.writeNumberField("start-time", startTime); + gen.writeNumberField("end-time", endTime); + gen.writeNumberField("duration", duration); + + MutableIOMetrics counts = new MutableIOMetrics(); + + counts.addIOMetrics( + vertex.getCurrentExecutionAttempt(), + fetcher, + jobID, + jobVertex.getJobVertexId().toString() + ); + + counts.writeIOMetricsAsJson(gen); + + gen.writeEndObject(); + + num++; + } + gen.writeEndArray(); + + gen.writeEndObject(); + + gen.close(); + return writer.toString(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandler.java new file mode 100644 index 0000000..fa4ab67 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandler.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.concurrent.FlinkFutureException; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; + +import com.fasterxml.jackson.core.JsonGenerator; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * A request handler that provides the details of a job vertex, including id, name, and the + * runtime and metrics of all its subtasks aggregated by TaskManager. + */ +public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandler { + + private static final String JOB_VERTEX_TASKMANAGERS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/taskmanagers"; + + private final MetricFetcher fetcher; + + public JobVertexTaskManagersHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) { + super(executionGraphHolder, executor); + this.fetcher = fetcher; + } + + @Override + public String[] getPaths() { + return new String[]{JOB_VERTEX_TASKMANAGERS_REST_PATH}; + } + + @Override + public CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) { + return CompletableFuture.supplyAsync( + () -> { + try { + return createVertexDetailsByTaskManagerJson(jobVertex, params.get("jobid"), fetcher); + } catch (IOException e) { + throw new FlinkFutureException("Could not create TaskManager json.", e); + } + }, + executor); + } + + /** + * Archivist for JobVertexTaskManagersHandler. + */ + public static class JobVertexTaskManagersJsonArchivist implements JsonArchivist { + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + List<ArchivedJson> archive = new ArrayList<>(); + for (AccessExecutionJobVertex task : graph.getAllVertices().values()) { + String json = createVertexDetailsByTaskManagerJson(task, graph.getJobID().toString(), null); + String path = JOB_VERTEX_TASKMANAGERS_REST_PATH + .replace(":jobid", graph.getJobID().toString()) + .replace(":vertexid", task.getJobVertexId().toString()); + archive.add(new ArchivedJson(path, json)); + } + return archive; + } + } + + public static String createVertexDetailsByTaskManagerJson( + AccessExecutionJobVertex jobVertex, + String jobID, + @Nullable MetricFetcher fetcher) throws IOException { + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + + // Build a map that groups tasks by TaskManager + Map<String, List<AccessExecutionVertex>> taskManagerVertices = new HashMap<>(); + + for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) { + TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation(); + String taskManager = location == null ? "(unassigned)" : location.getHostname() + ":" + location.dataPort(); + + List<AccessExecutionVertex> vertices = taskManagerVertices.get(taskManager); + + if (vertices == null) { + vertices = new ArrayList<>(); + taskManagerVertices.put(taskManager, vertices); + } + + vertices.add(vertex); + } + + // Build JSON response + final long now = System.currentTimeMillis(); + + gen.writeStartObject(); + + gen.writeStringField("id", jobVertex.getJobVertexId().toString()); + gen.writeStringField("name", jobVertex.getName()); + gen.writeNumberField("now", now); + + gen.writeArrayFieldStart("taskmanagers"); + for (Map.Entry<String, List<AccessExecutionVertex>> entry : taskManagerVertices.entrySet()) { + String host = entry.getKey(); + List<AccessExecutionVertex> taskVertices = entry.getValue(); + + int[] tasksPerState = new int[ExecutionState.values().length]; + + long startTime = Long.MAX_VALUE; + long endTime = 0; + boolean allFinished = true; + + MutableIOMetrics counts = new MutableIOMetrics(); + + for (AccessExecutionVertex vertex : taskVertices) { + final ExecutionState state = vertex.getExecutionState(); + tasksPerState[state.ordinal()]++; + + // take the earliest start time + long started = vertex.getStateTimestamp(ExecutionState.DEPLOYING); + if (started > 0) { + startTime = Math.min(startTime, started); + } + + allFinished &= state.isTerminal(); + endTime = Math.max(endTime, vertex.getStateTimestamp(state)); + + counts.addIOMetrics( + vertex.getCurrentExecutionAttempt(), + fetcher, + jobID, + jobVertex.getJobVertexId().toString()); + } + + long duration; + if (startTime < Long.MAX_VALUE) { + if (allFinished) { + duration = endTime - startTime; + } + else { + endTime = -1L; + duration = now - startTime; + } + } + else { + startTime = -1L; + endTime = -1L; + duration = -1L; + } + + ExecutionState jobVertexState = + ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, taskVertices.size()); + + gen.writeStartObject(); + + gen.writeStringField("host", host); + gen.writeStringField("status", jobVertexState.name()); + + gen.writeNumberField("start-time", startTime); + gen.writeNumberField("end-time", endTime); + gen.writeNumberField("duration", duration); + + counts.writeIOMetricsAsJson(gen); + + gen.writeObjectFieldStart("status-counts"); + for (ExecutionState state : ExecutionState.values()) { + gen.writeNumberField(state.name(), tasksPerState[state.ordinal()]); + } + gen.writeEndObject(); + + gen.writeEndObject(); + } + gen.writeEndArray(); + + gen.writeEndObject(); + + gen.close(); + return writer.toString(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JsonFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JsonFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JsonFactory.java new file mode 100644 index 0000000..e0629f3 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JsonFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +/** + * A holder for the singleton Jackson JSON factory. Since the Jackson's JSON factory object + * is a heavyweight object that is encouraged to be shared, we use a singleton instance across + * all request handlers. + */ +public class JsonFactory { + + /** The singleton Jackson JSON factory. */ + public static final com.fasterxml.jackson.core.JsonFactory JACKSON_FACTORY = + new com.fasterxml.jackson.core.JsonFactory(); + + // -------------------------------------------------------------------------------------------- + + private JsonFactory() {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/RequestHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/RequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/RequestHandler.java new file mode 100644 index 0000000..d9fc899 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/RequestHandler.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.jobmaster.JobManagerGateway; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Base interface for all request handlers. + * + * <p>Most handlers will want to use the {@link AbstractJsonRequestHandler} + * as a starting point, which produces a valid HTTP response. + */ +public interface RequestHandler { + + /** + * Core method that handles the request and generates the response. The method needs to + * respond with a full http response, including content-type, content-length, etc. + * + * <p>Exceptions may be throws and will be handled. + * + * @param pathParams The map of REST path parameters, decoded by the router. + * @param queryParams The map of query parameters. + * @param jobManagerGateway to talk to the JobManager. + * + * @return The full http response. + */ + CompletableFuture<FullHttpResponse> handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway); + + /** + * Returns an array of REST URL's under which this handler can be registered. + * + * @return array containing REST URL's under which this handler can be registered. + */ + String[] getPaths(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/RequestHandlerException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/RequestHandlerException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/RequestHandlerException.java new file mode 100644 index 0000000..0dad905 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/RequestHandlerException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +/** + * Base class for request handler exceptions. + */ +public class RequestHandlerException extends Exception { + + private static final long serialVersionUID = 7570352908725875886L; + + public RequestHandlerException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandler.java new file mode 100644 index 0000000..ff4fb46 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandler.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Request handler providing details about a single task execution attempt. + */ +public class SubtaskCurrentAttemptDetailsHandler extends SubtaskExecutionAttemptDetailsHandler { + + public static final String SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum"; + + public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) { + super(executionGraphHolder, executor, fetcher); + } + + @Override + public String[] getPaths() { + return new String[]{SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH}; + } + + @Override + public CompletableFuture<String> handleRequest(AccessExecutionVertex vertex, Map<String, String> params) { + return handleRequest(vertex.getCurrentExecutionAttempt(), params); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandler.java new file mode 100644 index 0000000..be4fe0b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandler.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.concurrent.FlinkFutureException; +import org.apache.flink.runtime.executiongraph.AccessExecution; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; + +import com.fasterxml.jackson.core.JsonGenerator; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Base class for request handlers whose response depends on a specific job vertex (defined + * via the "vertexid" parameter) in a specific job, defined via (defined voa the "jobid" parameter). + */ +public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskAttemptRequestHandler { + + private static final String SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators"; + + public SubtaskExecutionAttemptAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + super(executionGraphHolder, executor); + } + + @Override + public String[] getPaths() { + return new String[]{SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH}; + } + + @Override + public CompletableFuture<String> handleRequest(AccessExecution execAttempt, Map<String, String> params) { + return CompletableFuture.supplyAsync( + () -> { + try { + return createAttemptAccumulatorsJson(execAttempt); + } catch (IOException e) { + throw new FlinkFutureException("Could not create accumulator json.", e); + } + }, + executor); + } + + /** + * Archivist for the SubtaskExecutionAttemptAccumulatorsHandler. + */ + public static class SubtaskExecutionAttemptAccumulatorsJsonArchivist implements JsonArchivist { + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + List<ArchivedJson> archive = new ArrayList<>(); + for (AccessExecutionJobVertex task : graph.getAllVertices().values()) { + for (AccessExecutionVertex subtask : task.getTaskVertices()) { + String curAttemptJson = createAttemptAccumulatorsJson(subtask.getCurrentExecutionAttempt()); + String curAttemptPath = SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH + .replace(":jobid", graph.getJobID().toString()) + .replace(":vertexid", task.getJobVertexId().toString()) + .replace(":subtasknum", String.valueOf(subtask.getParallelSubtaskIndex())) + .replace(":attempt", String.valueOf(subtask.getCurrentExecutionAttempt().getAttemptNumber())); + + archive.add(new ArchivedJson(curAttemptPath, curAttemptJson)); + + for (int x = 0; x < subtask.getCurrentExecutionAttempt().getAttemptNumber(); x++) { + AccessExecution attempt = subtask.getPriorExecutionAttempt(x); + String json = createAttemptAccumulatorsJson(attempt); + String path = SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH + .replace(":jobid", graph.getJobID().toString()) + .replace(":vertexid", task.getJobVertexId().toString()) + .replace(":subtasknum", String.valueOf(subtask.getParallelSubtaskIndex())) + .replace(":attempt", String.valueOf(attempt.getAttemptNumber())); + archive.add(new ArchivedJson(path, json)); + } + } + } + return archive; + } + } + + public static String createAttemptAccumulatorsJson(AccessExecution execAttempt) throws IOException { + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + + final StringifiedAccumulatorResult[] accs = execAttempt.getUserAccumulatorsStringified(); + + gen.writeStartObject(); + + gen.writeNumberField("subtask", execAttempt.getParallelSubtaskIndex()); + gen.writeNumberField("attempt", execAttempt.getAttemptNumber()); + gen.writeStringField("id", execAttempt.getAttemptId().toString()); + + gen.writeArrayFieldStart("user-accumulators"); + for (StringifiedAccumulatorResult acc : accs) { + gen.writeStartObject(); + gen.writeStringField("name", acc.getName()); + gen.writeStringField("type", acc.getType()); + gen.writeStringField("value", acc.getValue()); + gen.writeEndObject(); + } + gen.writeEndArray(); + + gen.writeEndObject(); + + gen.close(); + return writer.toString(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java new file mode 100644 index 0000000..83a8793 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.concurrent.FlinkFutureException; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecution; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; + +import com.fasterxml.jackson.core.JsonGenerator; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +import static org.apache.flink.runtime.rest.handler.legacy.SubtaskCurrentAttemptDetailsHandler.SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH; + +/** + * Request handler providing details about a single task execution attempt. + */ +public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemptRequestHandler { + + private static final String SUBTASK_ATTEMPT_DETAILS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt"; + + private final MetricFetcher fetcher; + + public SubtaskExecutionAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) { + super(executionGraphHolder, executor); + this.fetcher = fetcher; + } + + @Override + public String[] getPaths() { + return new String[]{SUBTASK_ATTEMPT_DETAILS_REST_PATH}; + } + + @Override + public CompletableFuture<String> handleRequest(AccessExecution execAttempt, Map<String, String> params) { + return CompletableFuture.supplyAsync( + () -> { + try { + return createAttemptDetailsJson(execAttempt, params.get("jobid"), params.get("vertexid"), fetcher); + } catch (IOException e) { + throw new FlinkFutureException("Could not create attempt details json.", e); + } + }, + executor); + } + + /** + * Archivist for the SubtaskExecutionAttemptDetailsHandler. + */ + public static class SubtaskExecutionAttemptDetailsJsonArchivist implements JsonArchivist { + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + List<ArchivedJson> archive = new ArrayList<>(); + for (AccessExecutionJobVertex task : graph.getAllVertices().values()) { + for (AccessExecutionVertex subtask : task.getTaskVertices()) { + String curAttemptJson = createAttemptDetailsJson(subtask.getCurrentExecutionAttempt(), graph.getJobID().toString(), task.getJobVertexId().toString(), null); + String curAttemptPath1 = SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH + .replace(":jobid", graph.getJobID().toString()) + .replace(":vertexid", task.getJobVertexId().toString()) + .replace(":subtasknum", String.valueOf(subtask.getParallelSubtaskIndex())); + String curAttemptPath2 = SUBTASK_ATTEMPT_DETAILS_REST_PATH + .replace(":jobid", graph.getJobID().toString()) + .replace(":vertexid", task.getJobVertexId().toString()) + .replace(":subtasknum", String.valueOf(subtask.getParallelSubtaskIndex())) + .replace(":attempt", String.valueOf(subtask.getCurrentExecutionAttempt().getAttemptNumber())); + + archive.add(new ArchivedJson(curAttemptPath1, curAttemptJson)); + archive.add(new ArchivedJson(curAttemptPath2, curAttemptJson)); + + for (int x = 0; x < subtask.getCurrentExecutionAttempt().getAttemptNumber(); x++) { + AccessExecution attempt = subtask.getPriorExecutionAttempt(x); + String json = createAttemptDetailsJson(attempt, graph.getJobID().toString(), task.getJobVertexId().toString(), null); + String path = SUBTASK_ATTEMPT_DETAILS_REST_PATH + .replace(":jobid", graph.getJobID().toString()) + .replace(":vertexid", task.getJobVertexId().toString()) + .replace(":subtasknum", String.valueOf(subtask.getParallelSubtaskIndex())) + .replace(":attempt", String.valueOf(attempt.getAttemptNumber())); + archive.add(new ArchivedJson(path, json)); + } + } + } + return archive; + } + } + + public static String createAttemptDetailsJson( + AccessExecution execAttempt, + String jobID, + String vertexID, + @Nullable MetricFetcher fetcher) throws IOException { + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + + final ExecutionState status = execAttempt.getState(); + final long now = System.currentTimeMillis(); + + TaskManagerLocation location = execAttempt.getAssignedResourceLocation(); + String locationString = location == null ? "(unassigned)" : location.getHostname(); + + long startTime = execAttempt.getStateTimestamp(ExecutionState.DEPLOYING); + if (startTime == 0) { + startTime = -1; + } + long endTime = status.isTerminal() ? execAttempt.getStateTimestamp(status) : -1; + long duration = startTime > 0 ? ((endTime > 0 ? endTime : now) - startTime) : -1; + + gen.writeStartObject(); + gen.writeNumberField("subtask", execAttempt.getParallelSubtaskIndex()); + gen.writeStringField("status", status.name()); + gen.writeNumberField("attempt", execAttempt.getAttemptNumber()); + gen.writeStringField("host", locationString); + gen.writeNumberField("start-time", startTime); + gen.writeNumberField("end-time", endTime); + gen.writeNumberField("duration", duration); + + MutableIOMetrics counts = new MutableIOMetrics(); + + counts.addIOMetrics( + execAttempt, + fetcher, + jobID, + vertexID + ); + + counts.writeIOMetricsAsJson(gen); + + gen.writeEndObject(); + + gen.close(); + return writer.toString(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandler.java new file mode 100644 index 0000000..6d0757d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandler.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.concurrent.FlinkFutureException; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; + +import com.fasterxml.jackson.core.JsonGenerator; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Request handler that returns the accumulators for all subtasks of job vertex. + */ +public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHandler { + + private static final String SUBTASKS_ALL_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/accumulators"; + + public SubtasksAllAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + super(executionGraphHolder, executor); + } + + @Override + public String[] getPaths() { + return new String[]{SUBTASKS_ALL_ACCUMULATORS_REST_PATH}; + } + + @Override + public CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) { + return CompletableFuture.supplyAsync( + () -> { + try { + return createSubtasksAccumulatorsJson(jobVertex); + } catch (IOException e) { + throw new FlinkFutureException("Could not create subtasks accumulator json.", e); + } + }, + executor); + } + + /** + * Archivist for the SubtasksAllAccumulatorsHandler. + */ + public static class SubtasksAllAccumulatorsJsonArchivist implements JsonArchivist { + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + List<ArchivedJson> archive = new ArrayList<>(); + for (AccessExecutionJobVertex task : graph.getAllVertices().values()) { + String json = createSubtasksAccumulatorsJson(task); + String path = SUBTASKS_ALL_ACCUMULATORS_REST_PATH + .replace(":jobid", graph.getJobID().toString()) + .replace(":vertexid", task.getJobVertexId().toString()); + archive.add(new ArchivedJson(path, json)); + } + return archive; + } + } + + public static String createSubtasksAccumulatorsJson(AccessExecutionJobVertex jobVertex) throws IOException { + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + + gen.writeStartObject(); + gen.writeStringField("id", jobVertex.getJobVertexId().toString()); + gen.writeNumberField("parallelism", jobVertex.getParallelism()); + + gen.writeArrayFieldStart("subtasks"); + + int num = 0; + for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) { + + TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation(); + String locationString = location == null ? "(unassigned)" : location.getHostname(); + + gen.writeStartObject(); + + gen.writeNumberField("subtask", num++); + gen.writeNumberField("attempt", vertex.getCurrentExecutionAttempt().getAttemptNumber()); + gen.writeStringField("host", locationString); + + StringifiedAccumulatorResult[] accs = vertex.getCurrentExecutionAttempt().getUserAccumulatorsStringified(); + gen.writeArrayFieldStart("user-accumulators"); + for (StringifiedAccumulatorResult acc : accs) { + gen.writeStartObject(); + gen.writeStringField("name", acc.getName()); + gen.writeStringField("type", acc.getType()); + gen.writeStringField("value", acc.getValue()); + gen.writeEndObject(); + } + gen.writeEndArray(); + + gen.writeEndObject(); + } + gen.writeEndArray(); + + gen.writeEndObject(); + gen.close(); + return writer.toString(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandler.java new file mode 100644 index 0000000..13fdc16 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandler.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.concurrent.FlinkFutureException; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; + +import com.fasterxml.jackson.core.JsonGenerator; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Request handler that returns the state transition timestamps for all subtasks, plus their + * location and duration. + */ +public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler { + + private static final String SUBTASK_TIMES_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasktimes"; + + public SubtasksTimesHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + super(executionGraphHolder, executor); + } + + @Override + public String[] getPaths() { + return new String[]{SUBTASK_TIMES_REST_PATH}; + } + + @Override + public CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) { + return CompletableFuture.supplyAsync( + () -> { + try { + return createSubtaskTimesJson(jobVertex); + } catch (IOException e) { + throw new FlinkFutureException("Could not write subtask time json.", e); + } + }, + executor); + } + + /** + * Archivist for the SubtasksTimesHandler. + */ + public static class SubtasksTimesJsonArchivist implements JsonArchivist { + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + List<ArchivedJson> archive = new ArrayList<>(); + for (AccessExecutionJobVertex task : graph.getAllVertices().values()) { + String json = createSubtaskTimesJson(task); + String path = SUBTASK_TIMES_REST_PATH + .replace(":jobid", graph.getJobID().toString()) + .replace(":vertexid", task.getJobVertexId().toString()); + archive.add(new ArchivedJson(path, json)); + } + return archive; + } + } + + public static String createSubtaskTimesJson(AccessExecutionJobVertex jobVertex) throws IOException { + final long now = System.currentTimeMillis(); + + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + + gen.writeStartObject(); + + gen.writeStringField("id", jobVertex.getJobVertexId().toString()); + gen.writeStringField("name", jobVertex.getName()); + gen.writeNumberField("now", now); + + gen.writeArrayFieldStart("subtasks"); + + int num = 0; + for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) { + + long[] timestamps = vertex.getCurrentExecutionAttempt().getStateTimestamps(); + ExecutionState status = vertex.getExecutionState(); + + long scheduledTime = timestamps[ExecutionState.SCHEDULED.ordinal()]; + + long start = scheduledTime > 0 ? scheduledTime : -1; + long end = status.isTerminal() ? timestamps[status.ordinal()] : now; + long duration = start >= 0 ? end - start : -1L; + + gen.writeStartObject(); + gen.writeNumberField("subtask", num++); + + TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation(); + String locationString = location == null ? "(unassigned)" : location.getHostname(); + gen.writeStringField("host", locationString); + + gen.writeNumberField("duration", duration); + + gen.writeObjectFieldStart("timestamps"); + for (ExecutionState state : ExecutionState.values()) { + gen.writeNumberField(state.name(), timestamps[state.ordinal()]); + } + gen.writeEndObject(); + + gen.writeEndObject(); + } + + gen.writeEndArray(); + gen.writeEndObject(); + + gen.close(); + return writer.toString(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java new file mode 100644 index 0000000..718657e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +/***************************************************************************** + * This code is based on the "HttpStaticFileServerHandler" from the + * Netty project's HTTP server example. + * + * See http://netty.io and + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java + *****************************************************************************/ + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobCache; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.blob.BlobView; +import org.apache.flink.runtime.concurrent.FlinkFutureException; +import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.runtime.rest.handler.RedirectHandler; +import org.apache.flink.runtime.rest.handler.WebHandler; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpChunkedInput; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed; +import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile; +import org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future; +import org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureListener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.net.InetSocketAddress; +import java.nio.channels.FileChannel; +import java.util.HashMap; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; + +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Request handler that returns the TaskManager log/out files. + * + * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server + * example.</p> + */ [email protected] +public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> implements WebHandler { + private static final Logger LOG = LoggerFactory.getLogger(TaskManagerLogHandler.class); + + private static final String TASKMANAGER_LOG_REST_PATH = "/taskmanagers/:taskmanagerid/log"; + private static final String TASKMANAGER_OUT_REST_PATH = "/taskmanagers/:taskmanagerid/stdout"; + + /** Keep track of last transmitted log, to clean up old ones. */ + private final HashMap<String, BlobKey> lastSubmittedLog = new HashMap<>(); + private final HashMap<String, BlobKey> lastSubmittedStdout = new HashMap<>(); + + /** Keep track of request status, prevents multiple log requests for a single TM running concurrently. */ + private final ConcurrentHashMap<String, Boolean> lastRequestPending = new ConcurrentHashMap<>(); + private final Configuration config; + + /** Future of the blob cache. */ + private CompletableFuture<BlobCache> cache; + + /** Indicates which log file should be displayed. */ + private FileMode fileMode; + + private final Executor executor; + + private final BlobView blobView; + + /** Used to control whether this handler serves the .log or .out file. */ + public enum FileMode { + LOG, + STDOUT + } + + public TaskManagerLogHandler( + GatewayRetriever<JobManagerGateway> retriever, + Executor executor, + CompletableFuture<String> localJobManagerAddressPromise, + Time timeout, + FileMode fileMode, + Configuration config, + BlobView blobView) { + super(localJobManagerAddressPromise, retriever, timeout); + + this.executor = checkNotNull(executor); + this.config = config; + this.fileMode = fileMode; + + this.blobView = Preconditions.checkNotNull(blobView, "blobView"); + } + + @Override + public String[] getPaths() { + switch (fileMode) { + case LOG: + return new String[]{TASKMANAGER_LOG_REST_PATH}; + case STDOUT: + default: + return new String[]{TASKMANAGER_OUT_REST_PATH}; + } + } + + /** + * Response when running with leading JobManager. + */ + @Override + protected void respondAsLeader(final ChannelHandlerContext ctx, final Routed routed, final JobManagerGateway jobManagerGateway) { + if (cache == null) { + CompletableFuture<Integer> blobPortFuture = jobManagerGateway.requestBlobServerPort(timeout); + cache = blobPortFuture.thenApplyAsync( + (Integer port) -> { + try { + return new BlobCache(new InetSocketAddress(jobManagerGateway.getHostname(), port), config, blobView); + } catch (IOException e) { + throw new FlinkFutureException("Could not create BlobCache.", e); + } + }, + executor); + } + + final String taskManagerID = routed.pathParams().get(TaskManagersHandler.TASK_MANAGER_ID_KEY); + final HttpRequest request = routed.request(); + + //fetch TaskManager logs if no other process is currently doing it + if (lastRequestPending.putIfAbsent(taskManagerID, true) == null) { + try { + InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(taskManagerID)); + CompletableFuture<Optional<Instance>> taskManagerFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, timeout); + + CompletableFuture<BlobKey> blobKeyFuture = taskManagerFuture.thenCompose( + (Optional<Instance> optTMInstance) -> { + Instance taskManagerInstance = optTMInstance.orElseThrow( + () -> new FlinkFutureException("Could not find instance with " + instanceID + '.')); + switch (fileMode) { + case LOG: + return taskManagerInstance.getTaskManagerGateway().requestTaskManagerLog(timeout); + case STDOUT: + default: + return taskManagerInstance.getTaskManagerGateway().requestTaskManagerStdout(timeout); + } + } + ); + + CompletableFuture<String> logPathFuture = blobKeyFuture + .thenCombineAsync( + cache, + (blobKey, blobCache) -> { + //delete previous log file, if it is different than the current one + HashMap<String, BlobKey> lastSubmittedFile = fileMode == FileMode.LOG ? lastSubmittedLog : lastSubmittedStdout; + if (lastSubmittedFile.containsKey(taskManagerID)) { + if (!Objects.equals(blobKey, lastSubmittedFile.get(taskManagerID))) { + try { + blobCache.deleteGlobal(lastSubmittedFile.get(taskManagerID)); + } catch (IOException e) { + throw new FlinkFutureException("Could not delete file for " + taskManagerID + '.', e); + } + lastSubmittedFile.put(taskManagerID, blobKey); + } + } else { + lastSubmittedFile.put(taskManagerID, blobKey); + } + try { + return blobCache.getFile(blobKey).getAbsolutePath(); + } catch (IOException e) { + throw new FlinkFutureException("Could not retrieve blob for " + blobKey + '.', e); + } + }, + executor); + + logPathFuture.exceptionally( + failure -> { + display(ctx, request, "Fetching TaskManager log failed."); + LOG.error("Fetching TaskManager log failed.", failure); + lastRequestPending.remove(taskManagerID); + + return null; + }); + + logPathFuture.thenAccept( + filePath -> { + File file = new File(filePath); + final RandomAccessFile raf; + try { + raf = new RandomAccessFile(file, "r"); + } catch (FileNotFoundException e) { + display(ctx, request, "Displaying TaskManager log failed."); + LOG.error("Displaying TaskManager log failed.", e); + + return; + } + long fileLength; + try { + fileLength = raf.length(); + } catch (IOException ioe) { + display(ctx, request, "Displaying TaskManager log failed."); + LOG.error("Displaying TaskManager log failed.", ioe); + try { + raf.close(); + } catch (IOException e) { + LOG.error("Could not close random access file.", e); + } + + return; + } + final FileChannel fc = raf.getChannel(); + + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); + response.headers().set(CONTENT_TYPE, "text/plain"); + + if (HttpHeaders.isKeepAlive(request)) { + response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE); + } + HttpHeaders.setContentLength(response, fileLength); + + // write the initial line and the header. + ctx.write(response); + + // write the content. + ChannelFuture lastContentFuture; + final GenericFutureListener<Future<? super Void>> completionListener = future -> { + lastRequestPending.remove(taskManagerID); + fc.close(); + raf.close(); + }; + if (ctx.pipeline().get(SslHandler.class) == null) { + ctx.write( + new DefaultFileRegion(fc, 0, fileLength), ctx.newProgressivePromise()) + .addListener(completionListener); + lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); + + } else { + try { + lastContentFuture = ctx.writeAndFlush( + new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)), + ctx.newProgressivePromise()) + .addListener(completionListener); + } catch (IOException e) { + display(ctx, request, "Displaying TaskManager log failed."); + LOG.warn("Could not write http data.", e); + + return; + } + // HttpChunkedInput will write the end marker (LastHttpContent) for us. + } + + // close the connection, if no keep-alive is needed + if (!HttpHeaders.isKeepAlive(request)) { + lastContentFuture.addListener(ChannelFutureListener.CLOSE); + } + }); + } catch (Exception e) { + display(ctx, request, "Error: " + e.getMessage()); + LOG.error("Fetching TaskManager log failed.", e); + lastRequestPending.remove(taskManagerID); + } + } else { + display(ctx, request, "loading..."); + } + } + + private void display(ChannelHandlerContext ctx, HttpRequest request, String message) { + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); + response.headers().set(CONTENT_TYPE, "text/plain"); + + if (HttpHeaders.isKeepAlive(request)) { + response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE); + } + + byte[] buf = message.getBytes(ConfigConstants.DEFAULT_CHARSET); + + ByteBuf b = Unpooled.copiedBuffer(buf); + + HttpHeaders.setContentLength(response, buf.length); + + // write the initial line and the header. + ctx.write(response); + + ctx.write(b); + + ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); + + // close the connection, if no keep-alive is needed + if (!HttpHeaders.isKeepAlive(request)) { + lastContentFuture.addListener(ChannelFutureListener.CLOSE); + } + } +}
