http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java new file mode 100644 index 0000000..95d417a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.FlinkFutureException; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.core.JsonGenerator; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +import static java.util.Objects.requireNonNull; + +/** + * A request handler that provides an overview over all taskmanagers or details for a single one. + */ +public class TaskManagersHandler extends AbstractJsonRequestHandler { + + private static final String TASKMANAGERS_REST_PATH = "/taskmanagers"; + private static final String TASKMANAGER_DETAILS_REST_PATH = "/taskmanagers/:taskmanagerid"; + + public static final String TASK_MANAGER_ID_KEY = "taskmanagerid"; + + private final Time timeout; + + private final MetricFetcher fetcher; + + public TaskManagersHandler(Executor executor, Time timeout, MetricFetcher fetcher) { + super(executor); + this.timeout = requireNonNull(timeout); + this.fetcher = fetcher; + } + + @Override + public String[] getPaths() { + return new String[]{TASKMANAGERS_REST_PATH, TASKMANAGER_DETAILS_REST_PATH}; + } + + @Override + public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) { + if (jobManagerGateway != null) { + // whether one task manager's metrics are requested, or all task manager, we + // return them in an array. This avoids unnecessary code complexity. + // If only one task manager is requested, we only fetch one task manager metrics. + if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) { + InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(pathParams.get(TASK_MANAGER_ID_KEY))); + CompletableFuture<Optional<Instance>> tmInstanceFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, timeout); + + return tmInstanceFuture.thenApplyAsync( + (Optional<Instance> optTaskManager) -> { + try { + return writeTaskManagersJson( + optTaskManager.map(Collections::singleton).orElse(Collections.emptySet()), + pathParams); + } catch (IOException e) { + throw new FlinkFutureException("Could not write TaskManagers JSON.", e); + } + }, + executor); + } else { + CompletableFuture<Collection<Instance>> tmInstancesFuture = jobManagerGateway.requestTaskManagerInstances(timeout); + + return tmInstancesFuture.thenApplyAsync( + (Collection<Instance> taskManagers) -> { + try { + return writeTaskManagersJson(taskManagers, pathParams); + } catch (IOException e) { + throw new FlinkFutureException("Could not write TaskManagers JSON.", e); + } + }, + executor); + } + } + else { + return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager.")); + } + } + + private String writeTaskManagersJson(Collection<Instance> instances, Map<String, String> pathParams) throws IOException { + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + + gen.writeStartObject(); + gen.writeArrayFieldStart("taskmanagers"); + + for (Instance instance : instances) { + gen.writeStartObject(); + gen.writeStringField("id", instance.getId().toString()); + gen.writeStringField("path", instance.getTaskManagerGateway().getAddress()); + gen.writeNumberField("dataPort", instance.getTaskManagerLocation().dataPort()); + gen.writeNumberField("timeSinceLastHeartbeat", instance.getLastHeartBeat()); + gen.writeNumberField("slotsNumber", instance.getTotalNumberOfSlots()); + gen.writeNumberField("freeSlots", instance.getNumberOfAvailableSlots()); + gen.writeNumberField("cpuCores", instance.getResources().getNumberOfCPUCores()); + gen.writeNumberField("physicalMemory", instance.getResources().getSizeOfPhysicalMemory()); + gen.writeNumberField("freeMemory", instance.getResources().getSizeOfJvmHeap()); + gen.writeNumberField("managedMemory", instance.getResources().getSizeOfManagedMemory()); + + // only send metrics when only one task manager requests them. + if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) { + fetcher.update(); + MetricStore.TaskManagerMetricStore metrics = fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString()); + if (metrics != null) { + gen.writeObjectFieldStart("metrics"); + long heapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0")); + long heapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0")); + long heapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0")); + + gen.writeNumberField("heapCommitted", heapCommitted); + gen.writeNumberField("heapUsed", heapUsed); + gen.writeNumberField("heapMax", heapTotal); + + long nonHeapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0")); + long nonHeapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0")); + long nonHeapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0")); + + gen.writeNumberField("nonHeapCommitted", nonHeapCommitted); + gen.writeNumberField("nonHeapUsed", nonHeapUsed); + gen.writeNumberField("nonHeapMax", nonHeapTotal); + + gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted); + gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed); + gen.writeNumberField("totalMax", heapTotal + nonHeapTotal); + + long directCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0")); + long directUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0")); + long directMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0")); + + gen.writeNumberField("directCount", directCount); + gen.writeNumberField("directUsed", directUsed); + gen.writeNumberField("directMax", directMax); + + long mappedCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0")); + long mappedUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0")); + long mappedMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0")); + + gen.writeNumberField("mappedCount", mappedCount); + gen.writeNumberField("mappedUsed", mappedUsed); + gen.writeNumberField("mappedMax", mappedMax); + + long memorySegmentsAvailable = Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0")); + long memorySegmentsTotal = Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0")); + + gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable); + gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal); + + gen.writeArrayFieldStart("garbageCollectors"); + + for (String gcName : metrics.garbageCollectorNames) { + String count = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null); + String time = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null); + if (count != null && time != null) { + gen.writeStartObject(); + gen.writeStringField("name", gcName); + gen.writeNumberField("count", Long.valueOf(count)); + gen.writeNumberField("time", Long.valueOf(time)); + gen.writeEndObject(); + } + } + + gen.writeEndArray(); + gen.writeEndObject(); + } + } + + gen.writeEndObject(); + } + + gen.writeEndArray(); + gen.writeEndObject(); + + gen.close(); + return writer.toString(); + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java new file mode 100644 index 0000000..96bf7ec --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.backpressure; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobStatus; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; +import org.apache.flink.shaded.guava18.com.google.common.collect.Maps; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Back pressure statistics tracker. + * + * <p>Back pressure is determined by sampling running tasks. If a task is + * slowed down by back pressure it will be stuck in memory requests to a + * {@link org.apache.flink.runtime.io.network.buffer.LocalBufferPool}. + * + * <p>The back pressured stack traces look like this: + * + * <pre> + * java.lang.Object.wait(Native Method) + * o.a.f.[...].LocalBufferPool.requestBuffer(LocalBufferPool.java:163) + * o.a.f.[...].LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) <--- BLOCKING + * request + * [...] + * </pre> + */ +public class BackPressureStatsTracker { + + private static final Logger LOG = LoggerFactory.getLogger(BackPressureStatsTracker.class); + + /** Maximum stack trace depth for samples. */ + static final int MAX_STACK_TRACE_DEPTH = 3; + + /** Expected class name for back pressure indicating stack trace element. */ + static final String EXPECTED_CLASS_NAME = "org.apache.flink.runtime.io.network.buffer.LocalBufferPool"; + + /** Expected method name for back pressure indicating stack trace element. */ + static final String EXPECTED_METHOD_NAME = "requestBufferBlocking"; + + /** Lock guarding trigger operations. */ + private final Object lock = new Object(); + + /* Stack trace sample coordinator. */ + private final StackTraceSampleCoordinator coordinator; + + /** + * Completed stats. Important: Job vertex IDs need to be scoped by job ID, + * because they are potentially constant across runs messing up the cached + * data. + */ + private final Cache<ExecutionJobVertex, OperatorBackPressureStats> operatorStatsCache; + + /** Pending in progress stats. Important: Job vertex IDs need to be scoped + * by job ID, because they are potentially constant across runs messing up + * the cached data.*/ + private final Set<ExecutionJobVertex> pendingStats = new HashSet<>(); + + /** Cleanup interval for completed stats cache. */ + private final int cleanUpInterval; + + private final int numSamples; + + private final Time delayBetweenSamples; + + /** Flag indicating whether the stats tracker has been shut down. */ + private boolean shutDown; + + /** + * Creates a back pressure statistics tracker. + * + * @param cleanUpInterval Clean up interval for completed stats. + * @param numSamples Number of stack trace samples when determining back pressure. + * @param delayBetweenSamples Delay between samples when determining back pressure. + */ + public BackPressureStatsTracker( + StackTraceSampleCoordinator coordinator, + int cleanUpInterval, + int numSamples, + Time delayBetweenSamples) { + + this.coordinator = checkNotNull(coordinator, "Stack trace sample coordinator"); + + checkArgument(cleanUpInterval >= 0, "Clean up interval"); + this.cleanUpInterval = cleanUpInterval; + + checkArgument(numSamples >= 1, "Number of samples"); + this.numSamples = numSamples; + + this.delayBetweenSamples = checkNotNull(delayBetweenSamples, "Delay between samples"); + + this.operatorStatsCache = CacheBuilder.newBuilder() + .concurrencyLevel(1) + .expireAfterAccess(cleanUpInterval, TimeUnit.MILLISECONDS) + .build(); + } + + /** Cleanup interval for completed stats cache. */ + public long getCleanUpInterval() { + return cleanUpInterval; + } + + /** + * Returns back pressure statistics for a operator. + * + * @param vertex Operator to get the stats for. + * + * @return Back pressure statistics for an operator + */ + public Optional<OperatorBackPressureStats> getOperatorBackPressureStats(ExecutionJobVertex vertex) { + return Optional.ofNullable(operatorStatsCache.getIfPresent(vertex)); + } + + /** + * Triggers a stack trace sample for a operator to gather the back pressure + * statistics. If there is a sample in progress for the operator, the call + * is ignored. + * + * @param vertex Operator to get the stats for. + * @return Flag indicating whether a sample with triggered. + */ + @SuppressWarnings("unchecked") + public boolean triggerStackTraceSample(ExecutionJobVertex vertex) { + synchronized (lock) { + if (shutDown) { + return false; + } + + if (!pendingStats.contains(vertex) && + !vertex.getGraph().getState().isGloballyTerminalState()) { + + Executor executor = vertex.getGraph().getFutureExecutor(); + + // Only trigger if still active job + if (executor != null) { + pendingStats.add(vertex); + + if (LOG.isDebugEnabled()) { + LOG.debug("Triggering stack trace sample for tasks: " + Arrays.toString(vertex.getTaskVertices())); + } + + CompletableFuture<StackTraceSample> sample = coordinator.triggerStackTraceSample( + vertex.getTaskVertices(), + numSamples, + delayBetweenSamples, + MAX_STACK_TRACE_DEPTH); + + sample.handleAsync(new StackTraceSampleCompletionCallback(vertex), executor); + + return true; + } + } + + return false; + } + } + + /** + * Cleans up the operator stats cache if it contains timed out entries. + * + * <p>The Guava cache only evicts as maintenance during normal operations. + * If this handler is inactive, it will never be cleaned. + */ + public void cleanUpOperatorStatsCache() { + operatorStatsCache.cleanUp(); + } + + /** + * Shuts down the stats tracker. + * + * <p>Invalidates the cache and clears all pending stats. + */ + public void shutDown() { + synchronized (lock) { + if (!shutDown) { + operatorStatsCache.invalidateAll(); + pendingStats.clear(); + + shutDown = true; + } + } + } + + /** + * Invalidates the cache (irrespective of clean up interval). + */ + void invalidateOperatorStatsCache() { + operatorStatsCache.invalidateAll(); + } + + /** + * Callback on completed stack trace sample. + */ + class StackTraceSampleCompletionCallback implements BiFunction<StackTraceSample, Throwable, Void> { + + private final ExecutionJobVertex vertex; + + public StackTraceSampleCompletionCallback(ExecutionJobVertex vertex) { + this.vertex = vertex; + } + + @Override + public Void apply(StackTraceSample stackTraceSample, Throwable throwable) { + synchronized (lock) { + try { + if (shutDown) { + return null; + } + + // Job finished, ignore. + JobStatus jobState = vertex.getGraph().getState(); + if (jobState.isGloballyTerminalState()) { + LOG.debug("Ignoring sample, because job is in state " + jobState + "."); + } else if (stackTraceSample != null) { + OperatorBackPressureStats stats = createStatsFromSample(stackTraceSample); + operatorStatsCache.put(vertex, stats); + } else { + LOG.debug("Failed to gather stack trace sample.", throwable); + } + } catch (Throwable t) { + LOG.error("Error during stats completion.", t); + } finally { + pendingStats.remove(vertex); + } + + return null; + } + } + + /** + * Creates the back pressure stats from a stack trace sample. + * + * @param sample Stack trace sample to base stats on. + * + * @return Back pressure stats + */ + private OperatorBackPressureStats createStatsFromSample(StackTraceSample sample) { + Map<ExecutionAttemptID, List<StackTraceElement[]>> traces = sample.getStackTraces(); + + // Map task ID to subtask index, because the web interface expects + // it like that. + Map<ExecutionAttemptID, Integer> subtaskIndexMap = Maps + .newHashMapWithExpectedSize(traces.size()); + + Set<ExecutionAttemptID> sampledTasks = sample.getStackTraces().keySet(); + + for (ExecutionVertex task : vertex.getTaskVertices()) { + ExecutionAttemptID taskId = task.getCurrentExecutionAttempt().getAttemptId(); + if (sampledTasks.contains(taskId)) { + subtaskIndexMap.put(taskId, task.getParallelSubtaskIndex()); + } else { + LOG.debug("Outdated sample. A task, which is part of the " + + "sample has been reset."); + } + } + + // Ratio of blocked samples to total samples per sub task. Array + // position corresponds to sub task index. + double[] backPressureRatio = new double[traces.size()]; + + for (Entry<ExecutionAttemptID, List<StackTraceElement[]>> entry : traces.entrySet()) { + int backPressureSamples = 0; + + List<StackTraceElement[]> taskTraces = entry.getValue(); + + for (StackTraceElement[] trace : taskTraces) { + for (int i = trace.length - 1; i >= 0; i--) { + StackTraceElement elem = trace[i]; + + if (elem.getClassName().equals(EXPECTED_CLASS_NAME) && + elem.getMethodName().equals(EXPECTED_METHOD_NAME)) { + + backPressureSamples++; + break; // Continue with next stack trace + } + } + } + + int subtaskIndex = subtaskIndexMap.get(entry.getKey()); + + int size = taskTraces.size(); + double ratio = (size > 0) + ? ((double) backPressureSamples) / size + : 0; + + backPressureRatio[subtaskIndex] = ratio; + } + + return new OperatorBackPressureStats( + sample.getSampleId(), + sample.getEndTime(), + backPressureRatio); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/OperatorBackPressureStats.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/OperatorBackPressureStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/OperatorBackPressureStats.java new file mode 100644 index 0000000..1a78a17 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/OperatorBackPressureStats.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.backpressure; + +import java.util.Arrays; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Back pressure statistics of multiple tasks. + * + * <p>Statistics are gathered by sampling stack traces of running tasks. The + * back pressure ratio denotes the ratio of traces indicating back pressure + * to the total number of sampled traces. + */ +public class OperatorBackPressureStats { + + /** ID of the corresponding sample. */ + private final int sampleId; + + /** End time stamp of the corresponding sample. */ + private final long endTimestamp; + + /** Back pressure ratio per subtask. */ + private final double[] subTaskBackPressureRatio; + + /** Maximum back pressure ratio. */ + private final double maxSubTaskBackPressureRatio; + + public OperatorBackPressureStats( + int sampleId, + long endTimestamp, + double[] subTaskBackPressureRatio) { + + this.sampleId = sampleId; + this.endTimestamp = endTimestamp; + this.subTaskBackPressureRatio = checkNotNull(subTaskBackPressureRatio, "Sub task back pressure ratio"); + checkArgument(subTaskBackPressureRatio.length >= 1, "No Sub task back pressure ratio specified"); + + double max = 0; + for (double ratio : subTaskBackPressureRatio) { + if (ratio > max) { + max = ratio; + } + } + + maxSubTaskBackPressureRatio = max; + } + + /** + * Returns the ID of the sample. + * + * @return ID of the sample + */ + public int getSampleId() { + return sampleId; + } + + /** + * Returns the time stamp, when all stack traces were collected at the + * JobManager. + * + * @return Time stamp, when all stack traces were collected at the + * JobManager + */ + public long getEndTimestamp() { + return endTimestamp; + } + + /** + * Returns the number of sub tasks. + * + * @return Number of sub tasks. + */ + public int getNumberOfSubTasks() { + return subTaskBackPressureRatio.length; + } + + /** + * Returns the ratio of stack traces indicating back pressure to total + * number of sampled stack traces. + * + * @param index Subtask index. + * + * @return Ratio of stack traces indicating back pressure to total number + * of sampled stack traces. + */ + public double getBackPressureRatio(int index) { + return subTaskBackPressureRatio[index]; + } + + /** + * Returns the maximum back pressure ratio of all sub tasks. + * + * @return Maximum back pressure ratio of all sub tasks. + */ + public double getMaxBackPressureRatio() { + return maxSubTaskBackPressureRatio; + } + + @Override + public String toString() { + return "OperatorBackPressureStats{" + + "sampleId=" + sampleId + + ", endTimestamp=" + endTimestamp + + ", subTaskBackPressureRatio=" + Arrays.toString(subTaskBackPressureRatio) + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSample.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSample.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSample.java new file mode 100644 index 0000000..dda4e33 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSample.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.backpressure; + +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A sample of stack traces for one or more tasks. + * + * <p>The sampling is triggered in {@link StackTraceSampleCoordinator}. + */ +public class StackTraceSample { + + /** ID of this sample (unique per job). */ + private final int sampleId; + + /** Time stamp, when the sample was triggered. */ + private final long startTime; + + /** Time stamp, when all stack traces were collected at the JobManager. */ + private final long endTime; + + /** Map of stack traces by execution ID. */ + private final Map<ExecutionAttemptID, List<StackTraceElement[]>> stackTracesByTask; + + /** + * Creates a stack trace sample. + * + * @param sampleId ID of the sample. + * @param startTime Time stamp, when the sample was triggered. + * @param endTime Time stamp, when all stack traces were + * collected at the JobManager. + * @param stackTracesByTask Map of stack traces by execution ID. + */ + public StackTraceSample( + int sampleId, + long startTime, + long endTime, + Map<ExecutionAttemptID, List<StackTraceElement[]>> stackTracesByTask) { + + checkArgument(sampleId >= 0, "Negative sample ID"); + checkArgument(startTime >= 0, "Negative start time"); + checkArgument(endTime >= startTime, "End time before start time"); + + this.sampleId = sampleId; + this.startTime = startTime; + this.endTime = endTime; + this.stackTracesByTask = Collections.unmodifiableMap(stackTracesByTask); + } + + /** + * Returns the ID of the sample. + * + * @return ID of the sample + */ + public int getSampleId() { + return sampleId; + } + + /** + * Returns the time stamp, when the sample was triggered. + * + * @return Time stamp, when the sample was triggered + */ + public long getStartTime() { + return startTime; + } + + /** + * Returns the time stamp, when all stack traces were collected at the + * JobManager. + * + * @return Time stamp, when all stack traces were collected at the + * JobManager + */ + public long getEndTime() { + return endTime; + } + + /** + * Returns the a map of stack traces by execution ID. + * + * @return Map of stack traces by execution ID + */ + public Map<ExecutionAttemptID, List<StackTraceElement[]>> getStackTraces() { + return stackTracesByTask; + } + + @Override + public String toString() { + return "StackTraceSample{" + + "sampleId=" + sampleId + + ", startTime=" + startTime + + ", endTime=" + endTime + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinator.java new file mode 100644 index 0000000..8c2ec6e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinator.java @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.backpressure; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.messages.StackTraceSampleResponse; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Maps; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A coordinator for triggering and collecting stack traces of running tasks. + */ +public class StackTraceSampleCoordinator { + + private static final Logger LOG = LoggerFactory.getLogger(StackTraceSampleCoordinator.class); + + private static final int NUM_GHOST_SAMPLE_IDS = 10; + + private final Object lock = new Object(); + + /** Executor used to run the futures. */ + private final Executor executor; + + /** Time out after the expected sampling duration. */ + private final long sampleTimeout; + + /** In progress samples (guarded by lock). */ + private final Map<Integer, PendingStackTraceSample> pendingSamples = new HashMap<>(); + + /** A list of recent sample IDs to identify late messages vs. invalid ones. */ + private final ArrayDeque<Integer> recentPendingSamples = new ArrayDeque<>(NUM_GHOST_SAMPLE_IDS); + + /** Sample ID counter (guarded by lock). */ + private int sampleIdCounter; + + /** + * Flag indicating whether the coordinator is still running (guarded by + * lock). + */ + private boolean isShutDown; + + /** + * Creates a new coordinator for the job. + * + * @param executor to use to execute the futures + * @param sampleTimeout Time out after the expected sampling duration. + * This is added to the expected duration of a + * sample, which is determined by the number of + * samples and the delay between each sample. + */ + public StackTraceSampleCoordinator(Executor executor, long sampleTimeout) { + checkArgument(sampleTimeout >= 0L); + this.executor = Preconditions.checkNotNull(executor); + this.sampleTimeout = sampleTimeout; + } + + /** + * Triggers a stack trace sample to all tasks. + * + * @param tasksToSample Tasks to sample. + * @param numSamples Number of stack trace samples to collect. + * @param delayBetweenSamples Delay between consecutive samples. + * @param maxStackTraceDepth Maximum depth of the stack trace. 0 indicates + * no maximum and keeps the complete stack trace. + * @return A future of the completed stack trace sample + */ + @SuppressWarnings("unchecked") + public CompletableFuture<StackTraceSample> triggerStackTraceSample( + ExecutionVertex[] tasksToSample, + int numSamples, + Time delayBetweenSamples, + int maxStackTraceDepth) { + + checkNotNull(tasksToSample, "Tasks to sample"); + checkArgument(tasksToSample.length >= 1, "No tasks to sample"); + checkArgument(numSamples >= 1, "No number of samples"); + checkArgument(maxStackTraceDepth >= 0, "Negative maximum stack trace depth"); + + // Execution IDs of running tasks + ExecutionAttemptID[] triggerIds = new ExecutionAttemptID[tasksToSample.length]; + Execution[] executions = new Execution[tasksToSample.length]; + + // Check that all tasks are RUNNING before triggering anything. The + // triggering can still fail. + for (int i = 0; i < triggerIds.length; i++) { + Execution execution = tasksToSample[i].getCurrentExecutionAttempt(); + if (execution != null && execution.getState() == ExecutionState.RUNNING) { + executions[i] = execution; + triggerIds[i] = execution.getAttemptId(); + } else { + return FutureUtils.completedExceptionally(new IllegalStateException("Task " + tasksToSample[i] + .getTaskNameWithSubtaskIndex() + " is not running.")); + } + } + + synchronized (lock) { + if (isShutDown) { + return FutureUtils.completedExceptionally(new IllegalStateException("Shut down")); + } + + final int sampleId = sampleIdCounter++; + + LOG.debug("Triggering stack trace sample {}", sampleId); + + final PendingStackTraceSample pending = new PendingStackTraceSample( + sampleId, triggerIds); + + // Discard the sample if it takes too long. We don't send cancel + // messages to the task managers, but only wait for the responses + // and then ignore them. + long expectedDuration = numSamples * delayBetweenSamples.toMilliseconds(); + Time timeout = Time.milliseconds(expectedDuration + sampleTimeout); + + // Add the pending sample before scheduling the discard task to + // prevent races with removing it again. + pendingSamples.put(sampleId, pending); + + // Trigger all samples + for (Execution execution: executions) { + final CompletableFuture<StackTraceSampleResponse> stackTraceSampleFuture = execution.requestStackTraceSample( + sampleId, + numSamples, + delayBetweenSamples, + maxStackTraceDepth, + timeout); + + stackTraceSampleFuture.handleAsync( + (StackTraceSampleResponse stackTraceSampleResponse, Throwable throwable) -> { + if (stackTraceSampleResponse != null) { + collectStackTraces( + stackTraceSampleResponse.getSampleId(), + stackTraceSampleResponse.getExecutionAttemptID(), + stackTraceSampleResponse.getSamples()); + } else { + cancelStackTraceSample(sampleId, throwable); + } + + return null; + }, + executor); + } + + return pending.getStackTraceSampleFuture(); + } + } + + /** + * Cancels a pending sample. + * + * @param sampleId ID of the sample to cancel. + * @param cause Cause of the cancelling (can be <code>null</code>). + */ + public void cancelStackTraceSample(int sampleId, Throwable cause) { + synchronized (lock) { + if (isShutDown) { + return; + } + + PendingStackTraceSample sample = pendingSamples.remove(sampleId); + if (sample != null) { + if (cause != null) { + LOG.info("Cancelling sample " + sampleId, cause); + } else { + LOG.info("Cancelling sample {}", sampleId); + } + + sample.discard(cause); + rememberRecentSampleId(sampleId); + } + } + } + + /** + * Shuts down the coordinator. + * + * <p>After shut down, no further operations are executed. + */ + public void shutDown() { + synchronized (lock) { + if (!isShutDown) { + LOG.info("Shutting down stack trace sample coordinator."); + + for (PendingStackTraceSample pending : pendingSamples.values()) { + pending.discard(new RuntimeException("Shut down")); + } + + pendingSamples.clear(); + + isShutDown = true; + } + } + } + + /** + * Collects stack traces of a task. + * + * @param sampleId ID of the sample. + * @param executionId ID of the sampled task. + * @param stackTraces Stack traces of the sampled task. + * + * @throws IllegalStateException If unknown sample ID and not recently + * finished or cancelled sample. + */ + public void collectStackTraces( + int sampleId, + ExecutionAttemptID executionId, + List<StackTraceElement[]> stackTraces) { + + synchronized (lock) { + if (isShutDown) { + return; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Collecting stack trace sample {} of task {}", sampleId, executionId); + } + + PendingStackTraceSample pending = pendingSamples.get(sampleId); + + if (pending != null) { + pending.collectStackTraces(executionId, stackTraces); + + // Publish the sample + if (pending.isComplete()) { + pendingSamples.remove(sampleId); + rememberRecentSampleId(sampleId); + + pending.completePromiseAndDiscard(); + } + } else if (recentPendingSamples.contains(sampleId)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Received late stack trace sample {} of task {}", + sampleId, executionId); + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Unknown sample ID " + sampleId); + } + } + } + } + + private void rememberRecentSampleId(int sampleId) { + if (recentPendingSamples.size() >= NUM_GHOST_SAMPLE_IDS) { + recentPendingSamples.removeFirst(); + } + recentPendingSamples.addLast(sampleId); + } + + int getNumberOfPendingSamples() { + synchronized (lock) { + return pendingSamples.size(); + } + } + + // ------------------------------------------------------------------------ + + /** + * A pending stack trace sample, which collects stack traces and owns a + * {@link StackTraceSample} promise. + * + * <p>Access pending sample in lock scope. + */ + private static class PendingStackTraceSample { + + private final int sampleId; + private final long startTime; + private final Set<ExecutionAttemptID> pendingTasks; + private final Map<ExecutionAttemptID, List<StackTraceElement[]>> stackTracesByTask; + private final CompletableFuture<StackTraceSample> stackTraceFuture; + + private boolean isDiscarded; + + PendingStackTraceSample( + int sampleId, + ExecutionAttemptID[] tasksToCollect) { + + this.sampleId = sampleId; + this.startTime = System.currentTimeMillis(); + this.pendingTasks = new HashSet<>(Arrays.asList(tasksToCollect)); + this.stackTracesByTask = Maps.newHashMapWithExpectedSize(tasksToCollect.length); + this.stackTraceFuture = new CompletableFuture<>(); + } + + int getSampleId() { + return sampleId; + } + + long getStartTime() { + return startTime; + } + + boolean isDiscarded() { + return isDiscarded; + } + + boolean isComplete() { + if (isDiscarded) { + throw new IllegalStateException("Discarded"); + } + + return pendingTasks.isEmpty(); + } + + void discard(Throwable cause) { + if (!isDiscarded) { + pendingTasks.clear(); + stackTracesByTask.clear(); + + stackTraceFuture.completeExceptionally(new RuntimeException("Discarded", cause)); + + isDiscarded = true; + } + } + + void collectStackTraces(ExecutionAttemptID executionId, List<StackTraceElement[]> stackTraces) { + if (isDiscarded) { + throw new IllegalStateException("Discarded"); + } + + if (pendingTasks.remove(executionId)) { + stackTracesByTask.put(executionId, Collections.unmodifiableList(stackTraces)); + } else if (isComplete()) { + throw new IllegalStateException("Completed"); + } else { + throw new IllegalArgumentException("Unknown task " + executionId); + } + } + + void completePromiseAndDiscard() { + if (isComplete()) { + isDiscarded = true; + + long endTime = System.currentTimeMillis(); + + StackTraceSample stackTraceSample = new StackTraceSample( + sampleId, + startTime, + endTime, + stackTracesByTask); + + stackTraceFuture.complete(stackTraceSample); + } else { + throw new IllegalStateException("Not completed yet"); + } + } + + @SuppressWarnings("unchecked") + CompletableFuture<StackTraceSample> getStackTraceSampleFuture() { + return stackTraceFuture; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java new file mode 100644 index 0000000..2086628 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.checkpoints; + +import org.apache.flink.runtime.concurrent.FlinkFutureException; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; +import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder; +import org.apache.flink.runtime.rest.handler.legacy.JsonFactory; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; + +import com.fasterxml.jackson.core.JsonGenerator; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Handler that returns a job's snapshotting settings. + */ +public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandler { + + private static final String CHECKPOINT_CONFIG_REST_PATH = "/jobs/:jobid/checkpoints/config"; + + public CheckpointConfigHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + super(executionGraphHolder, executor); + } + + @Override + public String[] getPaths() { + return new String[]{CHECKPOINT_CONFIG_REST_PATH}; + } + + @Override + public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) { + return CompletableFuture.supplyAsync( + () -> { + try { + return createCheckpointConfigJson(graph); + } catch (IOException e) { + throw new FlinkFutureException("Could not create checkpoint config json.", e); + } + }, + executor); + } + + /** + * Archivist for the CheckpointConfigHandler. + */ + public static class CheckpointConfigJsonArchivist implements JsonArchivist { + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + String json = createCheckpointConfigJson(graph); + String path = CHECKPOINT_CONFIG_REST_PATH + .replace(":jobid", graph.getJobID().toString()); + return Collections.singletonList(new ArchivedJson(path, json)); + } + } + + private static String createCheckpointConfigJson(AccessExecutionGraph graph) throws IOException { + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + JobCheckpointingSettings settings = graph.getJobCheckpointingSettings(); + + if (settings == null) { + return "{}"; + } + + gen.writeStartObject(); + { + gen.writeStringField("mode", settings.isExactlyOnce() ? "exactly_once" : "at_least_once"); + gen.writeNumberField("interval", settings.getCheckpointInterval()); + gen.writeNumberField("timeout", settings.getCheckpointTimeout()); + gen.writeNumberField("min_pause", settings.getMinPauseBetweenCheckpoints()); + gen.writeNumberField("max_concurrent", settings.getMaxConcurrentCheckpoints()); + + ExternalizedCheckpointSettings externalization = settings.getExternalizedCheckpointSettings(); + gen.writeObjectFieldStart("externalization"); + { + if (externalization.externalizeCheckpoints()) { + gen.writeBooleanField("enabled", true); + gen.writeBooleanField("delete_on_cancellation", externalization.deleteOnCancellation()); + } else { + gen.writeBooleanField("enabled", false); + } + } + gen.writeEndObject(); + + } + gen.writeEndObject(); + + gen.close(); + + return writer.toString(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java new file mode 100644 index 0000000..f21fc76 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.checkpoints; + +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; + +import javax.annotation.Nullable; + +/** + * A size-based cache of accessed checkpoints for completed and failed + * checkpoints. + * + * <p>Having this cache in place for accessed stats improves the user + * experience quite a bit as accessed checkpoint stats stay available + * and don't expire. For example if you manage to click on the last + * checkpoint in the history, it is not available via the stats as soon + * as another checkpoint is triggered. With the cache in place, the + * checkpoint will still be available for investigation. + */ +public class CheckpointStatsCache { + + @Nullable + private final Cache<Long, AbstractCheckpointStats> cache; + + public CheckpointStatsCache(int maxNumEntries) { + if (maxNumEntries > 0) { + this.cache = CacheBuilder.<Long, AbstractCheckpointStats>newBuilder() + .maximumSize(maxNumEntries) + .build(); + } else { + this.cache = null; + } + } + + /** + * Try to add the checkpoint to the cache. + * + * @param checkpoint Checkpoint to be added. + */ + void tryAdd(AbstractCheckpointStats checkpoint) { + // Don't add in progress checkpoints as they will be replaced by their + // completed/failed version eventually. + if (cache != null && checkpoint != null && !checkpoint.getStatus().isInProgress()) { + cache.put(checkpoint.getCheckpointId(), checkpoint); + } + } + + /** + * Try to look up a checkpoint by it's ID in the cache. + * + * @param checkpointId ID of the checkpoint to look up. + * @return The checkpoint or <code>null</code> if checkpoint not found. + */ + AbstractCheckpointStats tryGet(long checkpointId) { + if (cache != null) { + return cache.getIfPresent(checkpointId); + } else { + return null; + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java new file mode 100644 index 0000000..61ebeda --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.checkpoints; + +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; +import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory; +import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats; +import org.apache.flink.runtime.checkpoint.FailedCheckpointStats; +import org.apache.flink.runtime.checkpoint.TaskStateStats; +import org.apache.flink.runtime.concurrent.FlinkFutureException; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder; +import org.apache.flink.runtime.rest.handler.legacy.JsonFactory; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; + +import com.fasterxml.jackson.core.JsonGenerator; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Request handler that returns checkpoint stats for a single job vertex. + */ +public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequestHandler { + + private static final String CHECKPOINT_STATS_DETAILS_REST_PATH = "/jobs/:jobid/checkpoints/details/:checkpointid"; + + private final CheckpointStatsCache cache; + + public CheckpointStatsDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, CheckpointStatsCache cache) { + super(executionGraphHolder, executor); + this.cache = cache; + } + + @Override + public String[] getPaths() { + return new String[]{CHECKPOINT_STATS_DETAILS_REST_PATH}; + } + + @Override + public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) { + return CompletableFuture.supplyAsync( + () -> { + long checkpointId = parseCheckpointId(params); + if (checkpointId == -1) { + return "{}"; + } + + CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot(); + if (snapshot == null) { + return "{}"; + } + + AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId); + + if (checkpoint != null) { + cache.tryAdd(checkpoint); + } else { + checkpoint = cache.tryGet(checkpointId); + + if (checkpoint == null) { + return "{}"; + } + } + + try { + return createCheckpointDetailsJson(checkpoint); + } catch (IOException e) { + throw new FlinkFutureException("Could not create checkpoint details json.", e); + } + }, + executor); + } + + /** + * Archivist for the CheckpointStatsDetails. + */ + public static class CheckpointStatsDetailsJsonArchivist implements JsonArchivist { + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + CheckpointStatsSnapshot stats = graph.getCheckpointStatsSnapshot(); + if (stats == null) { + return Collections.emptyList(); + } + CheckpointStatsHistory history = stats.getHistory(); + List<ArchivedJson> archive = new ArrayList<>(); + for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) { + String json = createCheckpointDetailsJson(checkpoint); + String path = CHECKPOINT_STATS_DETAILS_REST_PATH + .replace(":jobid", graph.getJobID().toString()) + .replace(":checkpointid", String.valueOf(checkpoint.getCheckpointId())); + archive.add(new ArchivedJson(path, json)); + } + return archive; + } + } + + public static String createCheckpointDetailsJson(AbstractCheckpointStats checkpoint) throws IOException { + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + gen.writeStartObject(); + + gen.writeNumberField("id", checkpoint.getCheckpointId()); + gen.writeStringField("status", checkpoint.getStatus().toString()); + gen.writeBooleanField("is_savepoint", checkpoint.getProperties().isSavepoint()); + gen.writeNumberField("trigger_timestamp", checkpoint.getTriggerTimestamp()); + gen.writeNumberField("latest_ack_timestamp", checkpoint.getLatestAckTimestamp()); + gen.writeNumberField("state_size", checkpoint.getStateSize()); + gen.writeNumberField("end_to_end_duration", checkpoint.getEndToEndDuration()); + gen.writeNumberField("alignment_buffered", checkpoint.getAlignmentBuffered()); + gen.writeNumberField("num_subtasks", checkpoint.getNumberOfSubtasks()); + gen.writeNumberField("num_acknowledged_subtasks", checkpoint.getNumberOfAcknowledgedSubtasks()); + + if (checkpoint.getStatus().isCompleted()) { + // --- Completed --- + CompletedCheckpointStats completed = (CompletedCheckpointStats) checkpoint; + + String externalPath = completed.getExternalPath(); + if (externalPath != null) { + gen.writeStringField("external_path", externalPath); + } + + gen.writeBooleanField("discarded", completed.isDiscarded()); + } + else if (checkpoint.getStatus().isFailed()) { + // --- Failed --- + FailedCheckpointStats failed = (FailedCheckpointStats) checkpoint; + + gen.writeNumberField("failure_timestamp", failed.getFailureTimestamp()); + + String failureMsg = failed.getFailureMessage(); + if (failureMsg != null) { + gen.writeStringField("failure_message", failureMsg); + } + } + + gen.writeObjectFieldStart("tasks"); + for (TaskStateStats taskStats : checkpoint.getAllTaskStateStats()) { + gen.writeObjectFieldStart(taskStats.getJobVertexId().toString()); + + gen.writeNumberField("latest_ack_timestamp", taskStats.getLatestAckTimestamp()); + gen.writeNumberField("state_size", taskStats.getStateSize()); + gen.writeNumberField("end_to_end_duration", taskStats.getEndToEndDuration(checkpoint.getTriggerTimestamp())); + gen.writeNumberField("alignment_buffered", taskStats.getAlignmentBuffered()); + gen.writeNumberField("num_subtasks", taskStats.getNumberOfSubtasks()); + gen.writeNumberField("num_acknowledged_subtasks", taskStats.getNumberOfAcknowledgedSubtasks()); + + gen.writeEndObject(); + } + gen.writeEndObject(); + + gen.writeEndObject(); + gen.close(); + + return writer.toString(); + } + + /** + * Returns the checkpoint ID parsed from the provided parameters. + * + * @param params Path parameters + * @return Parsed checkpoint ID or <code>-1</code> if not available. + */ + static long parseCheckpointId(Map<String, String> params) { + String param = params.get("checkpointid"); + if (param == null) { + return -1; + } + + try { + return Long.parseLong(param); + } catch (NumberFormatException ignored) { + return -1; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java new file mode 100644 index 0000000..22a8db2 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.checkpoints; + +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; +import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory; +import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats; +import org.apache.flink.runtime.checkpoint.SubtaskStateStats; +import org.apache.flink.runtime.checkpoint.TaskStateStats; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler; +import org.apache.flink.runtime.rest.handler.legacy.AbstractJobVertexRequestHandler; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder; +import org.apache.flink.runtime.rest.handler.legacy.JsonFactory; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; + +import com.fasterxml.jackson.core.JsonGenerator; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Request handler that returns checkpoint stats for a single job vertex with + * the summary stats and all subtasks. + */ +public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGraphRequestHandler { + + private static final String CHECKPOINT_STATS_DETAILS_SUBTASKS_REST_PATH = "/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid"; + + private final CheckpointStatsCache cache; + + public CheckpointStatsDetailsSubtasksHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, CheckpointStatsCache cache) { + super(executionGraphHolder, executor); + this.cache = checkNotNull(cache); + } + + @Override + public String[] getPaths() { + return new String[]{CHECKPOINT_STATS_DETAILS_SUBTASKS_REST_PATH}; + } + + @Override + public CompletableFuture<String> handleJsonRequest( + Map<String, String> pathParams, + Map<String, String> queryParams, + JobManagerGateway jobManagerGateway) { + return super.handleJsonRequest(pathParams, queryParams, jobManagerGateway); + } + + @Override + public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) { + long checkpointId = CheckpointStatsDetailsHandler.parseCheckpointId(params); + if (checkpointId == -1) { + return CompletableFuture.completedFuture("{}"); + } + + JobVertexID vertexId = AbstractJobVertexRequestHandler.parseJobVertexId(params); + if (vertexId == null) { + return CompletableFuture.completedFuture("{}"); + } + + CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot(); + if (snapshot == null) { + return CompletableFuture.completedFuture("{}"); + } + + AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId); + + if (checkpoint != null) { + cache.tryAdd(checkpoint); + } else { + checkpoint = cache.tryGet(checkpointId); + + if (checkpoint == null) { + return CompletableFuture.completedFuture("{}"); + } + } + + TaskStateStats taskStats = checkpoint.getTaskStateStats(vertexId); + if (taskStats == null) { + return CompletableFuture.completedFuture("{}"); + } + + try { + return CompletableFuture.completedFuture(createSubtaskCheckpointDetailsJson(checkpoint, taskStats)); + } catch (IOException e) { + return FutureUtils.completedExceptionally(e); + } + } + + /** + * Archivist for the CheckpointStatsDetailsSubtasksHandler. + */ + public static class CheckpointStatsDetailsSubtasksJsonArchivist implements JsonArchivist { + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + CheckpointStatsSnapshot stats = graph.getCheckpointStatsSnapshot(); + if (stats == null) { + return Collections.emptyList(); + } + CheckpointStatsHistory history = stats.getHistory(); + List<ArchivedJson> archive = new ArrayList<>(); + for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) { + for (TaskStateStats subtaskStats : checkpoint.getAllTaskStateStats()) { + String json = createSubtaskCheckpointDetailsJson(checkpoint, subtaskStats); + String path = CHECKPOINT_STATS_DETAILS_SUBTASKS_REST_PATH + .replace(":jobid", graph.getJobID().toString()) + .replace(":checkpointid", String.valueOf(checkpoint.getCheckpointId())) + .replace(":vertexid", subtaskStats.getJobVertexId().toString()); + archive.add(new ArchivedJson(path, json)); + } + } + return archive; + } + } + + private static String createSubtaskCheckpointDetailsJson(AbstractCheckpointStats checkpoint, TaskStateStats taskStats) throws IOException { + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + + gen.writeStartObject(); + // Overview + gen.writeNumberField("id", checkpoint.getCheckpointId()); + gen.writeStringField("status", checkpoint.getStatus().toString()); + gen.writeNumberField("latest_ack_timestamp", taskStats.getLatestAckTimestamp()); + gen.writeNumberField("state_size", taskStats.getStateSize()); + gen.writeNumberField("end_to_end_duration", taskStats.getEndToEndDuration(checkpoint.getTriggerTimestamp())); + gen.writeNumberField("alignment_buffered", taskStats.getAlignmentBuffered()); + gen.writeNumberField("num_subtasks", taskStats.getNumberOfSubtasks()); + gen.writeNumberField("num_acknowledged_subtasks", taskStats.getNumberOfAcknowledgedSubtasks()); + + if (taskStats.getNumberOfAcknowledgedSubtasks() > 0) { + gen.writeObjectFieldStart("summary"); + gen.writeObjectFieldStart("state_size"); + CheckpointStatsHandler.writeMinMaxAvg(gen, taskStats.getSummaryStats().getStateSizeStats()); + gen.writeEndObject(); + + gen.writeObjectFieldStart("end_to_end_duration"); + MinMaxAvgStats ackTimestampStats = taskStats.getSummaryStats().getAckTimestampStats(); + gen.writeNumberField("min", Math.max(0, ackTimestampStats.getMinimum() - checkpoint.getTriggerTimestamp())); + gen.writeNumberField("max", Math.max(0, ackTimestampStats.getMaximum() - checkpoint.getTriggerTimestamp())); + gen.writeNumberField("avg", Math.max(0, ackTimestampStats.getAverage() - checkpoint.getTriggerTimestamp())); + gen.writeEndObject(); + + gen.writeObjectFieldStart("checkpoint_duration"); + gen.writeObjectFieldStart("sync"); + CheckpointStatsHandler.writeMinMaxAvg(gen, taskStats.getSummaryStats().getSyncCheckpointDurationStats()); + gen.writeEndObject(); + gen.writeObjectFieldStart("async"); + CheckpointStatsHandler.writeMinMaxAvg(gen, taskStats.getSummaryStats().getAsyncCheckpointDurationStats()); + gen.writeEndObject(); + gen.writeEndObject(); + + gen.writeObjectFieldStart("alignment"); + gen.writeObjectFieldStart("buffered"); + CheckpointStatsHandler.writeMinMaxAvg(gen, taskStats.getSummaryStats().getAlignmentBufferedStats()); + gen.writeEndObject(); + gen.writeObjectFieldStart("duration"); + CheckpointStatsHandler.writeMinMaxAvg(gen, taskStats.getSummaryStats().getAlignmentDurationStats()); + gen.writeEndObject(); + gen.writeEndObject(); + gen.writeEndObject(); + } + + SubtaskStateStats[] subtasks = taskStats.getSubtaskStats(); + + gen.writeArrayFieldStart("subtasks"); + for (int i = 0; i < subtasks.length; i++) { + SubtaskStateStats subtask = subtasks[i]; + + gen.writeStartObject(); + gen.writeNumberField("index", i); + + if (subtask != null) { + gen.writeStringField("status", "completed"); + gen.writeNumberField("ack_timestamp", subtask.getAckTimestamp()); + gen.writeNumberField("end_to_end_duration", subtask.getEndToEndDuration(checkpoint.getTriggerTimestamp())); + gen.writeNumberField("state_size", subtask.getStateSize()); + + gen.writeObjectFieldStart("checkpoint"); + gen.writeNumberField("sync", subtask.getSyncCheckpointDuration()); + gen.writeNumberField("async", subtask.getAsyncCheckpointDuration()); + gen.writeEndObject(); + + gen.writeObjectFieldStart("alignment"); + gen.writeNumberField("buffered", subtask.getAlignmentBuffered()); + gen.writeNumberField("duration", subtask.getAlignmentDuration()); + gen.writeEndObject(); + } else { + gen.writeStringField("status", "pending_or_failed"); + } + gen.writeEndObject(); + } + gen.writeEndArray(); + + gen.writeEndObject(); + gen.close(); + + return writer.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java new file mode 100644 index 0000000..abb353e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.checkpoints; + +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; +import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts; +import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory; +import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary; +import org.apache.flink.runtime.checkpoint.FailedCheckpointStats; +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats; +import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats; +import org.apache.flink.runtime.concurrent.FlinkFutureException; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder; +import org.apache.flink.runtime.rest.handler.legacy.JsonFactory; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; + +import com.fasterxml.jackson.core.JsonGenerator; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Handler that returns checkpoint statistics for a job. + */ +public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler { + + private static final String CHECKPOINT_STATS_REST_PATH = "/jobs/:jobid/checkpoints"; + + public CheckpointStatsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + super(executionGraphHolder, executor); + } + + @Override + public String[] getPaths() { + return new String[]{CHECKPOINT_STATS_REST_PATH}; + } + + @Override + public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) { + return CompletableFuture.supplyAsync( + () -> { + try { + return createCheckpointStatsJson(graph); + } catch (IOException e) { + throw new FlinkFutureException("Could not create checkpoint stats json.", e); + } + }, + executor); + } + + /** + * Archivist for the CheckpointStatsJsonHandler. + */ + public static class CheckpointStatsJsonArchivist implements JsonArchivist { + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + String json = createCheckpointStatsJson(graph); + String path = CHECKPOINT_STATS_REST_PATH + .replace(":jobid", graph.getJobID().toString()); + return Collections.singletonList(new ArchivedJson(path, json)); + } + } + + private static String createCheckpointStatsJson(AccessExecutionGraph graph) throws IOException { + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + + CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot(); + if (snapshot == null) { + return "{}"; + } + + gen.writeStartObject(); + + // Counts + writeCounts(gen, snapshot.getCounts()); + + // Summary + writeSummary(gen, snapshot.getSummaryStats()); + + CheckpointStatsHistory history = snapshot.getHistory(); + + // Latest + writeLatestCheckpoints( + gen, + history.getLatestCompletedCheckpoint(), + history.getLatestSavepoint(), + history.getLatestFailedCheckpoint(), + snapshot.getLatestRestoredCheckpoint()); + + // History + writeHistory(gen, snapshot.getHistory()); + + gen.writeEndObject(); + gen.close(); + + return writer.toString(); + } + + private static void writeCounts(JsonGenerator gen, CheckpointStatsCounts counts) throws IOException { + gen.writeObjectFieldStart("counts"); + gen.writeNumberField("restored", counts.getNumberOfRestoredCheckpoints()); + gen.writeNumberField("total", counts.getTotalNumberOfCheckpoints()); + gen.writeNumberField("in_progress", counts.getNumberOfInProgressCheckpoints()); + gen.writeNumberField("completed", counts.getNumberOfCompletedCheckpoints()); + gen.writeNumberField("failed", counts.getNumberOfFailedCheckpoints()); + gen.writeEndObject(); + } + + private static void writeSummary( + JsonGenerator gen, + CompletedCheckpointStatsSummary summary) throws IOException { + gen.writeObjectFieldStart("summary"); + gen.writeObjectFieldStart("state_size"); + writeMinMaxAvg(gen, summary.getStateSizeStats()); + gen.writeEndObject(); + + gen.writeObjectFieldStart("end_to_end_duration"); + writeMinMaxAvg(gen, summary.getEndToEndDurationStats()); + gen.writeEndObject(); + + gen.writeObjectFieldStart("alignment_buffered"); + writeMinMaxAvg(gen, summary.getAlignmentBufferedStats()); + gen.writeEndObject(); + gen.writeEndObject(); + } + + static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException { + gen.writeNumberField("min", minMaxAvg.getMinimum()); + gen.writeNumberField("max", minMaxAvg.getMaximum()); + gen.writeNumberField("avg", minMaxAvg.getAverage()); + } + + private static void writeLatestCheckpoints( + JsonGenerator gen, + @Nullable CompletedCheckpointStats completed, + @Nullable CompletedCheckpointStats savepoint, + @Nullable FailedCheckpointStats failed, + @Nullable RestoredCheckpointStats restored) throws IOException { + + gen.writeObjectFieldStart("latest"); + // Completed checkpoint + if (completed != null) { + gen.writeObjectFieldStart("completed"); + writeCheckpoint(gen, completed); + + String externalPath = completed.getExternalPath(); + if (externalPath != null) { + gen.writeStringField("external_path", completed.getExternalPath()); + } + + gen.writeEndObject(); + } + + // Completed savepoint + if (savepoint != null) { + gen.writeObjectFieldStart("savepoint"); + writeCheckpoint(gen, savepoint); + + String externalPath = savepoint.getExternalPath(); + if (externalPath != null) { + gen.writeStringField("external_path", savepoint.getExternalPath()); + } + gen.writeEndObject(); + } + + // Failed checkpoint + if (failed != null) { + gen.writeObjectFieldStart("failed"); + writeCheckpoint(gen, failed); + + gen.writeNumberField("failure_timestamp", failed.getFailureTimestamp()); + String failureMsg = failed.getFailureMessage(); + if (failureMsg != null) { + gen.writeStringField("failure_message", failureMsg); + } + gen.writeEndObject(); + } + + // Restored checkpoint + if (restored != null) { + gen.writeObjectFieldStart("restored"); + gen.writeNumberField("id", restored.getCheckpointId()); + gen.writeNumberField("restore_timestamp", restored.getRestoreTimestamp()); + gen.writeBooleanField("is_savepoint", restored.getProperties().isSavepoint()); + + String externalPath = restored.getExternalPath(); + if (externalPath != null) { + gen.writeStringField("external_path", externalPath); + } + gen.writeEndObject(); + } + gen.writeEndObject(); + } + + private static void writeCheckpoint(JsonGenerator gen, AbstractCheckpointStats checkpoint) throws IOException { + gen.writeNumberField("id", checkpoint.getCheckpointId()); + gen.writeNumberField("trigger_timestamp", checkpoint.getTriggerTimestamp()); + gen.writeNumberField("latest_ack_timestamp", checkpoint.getLatestAckTimestamp()); + gen.writeNumberField("state_size", checkpoint.getStateSize()); + gen.writeNumberField("end_to_end_duration", checkpoint.getEndToEndDuration()); + gen.writeNumberField("alignment_buffered", checkpoint.getAlignmentBuffered()); + + } + + private static void writeHistory(JsonGenerator gen, CheckpointStatsHistory history) throws IOException { + gen.writeArrayFieldStart("history"); + for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) { + gen.writeStartObject(); + gen.writeNumberField("id", checkpoint.getCheckpointId()); + gen.writeStringField("status", checkpoint.getStatus().toString()); + gen.writeBooleanField("is_savepoint", checkpoint.getProperties().isSavepoint()); + gen.writeNumberField("trigger_timestamp", checkpoint.getTriggerTimestamp()); + gen.writeNumberField("latest_ack_timestamp", checkpoint.getLatestAckTimestamp()); + gen.writeNumberField("state_size", checkpoint.getStateSize()); + gen.writeNumberField("end_to_end_duration", checkpoint.getEndToEndDuration()); + gen.writeNumberField("alignment_buffered", checkpoint.getAlignmentBuffered()); + gen.writeNumberField("num_subtasks", checkpoint.getNumberOfSubtasks()); + gen.writeNumberField("num_acknowledged_subtasks", checkpoint.getNumberOfAcknowledgedSubtasks()); + + if (checkpoint.getStatus().isCompleted()) { + // --- Completed --- + CompletedCheckpointStats completed = (CompletedCheckpointStats) checkpoint; + + String externalPath = completed.getExternalPath(); + if (externalPath != null) { + gen.writeStringField("external_path", externalPath); + } + + gen.writeBooleanField("discarded", completed.isDiscarded()); + } + else if (checkpoint.getStatus().isFailed()) { + // --- Failed --- + FailedCheckpointStats failed = (FailedCheckpointStats) checkpoint; + + gen.writeNumberField("failure_timestamp", failed.getFailureTimestamp()); + + String failureMsg = failed.getFailureMessage(); + if (failureMsg != null) { + gen.writeStringField("failure_message", failureMsg); + } + } + + gen.writeEndObject(); + } + gen.writeEndArray(); + } +}
