http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java deleted file mode 100644 index 9f83ed0..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.concurrent.FlinkFutureException; -import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.instance.Instance; -import org.apache.flink.runtime.instance.InstanceID; -import org.apache.flink.runtime.jobmaster.JobManagerGateway; -import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; -import org.apache.flink.runtime.webmonitor.metrics.MetricStore; -import org.apache.flink.util.StringUtils; - -import com.fasterxml.jackson.core.JsonGenerator; - -import java.io.IOException; -import java.io.StringWriter; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -import static java.util.Objects.requireNonNull; - -/** - * A request handler that provides an overview over all taskmanagers or details for a single one. - */ -public class TaskManagersHandler extends AbstractJsonRequestHandler { - - private static final String TASKMANAGERS_REST_PATH = "/taskmanagers"; - private static final String TASKMANAGER_DETAILS_REST_PATH = "/taskmanagers/:taskmanagerid"; - - public static final String TASK_MANAGER_ID_KEY = "taskmanagerid"; - - private final Time timeout; - - private final MetricFetcher fetcher; - - public TaskManagersHandler(Executor executor, Time timeout, MetricFetcher fetcher) { - super(executor); - this.timeout = requireNonNull(timeout); - this.fetcher = fetcher; - } - - @Override - public String[] getPaths() { - return new String[]{TASKMANAGERS_REST_PATH, TASKMANAGER_DETAILS_REST_PATH}; - } - - @Override - public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) { - if (jobManagerGateway != null) { - // whether one task manager's metrics are requested, or all task manager, we - // return them in an array. This avoids unnecessary code complexity. - // If only one task manager is requested, we only fetch one task manager metrics. - if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) { - InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(pathParams.get(TASK_MANAGER_ID_KEY))); - CompletableFuture<Optional<Instance>> tmInstanceFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, timeout); - - return tmInstanceFuture.thenApplyAsync( - (Optional<Instance> optTaskManager) -> { - try { - return writeTaskManagersJson( - optTaskManager.map(Collections::singleton).orElse(Collections.emptySet()), - pathParams); - } catch (IOException e) { - throw new FlinkFutureException("Could not write TaskManagers JSON.", e); - } - }, - executor); - } else { - CompletableFuture<Collection<Instance>> tmInstancesFuture = jobManagerGateway.requestTaskManagerInstances(timeout); - - return tmInstancesFuture.thenApplyAsync( - (Collection<Instance> taskManagers) -> { - try { - return writeTaskManagersJson(taskManagers, pathParams); - } catch (IOException e) { - throw new FlinkFutureException("Could not write TaskManagers JSON.", e); - } - }, - executor); - } - } - else { - return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager.")); - } - } - - private String writeTaskManagersJson(Collection<Instance> instances, Map<String, String> pathParams) throws IOException { - StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); - - gen.writeStartObject(); - gen.writeArrayFieldStart("taskmanagers"); - - for (Instance instance : instances) { - gen.writeStartObject(); - gen.writeStringField("id", instance.getId().toString()); - gen.writeStringField("path", instance.getTaskManagerGateway().getAddress()); - gen.writeNumberField("dataPort", instance.getTaskManagerLocation().dataPort()); - gen.writeNumberField("timeSinceLastHeartbeat", instance.getLastHeartBeat()); - gen.writeNumberField("slotsNumber", instance.getTotalNumberOfSlots()); - gen.writeNumberField("freeSlots", instance.getNumberOfAvailableSlots()); - gen.writeNumberField("cpuCores", instance.getResources().getNumberOfCPUCores()); - gen.writeNumberField("physicalMemory", instance.getResources().getSizeOfPhysicalMemory()); - gen.writeNumberField("freeMemory", instance.getResources().getSizeOfJvmHeap()); - gen.writeNumberField("managedMemory", instance.getResources().getSizeOfManagedMemory()); - - // only send metrics when only one task manager requests them. - if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) { - fetcher.update(); - MetricStore.TaskManagerMetricStore metrics = fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString()); - if (metrics != null) { - gen.writeObjectFieldStart("metrics"); - long heapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0")); - long heapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0")); - long heapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0")); - - gen.writeNumberField("heapCommitted", heapCommitted); - gen.writeNumberField("heapUsed", heapUsed); - gen.writeNumberField("heapMax", heapTotal); - - long nonHeapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0")); - long nonHeapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0")); - long nonHeapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0")); - - gen.writeNumberField("nonHeapCommitted", nonHeapCommitted); - gen.writeNumberField("nonHeapUsed", nonHeapUsed); - gen.writeNumberField("nonHeapMax", nonHeapTotal); - - gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted); - gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed); - gen.writeNumberField("totalMax", heapTotal + nonHeapTotal); - - long directCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0")); - long directUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0")); - long directMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0")); - - gen.writeNumberField("directCount", directCount); - gen.writeNumberField("directUsed", directUsed); - gen.writeNumberField("directMax", directMax); - - long mappedCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0")); - long mappedUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0")); - long mappedMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0")); - - gen.writeNumberField("mappedCount", mappedCount); - gen.writeNumberField("mappedUsed", mappedUsed); - gen.writeNumberField("mappedMax", mappedMax); - - long memorySegmentsAvailable = Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0")); - long memorySegmentsTotal = Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0")); - - gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable); - gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal); - - gen.writeArrayFieldStart("garbageCollectors"); - - for (String gcName : metrics.garbageCollectorNames) { - String count = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null); - String time = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null); - if (count != null && time != null) { - gen.writeStartObject(); - gen.writeStringField("name", gcName); - gen.writeNumberField("count", Long.valueOf(count)); - gen.writeNumberField("time", Long.valueOf(time)); - gen.writeEndObject(); - } - } - - gen.writeEndArray(); - gen.writeEndObject(); - } - } - - gen.writeEndObject(); - } - - gen.writeEndArray(); - gen.writeEndObject(); - - gen.close(); - return writer.toString(); - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java deleted file mode 100644 index 3affd7c..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers.checkpoints; - -import org.apache.flink.runtime.concurrent.FlinkFutureException; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; -import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler; -import org.apache.flink.runtime.webmonitor.handlers.JsonFactory; -import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; - -import com.fasterxml.jackson.core.JsonGenerator; - -import java.io.IOException; -import java.io.StringWriter; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -/** - * Handler that returns a job's snapshotting settings. - */ -public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandler { - - private static final String CHECKPOINT_CONFIG_REST_PATH = "/jobs/:jobid/checkpoints/config"; - - public CheckpointConfigHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { - super(executionGraphHolder, executor); - } - - @Override - public String[] getPaths() { - return new String[]{CHECKPOINT_CONFIG_REST_PATH}; - } - - @Override - public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) { - return CompletableFuture.supplyAsync( - () -> { - try { - return createCheckpointConfigJson(graph); - } catch (IOException e) { - throw new FlinkFutureException("Could not create checkpoint config json.", e); - } - }, - executor); - } - - /** - * Archivist for the CheckpointConfigHandler. - */ - public static class CheckpointConfigJsonArchivist implements JsonArchivist { - - @Override - public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { - String json = createCheckpointConfigJson(graph); - String path = CHECKPOINT_CONFIG_REST_PATH - .replace(":jobid", graph.getJobID().toString()); - return Collections.singletonList(new ArchivedJson(path, json)); - } - } - - private static String createCheckpointConfigJson(AccessExecutionGraph graph) throws IOException { - StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); - JobCheckpointingSettings settings = graph.getJobCheckpointingSettings(); - - if (settings == null) { - return "{}"; - } - - gen.writeStartObject(); - { - gen.writeStringField("mode", settings.isExactlyOnce() ? "exactly_once" : "at_least_once"); - gen.writeNumberField("interval", settings.getCheckpointInterval()); - gen.writeNumberField("timeout", settings.getCheckpointTimeout()); - gen.writeNumberField("min_pause", settings.getMinPauseBetweenCheckpoints()); - gen.writeNumberField("max_concurrent", settings.getMaxConcurrentCheckpoints()); - - ExternalizedCheckpointSettings externalization = settings.getExternalizedCheckpointSettings(); - gen.writeObjectFieldStart("externalization"); - { - if (externalization.externalizeCheckpoints()) { - gen.writeBooleanField("enabled", true); - gen.writeBooleanField("delete_on_cancellation", externalization.deleteOnCancellation()); - } else { - gen.writeBooleanField("enabled", false); - } - } - gen.writeEndObject(); - - } - gen.writeEndObject(); - - gen.close(); - - return writer.toString(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCache.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCache.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCache.java deleted file mode 100644 index 974364d..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCache.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers.checkpoints; - -import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; - -import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; -import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; - -import javax.annotation.Nullable; - -/** - * A size-based cache of accessed checkpoints for completed and failed - * checkpoints. - * - * <p>Having this cache in place for accessed stats improves the user - * experience quite a bit as accessed checkpoint stats stay available - * and don't expire. For example if you manage to click on the last - * checkpoint in the history, it is not available via the stats as soon - * as another checkpoint is triggered. With the cache in place, the - * checkpoint will still be available for investigation. - */ -public class CheckpointStatsCache { - - @Nullable - private final Cache<Long, AbstractCheckpointStats> cache; - - public CheckpointStatsCache(int maxNumEntries) { - if (maxNumEntries > 0) { - this.cache = CacheBuilder.<Long, AbstractCheckpointStats>newBuilder() - .maximumSize(maxNumEntries) - .build(); - } else { - this.cache = null; - } - } - - /** - * Try to add the checkpoint to the cache. - * - * @param checkpoint Checkpoint to be added. - */ - void tryAdd(AbstractCheckpointStats checkpoint) { - // Don't add in progress checkpoints as they will be replaced by their - // completed/failed version eventually. - if (cache != null && checkpoint != null && !checkpoint.getStatus().isInProgress()) { - cache.put(checkpoint.getCheckpointId(), checkpoint); - } - } - - /** - * Try to look up a checkpoint by it's ID in the cache. - * - * @param checkpointId ID of the checkpoint to look up. - * @return The checkpoint or <code>null</code> if checkpoint not found. - */ - AbstractCheckpointStats tryGet(long checkpointId) { - if (cache != null) { - return cache.getIfPresent(checkpointId); - } else { - return null; - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java deleted file mode 100644 index 96cc3e0..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers.checkpoints; - -import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; -import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory; -import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; -import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats; -import org.apache.flink.runtime.checkpoint.FailedCheckpointStats; -import org.apache.flink.runtime.checkpoint.TaskStateStats; -import org.apache.flink.runtime.concurrent.FlinkFutureException; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler; -import org.apache.flink.runtime.webmonitor.handlers.JsonFactory; -import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; - -import com.fasterxml.jackson.core.JsonGenerator; - -import java.io.IOException; -import java.io.StringWriter; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -/** - * Request handler that returns checkpoint stats for a single job vertex. - */ -public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequestHandler { - - private static final String CHECKPOINT_STATS_DETAILS_REST_PATH = "/jobs/:jobid/checkpoints/details/:checkpointid"; - - private final CheckpointStatsCache cache; - - public CheckpointStatsDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, CheckpointStatsCache cache) { - super(executionGraphHolder, executor); - this.cache = cache; - } - - @Override - public String[] getPaths() { - return new String[]{CHECKPOINT_STATS_DETAILS_REST_PATH}; - } - - @Override - public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) { - return CompletableFuture.supplyAsync( - () -> { - long checkpointId = parseCheckpointId(params); - if (checkpointId == -1) { - return "{}"; - } - - CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot(); - if (snapshot == null) { - return "{}"; - } - - AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId); - - if (checkpoint != null) { - cache.tryAdd(checkpoint); - } else { - checkpoint = cache.tryGet(checkpointId); - - if (checkpoint == null) { - return "{}"; - } - } - - try { - return createCheckpointDetailsJson(checkpoint); - } catch (IOException e) { - throw new FlinkFutureException("Could not create checkpoint details json.", e); - } - }, - executor); - } - - /** - * Archivist for the CheckpointStatsDetails. - */ - public static class CheckpointStatsDetailsJsonArchivist implements JsonArchivist { - - @Override - public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { - CheckpointStatsSnapshot stats = graph.getCheckpointStatsSnapshot(); - if (stats == null) { - return Collections.emptyList(); - } - CheckpointStatsHistory history = stats.getHistory(); - List<ArchivedJson> archive = new ArrayList<>(); - for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) { - String json = createCheckpointDetailsJson(checkpoint); - String path = CHECKPOINT_STATS_DETAILS_REST_PATH - .replace(":jobid", graph.getJobID().toString()) - .replace(":checkpointid", String.valueOf(checkpoint.getCheckpointId())); - archive.add(new ArchivedJson(path, json)); - } - return archive; - } - } - - public static String createCheckpointDetailsJson(AbstractCheckpointStats checkpoint) throws IOException { - StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); - gen.writeStartObject(); - - gen.writeNumberField("id", checkpoint.getCheckpointId()); - gen.writeStringField("status", checkpoint.getStatus().toString()); - gen.writeBooleanField("is_savepoint", checkpoint.getProperties().isSavepoint()); - gen.writeNumberField("trigger_timestamp", checkpoint.getTriggerTimestamp()); - gen.writeNumberField("latest_ack_timestamp", checkpoint.getLatestAckTimestamp()); - gen.writeNumberField("state_size", checkpoint.getStateSize()); - gen.writeNumberField("end_to_end_duration", checkpoint.getEndToEndDuration()); - gen.writeNumberField("alignment_buffered", checkpoint.getAlignmentBuffered()); - gen.writeNumberField("num_subtasks", checkpoint.getNumberOfSubtasks()); - gen.writeNumberField("num_acknowledged_subtasks", checkpoint.getNumberOfAcknowledgedSubtasks()); - - if (checkpoint.getStatus().isCompleted()) { - // --- Completed --- - CompletedCheckpointStats completed = (CompletedCheckpointStats) checkpoint; - - String externalPath = completed.getExternalPath(); - if (externalPath != null) { - gen.writeStringField("external_path", externalPath); - } - - gen.writeBooleanField("discarded", completed.isDiscarded()); - } - else if (checkpoint.getStatus().isFailed()) { - // --- Failed --- - FailedCheckpointStats failed = (FailedCheckpointStats) checkpoint; - - gen.writeNumberField("failure_timestamp", failed.getFailureTimestamp()); - - String failureMsg = failed.getFailureMessage(); - if (failureMsg != null) { - gen.writeStringField("failure_message", failureMsg); - } - } - - gen.writeObjectFieldStart("tasks"); - for (TaskStateStats taskStats : checkpoint.getAllTaskStateStats()) { - gen.writeObjectFieldStart(taskStats.getJobVertexId().toString()); - - gen.writeNumberField("latest_ack_timestamp", taskStats.getLatestAckTimestamp()); - gen.writeNumberField("state_size", taskStats.getStateSize()); - gen.writeNumberField("end_to_end_duration", taskStats.getEndToEndDuration(checkpoint.getTriggerTimestamp())); - gen.writeNumberField("alignment_buffered", taskStats.getAlignmentBuffered()); - gen.writeNumberField("num_subtasks", taskStats.getNumberOfSubtasks()); - gen.writeNumberField("num_acknowledged_subtasks", taskStats.getNumberOfAcknowledgedSubtasks()); - - gen.writeEndObject(); - } - gen.writeEndObject(); - - gen.writeEndObject(); - gen.close(); - - return writer.toString(); - } - - /** - * Returns the checkpoint ID parsed from the provided parameters. - * - * @param params Path parameters - * @return Parsed checkpoint ID or <code>-1</code> if not available. - */ - static long parseCheckpointId(Map<String, String> params) { - String param = params.get("checkpointid"); - if (param == null) { - return -1; - } - - try { - return Long.parseLong(param); - } catch (NumberFormatException ignored) { - return -1; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java deleted file mode 100644 index 045248b..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java +++ /dev/null @@ -1,234 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers.checkpoints; - -import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; -import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory; -import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; -import org.apache.flink.runtime.checkpoint.MinMaxAvgStats; -import org.apache.flink.runtime.checkpoint.SubtaskStateStats; -import org.apache.flink.runtime.checkpoint.TaskStateStats; -import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobmaster.JobManagerGateway; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler; -import org.apache.flink.runtime.webmonitor.handlers.AbstractJobVertexRequestHandler; -import org.apache.flink.runtime.webmonitor.handlers.JsonFactory; -import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; - -import com.fasterxml.jackson.core.JsonGenerator; - -import java.io.IOException; -import java.io.StringWriter; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -import static org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsHandler.writeMinMaxAvg; -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Request handler that returns checkpoint stats for a single job vertex with - * the summary stats and all subtasks. - */ -public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGraphRequestHandler { - - private static final String CHECKPOINT_STATS_DETAILS_SUBTASKS_REST_PATH = "/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid"; - - private final CheckpointStatsCache cache; - - public CheckpointStatsDetailsSubtasksHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, CheckpointStatsCache cache) { - super(executionGraphHolder, executor); - this.cache = checkNotNull(cache); - } - - @Override - public String[] getPaths() { - return new String[]{CHECKPOINT_STATS_DETAILS_SUBTASKS_REST_PATH}; - } - - @Override - public CompletableFuture<String> handleJsonRequest( - Map<String, String> pathParams, - Map<String, String> queryParams, - JobManagerGateway jobManagerGateway) { - return super.handleJsonRequest(pathParams, queryParams, jobManagerGateway); - } - - @Override - public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) { - long checkpointId = CheckpointStatsDetailsHandler.parseCheckpointId(params); - if (checkpointId == -1) { - return CompletableFuture.completedFuture("{}"); - } - - JobVertexID vertexId = AbstractJobVertexRequestHandler.parseJobVertexId(params); - if (vertexId == null) { - return CompletableFuture.completedFuture("{}"); - } - - CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot(); - if (snapshot == null) { - return CompletableFuture.completedFuture("{}"); - } - - AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId); - - if (checkpoint != null) { - cache.tryAdd(checkpoint); - } else { - checkpoint = cache.tryGet(checkpointId); - - if (checkpoint == null) { - return CompletableFuture.completedFuture("{}"); - } - } - - TaskStateStats taskStats = checkpoint.getTaskStateStats(vertexId); - if (taskStats == null) { - return CompletableFuture.completedFuture("{}"); - } - - try { - return CompletableFuture.completedFuture(createSubtaskCheckpointDetailsJson(checkpoint, taskStats)); - } catch (IOException e) { - return FutureUtils.completedExceptionally(e); - } - } - - /** - * Archivist for the CheckpointStatsDetailsSubtasksHandler. - */ - public static class CheckpointStatsDetailsSubtasksJsonArchivist implements JsonArchivist { - - @Override - public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { - CheckpointStatsSnapshot stats = graph.getCheckpointStatsSnapshot(); - if (stats == null) { - return Collections.emptyList(); - } - CheckpointStatsHistory history = stats.getHistory(); - List<ArchivedJson> archive = new ArrayList<>(); - for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) { - for (TaskStateStats subtaskStats : checkpoint.getAllTaskStateStats()) { - String json = createSubtaskCheckpointDetailsJson(checkpoint, subtaskStats); - String path = CHECKPOINT_STATS_DETAILS_SUBTASKS_REST_PATH - .replace(":jobid", graph.getJobID().toString()) - .replace(":checkpointid", String.valueOf(checkpoint.getCheckpointId())) - .replace(":vertexid", subtaskStats.getJobVertexId().toString()); - archive.add(new ArchivedJson(path, json)); - } - } - return archive; - } - } - - private static String createSubtaskCheckpointDetailsJson(AbstractCheckpointStats checkpoint, TaskStateStats taskStats) throws IOException { - StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); - - gen.writeStartObject(); - // Overview - gen.writeNumberField("id", checkpoint.getCheckpointId()); - gen.writeStringField("status", checkpoint.getStatus().toString()); - gen.writeNumberField("latest_ack_timestamp", taskStats.getLatestAckTimestamp()); - gen.writeNumberField("state_size", taskStats.getStateSize()); - gen.writeNumberField("end_to_end_duration", taskStats.getEndToEndDuration(checkpoint.getTriggerTimestamp())); - gen.writeNumberField("alignment_buffered", taskStats.getAlignmentBuffered()); - gen.writeNumberField("num_subtasks", taskStats.getNumberOfSubtasks()); - gen.writeNumberField("num_acknowledged_subtasks", taskStats.getNumberOfAcknowledgedSubtasks()); - - if (taskStats.getNumberOfAcknowledgedSubtasks() > 0) { - gen.writeObjectFieldStart("summary"); - gen.writeObjectFieldStart("state_size"); - writeMinMaxAvg(gen, taskStats.getSummaryStats().getStateSizeStats()); - gen.writeEndObject(); - - gen.writeObjectFieldStart("end_to_end_duration"); - MinMaxAvgStats ackTimestampStats = taskStats.getSummaryStats().getAckTimestampStats(); - gen.writeNumberField("min", Math.max(0, ackTimestampStats.getMinimum() - checkpoint.getTriggerTimestamp())); - gen.writeNumberField("max", Math.max(0, ackTimestampStats.getMaximum() - checkpoint.getTriggerTimestamp())); - gen.writeNumberField("avg", Math.max(0, ackTimestampStats.getAverage() - checkpoint.getTriggerTimestamp())); - gen.writeEndObject(); - - gen.writeObjectFieldStart("checkpoint_duration"); - gen.writeObjectFieldStart("sync"); - writeMinMaxAvg(gen, taskStats.getSummaryStats().getSyncCheckpointDurationStats()); - gen.writeEndObject(); - gen.writeObjectFieldStart("async"); - writeMinMaxAvg(gen, taskStats.getSummaryStats().getAsyncCheckpointDurationStats()); - gen.writeEndObject(); - gen.writeEndObject(); - - gen.writeObjectFieldStart("alignment"); - gen.writeObjectFieldStart("buffered"); - writeMinMaxAvg(gen, taskStats.getSummaryStats().getAlignmentBufferedStats()); - gen.writeEndObject(); - gen.writeObjectFieldStart("duration"); - writeMinMaxAvg(gen, taskStats.getSummaryStats().getAlignmentDurationStats()); - gen.writeEndObject(); - gen.writeEndObject(); - gen.writeEndObject(); - } - - SubtaskStateStats[] subtasks = taskStats.getSubtaskStats(); - - gen.writeArrayFieldStart("subtasks"); - for (int i = 0; i < subtasks.length; i++) { - SubtaskStateStats subtask = subtasks[i]; - - gen.writeStartObject(); - gen.writeNumberField("index", i); - - if (subtask != null) { - gen.writeStringField("status", "completed"); - gen.writeNumberField("ack_timestamp", subtask.getAckTimestamp()); - gen.writeNumberField("end_to_end_duration", subtask.getEndToEndDuration(checkpoint.getTriggerTimestamp())); - gen.writeNumberField("state_size", subtask.getStateSize()); - - gen.writeObjectFieldStart("checkpoint"); - gen.writeNumberField("sync", subtask.getSyncCheckpointDuration()); - gen.writeNumberField("async", subtask.getAsyncCheckpointDuration()); - gen.writeEndObject(); - - gen.writeObjectFieldStart("alignment"); - gen.writeNumberField("buffered", subtask.getAlignmentBuffered()); - gen.writeNumberField("duration", subtask.getAlignmentDuration()); - gen.writeEndObject(); - } else { - gen.writeStringField("status", "pending_or_failed"); - } - gen.writeEndObject(); - } - gen.writeEndArray(); - - gen.writeEndObject(); - gen.close(); - - return writer.toString(); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java deleted file mode 100644 index a60aee0..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java +++ /dev/null @@ -1,277 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers.checkpoints; - -import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; -import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts; -import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory; -import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; -import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats; -import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary; -import org.apache.flink.runtime.checkpoint.FailedCheckpointStats; -import org.apache.flink.runtime.checkpoint.MinMaxAvgStats; -import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats; -import org.apache.flink.runtime.concurrent.FlinkFutureException; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler; -import org.apache.flink.runtime.webmonitor.handlers.JsonFactory; -import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; - -import com.fasterxml.jackson.core.JsonGenerator; - -import javax.annotation.Nullable; - -import java.io.IOException; -import java.io.StringWriter; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -/** - * Handler that returns checkpoint statistics for a job. - */ -public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler { - - private static final String CHECKPOINT_STATS_REST_PATH = "/jobs/:jobid/checkpoints"; - - public CheckpointStatsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { - super(executionGraphHolder, executor); - } - - @Override - public String[] getPaths() { - return new String[]{CHECKPOINT_STATS_REST_PATH}; - } - - @Override - public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) { - return CompletableFuture.supplyAsync( - () -> { - try { - return createCheckpointStatsJson(graph); - } catch (IOException e) { - throw new FlinkFutureException("Could not create checkpoint stats json.", e); - } - }, - executor); - } - - /** - * Archivist for the CheckpointStatsJsonHandler. - */ - public static class CheckpointStatsJsonArchivist implements JsonArchivist { - - @Override - public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { - String json = createCheckpointStatsJson(graph); - String path = CHECKPOINT_STATS_REST_PATH - .replace(":jobid", graph.getJobID().toString()); - return Collections.singletonList(new ArchivedJson(path, json)); - } - } - - private static String createCheckpointStatsJson(AccessExecutionGraph graph) throws IOException { - StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); - - CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot(); - if (snapshot == null) { - return "{}"; - } - - gen.writeStartObject(); - - // Counts - writeCounts(gen, snapshot.getCounts()); - - // Summary - writeSummary(gen, snapshot.getSummaryStats()); - - CheckpointStatsHistory history = snapshot.getHistory(); - - // Latest - writeLatestCheckpoints( - gen, - history.getLatestCompletedCheckpoint(), - history.getLatestSavepoint(), - history.getLatestFailedCheckpoint(), - snapshot.getLatestRestoredCheckpoint()); - - // History - writeHistory(gen, snapshot.getHistory()); - - gen.writeEndObject(); - gen.close(); - - return writer.toString(); - } - - private static void writeCounts(JsonGenerator gen, CheckpointStatsCounts counts) throws IOException { - gen.writeObjectFieldStart("counts"); - gen.writeNumberField("restored", counts.getNumberOfRestoredCheckpoints()); - gen.writeNumberField("total", counts.getTotalNumberOfCheckpoints()); - gen.writeNumberField("in_progress", counts.getNumberOfInProgressCheckpoints()); - gen.writeNumberField("completed", counts.getNumberOfCompletedCheckpoints()); - gen.writeNumberField("failed", counts.getNumberOfFailedCheckpoints()); - gen.writeEndObject(); - } - - private static void writeSummary( - JsonGenerator gen, - CompletedCheckpointStatsSummary summary) throws IOException { - gen.writeObjectFieldStart("summary"); - gen.writeObjectFieldStart("state_size"); - writeMinMaxAvg(gen, summary.getStateSizeStats()); - gen.writeEndObject(); - - gen.writeObjectFieldStart("end_to_end_duration"); - writeMinMaxAvg(gen, summary.getEndToEndDurationStats()); - gen.writeEndObject(); - - gen.writeObjectFieldStart("alignment_buffered"); - writeMinMaxAvg(gen, summary.getAlignmentBufferedStats()); - gen.writeEndObject(); - gen.writeEndObject(); - } - - static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException { - gen.writeNumberField("min", minMaxAvg.getMinimum()); - gen.writeNumberField("max", minMaxAvg.getMaximum()); - gen.writeNumberField("avg", minMaxAvg.getAverage()); - } - - private static void writeLatestCheckpoints( - JsonGenerator gen, - @Nullable CompletedCheckpointStats completed, - @Nullable CompletedCheckpointStats savepoint, - @Nullable FailedCheckpointStats failed, - @Nullable RestoredCheckpointStats restored) throws IOException { - - gen.writeObjectFieldStart("latest"); - // Completed checkpoint - if (completed != null) { - gen.writeObjectFieldStart("completed"); - writeCheckpoint(gen, completed); - - String externalPath = completed.getExternalPath(); - if (externalPath != null) { - gen.writeStringField("external_path", completed.getExternalPath()); - } - - gen.writeEndObject(); - } - - // Completed savepoint - if (savepoint != null) { - gen.writeObjectFieldStart("savepoint"); - writeCheckpoint(gen, savepoint); - - String externalPath = savepoint.getExternalPath(); - if (externalPath != null) { - gen.writeStringField("external_path", savepoint.getExternalPath()); - } - gen.writeEndObject(); - } - - // Failed checkpoint - if (failed != null) { - gen.writeObjectFieldStart("failed"); - writeCheckpoint(gen, failed); - - gen.writeNumberField("failure_timestamp", failed.getFailureTimestamp()); - String failureMsg = failed.getFailureMessage(); - if (failureMsg != null) { - gen.writeStringField("failure_message", failureMsg); - } - gen.writeEndObject(); - } - - // Restored checkpoint - if (restored != null) { - gen.writeObjectFieldStart("restored"); - gen.writeNumberField("id", restored.getCheckpointId()); - gen.writeNumberField("restore_timestamp", restored.getRestoreTimestamp()); - gen.writeBooleanField("is_savepoint", restored.getProperties().isSavepoint()); - - String externalPath = restored.getExternalPath(); - if (externalPath != null) { - gen.writeStringField("external_path", externalPath); - } - gen.writeEndObject(); - } - gen.writeEndObject(); - } - - private static void writeCheckpoint(JsonGenerator gen, AbstractCheckpointStats checkpoint) throws IOException { - gen.writeNumberField("id", checkpoint.getCheckpointId()); - gen.writeNumberField("trigger_timestamp", checkpoint.getTriggerTimestamp()); - gen.writeNumberField("latest_ack_timestamp", checkpoint.getLatestAckTimestamp()); - gen.writeNumberField("state_size", checkpoint.getStateSize()); - gen.writeNumberField("end_to_end_duration", checkpoint.getEndToEndDuration()); - gen.writeNumberField("alignment_buffered", checkpoint.getAlignmentBuffered()); - - } - - private static void writeHistory(JsonGenerator gen, CheckpointStatsHistory history) throws IOException { - gen.writeArrayFieldStart("history"); - for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) { - gen.writeStartObject(); - gen.writeNumberField("id", checkpoint.getCheckpointId()); - gen.writeStringField("status", checkpoint.getStatus().toString()); - gen.writeBooleanField("is_savepoint", checkpoint.getProperties().isSavepoint()); - gen.writeNumberField("trigger_timestamp", checkpoint.getTriggerTimestamp()); - gen.writeNumberField("latest_ack_timestamp", checkpoint.getLatestAckTimestamp()); - gen.writeNumberField("state_size", checkpoint.getStateSize()); - gen.writeNumberField("end_to_end_duration", checkpoint.getEndToEndDuration()); - gen.writeNumberField("alignment_buffered", checkpoint.getAlignmentBuffered()); - gen.writeNumberField("num_subtasks", checkpoint.getNumberOfSubtasks()); - gen.writeNumberField("num_acknowledged_subtasks", checkpoint.getNumberOfAcknowledgedSubtasks()); - - if (checkpoint.getStatus().isCompleted()) { - // --- Completed --- - CompletedCheckpointStats completed = (CompletedCheckpointStats) checkpoint; - - String externalPath = completed.getExternalPath(); - if (externalPath != null) { - gen.writeStringField("external_path", externalPath); - } - - gen.writeBooleanField("discarded", completed.isDiscarded()); - } - else if (checkpoint.getStatus().isFailed()) { - // --- Failed --- - FailedCheckpointStats failed = (FailedCheckpointStats) checkpoint; - - gen.writeNumberField("failure_timestamp", failed.getFailureTimestamp()); - - String failureMsg = failed.getFailureMessage(); - if (failureMsg != null) { - gen.writeStringField("failure_message", failureMsg); - } - } - - gen.writeEndObject(); - } - gen.writeEndArray(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java index 6da8115..01228d5 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java @@ -27,9 +27,9 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.history.FsJobArchivist; import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler; import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.runtime.webmonitor.WebMonitorUtils; -import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler; import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap; import org.apache.flink.util.FileUtils; import org.apache.flink.util.FlinkException; http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java index 0fc4314..bae8e21 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java @@ -24,8 +24,8 @@ import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.history.FsJobArchivist; +import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler; import org.apache.flink.runtime.util.ExecutorThreadFactory; -import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; import org.apache.flink.util.FileUtils; import com.fasterxml.jackson.core.JsonFactory; http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java index c5943dc..12a27a7 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java @@ -26,7 +26,7 @@ package org.apache.flink.runtime.webmonitor.history; * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java *****************************************************************************/ -import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler; +import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener; http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java deleted file mode 100644 index cf286ce..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.metrics; - -import org.apache.flink.runtime.concurrent.FlinkFutureException; -import org.apache.flink.runtime.jobmaster.JobManagerGateway; -import org.apache.flink.runtime.webmonitor.handlers.AbstractJsonRequestHandler; -import org.apache.flink.runtime.webmonitor.handlers.JsonFactory; -import org.apache.flink.util.Preconditions; - -import com.fasterxml.jackson.core.JsonGenerator; - -import java.io.IOException; -import java.io.StringWriter; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -/** - * Abstract request handler that returns a list of all available metrics or the values for a set of metrics. - * - * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned. - * {@code [ { "id" : "X" } ] } - * - * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value. - * {@code /get?X,Y} - * The handler will then return a list containing the values of the requested metrics. - * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } - */ -public abstract class AbstractMetricsHandler extends AbstractJsonRequestHandler { - private final MetricFetcher fetcher; - - public AbstractMetricsHandler(Executor executor, MetricFetcher fetcher) { - super(executor); - this.fetcher = Preconditions.checkNotNull(fetcher); - } - - @Override - public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) { - return CompletableFuture.supplyAsync( - () -> { - fetcher.update(); - String requestedMetricsList = queryParams.get("get"); - try { - return requestedMetricsList != null - ? getMetricsValues(pathParams, requestedMetricsList) - : getAvailableMetricsList(pathParams); - } catch (IOException e) { - throw new FlinkFutureException("Could not retrieve metrics.", e); - } - }, - executor); - - } - - /** - * Returns a Map containing the metrics belonging to the entity pointed to by the path parameters. - * - * @param pathParams REST path parameters - * @param metrics MetricStore containing all metrics - * @return Map containing metrics, or null if no metric exists - */ - protected abstract Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics); - - private String getMetricsValues(Map<String, String> pathParams, String requestedMetricsList) throws IOException { - if (requestedMetricsList.isEmpty()) { - /* - * The WebInterface doesn't check whether the list of available metrics was empty. This can lead to a - * request for which the "get" parameter is an empty string. - */ - return ""; - } - MetricStore metricStore = fetcher.getMetricStore(); - synchronized (metricStore) { - Map<String, String> metrics = getMapFor(pathParams, metricStore); - if (metrics == null) { - return ""; - } - String[] requestedMetrics = requestedMetricsList.split(","); - - StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); - - gen.writeStartArray(); - for (String requestedMetric : requestedMetrics) { - Object metricValue = metrics.get(requestedMetric); - if (metricValue != null) { - gen.writeStartObject(); - gen.writeStringField("id", requestedMetric); - gen.writeStringField("value", metricValue.toString()); - gen.writeEndObject(); - } - } - gen.writeEndArray(); - - gen.close(); - return writer.toString(); - } - } - - private String getAvailableMetricsList(Map<String, String> pathParams) throws IOException { - MetricStore metricStore = fetcher.getMetricStore(); - synchronized (metricStore) { - Map<String, String> metrics = getMapFor(pathParams, metricStore); - if (metrics == null) { - return ""; - } - StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); - - gen.writeStartArray(); - for (String m : metrics.keySet()) { - gen.writeStartObject(); - gen.writeStringField("id", m); - gen.writeEndObject(); - } - gen.writeEndArray(); - - gen.close(); - return writer.toString(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java deleted file mode 100644 index 2bd6683..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.metrics; - -import java.util.Map; -import java.util.concurrent.Executor; - -/** - * Request handler that returns for the job manager a list of all available metrics or the values for a set of metrics. - * - * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned. - * {@code {"available": [ { "name" : "X", "id" : "X" } ] } } - * - * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value. - * {@code /get?X,Y} - * The handler will then return a list containing the values of the requested metrics. - * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } - */ -public class JobManagerMetricsHandler extends AbstractMetricsHandler { - - private static final String JOBMANAGER_METRICS_REST_PATH = "/jobmanager/metrics"; - - public JobManagerMetricsHandler(Executor executor, MetricFetcher fetcher) { - super(executor, fetcher); - } - - @Override - public String[] getPaths() { - return new String[]{JOBMANAGER_METRICS_REST_PATH}; - } - - @Override - protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) { - MetricStore.JobManagerMetricStore jobManager = metrics.getJobManagerMetricStore(); - if (jobManager == null) { - return null; - } else { - return jobManager.metrics; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java deleted file mode 100644 index e5e2500..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.metrics; - -import java.util.Map; -import java.util.concurrent.Executor; - -/** - * Request handler that returns for a given job a list of all available metrics or the values for a set of metrics. - * - * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned. - * {@code {"available": [ { "name" : "X", "id" : "X" } ] } } - * - * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value. - * {@code /get?X,Y} - * The handler will then return a list containing the values of the requested metrics. - * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } - */ -public class JobMetricsHandler extends AbstractMetricsHandler { - public static final String PARAMETER_JOB_ID = "jobid"; - private static final String JOB_METRICS_REST_PATH = "/jobs/:jobid/metrics"; - - public JobMetricsHandler(Executor executor, MetricFetcher fetcher) { - super(executor, fetcher); - } - - @Override - public String[] getPaths() { - return new String[]{JOB_METRICS_REST_PATH}; - } - - @Override - protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) { - MetricStore.JobMetricStore job = metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID)); - return job != null - ? job.metrics - : null; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java deleted file mode 100644 index 1d2cd84..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.metrics; - -import java.util.Map; -import java.util.concurrent.Executor; - -/** - * Request handler that returns for a given task a list of all available metrics or the values for a set of metrics. - * - * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned. - * {@code {"available": [ { "name" : "X", "id" : "X" } ] } } - * - * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value. - * {@code /get?X,Y} - * The handler will then return a list containing the values of the requested metrics. - * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } - */ -public class JobVertexMetricsHandler extends AbstractMetricsHandler { - public static final String PARAMETER_VERTEX_ID = "vertexid"; - private static final String JOB_VERTEX_METRICS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/metrics"; - - public JobVertexMetricsHandler(Executor executor, MetricFetcher fetcher) { - super(executor, fetcher); - } - - @Override - public String[] getPaths() { - return new String[]{JOB_VERTEX_METRICS_REST_PATH}; - } - - @Override - protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) { - MetricStore.TaskMetricStore task = metrics.getTaskMetricStore( - pathParams.get(JobMetricsHandler.PARAMETER_JOB_ID), - pathParams.get(PARAMETER_VERTEX_ID)); - return task != null - ? task.metrics - : null; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java deleted file mode 100644 index a5f4ca5..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.metrics; - -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.instance.Instance; -import org.apache.flink.runtime.jobmaster.JobManagerGateway; -import org.apache.flink.runtime.messages.webmonitor.JobDetails; -import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; -import org.apache.flink.runtime.metrics.dump.MetricDump; -import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization; -import org.apache.flink.runtime.metrics.dump.MetricQueryService; -import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; -import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway; -import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; -import org.apache.flink.util.Preconditions; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.stream.Collectors; - -import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpDeserializer; - -/** - * The MetricFetcher can be used to fetch metrics from the JobManager and all registered TaskManagers. - * - * <p>Metrics will only be fetched when {@link MetricFetcher#update()} is called, provided that a sufficient time since - * the last call has passed. - */ -public class MetricFetcher { - private static final Logger LOG = LoggerFactory.getLogger(MetricFetcher.class); - - private final GatewayRetriever<JobManagerGateway> retriever; - private final MetricQueryServiceRetriever queryServiceRetriever; - private final Executor executor; - private final Time timeout; - - private final MetricStore metrics = new MetricStore(); - private final MetricDumpDeserializer deserializer = new MetricDumpDeserializer(); - - private long lastUpdateTime; - - public MetricFetcher( - GatewayRetriever<JobManagerGateway> retriever, - MetricQueryServiceRetriever queryServiceRetriever, - Executor executor, - Time timeout) { - this.retriever = Preconditions.checkNotNull(retriever); - this.queryServiceRetriever = Preconditions.checkNotNull(queryServiceRetriever); - this.executor = Preconditions.checkNotNull(executor); - this.timeout = Preconditions.checkNotNull(timeout); - } - - /** - * Returns the MetricStore containing all stored metrics. - * - * @return MetricStore containing all stored metrics; - */ - public MetricStore getMetricStore() { - return metrics; - } - - /** - * This method can be used to signal this MetricFetcher that the metrics are still in use and should be updated. - */ - public void update() { - synchronized (this) { - long currentTime = System.currentTimeMillis(); - if (currentTime - lastUpdateTime > 10000) { // 10 seconds have passed since the last update - lastUpdateTime = currentTime; - fetchMetrics(); - } - } - } - - private void fetchMetrics() { - try { - Optional<JobManagerGateway> optJobManagerGateway = retriever.getNow(); - if (optJobManagerGateway.isPresent()) { - final JobManagerGateway jobManagerGateway = optJobManagerGateway.get(); - - /** - * Remove all metrics that belong to a job that is not running and no longer archived. - */ - CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(true, true, timeout); - - jobDetailsFuture.whenCompleteAsync( - (MultipleJobsDetails jobDetails, Throwable throwable) -> { - if (throwable != null) { - LOG.debug("Fetching of JobDetails failed.", throwable); - } else { - ArrayList<String> toRetain = new ArrayList<>(); - for (JobDetails job : jobDetails.getRunningJobs()) { - toRetain.add(job.getJobId().toString()); - } - for (JobDetails job : jobDetails.getFinishedJobs()) { - toRetain.add(job.getJobId().toString()); - } - synchronized (metrics) { - metrics.jobs.keySet().retainAll(toRetain); - } - } - }, - executor); - - String jobManagerPath = jobManagerGateway.getAddress(); - String jmQueryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME; - - retrieveAndQueryMetrics(jmQueryServicePath); - - /** - * We first request the list of all registered task managers from the job manager, and then - * request the respective metric dump from each task manager. - * - * <p>All stored metrics that do not belong to a registered task manager will be removed. - */ - CompletableFuture<Collection<Instance>> taskManagersFuture = jobManagerGateway.requestTaskManagerInstances(timeout); - - taskManagersFuture.whenCompleteAsync( - (Collection<Instance> taskManagers, Throwable throwable) -> { - if (throwable != null) { - LOG.debug("Fetching list of registered TaskManagers failed.", throwable); - } else { - List<String> activeTaskManagers = taskManagers.stream().map( - taskManagerInstance -> { - final String taskManagerAddress = taskManagerInstance.getTaskManagerGateway().getAddress(); - final String tmQueryServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + taskManagerInstance.getTaskManagerID().getResourceIdString(); - - retrieveAndQueryMetrics(tmQueryServicePath); - - return taskManagerInstance.getId().toString(); - }).collect(Collectors.toList()); - - synchronized (metrics) { - metrics.taskManagers.keySet().retainAll(activeTaskManagers); - } - } - }, - executor); - } - } catch (Exception e) { - LOG.warn("Exception while fetching metrics.", e); - } - } - - /** - * Retrieves and queries the specified QueryServiceGateway. - * - * @param queryServicePath specifying the QueryServiceGateway - */ - private void retrieveAndQueryMetrics(String queryServicePath) { - final CompletableFuture<MetricQueryServiceGateway> queryServiceGatewayFuture = queryServiceRetriever.retrieveService(queryServicePath); - - queryServiceGatewayFuture.whenCompleteAsync( - (MetricQueryServiceGateway queryServiceGateway, Throwable t) -> { - if (t != null) { - LOG.debug("Could not retrieve QueryServiceGateway.", t); - } else { - queryMetrics(queryServiceGateway); - } - }, - executor); - } - - /** - * Query the metrics from the given QueryServiceGateway. - * - * @param queryServiceGateway to query for metrics - */ - private void queryMetrics(final MetricQueryServiceGateway queryServiceGateway) { - queryServiceGateway - .queryMetrics(timeout) - .whenCompleteAsync( - (MetricDumpSerialization.MetricSerializationResult result, Throwable t) -> { - if (t != null) { - LOG.debug("Fetching metrics failed.", t); - } else { - List<MetricDump> dumpedMetrics = deserializer.deserialize(result); - synchronized (metrics) { - for (MetricDump metric : dumpedMetrics) { - metrics.add(metric); - } - } - } - }, - executor); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java deleted file mode 100644 index e36dca8..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java +++ /dev/null @@ -1,305 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.metrics; - -import org.apache.flink.runtime.metrics.dump.MetricDump; -import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_COUNTER; -import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_GAUGE; -import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_HISTOGRAM; -import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_METER; -import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JM; -import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JOB; -import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR; -import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK; -import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM; - -/** - * Nested data-structure to store metrics. - * - * <p>This structure is not thread-safe. - */ -public class MetricStore { - private static final Logger LOG = LoggerFactory.getLogger(MetricStore.class); - - final JobManagerMetricStore jobManager = new JobManagerMetricStore(); - final Map<String, TaskManagerMetricStore> taskManagers = new HashMap<>(); - final Map<String, JobMetricStore> jobs = new HashMap<>(); - - // ----------------------------------------------------------------------------------------------------------------- - // Adding metrics - // ----------------------------------------------------------------------------------------------------------------- - public void add(MetricDump metric) { - try { - QueryScopeInfo info = metric.scopeInfo; - TaskManagerMetricStore tm; - JobMetricStore job; - TaskMetricStore task; - SubtaskMetricStore subtask; - - String name = info.scope.isEmpty() - ? metric.name - : info.scope + "." + metric.name; - - if (name.isEmpty()) { // malformed transmission - return; - } - - switch (info.getCategory()) { - case INFO_CATEGORY_JM: - addMetric(jobManager.metrics, name, metric); - break; - case INFO_CATEGORY_TM: - String tmID = ((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID; - tm = taskManagers.get(tmID); - if (tm == null) { - tm = new TaskManagerMetricStore(); - taskManagers.put(tmID, tm); - } - if (name.contains("GarbageCollector")) { - String gcName = name.substring("Status.JVM.GarbageCollector.".length(), name.lastIndexOf('.')); - tm.addGarbageCollectorName(gcName); - } - addMetric(tm.metrics, name, metric); - break; - case INFO_CATEGORY_JOB: - QueryScopeInfo.JobQueryScopeInfo jobInfo = (QueryScopeInfo.JobQueryScopeInfo) info; - job = jobs.get(jobInfo.jobID); - if (job == null) { - job = new JobMetricStore(); - jobs.put(jobInfo.jobID, job); - } - addMetric(job.metrics, name, metric); - break; - case INFO_CATEGORY_TASK: - QueryScopeInfo.TaskQueryScopeInfo taskInfo = (QueryScopeInfo.TaskQueryScopeInfo) info; - job = jobs.get(taskInfo.jobID); - if (job == null) { - job = new JobMetricStore(); - jobs.put(taskInfo.jobID, job); - } - task = job.tasks.get(taskInfo.vertexID); - if (task == null) { - task = new TaskMetricStore(); - job.tasks.put(taskInfo.vertexID, task); - } - subtask = task.subtasks.get(taskInfo.subtaskIndex); - if (subtask == null) { - subtask = new SubtaskMetricStore(); - task.subtasks.put(taskInfo.subtaskIndex, subtask); - } - /** - * The duplication is intended. Metrics scoped by subtask are useful for several job/task handlers, - * while the WebInterface task metric queries currently do not account for subtasks, so we don't - * divide by subtask and instead use the concatenation of subtask index and metric name as the name - * for those. - */ - addMetric(subtask.metrics, name, metric); - addMetric(task.metrics, taskInfo.subtaskIndex + "." + name, metric); - break; - case INFO_CATEGORY_OPERATOR: - QueryScopeInfo.OperatorQueryScopeInfo operatorInfo = (QueryScopeInfo.OperatorQueryScopeInfo) info; - job = jobs.get(operatorInfo.jobID); - if (job == null) { - job = new JobMetricStore(); - jobs.put(operatorInfo.jobID, job); - } - task = job.tasks.get(operatorInfo.vertexID); - if (task == null) { - task = new TaskMetricStore(); - job.tasks.put(operatorInfo.vertexID, task); - } - /** - * As the WebInterface does not account for operators (because it can't) we don't - * divide by operator and instead use the concatenation of subtask index, operator name and metric name - * as the name. - */ - addMetric(task.metrics, operatorInfo.subtaskIndex + "." + operatorInfo.operatorName + "." + name, metric); - break; - default: - LOG.debug("Invalid metric dump category: " + info.getCategory()); - } - } catch (Exception e) { - LOG.debug("Malformed metric dump.", e); - } - } - - private void addMetric(Map<String, String> target, String name, MetricDump metric) { - switch (metric.getCategory()) { - case METRIC_CATEGORY_COUNTER: - MetricDump.CounterDump counter = (MetricDump.CounterDump) metric; - target.put(name, String.valueOf(counter.count)); - break; - case METRIC_CATEGORY_GAUGE: - MetricDump.GaugeDump gauge = (MetricDump.GaugeDump) metric; - target.put(name, gauge.value); - break; - case METRIC_CATEGORY_HISTOGRAM: - MetricDump.HistogramDump histogram = (MetricDump.HistogramDump) metric; - target.put(name + "_min", String.valueOf(histogram.min)); - target.put(name + "_max", String.valueOf(histogram.max)); - target.put(name + "_mean", String.valueOf(histogram.mean)); - target.put(name + "_median", String.valueOf(histogram.median)); - target.put(name + "_stddev", String.valueOf(histogram.stddev)); - target.put(name + "_p75", String.valueOf(histogram.p75)); - target.put(name + "_p90", String.valueOf(histogram.p90)); - target.put(name + "_p95", String.valueOf(histogram.p95)); - target.put(name + "_p98", String.valueOf(histogram.p98)); - target.put(name + "_p99", String.valueOf(histogram.p99)); - target.put(name + "_p999", String.valueOf(histogram.p999)); - break; - case METRIC_CATEGORY_METER: - MetricDump.MeterDump meter = (MetricDump.MeterDump) metric; - target.put(name, String.valueOf(meter.rate)); - break; - } - } - - // ----------------------------------------------------------------------------------------------------------------- - // Accessors for sub MetricStores - // ----------------------------------------------------------------------------------------------------------------- - - /** - * Returns the {@link JobManagerMetricStore}. - * - * @return JobManagerMetricStore - */ - public JobManagerMetricStore getJobManagerMetricStore() { - return jobManager; - } - - /** - * Returns the {@link TaskManagerMetricStore} for the given taskmanager ID. - * - * @param tmID taskmanager ID - * @return TaskManagerMetricStore for the given ID, or null if no store for the given argument exists - */ - public TaskManagerMetricStore getTaskManagerMetricStore(String tmID) { - return taskManagers.get(tmID); - } - - /** - * Returns the {@link JobMetricStore} for the given job ID. - * - * @param jobID job ID - * @return JobMetricStore for the given ID, or null if no store for the given argument exists - */ - public JobMetricStore getJobMetricStore(String jobID) { - return jobs.get(jobID); - } - - /** - * Returns the {@link TaskMetricStore} for the given job/task ID. - * - * @param jobID job ID - * @param taskID task ID - * @return TaskMetricStore for given IDs, or null if no store for the given arguments exists - */ - public TaskMetricStore getTaskMetricStore(String jobID, String taskID) { - JobMetricStore job = getJobMetricStore(jobID); - if (job == null) { - return null; - } - return job.getTaskMetricStore(taskID); - } - - /** - * Returns the {@link SubtaskMetricStore} for the given job/task ID and subtask index. - * - * @param jobID job ID - * @param taskID task ID - * @param subtaskIndex subtask index - * @return SubtaskMetricStore for the given IDs and index, or null if no store for the given arguments exists - */ - public SubtaskMetricStore getSubtaskMetricStore(String jobID, String taskID, int subtaskIndex) { - TaskMetricStore task = getTaskMetricStore(jobID, taskID); - if (task == null) { - return null; - } - return task.getSubtaskMetricStore(subtaskIndex); - } - - // ----------------------------------------------------------------------------------------------------------------- - // sub MetricStore classes - // ----------------------------------------------------------------------------------------------------------------- - private abstract static class ComponentMetricStore { - public final Map<String, String> metrics = new HashMap<>(); - - public String getMetric(String name, String defaultValue) { - String value = this.metrics.get(name); - return value != null - ? value - : defaultValue; - } - } - - /** - * Sub-structure containing metrics of the JobManager. - */ - public static class JobManagerMetricStore extends ComponentMetricStore { - } - - /** - * Sub-structure containing metrics of a single TaskManager. - */ - public static class TaskManagerMetricStore extends ComponentMetricStore { - public final Set<String> garbageCollectorNames = new HashSet<>(); - - public void addGarbageCollectorName(String name) { - garbageCollectorNames.add(name); - } - } - - /** - * Sub-structure containing metrics of a single Job. - */ - public static class JobMetricStore extends ComponentMetricStore { - private final Map<String, TaskMetricStore> tasks = new HashMap<>(); - - public TaskMetricStore getTaskMetricStore(String taskID) { - return tasks.get(taskID); - } - } - - /** - * Sub-structure containing metrics of a single Task. - */ - public static class TaskMetricStore extends ComponentMetricStore { - private final Map<Integer, SubtaskMetricStore> subtasks = new HashMap<>(); - - public SubtaskMetricStore getSubtaskMetricStore(int subtaskIndex) { - return subtasks.get(subtaskIndex); - } - } - - /** - * Sub-structure containing metrics of a single Subtask. - */ - public static class SubtaskMetricStore extends ComponentMetricStore { - } -}
