AHeise commented on a change in pull request #15054:
URL: https://github.com/apache/flink/pull/15054#discussion_r600455939
##########
File path:
flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
##########
@@ -156,6 +156,48 @@
.withDeprecatedKeys("jobmanager.web.backpressure.delay-between-samples")
.withDescription("This config option is no longer used");
+ /** Time, in milliseconds, after which cached stats are cleaned up if not
accessed. */
Review comment:
Without looking at documentation, I have a hard to figure out the
correlation of all the parameters.
Naively, I'd expect these settings (coming from VisualVM):
- Sample frequency/interval (=FLAMEGRAPH_DELAY?)
- Flame graph window size (=FLAMEGRAPH_NUM_SAMPLES)
- Flame graph window slide (FLAMEGRAPH_REFRESH_INTERVAL? or
FLAMEGRAPH_CLEANUP_INTERVAL??)
Then I'd assume that there is a moving average using the sliding window
semantics of Flink. Maybe you have a way to map these parameters to known
concepts and calculate the technical parameters from them?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/FlameGraphTypeQueryParameter.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.Arrays;
+
+/** Termination mode query parameter. */
+public class FlameGraphTypeQueryParameter
+ extends MessageQueryParameter<FlameGraphTypeQueryParameter.Type> {
+
+ private static final String key = "type";
+
+ public FlameGraphTypeQueryParameter() {
+ super(key, MessageParameterRequisiteness.OPTIONAL);
+ }
+
+ @Override
+ public Type convertStringToValue(String value) {
+ return Type.valueOf(value.toUpperCase());
+ }
+
+ @Override
+ public String convertValueToString(Type value) {
+ return value.name().toLowerCase();
+ }
+
+ @Override
+ public String getDescription() {
+ return "String value that specifies the Flame Graph type. Supported
options are: \""
+ + Arrays.toString(Type.values())
+ + "\".";
+ }
+
+ /** Termination mode. */
Review comment:
c&p error in doc?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexFlameGraphInfo.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.handler.job.JobVertexFlameGraphHandler;
+import org.apache.flink.runtime.webmonitor.threadinfo.OperatorFlameGraph;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+/** Response type of the {@link JobVertexFlameGraphHandler}. */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class JobVertexFlameGraphInfo implements ResponseBody {
+
+ public static JobVertexFlameGraphInfo empty() {
+ return new JobVertexFlameGraphInfo(null, null);
+ }
+
+ private static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
+ private static final String FIELD_NAME_ROOT = "data";
+
+ @JsonProperty(FIELD_NAME_END_TIMESTAMP)
+ private final Long endTimestamp;
Review comment:
long?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/messages/ThreadInfoSample.java
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import java.io.Serializable;
+import java.lang.management.LockInfo;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+
+/**
+ * A serializable wrapper around {@link java.lang.management.ThreadInfo} that
excludes {@link
+ * java.lang.management.LockInfo}-based non-serializable fields.
+ */
+public class ThreadInfoSample implements Serializable {
+
+ private final String threadName;
+ private final long threadId;
+ private final long blockedTime;
+ private final long blockedCount;
+ private final long waitedTime;
+ private final long waitedCount;
+ private final String lockName;
+ private final long lockOwnerId;
+ private final String lockOwnerName;
+ private final boolean inNative;
+ private final boolean suspended;
+ private final Thread.State threadState;
+ private final StackTraceElement[] stackTrace;
+
+ private ThreadInfoSample(
+ String threadName,
+ long threadId,
+ long blockedTime,
+ long blockedCount,
+ long waitedTime,
+ long waitedCount,
+ String lockName,
+ long lockOwnerId,
+ String lockOwnerName,
+ boolean inNative,
+ boolean suspended,
+ Thread.State threadState,
+ StackTraceElement[] stackTrace) {
+ this.threadName = threadName;
+ this.threadId = threadId;
+ this.blockedTime = blockedTime;
+ this.blockedCount = blockedCount;
+ this.waitedTime = waitedTime;
+ this.waitedCount = waitedCount;
+ this.lockName = lockName;
+ this.lockOwnerId = lockOwnerId;
+ this.lockOwnerName = lockOwnerName;
+ this.inNative = inNative;
+ this.suspended = suspended;
+ this.threadState = threadState;
+ this.stackTrace = stackTrace;
+ }
+
+ /**
+ * Constructs a {@link ThreadInfoSample} from {@link ThreadInfo}.
+ *
+ * @param threadInfo {@link ThreadInfo} where the data will be copied from.
+ * @return new {@link ThreadInfoSample}
+ */
+ public static ThreadInfoSample from(ThreadInfo threadInfo) {
Review comment:
Do you really want to support null here and not handle it on call-site?
If so, add `@Nullable` to param and return type.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexFlameGraphHandler.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.FlameGraphTypeQueryParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexFlameGraphInfo;
+import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.threadinfo.OperatorFlameGraph;
+import
org.apache.flink.runtime.webmonitor.threadinfo.OperatorFlameGraphFactory;
Review comment:
This class doesn't exist in this commit yet (improper cut?).
Each commit should compile and ideally have only green tests (this is not
always easily possible).
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/OperatorFlameGraphFactory.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.runtime.messages.ThreadInfoSample;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Factory class for creating Flame Graph representations. */
+public class OperatorFlameGraphFactory {
+
+ /**
+ * Converts {@link OperatorThreadInfoStats} into a FlameGraph.
+ *
+ * @param sample Thread details sample containing stack traces.
+ * @return FlameGraph data structure
+ */
+ public static OperatorFlameGraph
createFullFlameGraphFrom(OperatorThreadInfoStats sample) {
+ Collection<Thread.State> included =
Arrays.asList(Thread.State.values());
+ return createFlameGraphFromSample(sample, included);
+ }
+
+ /**
+ * Converts {@link OperatorThreadInfoStats} into a FlameGraph representing
blocked (Off-CPU)
+ * threads.
+ *
+ * <p>Includes threads in states Thread.State.[TIMED_WAITING,
Thread.State.BLOCKED,
+ * Thread.State.WAITING].
+ *
+ * @param sample Thread details sample containing stack traces.
+ * @return FlameGraph data structure.
+ */
+ public static OperatorFlameGraph
createOffCpuFlameGraph(OperatorThreadInfoStats sample) {
+ Collection<Thread.State> included =
+ Arrays.asList(
+ Thread.State.TIMED_WAITING, Thread.State.BLOCKED,
Thread.State.WAITING);
+ return createFlameGraphFromSample(sample, included);
+ }
+
+ /**
+ * Converts {@link OperatorThreadInfoStats} into a FlameGraph representing
actively running
+ * (On-CPU) threads.
+ *
+ * <p>Includes threads in states Thread.State.[TIMED_WAITING,
Thread.State.BLOCKED,
+ * Thread.State.WAITING].
+ *
+ * @param sample Thread details sample containing stack traces.
+ * @return FlameGraph data structure
+ */
+ public static OperatorFlameGraph
createOnCpuFlameGraph(OperatorThreadInfoStats sample) {
+ Collection<Thread.State> included =
Arrays.asList(Thread.State.RUNNABLE, Thread.State.NEW);
+ return createFlameGraphFromSample(sample, included);
+ }
+
+ private static OperatorFlameGraph createFlameGraphFromSample(
+ OperatorThreadInfoStats sample, Collection<Thread.State>
threadStates) {
Review comment:
-> `Set<Thread.State> threadStates`
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/OperatorFlameGraphFactory.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.runtime.messages.ThreadInfoSample;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Factory class for creating Flame Graph representations. */
+public class OperatorFlameGraphFactory {
+
+ /**
+ * Converts {@link OperatorThreadInfoStats} into a FlameGraph.
+ *
+ * @param sample Thread details sample containing stack traces.
+ * @return FlameGraph data structure
+ */
+ public static OperatorFlameGraph
createFullFlameGraphFrom(OperatorThreadInfoStats sample) {
+ Collection<Thread.State> included =
Arrays.asList(Thread.State.values());
+ return createFlameGraphFromSample(sample, included);
+ }
+
+ /**
+ * Converts {@link OperatorThreadInfoStats} into a FlameGraph representing
blocked (Off-CPU)
+ * threads.
+ *
+ * <p>Includes threads in states Thread.State.[TIMED_WAITING,
Thread.State.BLOCKED,
+ * Thread.State.WAITING].
+ *
+ * @param sample Thread details sample containing stack traces.
+ * @return FlameGraph data structure.
+ */
+ public static OperatorFlameGraph
createOffCpuFlameGraph(OperatorThreadInfoStats sample) {
+ Collection<Thread.State> included =
+ Arrays.asList(
+ Thread.State.TIMED_WAITING, Thread.State.BLOCKED,
Thread.State.WAITING);
+ return createFlameGraphFromSample(sample, included);
+ }
+
+ /**
+ * Converts {@link OperatorThreadInfoStats} into a FlameGraph representing
actively running
+ * (On-CPU) threads.
+ *
+ * <p>Includes threads in states Thread.State.[TIMED_WAITING,
Thread.State.BLOCKED,
Review comment:
C&p error.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/stats/Stats.java
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.stats;
+
+/** Represents one or more statistics samples. */
+public interface Stats {
Review comment:
Although I have found `stats` in the codebase, it feels as if
`Statistics` is more common.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/OperatorFlameGraphFactory.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.runtime.messages.ThreadInfoSample;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Factory class for creating Flame Graph representations. */
+public class OperatorFlameGraphFactory {
+
+ /**
+ * Converts {@link OperatorThreadInfoStats} into a FlameGraph.
+ *
+ * @param sample Thread details sample containing stack traces.
+ * @return FlameGraph data structure
+ */
+ public static OperatorFlameGraph
createFullFlameGraphFrom(OperatorThreadInfoStats sample) {
+ Collection<Thread.State> included =
Arrays.asList(Thread.State.values());
Review comment:
Use `EnumSet` here.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/stats/OperatorStatsTracker.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.stats;
+
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.util.FlinkException;
+
+import java.util.Optional;
+
+/**
+ * Interface for a tracker of statistics for {@link AccessExecutionJobVertex}.
+ *
+ * @param <T> Type of statistics to track
+ */
+public interface OperatorStatsTracker<T extends Stats> {
+
+ /**
+ * Returns statistics for an operator. Automatically triggers sampling
request if statistics are
+ * not available or outdated.
+ *
+ * @param vertex Operator to get the stats for.
+ * @return Statistics for an operator
+ */
+ Optional<T> getOperatorStats(AccessExecutionJobVertex vertex);
+
+ /**
+ * Cleans up the operator stats cache if it contains timed out entries.
+ *
+ * <p>The Guava cache only evicts as maintenance during normal operations.
If this handler is
+ * inactive, it will never be cleaned.
Review comment:
Impl detail remove here.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoOperatorTracker.java
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.stats.OperatorStatsTracker;
+import org.apache.flink.runtime.webmonitor.stats.Stats;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Tracker of thread infos for {@link ExecutionJobVertex}.
+ *
+ * @param <T> Type of the derived statistics to return.
+ */
+public class ThreadInfoOperatorTracker<T extends Stats> implements
OperatorStatsTracker<T> {
+
+ /**
+ * Create a new {@link Builder}.
+ *
+ * @param createStatsFn Function that converts a thread info sample into a
derived statistic.
+ * Could be an identity function.
+ * @param <T> Type of the derived statistics to return.
+ * @return Builder.
+ */
+ public static <T extends Stats> Builder<T> newBuilder(
+ GatewayRetriever<ResourceManagerGateway>
resourceManagerGatewayRetriever,
+ Function<OperatorThreadInfoStats, T> createStatsFn,
+ ExecutorService executor) {
+ return new Builder<>(resourceManagerGatewayRetriever, createStatsFn,
executor);
+ }
+
+ /**
+ * Builder for {@link ThreadInfoOperatorTracker}.
+ *
+ * @param <T> Type of the derived statistics to return.
+ */
+ public static class Builder<T extends Stats> {
+
+ private final GatewayRetriever<ResourceManagerGateway>
resourceManagerGatewayRetriever;
+ private final Function<OperatorThreadInfoStats, T> createStatsFn;
+ private final ExecutorService executor;
+
+ private ThreadInfoRequestCoordinator coordinator;
+ private int cleanUpInterval;
+ private int numSamples;
+ private int statsRefreshInterval;
+ private Time delayBetweenSamples;
+ private int maxThreadInfoDepth;
+
+ private Builder(
+ GatewayRetriever<ResourceManagerGateway>
resourceManagerGatewayRetriever,
+ Function<OperatorThreadInfoStats, T> createStatsFn,
+ ExecutorService executor) {
+ this.resourceManagerGatewayRetriever =
resourceManagerGatewayRetriever;
+ this.createStatsFn = createStatsFn;
+ this.executor = executor;
+ }
+
+ /**
+ * Sets {@code cleanUpInterval}.
+ *
+ * @param coordinator Coordinator for thread info stats request.
+ * @return Builder.
+ */
+ public Builder<T> setCoordinator(ThreadInfoRequestCoordinator
coordinator) {
+ this.coordinator = coordinator;
+ return this;
+ }
+
+ /**
+ * Sets {@code cleanUpInterval}.
+ *
+ * @param cleanUpInterval Clean up interval for completed stats.
+ * @return Builder.
+ */
+ public Builder<T> setCleanUpInterval(int cleanUpInterval) {
+ this.cleanUpInterval = cleanUpInterval;
+ return this;
+ }
+
+ /**
+ * Sets {@code numSamples}.
+ *
+ * @param numSamples Number of thread info samples to collect for each
subtask.
+ * @return Builder.
+ */
+ public Builder<T> setNumSamples(int numSamples) {
+ this.numSamples = numSamples;
+ return this;
+ }
+
+ /**
+ * Sets {@code statsRefreshInterval}.
+ *
+ * @param statsRefreshInterval Time interval after which the available
thread info stats are
+ * deprecated and need to be refreshed.
+ * @return Builder.
+ */
+ public Builder<T> setStatsRefreshInterval(int statsRefreshInterval) {
+ this.statsRefreshInterval = statsRefreshInterval;
+ return this;
+ }
+
+ /**
+ * Sets {@code delayBetweenSamples}.
+ *
+ * @param delayBetweenSamples Delay between individual samples per
task.
+ * @return Builder.
+ */
+ public Builder<T> setDelayBetweenSamples(Time delayBetweenSamples) {
+ this.delayBetweenSamples = delayBetweenSamples;
+ return this;
+ }
+
+ /**
+ * Sets {@code delayBetweenSamples}.
+ *
+ * @param maxThreadInfoDepth Limit for the depth of the stack traces
included when sampling
+ * threads.
+ * @return Builder.
+ */
+ public Builder<T> setMaxThreadInfoDepth(int maxThreadInfoDepth) {
+ this.maxThreadInfoDepth = maxThreadInfoDepth;
+ return this;
+ }
+
+ /**
+ * Constructs a new {@link ThreadInfoOperatorTracker}.
+ *
+ * @return a new {@link ThreadInfoOperatorTracker} instance.
+ */
+ public ThreadInfoOperatorTracker<T> build() {
+ return new ThreadInfoOperatorTracker<>(
+ coordinator,
+ resourceManagerGatewayRetriever,
+ createStatsFn,
+ executor,
+ cleanUpInterval,
+ numSamples,
+ statsRefreshInterval,
+ delayBetweenSamples,
+ maxThreadInfoDepth);
+ }
+ }
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ThreadInfoOperatorTracker.class);
+
+ /** Lock guarding trigger operations. */
+ private final Object lock = new Object();
+
+ private final ThreadInfoRequestCoordinator coordinator;
+
+ private final Function<OperatorThreadInfoStats, T> createStatsFn;
+
+ private final ExecutorService executor;
+
+ private final GatewayRetriever<ResourceManagerGateway>
resourceManagerGatewayRetriever;
+
+ /**
+ * Completed stats. Important: Job vertex IDs need to be scoped by job ID,
because they are
+ * potentially constant across runs messing up the cached data.
+ */
+ private final Cache<AccessExecutionJobVertex, T> operatorStatsCache;
+
+ /**
+ * Pending in progress stats. Important: Job vertex IDs need to be scoped
by job ID, because
+ * they are potentially constant across runs messing up the cached data.
+ */
+ private final Set<AccessExecutionJobVertex> pendingStats = new HashSet<>();
+
+ private final int numSamples;
+
+ private final int statsRefreshInterval;
+
+ private final Time delayBetweenSamples;
+
+ private final int maxThreadInfoDepth;
+
+ // Used for testing purposes
+ private final CompletableFuture<Void> resultAvailableFuture = new
CompletableFuture<>();
+
+ /** Flag indicating whether the stats tracker has been shut down. */
+ private boolean shutDown;
+
+ private ThreadInfoOperatorTracker(
+ ThreadInfoRequestCoordinator coordinator,
+ GatewayRetriever<ResourceManagerGateway>
resourceManagerGatewayRetriever,
+ Function<OperatorThreadInfoStats, T> createStatsFn,
+ ExecutorService executor,
+ int cleanUpInterval,
+ int numSamples,
+ int statsRefreshInterval,
+ Time delayBetweenSamples,
+ int maxStackTraceDepth) {
+
+ this.coordinator = checkNotNull(coordinator, "Thread info samples
coordinator");
+ this.resourceManagerGatewayRetriever =
+ checkNotNull(resourceManagerGatewayRetriever, "Gateway
retriever");
+ this.createStatsFn = checkNotNull(createStatsFn, "Create stats
function");
+ this.executor = checkNotNull(executor, "Scheduled executor");
+
+ checkArgument(cleanUpInterval >= 0, "Clean up interval");
+
+ checkArgument(numSamples >= 1, "Number of samples");
+ this.numSamples = numSamples;
+
+ checkArgument(
+ statsRefreshInterval >= 0,
+ "Stats refresh interval must be greater than or equal to 0");
+ this.statsRefreshInterval = statsRefreshInterval;
+
+ this.delayBetweenSamples = checkNotNull(delayBetweenSamples, "Delay
between samples");
+
+ checkArgument(
+ maxStackTraceDepth >= 0,
+ "Max stack trace depth must be greater than or equal to 0");
+ this.maxThreadInfoDepth = maxStackTraceDepth;
+
+ this.operatorStatsCache =
+ CacheBuilder.newBuilder()
+ .concurrencyLevel(1)
+ .expireAfterAccess(cleanUpInterval,
TimeUnit.MILLISECONDS)
+ .build();
+ }
+
+ @Override
+ public Optional<T> getOperatorStats(AccessExecutionJobVertex vertex) {
+ synchronized (lock) {
+ final T stats = operatorStatsCache.getIfPresent(vertex);
+ if (stats == null
+ || System.currentTimeMillis() >= stats.getEndTime() +
statsRefreshInterval) {
Review comment:
Is this assuming that clocks are synchronized across machines?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/stats/TaskStatsRequestCoordinator.java
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.stats;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayDeque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Encapsulates the common functionality for requesting statistics from
individual tasks and
+ * combining their responses.
+ *
+ * @param <T> Type of the statistics to be gathered.
+ * @param <V> Type of the combined response.
+ */
+public class TaskStatsRequestCoordinator<T, V> {
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ protected final int numGhostSampleIds = 10;
+
+ protected final Object lock = new Object();
+
+ /** Executor used to run the futures. */
+ protected final Executor executor;
+
+ /** Request time out of a triggered task stats request. */
+ protected final Time requestTimeout;
+
+ /** In progress samples. */
+ @GuardedBy("lock")
+ protected final Map<Integer, PendingStatsRequest<T, V>> pendingRequests =
new HashMap<>();
+
+ /** A list of recent request IDs to identify late messages vs. invalid
ones. */
+ protected final ArrayDeque<Integer> recentPendingRequestIds =
+ new ArrayDeque<>(numGhostSampleIds);
+
+ /** Sample ID counter. */
+ @GuardedBy("lock")
+ protected int requestIdCounter;
+
+ /** Flag indicating whether the coordinator is still running. */
+ @GuardedBy("lock")
+ protected boolean isShutDown;
+
+ /**
+ * Creates a new coordinator for the cluster.
+ *
+ * @param executor Used to execute the futures.
+ * @param requestTimeout Request time out of a triggered task stats
request.
+ */
+ public TaskStatsRequestCoordinator(Executor executor, long requestTimeout)
{
+ checkArgument(requestTimeout >= 0L, "The request timeout must be
non-negative.");
+ this.executor = Preconditions.checkNotNull(executor);
+ this.requestTimeout = Time.milliseconds(requestTimeout);
+ }
+
+ /**
+ * Handles the failed stats response by canceling the corresponding
unfinished pending request.
+ *
+ * @param requestId ID of the request to cancel.
+ * @param cause Cause of the cancelling (can be <code>null</code>).
+ */
+ public void handleFailedResponse(int requestId, @Nullable Throwable cause)
{
+ synchronized (lock) {
+ if (isShutDown) {
+ return;
+ }
+
+ PendingStatsRequest<T, V> pendingRequest =
pendingRequests.remove(requestId);
+ if (pendingRequest != null) {
+ if (cause != null) {
+ log.info("Cancelling request " + requestId, cause);
+ } else {
+ log.info("Cancelling request {}", requestId);
+ }
+
+ pendingRequest.discard(cause);
+ rememberRecentRequestId(requestId);
+ }
+ }
+ }
+
+ /**
+ * Shuts down the coordinator.
+ *
+ * <p>After shut down, no further operations are executed.
+ */
+ public void shutDown() {
+ synchronized (lock) {
+ if (!isShutDown) {
+ log.info("Shutting down task stats request coordinator.");
+
+ for (PendingStatsRequest<T, V> pending :
pendingRequests.values()) {
+ pending.discard(new RuntimeException("Shut down"));
+ }
+
+ pendingRequests.clear();
+ recentPendingRequestIds.clear();
+
+ isShutDown = true;
+ }
+ }
+ }
+
+ /**
+ * Handles the successfully returned task stats response by collecting the
corresponding subtask
+ * samples.
+ *
+ * @param requestId ID of the request.
+ * @param executionId ID of the sampled task.
+ * @param result Result of stats request returned by an individual task.
+ * @throws IllegalStateException If unknown request ID and not recently
finished or cancelled
+ * sample.
+ */
+ public void handleSuccessfulResponse(int requestId, ExecutionAttemptID
executionId, T result) {
+
+ synchronized (lock) {
+ if (isShutDown) {
+ return;
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Collecting stats sample {} of task {}", requestId,
executionId);
+ }
+
+ PendingStatsRequest<T, V> pending = pendingRequests.get(requestId);
+
+ if (pending != null) {
+ pending.collectTaskStats(executionId, result);
+
+ // Publish the sample
+ if (pending.isComplete()) {
+ pendingRequests.remove(requestId);
+ rememberRecentRequestId(requestId);
+
+ pending.completePromiseAndDiscard();
+ }
+ } else if (recentPendingRequestIds.contains(requestId)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Received late stats sample {} of task {}",
requestId, executionId);
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Unknown request ID %d.",
requestId));
+ }
+ }
+ }
+ }
+
+ private void rememberRecentRequestId(int sampleId) {
+ if (recentPendingRequestIds.size() >= numGhostSampleIds) {
+ recentPendingRequestIds.removeFirst();
+ }
+ recentPendingRequestIds.addLast(sampleId);
+ }
+
+ @VisibleForTesting
+ public int getNumberOfPendingRequests() {
+ synchronized (lock) {
+ return pendingRequests.size();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * A pending task stats request, which collects samples from individual
tasks and completes the
+ * response future upon gathering all of of them.
+ *
+ * <p>Has to be accessed in lock scope.
+ *
+ * @param <T> Type of the result collected from tasks.
+ * @param <V> Type of the result assembled and returned when all tasks
where sampled.
+ */
+ protected abstract static class PendingStatsRequest<T, V> {
Review comment:
Move to top-level? I'm finding nested abstracts odd if you do not
include nested implementations.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/stats/TaskStatsRequestCoordinator.java
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.stats;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayDeque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Encapsulates the common functionality for requesting statistics from
individual tasks and
+ * combining their responses.
+ *
+ * @param <T> Type of the statistics to be gathered.
+ * @param <V> Type of the combined response.
+ */
+public class TaskStatsRequestCoordinator<T, V> {
+ protected final Logger log = LoggerFactory.getLogger(getClass());
Review comment:
We typically use a static LOG and I don't see a reason why it shouldn't
be used here.
##########
File path:
flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
##########
@@ -156,6 +156,48 @@
.withDeprecatedKeys("jobmanager.web.backpressure.delay-between-samples")
.withDescription("This config option is no longer used");
+ /** Time, in milliseconds, after which cached stats are cleaned up if not
accessed. */
+ public static final ConfigOption<Integer> FLAMEGRAPH_CLEANUP_INTERVAL =
Review comment:
I'd prefer duration for all timebased configurations.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/ThreadInfoSampleService.java
##########
@@ -0,0 +1,107 @@
+/*
Review comment:
For such helper commits, it's always good to state when they are used.
The `ThreadInfoSampleService` is later injected into ... to ....
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexFlameGraphHandler.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.FlameGraphTypeQueryParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexFlameGraphInfo;
+import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.threadinfo.OperatorFlameGraph;
+import
org.apache.flink.runtime.webmonitor.threadinfo.OperatorFlameGraphFactory;
+import org.apache.flink.runtime.webmonitor.threadinfo.OperatorThreadInfoStats;
+import
org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoOperatorTracker;
+
+import
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executor;
+
+/** Request handler for the job vertex Flame Graph. */
+public class JobVertexFlameGraphHandler
+ extends AbstractJobVertexHandler<JobVertexFlameGraphInfo,
JobVertexMessageParameters> {
+
+ private final ThreadInfoOperatorTracker<OperatorThreadInfoStats>
threadInfoOperatorTracker;
+
+ private static JobVertexFlameGraphInfo createJobVertexFlameGraphInfo(
+ OperatorFlameGraph flameGraph) {
+ return new JobVertexFlameGraphInfo(flameGraph.getEndTime(),
flameGraph.getRoot());
+ }
+
+ public JobVertexFlameGraphHandler(
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Time timeout,
+ Map<String, String> responseHeaders,
+ MessageHeaders<EmptyRequestBody, JobVertexFlameGraphInfo,
JobVertexMessageParameters>
+ messageHeaders,
+ ExecutionGraphCache executionGraphCache,
+ Executor executor,
+ ThreadInfoOperatorTracker<OperatorThreadInfoStats>
threadInfoOperatorTracker) {
+ super(
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ messageHeaders,
+ executionGraphCache,
+ executor);
+ this.threadInfoOperatorTracker = threadInfoOperatorTracker;
+ }
+
+ @Override
+ protected JobVertexFlameGraphInfo handleRequest(
+ @Nonnull HandlerRequest<EmptyRequestBody,
JobVertexMessageParameters> request,
+ @Nonnull AccessExecutionJobVertex jobVertex)
+ throws RestHandlerException {
+
+ final Optional<OperatorThreadInfoStats> threadInfoSample =
+ threadInfoOperatorTracker.getOperatorStats(jobVertex);
+
+ final List<FlameGraphTypeQueryParameter.Type> flameGraphTypeParameter =
+ request.getQueryParameter(FlameGraphTypeQueryParameter.class);
+ final FlameGraphTypeQueryParameter.Type flameGraphType;
+
+ if (flameGraphTypeParameter.isEmpty()) {
+ flameGraphType = FlameGraphTypeQueryParameter.Type.FULL;
+ } else {
+ flameGraphType = flameGraphTypeParameter.get(0);
+ }
+
+ final Optional<OperatorFlameGraph> operatorFlameGraph;
+
+ switch (flameGraphType) {
+ case FULL:
+ operatorFlameGraph =
Review comment:
You could embed this mapping into the
`FlameGraphTypeQueryParameter.Type` enum = e.g. making them somewhat
intelligent.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/messages/TaskThreadInfoResponse.java
##########
@@ -0,0 +1,79 @@
+/*
Review comment:
Merge this commit into the commit that actually uses the data structures.
Alternatively, find a more descriptive commit message and add some
explanation to the commit why these structures are needed.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/FlameGraphTypeQueryParameter.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.Arrays;
+
+/** Termination mode query parameter. */
Review comment:
c&p error in doc?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
##########
@@ -166,6 +166,34 @@ public void testRequestTaskManagerInfo() throws Exception {
assertEquals(0, taskManagerInfo.getNumberAvailableSlots());
}
+ /**
Review comment:
Tests should be part of the respective main commits. You want each
commit to stand on its own feet (that's not always strictly possible).
##########
File path:
flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
##########
@@ -156,6 +156,48 @@
.withDeprecatedKeys("jobmanager.web.backpressure.delay-between-samples")
.withDescription("This config option is no longer used");
+ /** Time, in milliseconds, after which cached stats are cleaned up if not
accessed. */
Review comment:
Component of commit message should be [rest].
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/messages/ThreadInfoSample.java
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import java.io.Serializable;
+import java.lang.management.LockInfo;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+
+/**
+ * A serializable wrapper around {@link java.lang.management.ThreadInfo} that
excludes {@link
Review comment:
The javadoc on the fields is copied from Java right? I'm not sure if
this allowed... @zentol could you please take a look?
Alternatively, link to the specific javadoc part of ThreadInfo?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/OperatorFlameGraphFactory.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.runtime.messages.ThreadInfoSample;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Factory class for creating Flame Graph representations. */
+public class OperatorFlameGraphFactory {
+
+ /**
+ * Converts {@link OperatorThreadInfoStats} into a FlameGraph.
+ *
+ * @param sample Thread details sample containing stack traces.
+ * @return FlameGraph data structure
+ */
+ public static OperatorFlameGraph
createFullFlameGraphFrom(OperatorThreadInfoStats sample) {
+ Collection<Thread.State> included =
Arrays.asList(Thread.State.values());
+ return createFlameGraphFromSample(sample, included);
+ }
+
+ /**
+ * Converts {@link OperatorThreadInfoStats} into a FlameGraph representing
blocked (Off-CPU)
+ * threads.
+ *
+ * <p>Includes threads in states Thread.State.[TIMED_WAITING,
Thread.State.BLOCKED,
Review comment:
Formatting of enums.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoSamplesRequest.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.api.common.time.Time;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A wrapper for parameters of a thread info sampling request. */
+public class ThreadInfoSamplesRequest {
+ private final int requestId;
+ private final int numSubSamples;
+ private final Time delayBetweenSamples;
+ private final int maxStackTraceDepth;
+
+ /**
+ * @param requestId ID of the sampling request.
+ * @param numSamples The number of samples.
+ * @param delayBetweenSamples The time to wait between taking samples.
+ * @param maxStackTraceDepth The maximum depth of the returned stack
traces.
+ */
+ public ThreadInfoSamplesRequest(
+ int requestId,
+ @Nonnegative int numSamples,
+ @Nonnull Time delayBetweenSamples,
Review comment:
We usually only annotate `@Nullable`
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/OperatorFlameGraph.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.runtime.webmonitor.stats.Stats;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Flame Graph representation for an operator.
+ *
+ * <p>Statistics are gathered by sampling stack traces of running tasks.
+ */
+public class OperatorFlameGraph implements Serializable, Stats {
+
+ /** Graph node. */
+ public static class Node {
+
+ private final String name;
+ private final int value;
Review comment:
hitCount or numberOfSamples?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/OperatorFlameGraphFactory.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.runtime.messages.ThreadInfoSample;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Factory class for creating Flame Graph representations. */
+public class OperatorFlameGraphFactory {
+
+ /**
+ * Converts {@link OperatorThreadInfoStats} into a FlameGraph.
+ *
+ * @param sample Thread details sample containing stack traces.
+ * @return FlameGraph data structure
+ */
+ public static OperatorFlameGraph
createFullFlameGraphFrom(OperatorThreadInfoStats sample) {
+ Collection<Thread.State> included =
Arrays.asList(Thread.State.values());
+ return createFlameGraphFromSample(sample, included);
+ }
+
+ /**
+ * Converts {@link OperatorThreadInfoStats} into a FlameGraph representing
blocked (Off-CPU)
+ * threads.
+ *
+ * <p>Includes threads in states Thread.State.[TIMED_WAITING,
Thread.State.BLOCKED,
+ * Thread.State.WAITING].
+ *
+ * @param sample Thread details sample containing stack traces.
+ * @return FlameGraph data structure.
+ */
+ public static OperatorFlameGraph
createOffCpuFlameGraph(OperatorThreadInfoStats sample) {
+ Collection<Thread.State> included =
+ Arrays.asList(
+ Thread.State.TIMED_WAITING, Thread.State.BLOCKED,
Thread.State.WAITING);
+ return createFlameGraphFromSample(sample, included);
+ }
+
+ /**
+ * Converts {@link OperatorThreadInfoStats} into a FlameGraph representing
actively running
+ * (On-CPU) threads.
+ *
+ * <p>Includes threads in states Thread.State.[TIMED_WAITING,
Thread.State.BLOCKED,
+ * Thread.State.WAITING].
+ *
+ * @param sample Thread details sample containing stack traces.
+ * @return FlameGraph data structure
+ */
+ public static OperatorFlameGraph
createOnCpuFlameGraph(OperatorThreadInfoStats sample) {
+ Collection<Thread.State> included =
Arrays.asList(Thread.State.RUNNABLE, Thread.State.NEW);
+ return createFlameGraphFromSample(sample, included);
+ }
+
+ private static OperatorFlameGraph createFlameGraphFromSample(
+ OperatorThreadInfoStats sample, Collection<Thread.State>
threadStates) {
+ final NodeBuilder root = new NodeBuilder("root");
+ for (List<ThreadInfoSample> threadInfoSubSamples :
sample.getSamplesBySubtask().values()) {
+ for (ThreadInfoSample threadInfo : threadInfoSubSamples) {
+ if (threadStates.contains(threadInfo.getThreadState())) {
+ StackTraceElement[] traces = threadInfo.getStackTrace();
+ root.increment();
+ NodeBuilder parent = root;
+ for (int i = traces.length - 1; i >= 0; i--) {
+ final String name =
+ traces[i].getClassName()
+ + "."
+ + traces[i].getMethodName()
+ + ":"
+ + traces[i].getLineNumber();
+ parent = parent.addChild(name);
+ }
+ }
+ }
+ }
+ return new OperatorFlameGraph(sample.getEndTime(),
buildFlameGraph(root));
+ }
+
+ private static OperatorFlameGraph.Node buildFlameGraph(NodeBuilder
builder) {
+ final List<OperatorFlameGraph.Node> children = new ArrayList<>();
+ for (NodeBuilder builderChild : builder.children.values()) {
+ children.add(buildFlameGraph(builderChild));
+ }
+ return new OperatorFlameGraph.Node(
+ builder.name, builder.value,
Collections.unmodifiableList(children));
+ }
+
+ private static class NodeBuilder {
+
+ private final Map<String, NodeBuilder> children = new HashMap<>();
+
+ private final String name;
+
+ private int value = 0;
Review comment:
hitCount or numberOfSamples?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/OperatorFlameGraphFactory.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.runtime.messages.ThreadInfoSample;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Factory class for creating Flame Graph representations. */
+public class OperatorFlameGraphFactory {
+
+ /**
+ * Converts {@link OperatorThreadInfoStats} into a FlameGraph.
+ *
+ * @param sample Thread details sample containing stack traces.
+ * @return FlameGraph data structure
+ */
+ public static OperatorFlameGraph
createFullFlameGraphFrom(OperatorThreadInfoStats sample) {
+ Collection<Thread.State> included =
Arrays.asList(Thread.State.values());
+ return createFlameGraphFromSample(sample, included);
+ }
+
+ /**
+ * Converts {@link OperatorThreadInfoStats} into a FlameGraph representing
blocked (Off-CPU)
+ * threads.
+ *
+ * <p>Includes threads in states Thread.State.[TIMED_WAITING,
Thread.State.BLOCKED,
+ * Thread.State.WAITING].
+ *
+ * @param sample Thread details sample containing stack traces.
+ * @return FlameGraph data structure.
+ */
+ public static OperatorFlameGraph
createOffCpuFlameGraph(OperatorThreadInfoStats sample) {
+ Collection<Thread.State> included =
+ Arrays.asList(
+ Thread.State.TIMED_WAITING, Thread.State.BLOCKED,
Thread.State.WAITING);
+ return createFlameGraphFromSample(sample, included);
+ }
+
+ /**
+ * Converts {@link OperatorThreadInfoStats} into a FlameGraph representing
actively running
+ * (On-CPU) threads.
+ *
+ * <p>Includes threads in states Thread.State.[TIMED_WAITING,
Thread.State.BLOCKED,
+ * Thread.State.WAITING].
+ *
+ * @param sample Thread details sample containing stack traces.
+ * @return FlameGraph data structure
+ */
+ public static OperatorFlameGraph
createOnCpuFlameGraph(OperatorThreadInfoStats sample) {
+ Collection<Thread.State> included =
Arrays.asList(Thread.State.RUNNABLE, Thread.State.NEW);
+ return createFlameGraphFromSample(sample, included);
+ }
+
+ private static OperatorFlameGraph createFlameGraphFromSample(
+ OperatorThreadInfoStats sample, Collection<Thread.State>
threadStates) {
+ final NodeBuilder root = new NodeBuilder("root");
+ for (List<ThreadInfoSample> threadInfoSubSamples :
sample.getSamplesBySubtask().values()) {
+ for (ThreadInfoSample threadInfo : threadInfoSubSamples) {
+ if (threadStates.contains(threadInfo.getThreadState())) {
+ StackTraceElement[] traces = threadInfo.getStackTrace();
+ root.increment();
+ NodeBuilder parent = root;
+ for (int i = traces.length - 1; i >= 0; i--) {
+ final String name =
+ traces[i].getClassName()
+ + "."
+ + traces[i].getMethodName()
+ + ":"
+ + traces[i].getLineNumber();
+ parent = parent.addChild(name);
+ }
+ }
+ }
+ }
+ return new OperatorFlameGraph(sample.getEndTime(),
buildFlameGraph(root));
+ }
+
+ private static OperatorFlameGraph.Node buildFlameGraph(NodeBuilder
builder) {
+ final List<OperatorFlameGraph.Node> children = new ArrayList<>();
+ for (NodeBuilder builderChild : builder.children.values()) {
+ children.add(buildFlameGraph(builderChild));
+ }
+ return new OperatorFlameGraph.Node(
+ builder.name, builder.value,
Collections.unmodifiableList(children));
+ }
Review comment:
This should be an instance method of `NodeBuilder`, maybe `toNode`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/OperatorThreadInfoStats.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.ThreadInfoSample;
+import org.apache.flink.runtime.webmonitor.stats.Stats;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Tnread info statistics of multiple tasks. Each subtask can deliver multiple
samples for
Review comment:
typo
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoOperatorTracker.java
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.stats.OperatorStatsTracker;
+import org.apache.flink.runtime.webmonitor.stats.Stats;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Tracker of thread infos for {@link ExecutionJobVertex}.
+ *
+ * @param <T> Type of the derived statistics to return.
+ */
+public class ThreadInfoOperatorTracker<T extends Stats> implements
OperatorStatsTracker<T> {
+
+ /**
+ * Create a new {@link Builder}.
+ *
+ * @param createStatsFn Function that converts a thread info sample into a
derived statistic.
+ * Could be an identity function.
+ * @param <T> Type of the derived statistics to return.
+ * @return Builder.
+ */
+ public static <T extends Stats> Builder<T> newBuilder(
+ GatewayRetriever<ResourceManagerGateway>
resourceManagerGatewayRetriever,
+ Function<OperatorThreadInfoStats, T> createStatsFn,
+ ExecutorService executor) {
+ return new Builder<>(resourceManagerGatewayRetriever, createStatsFn,
executor);
+ }
+
+ /**
+ * Builder for {@link ThreadInfoOperatorTracker}.
+ *
+ * @param <T> Type of the derived statistics to return.
+ */
+ public static class Builder<T extends Stats> {
+
+ private final GatewayRetriever<ResourceManagerGateway>
resourceManagerGatewayRetriever;
+ private final Function<OperatorThreadInfoStats, T> createStatsFn;
+ private final ExecutorService executor;
+
+ private ThreadInfoRequestCoordinator coordinator;
+ private int cleanUpInterval;
+ private int numSamples;
+ private int statsRefreshInterval;
+ private Time delayBetweenSamples;
+ private int maxThreadInfoDepth;
+
+ private Builder(
+ GatewayRetriever<ResourceManagerGateway>
resourceManagerGatewayRetriever,
+ Function<OperatorThreadInfoStats, T> createStatsFn,
+ ExecutorService executor) {
+ this.resourceManagerGatewayRetriever =
resourceManagerGatewayRetriever;
+ this.createStatsFn = createStatsFn;
+ this.executor = executor;
+ }
+
+ /**
+ * Sets {@code cleanUpInterval}.
+ *
+ * @param coordinator Coordinator for thread info stats request.
+ * @return Builder.
+ */
+ public Builder<T> setCoordinator(ThreadInfoRequestCoordinator
coordinator) {
+ this.coordinator = coordinator;
+ return this;
+ }
+
+ /**
+ * Sets {@code cleanUpInterval}.
+ *
+ * @param cleanUpInterval Clean up interval for completed stats.
+ * @return Builder.
+ */
+ public Builder<T> setCleanUpInterval(int cleanUpInterval) {
+ this.cleanUpInterval = cleanUpInterval;
+ return this;
+ }
+
+ /**
+ * Sets {@code numSamples}.
+ *
+ * @param numSamples Number of thread info samples to collect for each
subtask.
+ * @return Builder.
+ */
+ public Builder<T> setNumSamples(int numSamples) {
+ this.numSamples = numSamples;
+ return this;
+ }
+
+ /**
+ * Sets {@code statsRefreshInterval}.
+ *
+ * @param statsRefreshInterval Time interval after which the available
thread info stats are
+ * deprecated and need to be refreshed.
+ * @return Builder.
+ */
+ public Builder<T> setStatsRefreshInterval(int statsRefreshInterval) {
+ this.statsRefreshInterval = statsRefreshInterval;
+ return this;
+ }
+
+ /**
+ * Sets {@code delayBetweenSamples}.
+ *
+ * @param delayBetweenSamples Delay between individual samples per
task.
+ * @return Builder.
+ */
+ public Builder<T> setDelayBetweenSamples(Time delayBetweenSamples) {
+ this.delayBetweenSamples = delayBetweenSamples;
+ return this;
+ }
+
+ /**
+ * Sets {@code delayBetweenSamples}.
+ *
+ * @param maxThreadInfoDepth Limit for the depth of the stack traces
included when sampling
+ * threads.
+ * @return Builder.
+ */
+ public Builder<T> setMaxThreadInfoDepth(int maxThreadInfoDepth) {
+ this.maxThreadInfoDepth = maxThreadInfoDepth;
+ return this;
+ }
+
+ /**
+ * Constructs a new {@link ThreadInfoOperatorTracker}.
+ *
+ * @return a new {@link ThreadInfoOperatorTracker} instance.
+ */
+ public ThreadInfoOperatorTracker<T> build() {
+ return new ThreadInfoOperatorTracker<>(
+ coordinator,
+ resourceManagerGatewayRetriever,
+ createStatsFn,
+ executor,
+ cleanUpInterval,
+ numSamples,
+ statsRefreshInterval,
+ delayBetweenSamples,
+ maxThreadInfoDepth);
+ }
+ }
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ThreadInfoOperatorTracker.class);
+
+ /** Lock guarding trigger operations. */
+ private final Object lock = new Object();
+
+ private final ThreadInfoRequestCoordinator coordinator;
+
+ private final Function<OperatorThreadInfoStats, T> createStatsFn;
+
+ private final ExecutorService executor;
+
+ private final GatewayRetriever<ResourceManagerGateway>
resourceManagerGatewayRetriever;
+
+ /**
+ * Completed stats. Important: Job vertex IDs need to be scoped by job ID,
because they are
+ * potentially constant across runs messing up the cached data.
+ */
+ private final Cache<AccessExecutionJobVertex, T> operatorStatsCache;
+
+ /**
+ * Pending in progress stats. Important: Job vertex IDs need to be scoped
by job ID, because
+ * they are potentially constant across runs messing up the cached data.
+ */
+ private final Set<AccessExecutionJobVertex> pendingStats = new HashSet<>();
+
+ private final int numSamples;
+
+ private final int statsRefreshInterval;
+
+ private final Time delayBetweenSamples;
+
+ private final int maxThreadInfoDepth;
+
+ // Used for testing purposes
+ private final CompletableFuture<Void> resultAvailableFuture = new
CompletableFuture<>();
+
+ /** Flag indicating whether the stats tracker has been shut down. */
+ private boolean shutDown;
Review comment:
Please add `@GuardedBy` where appropriate.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
##########
@@ -834,6 +834,19 @@ public void notifySlotAvailable(
}
}
+ @Override
Review comment:
Commit should be merged to the place where you use it.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/OperatorFlameGraphFactory.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.runtime.messages.ThreadInfoSample;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Factory class for creating Flame Graph representations. */
+public class OperatorFlameGraphFactory {
+
+ /**
+ * Converts {@link OperatorThreadInfoStats} into a FlameGraph.
+ *
+ * @param sample Thread details sample containing stack traces.
+ * @return FlameGraph data structure
+ */
+ public static OperatorFlameGraph
createFullFlameGraphFrom(OperatorThreadInfoStats sample) {
+ Collection<Thread.State> included =
Arrays.asList(Thread.State.values());
+ return createFlameGraphFromSample(sample, included);
+ }
+
+ /**
+ * Converts {@link OperatorThreadInfoStats} into a FlameGraph representing
blocked (Off-CPU)
+ * threads.
+ *
+ * <p>Includes threads in states Thread.State.[TIMED_WAITING,
Thread.State.BLOCKED,
+ * Thread.State.WAITING].
+ *
+ * @param sample Thread details sample containing stack traces.
+ * @return FlameGraph data structure.
+ */
+ public static OperatorFlameGraph
createOffCpuFlameGraph(OperatorThreadInfoStats sample) {
+ Collection<Thread.State> included =
+ Arrays.asList(
+ Thread.State.TIMED_WAITING, Thread.State.BLOCKED,
Thread.State.WAITING);
+ return createFlameGraphFromSample(sample, included);
+ }
+
+ /**
+ * Converts {@link OperatorThreadInfoStats} into a FlameGraph representing
actively running
+ * (On-CPU) threads.
+ *
+ * <p>Includes threads in states Thread.State.[TIMED_WAITING,
Thread.State.BLOCKED,
+ * Thread.State.WAITING].
+ *
+ * @param sample Thread details sample containing stack traces.
+ * @return FlameGraph data structure
+ */
+ public static OperatorFlameGraph
createOnCpuFlameGraph(OperatorThreadInfoStats sample) {
+ Collection<Thread.State> included =
Arrays.asList(Thread.State.RUNNABLE, Thread.State.NEW);
+ return createFlameGraphFromSample(sample, included);
+ }
+
+ private static OperatorFlameGraph createFlameGraphFromSample(
+ OperatorThreadInfoStats sample, Collection<Thread.State>
threadStates) {
+ final NodeBuilder root = new NodeBuilder("root");
+ for (List<ThreadInfoSample> threadInfoSubSamples :
sample.getSamplesBySubtask().values()) {
+ for (ThreadInfoSample threadInfo : threadInfoSubSamples) {
+ if (threadStates.contains(threadInfo.getThreadState())) {
+ StackTraceElement[] traces = threadInfo.getStackTrace();
+ root.increment();
+ NodeBuilder parent = root;
+ for (int i = traces.length - 1; i >= 0; i--) {
+ final String name =
+ traces[i].getClassName()
+ + "."
+ + traces[i].getMethodName()
+ + ":"
+ + traces[i].getLineNumber();
+ parent = parent.addChild(name);
+ }
+ }
+ }
+ }
+ return new OperatorFlameGraph(sample.getEndTime(),
buildFlameGraph(root));
+ }
+
+ private static OperatorFlameGraph.Node buildFlameGraph(NodeBuilder
builder) {
+ final List<OperatorFlameGraph.Node> children = new ArrayList<>();
Review comment:
Preinit size?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoOperatorTracker.java
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.stats.OperatorStatsTracker;
+import org.apache.flink.runtime.webmonitor.stats.Stats;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Tracker of thread infos for {@link ExecutionJobVertex}.
+ *
+ * @param <T> Type of the derived statistics to return.
+ */
+public class ThreadInfoOperatorTracker<T extends Stats> implements
OperatorStatsTracker<T> {
+
+ /**
+ * Create a new {@link Builder}.
+ *
+ * @param createStatsFn Function that converts a thread info sample into a
derived statistic.
+ * Could be an identity function.
+ * @param <T> Type of the derived statistics to return.
+ * @return Builder.
+ */
+ public static <T extends Stats> Builder<T> newBuilder(
+ GatewayRetriever<ResourceManagerGateway>
resourceManagerGatewayRetriever,
+ Function<OperatorThreadInfoStats, T> createStatsFn,
+ ExecutorService executor) {
+ return new Builder<>(resourceManagerGatewayRetriever, createStatsFn,
executor);
+ }
+
+ /**
+ * Builder for {@link ThreadInfoOperatorTracker}.
+ *
+ * @param <T> Type of the derived statistics to return.
+ */
+ public static class Builder<T extends Stats> {
+
+ private final GatewayRetriever<ResourceManagerGateway>
resourceManagerGatewayRetriever;
+ private final Function<OperatorThreadInfoStats, T> createStatsFn;
+ private final ExecutorService executor;
+
+ private ThreadInfoRequestCoordinator coordinator;
+ private int cleanUpInterval;
+ private int numSamples;
+ private int statsRefreshInterval;
+ private Time delayBetweenSamples;
+ private int maxThreadInfoDepth;
+
+ private Builder(
+ GatewayRetriever<ResourceManagerGateway>
resourceManagerGatewayRetriever,
+ Function<OperatorThreadInfoStats, T> createStatsFn,
+ ExecutorService executor) {
+ this.resourceManagerGatewayRetriever =
resourceManagerGatewayRetriever;
+ this.createStatsFn = createStatsFn;
+ this.executor = executor;
+ }
+
+ /**
+ * Sets {@code cleanUpInterval}.
+ *
+ * @param coordinator Coordinator for thread info stats request.
+ * @return Builder.
+ */
+ public Builder<T> setCoordinator(ThreadInfoRequestCoordinator
coordinator) {
+ this.coordinator = coordinator;
+ return this;
+ }
+
+ /**
+ * Sets {@code cleanUpInterval}.
+ *
+ * @param cleanUpInterval Clean up interval for completed stats.
+ * @return Builder.
+ */
+ public Builder<T> setCleanUpInterval(int cleanUpInterval) {
+ this.cleanUpInterval = cleanUpInterval;
+ return this;
+ }
+
+ /**
+ * Sets {@code numSamples}.
+ *
+ * @param numSamples Number of thread info samples to collect for each
subtask.
+ * @return Builder.
+ */
+ public Builder<T> setNumSamples(int numSamples) {
+ this.numSamples = numSamples;
+ return this;
+ }
+
+ /**
+ * Sets {@code statsRefreshInterval}.
+ *
+ * @param statsRefreshInterval Time interval after which the available
thread info stats are
+ * deprecated and need to be refreshed.
+ * @return Builder.
+ */
+ public Builder<T> setStatsRefreshInterval(int statsRefreshInterval) {
+ this.statsRefreshInterval = statsRefreshInterval;
+ return this;
+ }
+
+ /**
+ * Sets {@code delayBetweenSamples}.
+ *
+ * @param delayBetweenSamples Delay between individual samples per
task.
+ * @return Builder.
+ */
+ public Builder<T> setDelayBetweenSamples(Time delayBetweenSamples) {
+ this.delayBetweenSamples = delayBetweenSamples;
+ return this;
+ }
+
+ /**
+ * Sets {@code delayBetweenSamples}.
+ *
+ * @param maxThreadInfoDepth Limit for the depth of the stack traces
included when sampling
+ * threads.
+ * @return Builder.
+ */
+ public Builder<T> setMaxThreadInfoDepth(int maxThreadInfoDepth) {
+ this.maxThreadInfoDepth = maxThreadInfoDepth;
+ return this;
+ }
+
+ /**
+ * Constructs a new {@link ThreadInfoOperatorTracker}.
+ *
+ * @return a new {@link ThreadInfoOperatorTracker} instance.
+ */
+ public ThreadInfoOperatorTracker<T> build() {
+ return new ThreadInfoOperatorTracker<>(
+ coordinator,
+ resourceManagerGatewayRetriever,
+ createStatsFn,
+ executor,
+ cleanUpInterval,
+ numSamples,
+ statsRefreshInterval,
+ delayBetweenSamples,
+ maxThreadInfoDepth);
+ }
+ }
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ThreadInfoOperatorTracker.class);
+
+ /** Lock guarding trigger operations. */
+ private final Object lock = new Object();
+
+ private final ThreadInfoRequestCoordinator coordinator;
+
+ private final Function<OperatorThreadInfoStats, T> createStatsFn;
+
+ private final ExecutorService executor;
+
+ private final GatewayRetriever<ResourceManagerGateway>
resourceManagerGatewayRetriever;
+
+ /**
+ * Completed stats. Important: Job vertex IDs need to be scoped by job ID,
because they are
+ * potentially constant across runs messing up the cached data.
+ */
+ private final Cache<AccessExecutionJobVertex, T> operatorStatsCache;
+
+ /**
+ * Pending in progress stats. Important: Job vertex IDs need to be scoped
by job ID, because
+ * they are potentially constant across runs messing up the cached data.
+ */
+ private final Set<AccessExecutionJobVertex> pendingStats = new HashSet<>();
+
+ private final int numSamples;
+
+ private final int statsRefreshInterval;
+
+ private final Time delayBetweenSamples;
+
+ private final int maxThreadInfoDepth;
+
+ // Used for testing purposes
+ private final CompletableFuture<Void> resultAvailableFuture = new
CompletableFuture<>();
+
+ /** Flag indicating whether the stats tracker has been shut down. */
+ private boolean shutDown;
+
+ private ThreadInfoOperatorTracker(
+ ThreadInfoRequestCoordinator coordinator,
+ GatewayRetriever<ResourceManagerGateway>
resourceManagerGatewayRetriever,
+ Function<OperatorThreadInfoStats, T> createStatsFn,
+ ExecutorService executor,
+ int cleanUpInterval,
+ int numSamples,
+ int statsRefreshInterval,
+ Time delayBetweenSamples,
+ int maxStackTraceDepth) {
+
+ this.coordinator = checkNotNull(coordinator, "Thread info samples
coordinator");
+ this.resourceManagerGatewayRetriever =
+ checkNotNull(resourceManagerGatewayRetriever, "Gateway
retriever");
+ this.createStatsFn = checkNotNull(createStatsFn, "Create stats
function");
+ this.executor = checkNotNull(executor, "Scheduled executor");
+
+ checkArgument(cleanUpInterval >= 0, "Clean up interval");
Review comment:
What's interval 0 supposed to do? Ditto on later parameters.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoOperatorTracker.java
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.stats.OperatorStatsTracker;
+import org.apache.flink.runtime.webmonitor.stats.Stats;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Tracker of thread infos for {@link ExecutionJobVertex}.
+ *
+ * @param <T> Type of the derived statistics to return.
+ */
+public class ThreadInfoOperatorTracker<T extends Stats> implements
OperatorStatsTracker<T> {
+
+ /**
+ * Create a new {@link Builder}.
+ *
+ * @param createStatsFn Function that converts a thread info sample into a
derived statistic.
+ * Could be an identity function.
+ * @param <T> Type of the derived statistics to return.
+ * @return Builder.
+ */
+ public static <T extends Stats> Builder<T> newBuilder(
+ GatewayRetriever<ResourceManagerGateway>
resourceManagerGatewayRetriever,
+ Function<OperatorThreadInfoStats, T> createStatsFn,
+ ExecutorService executor) {
+ return new Builder<>(resourceManagerGatewayRetriever, createStatsFn,
executor);
+ }
+
+ /**
+ * Builder for {@link ThreadInfoOperatorTracker}.
+ *
+ * @param <T> Type of the derived statistics to return.
+ */
+ public static class Builder<T extends Stats> {
+
+ private final GatewayRetriever<ResourceManagerGateway>
resourceManagerGatewayRetriever;
+ private final Function<OperatorThreadInfoStats, T> createStatsFn;
+ private final ExecutorService executor;
+
+ private ThreadInfoRequestCoordinator coordinator;
+ private int cleanUpInterval;
+ private int numSamples;
+ private int statsRefreshInterval;
+ private Time delayBetweenSamples;
+ private int maxThreadInfoDepth;
+
+ private Builder(
+ GatewayRetriever<ResourceManagerGateway>
resourceManagerGatewayRetriever,
+ Function<OperatorThreadInfoStats, T> createStatsFn,
+ ExecutorService executor) {
+ this.resourceManagerGatewayRetriever =
resourceManagerGatewayRetriever;
+ this.createStatsFn = createStatsFn;
+ this.executor = executor;
+ }
+
+ /**
+ * Sets {@code cleanUpInterval}.
+ *
+ * @param coordinator Coordinator for thread info stats request.
+ * @return Builder.
+ */
+ public Builder<T> setCoordinator(ThreadInfoRequestCoordinator
coordinator) {
+ this.coordinator = coordinator;
+ return this;
+ }
+
+ /**
+ * Sets {@code cleanUpInterval}.
+ *
+ * @param cleanUpInterval Clean up interval for completed stats.
+ * @return Builder.
+ */
+ public Builder<T> setCleanUpInterval(int cleanUpInterval) {
+ this.cleanUpInterval = cleanUpInterval;
+ return this;
+ }
+
+ /**
+ * Sets {@code numSamples}.
+ *
+ * @param numSamples Number of thread info samples to collect for each
subtask.
+ * @return Builder.
+ */
+ public Builder<T> setNumSamples(int numSamples) {
+ this.numSamples = numSamples;
+ return this;
+ }
+
+ /**
+ * Sets {@code statsRefreshInterval}.
+ *
+ * @param statsRefreshInterval Time interval after which the available
thread info stats are
+ * deprecated and need to be refreshed.
+ * @return Builder.
+ */
+ public Builder<T> setStatsRefreshInterval(int statsRefreshInterval) {
+ this.statsRefreshInterval = statsRefreshInterval;
+ return this;
+ }
+
+ /**
+ * Sets {@code delayBetweenSamples}.
+ *
+ * @param delayBetweenSamples Delay between individual samples per
task.
+ * @return Builder.
+ */
+ public Builder<T> setDelayBetweenSamples(Time delayBetweenSamples) {
+ this.delayBetweenSamples = delayBetweenSamples;
+ return this;
+ }
+
+ /**
+ * Sets {@code delayBetweenSamples}.
+ *
+ * @param maxThreadInfoDepth Limit for the depth of the stack traces
included when sampling
+ * threads.
+ * @return Builder.
+ */
+ public Builder<T> setMaxThreadInfoDepth(int maxThreadInfoDepth) {
+ this.maxThreadInfoDepth = maxThreadInfoDepth;
+ return this;
+ }
+
+ /**
+ * Constructs a new {@link ThreadInfoOperatorTracker}.
+ *
+ * @return a new {@link ThreadInfoOperatorTracker} instance.
+ */
+ public ThreadInfoOperatorTracker<T> build() {
+ return new ThreadInfoOperatorTracker<>(
+ coordinator,
+ resourceManagerGatewayRetriever,
+ createStatsFn,
+ executor,
+ cleanUpInterval,
+ numSamples,
+ statsRefreshInterval,
+ delayBetweenSamples,
+ maxThreadInfoDepth);
+ }
+ }
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ThreadInfoOperatorTracker.class);
+
+ /** Lock guarding trigger operations. */
+ private final Object lock = new Object();
+
+ private final ThreadInfoRequestCoordinator coordinator;
+
+ private final Function<OperatorThreadInfoStats, T> createStatsFn;
+
+ private final ExecutorService executor;
+
+ private final GatewayRetriever<ResourceManagerGateway>
resourceManagerGatewayRetriever;
+
+ /**
+ * Completed stats. Important: Job vertex IDs need to be scoped by job ID,
because they are
+ * potentially constant across runs messing up the cached data.
+ */
+ private final Cache<AccessExecutionJobVertex, T> operatorStatsCache;
Review comment:
What is the actual purpose of the cache? How often do you access cached
data? What is the flow?
Is the cache hit, when a user looks at the flame graph of a job and switches
back and forth of a task? Or is it hit in a more fine-grain level? Do you have
an auto-fresh feature in the UI that needs the cache for updates? Couldn't we
use async requests instead? (Note that these questions should not result in any
changes to this PR but are more meant for a future refactoring)
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/stats/TaskStatsRequestCoordinator.java
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.stats;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayDeque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Encapsulates the common functionality for requesting statistics from
individual tasks and
+ * combining their responses.
+ *
+ * @param <T> Type of the statistics to be gathered.
+ * @param <V> Type of the combined response.
+ */
+public class TaskStatsRequestCoordinator<T, V> {
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ protected final int numGhostSampleIds = 10;
Review comment:
Convert to constant? What is this about?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/stats/TaskStatsRequestCoordinator.java
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.stats;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayDeque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Encapsulates the common functionality for requesting statistics from
individual tasks and
+ * combining their responses.
+ *
+ * @param <T> Type of the statistics to be gathered.
+ * @param <V> Type of the combined response.
+ */
+public class TaskStatsRequestCoordinator<T, V> {
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ protected final int numGhostSampleIds = 10;
+
+ protected final Object lock = new Object();
+
+ /** Executor used to run the futures. */
+ protected final Executor executor;
+
+ /** Request time out of a triggered task stats request. */
+ protected final Time requestTimeout;
+
+ /** In progress samples. */
+ @GuardedBy("lock")
+ protected final Map<Integer, PendingStatsRequest<T, V>> pendingRequests =
new HashMap<>();
+
+ /** A list of recent request IDs to identify late messages vs. invalid
ones. */
+ protected final ArrayDeque<Integer> recentPendingRequestIds =
+ new ArrayDeque<>(numGhostSampleIds);
+
+ /** Sample ID counter. */
+ @GuardedBy("lock")
+ protected int requestIdCounter;
+
+ /** Flag indicating whether the coordinator is still running. */
+ @GuardedBy("lock")
+ protected boolean isShutDown;
+
+ /**
+ * Creates a new coordinator for the cluster.
+ *
+ * @param executor Used to execute the futures.
+ * @param requestTimeout Request time out of a triggered task stats
request.
+ */
+ public TaskStatsRequestCoordinator(Executor executor, long requestTimeout)
{
+ checkArgument(requestTimeout >= 0L, "The request timeout must be
non-negative.");
Review comment:
What's the semantic of timeout = 0?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinator.java
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.TaskThreadInfoResponse;
+import org.apache.flink.runtime.messages.ThreadInfoSample;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.webmonitor.stats.TaskStatsRequestCoordinator;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A coordinator for triggering and collecting thread info stats of running
operator subtasks. */
+public class ThreadInfoRequestCoordinator
+ extends TaskStatsRequestCoordinator<List<ThreadInfoSample>,
OperatorThreadInfoStats> {
+
+ /**
+ * Creates a new coordinator for the job.
+ *
+ * @param executor Used to execute the futures.
+ * @param requestTimeout Time out after the expected sampling duration.
This is added to the
+ * expected duration of a request, which is determined by the number
of samples and the
+ * delay between each sample.
+ */
+ public ThreadInfoRequestCoordinator(Executor executor, long
requestTimeout) {
+ super(executor, requestTimeout);
+ }
+
+ /**
+ * Triggers collection of thread info stats of an operator by combining
thread info responses
+ * from given subtasks. A thread info response of a subtask in turn
consists of {@code
+ * numSamples}, collected with {@code delayBetweenSamples} milliseconds
delay between them.
+ *
+ * @param subtasksWithGateways Execution vertices together with
TaskExecutors running them.
+ * @param numSamples Number of thread info samples to collect from each
subtask.
+ * @param delayBetweenSamples Delay between consecutive samples (ms).
+ * @param maxStackTraceDepth Maximum depth of the stack traces collected
within thread info
+ * samples.
+ * @return A future of the completed thread info stats.
+ */
+ public CompletableFuture<OperatorThreadInfoStats> triggerThreadInfoRequest(
+ List<Tuple2<AccessExecutionVertex,
CompletableFuture<TaskExecutorGateway>>>
Review comment:
This parameter type is inducing nightmares ;) Maybe add a small record
class?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/stats/TaskStatsRequestCoordinator.java
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.stats;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayDeque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Encapsulates the common functionality for requesting statistics from
individual tasks and
+ * combining their responses.
+ *
+ * @param <T> Type of the statistics to be gathered.
+ * @param <V> Type of the combined response.
+ */
+public class TaskStatsRequestCoordinator<T, V> {
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ protected final int numGhostSampleIds = 10;
+
+ protected final Object lock = new Object();
+
+ /** Executor used to run the futures. */
+ protected final Executor executor;
+
+ /** Request time out of a triggered task stats request. */
+ protected final Time requestTimeout;
+
+ /** In progress samples. */
+ @GuardedBy("lock")
+ protected final Map<Integer, PendingStatsRequest<T, V>> pendingRequests =
new HashMap<>();
+
+ /** A list of recent request IDs to identify late messages vs. invalid
ones. */
+ protected final ArrayDeque<Integer> recentPendingRequestIds =
+ new ArrayDeque<>(numGhostSampleIds);
+
+ /** Sample ID counter. */
+ @GuardedBy("lock")
+ protected int requestIdCounter;
+
+ /** Flag indicating whether the coordinator is still running. */
+ @GuardedBy("lock")
+ protected boolean isShutDown;
+
+ /**
+ * Creates a new coordinator for the cluster.
+ *
+ * @param executor Used to execute the futures.
+ * @param requestTimeout Request time out of a triggered task stats
request.
+ */
+ public TaskStatsRequestCoordinator(Executor executor, long requestTimeout)
{
+ checkArgument(requestTimeout >= 0L, "The request timeout must be
non-negative.");
+ this.executor = Preconditions.checkNotNull(executor);
+ this.requestTimeout = Time.milliseconds(requestTimeout);
+ }
+
+ /**
+ * Handles the failed stats response by canceling the corresponding
unfinished pending request.
+ *
+ * @param requestId ID of the request to cancel.
+ * @param cause Cause of the cancelling (can be <code>null</code>).
+ */
+ public void handleFailedResponse(int requestId, @Nullable Throwable cause)
{
+ synchronized (lock) {
+ if (isShutDown) {
+ return;
+ }
+
+ PendingStatsRequest<T, V> pendingRequest =
pendingRequests.remove(requestId);
+ if (pendingRequest != null) {
+ if (cause != null) {
+ log.info("Cancelling request " + requestId, cause);
+ } else {
+ log.info("Cancelling request {}", requestId);
+ }
+
+ pendingRequest.discard(cause);
+ rememberRecentRequestId(requestId);
+ }
+ }
+ }
+
+ /**
+ * Shuts down the coordinator.
+ *
+ * <p>After shut down, no further operations are executed.
+ */
+ public void shutDown() {
+ synchronized (lock) {
+ if (!isShutDown) {
+ log.info("Shutting down task stats request coordinator.");
+
+ for (PendingStatsRequest<T, V> pending :
pendingRequests.values()) {
+ pending.discard(new RuntimeException("Shut down"));
+ }
+
+ pendingRequests.clear();
+ recentPendingRequestIds.clear();
+
+ isShutDown = true;
+ }
+ }
+ }
+
+ /**
+ * Handles the successfully returned task stats response by collecting the
corresponding subtask
+ * samples.
+ *
+ * @param requestId ID of the request.
+ * @param executionId ID of the sampled task.
+ * @param result Result of stats request returned by an individual task.
+ * @throws IllegalStateException If unknown request ID and not recently
finished or cancelled
+ * sample.
+ */
+ public void handleSuccessfulResponse(int requestId, ExecutionAttemptID
executionId, T result) {
+
+ synchronized (lock) {
+ if (isShutDown) {
+ return;
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Collecting stats sample {} of task {}", requestId,
executionId);
+ }
+
+ PendingStatsRequest<T, V> pending = pendingRequests.get(requestId);
+
+ if (pending != null) {
+ pending.collectTaskStats(executionId, result);
+
+ // Publish the sample
+ if (pending.isComplete()) {
+ pendingRequests.remove(requestId);
+ rememberRecentRequestId(requestId);
+
+ pending.completePromiseAndDiscard();
+ }
+ } else if (recentPendingRequestIds.contains(requestId)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Received late stats sample {} of task {}",
requestId, executionId);
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Unknown request ID %d.",
requestId));
+ }
+ }
+ }
+ }
+
+ private void rememberRecentRequestId(int sampleId) {
+ if (recentPendingRequestIds.size() >= numGhostSampleIds) {
+ recentPendingRequestIds.removeFirst();
+ }
+ recentPendingRequestIds.addLast(sampleId);
+ }
+
+ @VisibleForTesting
+ public int getNumberOfPendingRequests() {
+ synchronized (lock) {
+ return pendingRequests.size();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * A pending task stats request, which collects samples from individual
tasks and completes the
+ * response future upon gathering all of of them.
+ *
+ * <p>Has to be accessed in lock scope.
Review comment:
Might be easier to add `@NotThreadSafe`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoOperatorTracker.java
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.stats.OperatorStatsTracker;
+import org.apache.flink.runtime.webmonitor.stats.Stats;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Tracker of thread infos for {@link ExecutionJobVertex}.
+ *
+ * @param <T> Type of the derived statistics to return.
+ */
+public class ThreadInfoOperatorTracker<T extends Stats> implements
OperatorStatsTracker<T> {
+
+ /**
+ * Create a new {@link Builder}.
+ *
+ * @param createStatsFn Function that converts a thread info sample into a
derived statistic.
+ * Could be an identity function.
+ * @param <T> Type of the derived statistics to return.
+ * @return Builder.
+ */
+ public static <T extends Stats> Builder<T> newBuilder(
+ GatewayRetriever<ResourceManagerGateway>
resourceManagerGatewayRetriever,
+ Function<OperatorThreadInfoStats, T> createStatsFn,
+ ExecutorService executor) {
+ return new Builder<>(resourceManagerGatewayRetriever, createStatsFn,
executor);
+ }
+
+ /**
+ * Builder for {@link ThreadInfoOperatorTracker}.
+ *
+ * @param <T> Type of the derived statistics to return.
+ */
+ public static class Builder<T extends Stats> {
+
+ private final GatewayRetriever<ResourceManagerGateway>
resourceManagerGatewayRetriever;
+ private final Function<OperatorThreadInfoStats, T> createStatsFn;
+ private final ExecutorService executor;
+
+ private ThreadInfoRequestCoordinator coordinator;
+ private int cleanUpInterval;
+ private int numSamples;
+ private int statsRefreshInterval;
+ private Time delayBetweenSamples;
+ private int maxThreadInfoDepth;
+
+ private Builder(
+ GatewayRetriever<ResourceManagerGateway>
resourceManagerGatewayRetriever,
+ Function<OperatorThreadInfoStats, T> createStatsFn,
+ ExecutorService executor) {
+ this.resourceManagerGatewayRetriever =
resourceManagerGatewayRetriever;
+ this.createStatsFn = createStatsFn;
+ this.executor = executor;
+ }
+
+ /**
+ * Sets {@code cleanUpInterval}.
+ *
+ * @param coordinator Coordinator for thread info stats request.
+ * @return Builder.
+ */
+ public Builder<T> setCoordinator(ThreadInfoRequestCoordinator
coordinator) {
+ this.coordinator = coordinator;
+ return this;
+ }
+
+ /**
+ * Sets {@code cleanUpInterval}.
+ *
+ * @param cleanUpInterval Clean up interval for completed stats.
+ * @return Builder.
+ */
+ public Builder<T> setCleanUpInterval(int cleanUpInterval) {
+ this.cleanUpInterval = cleanUpInterval;
+ return this;
+ }
+
+ /**
+ * Sets {@code numSamples}.
+ *
+ * @param numSamples Number of thread info samples to collect for each
subtask.
+ * @return Builder.
+ */
+ public Builder<T> setNumSamples(int numSamples) {
+ this.numSamples = numSamples;
+ return this;
+ }
+
+ /**
+ * Sets {@code statsRefreshInterval}.
+ *
+ * @param statsRefreshInterval Time interval after which the available
thread info stats are
+ * deprecated and need to be refreshed.
+ * @return Builder.
+ */
+ public Builder<T> setStatsRefreshInterval(int statsRefreshInterval) {
+ this.statsRefreshInterval = statsRefreshInterval;
+ return this;
+ }
+
+ /**
+ * Sets {@code delayBetweenSamples}.
+ *
+ * @param delayBetweenSamples Delay between individual samples per
task.
+ * @return Builder.
+ */
+ public Builder<T> setDelayBetweenSamples(Time delayBetweenSamples) {
+ this.delayBetweenSamples = delayBetweenSamples;
+ return this;
+ }
+
+ /**
+ * Sets {@code delayBetweenSamples}.
+ *
+ * @param maxThreadInfoDepth Limit for the depth of the stack traces
included when sampling
+ * threads.
+ * @return Builder.
+ */
+ public Builder<T> setMaxThreadInfoDepth(int maxThreadInfoDepth) {
+ this.maxThreadInfoDepth = maxThreadInfoDepth;
+ return this;
+ }
+
+ /**
+ * Constructs a new {@link ThreadInfoOperatorTracker}.
+ *
+ * @return a new {@link ThreadInfoOperatorTracker} instance.
+ */
+ public ThreadInfoOperatorTracker<T> build() {
+ return new ThreadInfoOperatorTracker<>(
+ coordinator,
+ resourceManagerGatewayRetriever,
+ createStatsFn,
+ executor,
+ cleanUpInterval,
+ numSamples,
+ statsRefreshInterval,
+ delayBetweenSamples,
+ maxThreadInfoDepth);
+ }
+ }
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ThreadInfoOperatorTracker.class);
+
+ /** Lock guarding trigger operations. */
+ private final Object lock = new Object();
+
+ private final ThreadInfoRequestCoordinator coordinator;
+
+ private final Function<OperatorThreadInfoStats, T> createStatsFn;
+
+ private final ExecutorService executor;
+
+ private final GatewayRetriever<ResourceManagerGateway>
resourceManagerGatewayRetriever;
+
+ /**
+ * Completed stats. Important: Job vertex IDs need to be scoped by job ID,
because they are
+ * potentially constant across runs messing up the cached data.
+ */
+ private final Cache<AccessExecutionJobVertex, T> operatorStatsCache;
+
+ /**
+ * Pending in progress stats. Important: Job vertex IDs need to be scoped
by job ID, because
+ * they are potentially constant across runs messing up the cached data.
+ */
+ private final Set<AccessExecutionJobVertex> pendingStats = new HashSet<>();
+
+ private final int numSamples;
+
+ private final int statsRefreshInterval;
+
+ private final Time delayBetweenSamples;
+
+ private final int maxThreadInfoDepth;
+
+ // Used for testing purposes
+ private final CompletableFuture<Void> resultAvailableFuture = new
CompletableFuture<>();
+
+ /** Flag indicating whether the stats tracker has been shut down. */
+ private boolean shutDown;
+
+ private ThreadInfoOperatorTracker(
+ ThreadInfoRequestCoordinator coordinator,
+ GatewayRetriever<ResourceManagerGateway>
resourceManagerGatewayRetriever,
+ Function<OperatorThreadInfoStats, T> createStatsFn,
+ ExecutorService executor,
+ int cleanUpInterval,
+ int numSamples,
+ int statsRefreshInterval,
+ Time delayBetweenSamples,
+ int maxStackTraceDepth) {
+
+ this.coordinator = checkNotNull(coordinator, "Thread info samples
coordinator");
+ this.resourceManagerGatewayRetriever =
+ checkNotNull(resourceManagerGatewayRetriever, "Gateway
retriever");
+ this.createStatsFn = checkNotNull(createStatsFn, "Create stats
function");
+ this.executor = checkNotNull(executor, "Scheduled executor");
+
+ checkArgument(cleanUpInterval >= 0, "Clean up interval");
+
+ checkArgument(numSamples >= 1, "Number of samples");
+ this.numSamples = numSamples;
+
+ checkArgument(
+ statsRefreshInterval >= 0,
+ "Stats refresh interval must be greater than or equal to 0");
+ this.statsRefreshInterval = statsRefreshInterval;
+
+ this.delayBetweenSamples = checkNotNull(delayBetweenSamples, "Delay
between samples");
+
+ checkArgument(
+ maxStackTraceDepth >= 0,
+ "Max stack trace depth must be greater than or equal to 0");
+ this.maxThreadInfoDepth = maxStackTraceDepth;
+
+ this.operatorStatsCache =
+ CacheBuilder.newBuilder()
+ .concurrencyLevel(1)
+ .expireAfterAccess(cleanUpInterval,
TimeUnit.MILLISECONDS)
+ .build();
+ }
+
+ @Override
+ public Optional<T> getOperatorStats(AccessExecutionJobVertex vertex) {
+ synchronized (lock) {
+ final T stats = operatorStatsCache.getIfPresent(vertex);
+ if (stats == null
+ || System.currentTimeMillis() >= stats.getEndTime() +
statsRefreshInterval) {
+ triggerThreadInfoSampleInternal(vertex);
+ }
+ return Optional.ofNullable(stats);
+ }
+ }
+
+ /**
+ * Triggers a request for an operator to gather the thread info
statistics. If there is a sample
+ * in progress for the operator, the call is ignored.
+ *
+ * @param vertex Operator to get the stats for.
+ */
+ private void triggerThreadInfoSampleInternal(final
AccessExecutionJobVertex vertex) {
+ assert (Thread.holdsLock(lock));
+
+ if (shutDown) {
+ return;
+ }
+
+ if (!pendingStats.contains(vertex)) {
+ pendingStats.add(vertex);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Triggering thread info sample for tasks: "
+ + Arrays.toString(vertex.getTaskVertices()));
+ }
+
+ final AccessExecutionVertex[] executionVertices =
vertex.getTaskVertices();
+ final CompletableFuture<ResourceManagerGateway> gatewayFuture =
+ resourceManagerGatewayRetriever.getFuture();
+
+ CompletableFuture<OperatorThreadInfoStats> sample =
+ gatewayFuture.thenCompose(
+ (ResourceManagerGateway resourceManagerGateway) ->
+ coordinator.triggerThreadInfoRequest(
+ matchExecutionsWithGateways(
+ executionVertices,
resourceManagerGateway),
+ numSamples,
+ delayBetweenSamples,
+ maxThreadInfoDepth));
+
+ sample.handleAsync(new ThreadInfoSampleCompletionCallback(vertex),
executor);
Review comment:
`whenCompleteAsync`
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/ThreadInfoSampleableTask.java
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+/** Task interface used by {@link ThreadInfoSampleService} for thread info
tracking. */
+interface ThreadInfoSampleableTask {
Review comment:
Maybe just `SampleableTask`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinator.java
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.TaskThreadInfoResponse;
+import org.apache.flink.runtime.messages.ThreadInfoSample;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.webmonitor.stats.TaskStatsRequestCoordinator;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A coordinator for triggering and collecting thread info stats of running
operator subtasks. */
+public class ThreadInfoRequestCoordinator
+ extends TaskStatsRequestCoordinator<List<ThreadInfoSample>,
OperatorThreadInfoStats> {
+
+ /**
+ * Creates a new coordinator for the job.
+ *
+ * @param executor Used to execute the futures.
+ * @param requestTimeout Time out after the expected sampling duration.
This is added to the
+ * expected duration of a request, which is determined by the number
of samples and the
+ * delay between each sample.
+ */
+ public ThreadInfoRequestCoordinator(Executor executor, long
requestTimeout) {
+ super(executor, requestTimeout);
+ }
+
+ /**
+ * Triggers collection of thread info stats of an operator by combining
thread info responses
+ * from given subtasks. A thread info response of a subtask in turn
consists of {@code
+ * numSamples}, collected with {@code delayBetweenSamples} milliseconds
delay between them.
+ *
+ * @param subtasksWithGateways Execution vertices together with
TaskExecutors running them.
+ * @param numSamples Number of thread info samples to collect from each
subtask.
+ * @param delayBetweenSamples Delay between consecutive samples (ms).
+ * @param maxStackTraceDepth Maximum depth of the stack traces collected
within thread info
+ * samples.
+ * @return A future of the completed thread info stats.
+ */
+ public CompletableFuture<OperatorThreadInfoStats> triggerThreadInfoRequest(
+ List<Tuple2<AccessExecutionVertex,
CompletableFuture<TaskExecutorGateway>>>
+ subtasksWithGateways,
+ int numSamples,
+ Time delayBetweenSamples,
+ int maxStackTraceDepth) {
+
+ checkNotNull(subtasksWithGateways, "Tasks to sample");
+ checkArgument(subtasksWithGateways.size() > 0, "No tasks to sample");
+ checkArgument(numSamples >= 1, "No number of samples");
+ checkArgument(maxStackTraceDepth >= 0, "Negative maximum stack trace
depth");
+
+ // Execution IDs of running tasks
+ List<ExecutionAttemptID> runningSubtasksIds = new ArrayList<>();
+
+ // Check that all tasks are RUNNING before triggering anything. The
+ // triggering can still fail.
+ for (Tuple2<AccessExecutionVertex,
CompletableFuture<TaskExecutorGateway>>
+ executionsWithGateway : subtasksWithGateways) {
+ AccessExecution execution =
executionsWithGateway.f0.getCurrentExecutionAttempt();
+ if (execution != null && execution.getState() ==
ExecutionState.RUNNING) {
+ runningSubtasksIds.add(execution.getAttemptId());
+ } else {
+ return FutureUtils.completedExceptionally(
+ new IllegalStateException(
+ "Task "
+ +
executionsWithGateway.f0.getTaskNameWithSubtaskIndex()
+ + " is not running."));
+ }
+ }
+
+ synchronized (lock) {
+ if (isShutDown) {
+ return FutureUtils.completedExceptionally(new
IllegalStateException("Shut down"));
+ }
+
+ final int requestId = requestIdCounter++;
+
+ log.debug("Triggering thread info request {}", requestId);
+
+ final PendingThreadInfoRequest pending =
+ new PendingThreadInfoRequest(requestId,
runningSubtasksIds);
+
+ // requestTimeout is treated as the time on top of the expected
sampling duration.
+ // Discard the request if it takes too long. We don't send cancel
+ // messages to the task managers, but only wait for the responses
+ // and then ignore them.
+ long expectedDuration = numSamples *
delayBetweenSamples.toMilliseconds();
+ Time timeout = Time.milliseconds(expectedDuration +
requestTimeout.toMilliseconds());
+
+ // Add the pending request before scheduling the discard task to
+ // prevent races with removing it again.
+ pendingRequests.put(requestId, pending);
+
+ ThreadInfoSamplesRequest requestParams =
+ new ThreadInfoSamplesRequest(
+ requestId, numSamples, delayBetweenSamples,
maxStackTraceDepth);
+
+ requestThreadInfo(subtasksWithGateways, requestParams, timeout);
+
+ return pending.getStatsFuture();
+ }
+ }
+
+ /**
+ * Requests thread infos from given subtasks. The response would be
ignored if it does not
+ * return within timeout.
+ */
+ private void requestThreadInfo(
+ List<Tuple2<AccessExecutionVertex,
CompletableFuture<TaskExecutorGateway>>>
+ subtasksWithGateways,
+ ThreadInfoSamplesRequest requestParams,
+ Time timeout) {
+
+ // Trigger samples collection from all subtasks
+ for (Tuple2<AccessExecutionVertex,
CompletableFuture<TaskExecutorGateway>>
+ executionWithGateway : subtasksWithGateways) {
+
+ CompletableFuture<TaskExecutorGateway> executorGatewayFuture =
executionWithGateway.f1;
+
+ ExecutionAttemptID taskExecutionAttemptId =
+
executionWithGateway.f0.getCurrentExecutionAttempt().getAttemptId();
+
+ CompletableFuture<TaskThreadInfoResponse> threadInfo =
+ executorGatewayFuture.thenCompose(
+ executorGateway ->
+ executorGateway.requestThreadInfoSamples(
+ taskExecutionAttemptId,
requestParams, timeout));
+
+ threadInfo.handleAsync(
Review comment:
whenCompleteAsync?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
##########
@@ -74,6 +74,8 @@
import org.apache.flink.runtime.management.JMXService;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.TaskThreadInfoResponse;
Review comment:
For commit message, I'd reformulate it in a positive way: you need it
for test mocks.
##########
File path: flink-runtime-web/web-dashboard/package.json
##########
@@ -43,11 +45,11 @@
"husky": "^1.3.1",
"jasmine-core": "~2.99.1",
"jasmine-spec-reporter": "~4.2.1",
+ "karma": "~4.0.0",
Review comment:
Reordering probably makes it much harder to track what packages you
changed here.
##########
File path: flink-runtime-web/web-dashboard/package.json
##########
@@ -22,6 +22,8 @@
"@antv/g2": "^3.4.10",
"core-js": "^2.5.4",
"d3": "^5.9.1",
+ "d3-flame-graph": "^4.0.6",
Review comment:
I have no clue what's going on here ;) But it looks good.
One thing that I don't understand at all - why do you update package-lock in
this commit and then in the next commit with a huge diff?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]