AHeise commented on a change in pull request #15054:
URL: https://github.com/apache/flink/pull/15054#discussion_r600455939



##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
##########
@@ -156,6 +156,48 @@
                     
.withDeprecatedKeys("jobmanager.web.backpressure.delay-between-samples")
                     .withDescription("This config option is no longer used");
 
+    /** Time, in milliseconds, after which cached stats are cleaned up if not 
accessed. */

Review comment:
       Without looking at documentation, I have a hard to figure out the 
correlation of all the parameters.
   Naively, I'd expect these settings (coming from VisualVM):
   - Sample frequency/interval (=FLAMEGRAPH_DELAY?)
   - Flame graph window size (=FLAMEGRAPH_NUM_SAMPLES)
   - Flame graph window slide (FLAMEGRAPH_REFRESH_INTERVAL? or 
FLAMEGRAPH_CLEANUP_INTERVAL??)
   
   Then I'd assume that there is a moving average using the sliding window 
semantics of Flink. Maybe you have a way to map these parameters to known 
concepts and calculate the technical parameters from them?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/FlameGraphTypeQueryParameter.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.Arrays;
+
+/** Termination mode query parameter. */
+public class FlameGraphTypeQueryParameter
+        extends MessageQueryParameter<FlameGraphTypeQueryParameter.Type> {
+
+    private static final String key = "type";
+
+    public FlameGraphTypeQueryParameter() {
+        super(key, MessageParameterRequisiteness.OPTIONAL);
+    }
+
+    @Override
+    public Type convertStringToValue(String value) {
+        return Type.valueOf(value.toUpperCase());
+    }
+
+    @Override
+    public String convertValueToString(Type value) {
+        return value.name().toLowerCase();
+    }
+
+    @Override
+    public String getDescription() {
+        return "String value that specifies the Flame Graph type. Supported 
options are: \""
+                + Arrays.toString(Type.values())
+                + "\".";
+    }
+
+    /** Termination mode. */

Review comment:
       c&p error in doc?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexFlameGraphInfo.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.handler.job.JobVertexFlameGraphHandler;
+import org.apache.flink.runtime.webmonitor.threadinfo.OperatorFlameGraph;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+/** Response type of the {@link JobVertexFlameGraphHandler}. */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class JobVertexFlameGraphInfo implements ResponseBody {
+
+    public static JobVertexFlameGraphInfo empty() {
+        return new JobVertexFlameGraphInfo(null, null);
+    }
+
+    private static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
+    private static final String FIELD_NAME_ROOT = "data";
+
+    @JsonProperty(FIELD_NAME_END_TIMESTAMP)
+    private final Long endTimestamp;

Review comment:
       long?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/messages/ThreadInfoSample.java
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import java.io.Serializable;
+import java.lang.management.LockInfo;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+
+/**
+ * A serializable wrapper around {@link java.lang.management.ThreadInfo} that 
excludes {@link
+ * java.lang.management.LockInfo}-based non-serializable fields.
+ */
+public class ThreadInfoSample implements Serializable {
+
+    private final String threadName;
+    private final long threadId;
+    private final long blockedTime;
+    private final long blockedCount;
+    private final long waitedTime;
+    private final long waitedCount;
+    private final String lockName;
+    private final long lockOwnerId;
+    private final String lockOwnerName;
+    private final boolean inNative;
+    private final boolean suspended;
+    private final Thread.State threadState;
+    private final StackTraceElement[] stackTrace;
+
+    private ThreadInfoSample(
+            String threadName,
+            long threadId,
+            long blockedTime,
+            long blockedCount,
+            long waitedTime,
+            long waitedCount,
+            String lockName,
+            long lockOwnerId,
+            String lockOwnerName,
+            boolean inNative,
+            boolean suspended,
+            Thread.State threadState,
+            StackTraceElement[] stackTrace) {
+        this.threadName = threadName;
+        this.threadId = threadId;
+        this.blockedTime = blockedTime;
+        this.blockedCount = blockedCount;
+        this.waitedTime = waitedTime;
+        this.waitedCount = waitedCount;
+        this.lockName = lockName;
+        this.lockOwnerId = lockOwnerId;
+        this.lockOwnerName = lockOwnerName;
+        this.inNative = inNative;
+        this.suspended = suspended;
+        this.threadState = threadState;
+        this.stackTrace = stackTrace;
+    }
+
+    /**
+     * Constructs a {@link ThreadInfoSample} from {@link ThreadInfo}.
+     *
+     * @param threadInfo {@link ThreadInfo} where the data will be copied from.
+     * @return new {@link ThreadInfoSample}
+     */
+    public static ThreadInfoSample from(ThreadInfo threadInfo) {

Review comment:
       Do you really want to support null here and not handle it on call-site? 
If so, add `@Nullable` to param and return type.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexFlameGraphHandler.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.FlameGraphTypeQueryParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexFlameGraphInfo;
+import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.threadinfo.OperatorFlameGraph;
+import 
org.apache.flink.runtime.webmonitor.threadinfo.OperatorFlameGraphFactory;

Review comment:
       This class doesn't exist in this commit yet (improper cut?).
   Each commit should compile and ideally have only green tests (this is not 
always easily possible).

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/OperatorFlameGraphFactory.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.runtime.messages.ThreadInfoSample;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Factory class for creating Flame Graph representations. */
+public class OperatorFlameGraphFactory {
+
+    /**
+     * Converts {@link OperatorThreadInfoStats} into a FlameGraph.
+     *
+     * @param sample Thread details sample containing stack traces.
+     * @return FlameGraph data structure
+     */
+    public static OperatorFlameGraph 
createFullFlameGraphFrom(OperatorThreadInfoStats sample) {
+        Collection<Thread.State> included = 
Arrays.asList(Thread.State.values());
+        return createFlameGraphFromSample(sample, included);
+    }
+
+    /**
+     * Converts {@link OperatorThreadInfoStats} into a FlameGraph representing 
blocked (Off-CPU)
+     * threads.
+     *
+     * <p>Includes threads in states Thread.State.[TIMED_WAITING, 
Thread.State.BLOCKED,
+     * Thread.State.WAITING].
+     *
+     * @param sample Thread details sample containing stack traces.
+     * @return FlameGraph data structure.
+     */
+    public static OperatorFlameGraph 
createOffCpuFlameGraph(OperatorThreadInfoStats sample) {
+        Collection<Thread.State> included =
+                Arrays.asList(
+                        Thread.State.TIMED_WAITING, Thread.State.BLOCKED, 
Thread.State.WAITING);
+        return createFlameGraphFromSample(sample, included);
+    }
+
+    /**
+     * Converts {@link OperatorThreadInfoStats} into a FlameGraph representing 
actively running
+     * (On-CPU) threads.
+     *
+     * <p>Includes threads in states Thread.State.[TIMED_WAITING, 
Thread.State.BLOCKED,
+     * Thread.State.WAITING].
+     *
+     * @param sample Thread details sample containing stack traces.
+     * @return FlameGraph data structure
+     */
+    public static OperatorFlameGraph 
createOnCpuFlameGraph(OperatorThreadInfoStats sample) {
+        Collection<Thread.State> included = 
Arrays.asList(Thread.State.RUNNABLE, Thread.State.NEW);
+        return createFlameGraphFromSample(sample, included);
+    }
+
+    private static OperatorFlameGraph createFlameGraphFromSample(
+            OperatorThreadInfoStats sample, Collection<Thread.State> 
threadStates) {

Review comment:
       -> `Set<Thread.State> threadStates`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/OperatorFlameGraphFactory.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.runtime.messages.ThreadInfoSample;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Factory class for creating Flame Graph representations. */
+public class OperatorFlameGraphFactory {
+
+    /**
+     * Converts {@link OperatorThreadInfoStats} into a FlameGraph.
+     *
+     * @param sample Thread details sample containing stack traces.
+     * @return FlameGraph data structure
+     */
+    public static OperatorFlameGraph 
createFullFlameGraphFrom(OperatorThreadInfoStats sample) {
+        Collection<Thread.State> included = 
Arrays.asList(Thread.State.values());
+        return createFlameGraphFromSample(sample, included);
+    }
+
+    /**
+     * Converts {@link OperatorThreadInfoStats} into a FlameGraph representing 
blocked (Off-CPU)
+     * threads.
+     *
+     * <p>Includes threads in states Thread.State.[TIMED_WAITING, 
Thread.State.BLOCKED,
+     * Thread.State.WAITING].
+     *
+     * @param sample Thread details sample containing stack traces.
+     * @return FlameGraph data structure.
+     */
+    public static OperatorFlameGraph 
createOffCpuFlameGraph(OperatorThreadInfoStats sample) {
+        Collection<Thread.State> included =
+                Arrays.asList(
+                        Thread.State.TIMED_WAITING, Thread.State.BLOCKED, 
Thread.State.WAITING);
+        return createFlameGraphFromSample(sample, included);
+    }
+
+    /**
+     * Converts {@link OperatorThreadInfoStats} into a FlameGraph representing 
actively running
+     * (On-CPU) threads.
+     *
+     * <p>Includes threads in states Thread.State.[TIMED_WAITING, 
Thread.State.BLOCKED,

Review comment:
       C&p error.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/stats/Stats.java
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.stats;
+
+/** Represents one or more statistics samples. */
+public interface Stats {

Review comment:
       Although I have found `stats` in the codebase, it feels as if 
`Statistics` is more common.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/OperatorFlameGraphFactory.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.runtime.messages.ThreadInfoSample;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Factory class for creating Flame Graph representations. */
+public class OperatorFlameGraphFactory {
+
+    /**
+     * Converts {@link OperatorThreadInfoStats} into a FlameGraph.
+     *
+     * @param sample Thread details sample containing stack traces.
+     * @return FlameGraph data structure
+     */
+    public static OperatorFlameGraph 
createFullFlameGraphFrom(OperatorThreadInfoStats sample) {
+        Collection<Thread.State> included = 
Arrays.asList(Thread.State.values());

Review comment:
       Use `EnumSet` here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/stats/OperatorStatsTracker.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.stats;
+
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.util.FlinkException;
+
+import java.util.Optional;
+
+/**
+ * Interface for a tracker of statistics for {@link AccessExecutionJobVertex}.
+ *
+ * @param <T> Type of statistics to track
+ */
+public interface OperatorStatsTracker<T extends Stats> {
+
+    /**
+     * Returns statistics for an operator. Automatically triggers sampling 
request if statistics are
+     * not available or outdated.
+     *
+     * @param vertex Operator to get the stats for.
+     * @return Statistics for an operator
+     */
+    Optional<T> getOperatorStats(AccessExecutionJobVertex vertex);
+
+    /**
+     * Cleans up the operator stats cache if it contains timed out entries.
+     *
+     * <p>The Guava cache only evicts as maintenance during normal operations. 
If this handler is
+     * inactive, it will never be cleaned.

Review comment:
       Impl detail remove here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoOperatorTracker.java
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.stats.OperatorStatsTracker;
+import org.apache.flink.runtime.webmonitor.stats.Stats;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Tracker of thread infos for {@link ExecutionJobVertex}.
+ *
+ * @param <T> Type of the derived statistics to return.
+ */
+public class ThreadInfoOperatorTracker<T extends Stats> implements 
OperatorStatsTracker<T> {
+
+    /**
+     * Create a new {@link Builder}.
+     *
+     * @param createStatsFn Function that converts a thread info sample into a 
derived statistic.
+     *     Could be an identity function.
+     * @param <T> Type of the derived statistics to return.
+     * @return Builder.
+     */
+    public static <T extends Stats> Builder<T> newBuilder(
+            GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
+            Function<OperatorThreadInfoStats, T> createStatsFn,
+            ExecutorService executor) {
+        return new Builder<>(resourceManagerGatewayRetriever, createStatsFn, 
executor);
+    }
+
+    /**
+     * Builder for {@link ThreadInfoOperatorTracker}.
+     *
+     * @param <T> Type of the derived statistics to return.
+     */
+    public static class Builder<T extends Stats> {
+
+        private final GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever;
+        private final Function<OperatorThreadInfoStats, T> createStatsFn;
+        private final ExecutorService executor;
+
+        private ThreadInfoRequestCoordinator coordinator;
+        private int cleanUpInterval;
+        private int numSamples;
+        private int statsRefreshInterval;
+        private Time delayBetweenSamples;
+        private int maxThreadInfoDepth;
+
+        private Builder(
+                GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
+                Function<OperatorThreadInfoStats, T> createStatsFn,
+                ExecutorService executor) {
+            this.resourceManagerGatewayRetriever = 
resourceManagerGatewayRetriever;
+            this.createStatsFn = createStatsFn;
+            this.executor = executor;
+        }
+
+        /**
+         * Sets {@code cleanUpInterval}.
+         *
+         * @param coordinator Coordinator for thread info stats request.
+         * @return Builder.
+         */
+        public Builder<T> setCoordinator(ThreadInfoRequestCoordinator 
coordinator) {
+            this.coordinator = coordinator;
+            return this;
+        }
+
+        /**
+         * Sets {@code cleanUpInterval}.
+         *
+         * @param cleanUpInterval Clean up interval for completed stats.
+         * @return Builder.
+         */
+        public Builder<T> setCleanUpInterval(int cleanUpInterval) {
+            this.cleanUpInterval = cleanUpInterval;
+            return this;
+        }
+
+        /**
+         * Sets {@code numSamples}.
+         *
+         * @param numSamples Number of thread info samples to collect for each 
subtask.
+         * @return Builder.
+         */
+        public Builder<T> setNumSamples(int numSamples) {
+            this.numSamples = numSamples;
+            return this;
+        }
+
+        /**
+         * Sets {@code statsRefreshInterval}.
+         *
+         * @param statsRefreshInterval Time interval after which the available 
thread info stats are
+         *     deprecated and need to be refreshed.
+         * @return Builder.
+         */
+        public Builder<T> setStatsRefreshInterval(int statsRefreshInterval) {
+            this.statsRefreshInterval = statsRefreshInterval;
+            return this;
+        }
+
+        /**
+         * Sets {@code delayBetweenSamples}.
+         *
+         * @param delayBetweenSamples Delay between individual samples per 
task.
+         * @return Builder.
+         */
+        public Builder<T> setDelayBetweenSamples(Time delayBetweenSamples) {
+            this.delayBetweenSamples = delayBetweenSamples;
+            return this;
+        }
+
+        /**
+         * Sets {@code delayBetweenSamples}.
+         *
+         * @param maxThreadInfoDepth Limit for the depth of the stack traces 
included when sampling
+         *     threads.
+         * @return Builder.
+         */
+        public Builder<T> setMaxThreadInfoDepth(int maxThreadInfoDepth) {
+            this.maxThreadInfoDepth = maxThreadInfoDepth;
+            return this;
+        }
+
+        /**
+         * Constructs a new {@link ThreadInfoOperatorTracker}.
+         *
+         * @return a new {@link ThreadInfoOperatorTracker} instance.
+         */
+        public ThreadInfoOperatorTracker<T> build() {
+            return new ThreadInfoOperatorTracker<>(
+                    coordinator,
+                    resourceManagerGatewayRetriever,
+                    createStatsFn,
+                    executor,
+                    cleanUpInterval,
+                    numSamples,
+                    statsRefreshInterval,
+                    delayBetweenSamples,
+                    maxThreadInfoDepth);
+        }
+    }
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ThreadInfoOperatorTracker.class);
+
+    /** Lock guarding trigger operations. */
+    private final Object lock = new Object();
+
+    private final ThreadInfoRequestCoordinator coordinator;
+
+    private final Function<OperatorThreadInfoStats, T> createStatsFn;
+
+    private final ExecutorService executor;
+
+    private final GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever;
+
+    /**
+     * Completed stats. Important: Job vertex IDs need to be scoped by job ID, 
because they are
+     * potentially constant across runs messing up the cached data.
+     */
+    private final Cache<AccessExecutionJobVertex, T> operatorStatsCache;
+
+    /**
+     * Pending in progress stats. Important: Job vertex IDs need to be scoped 
by job ID, because
+     * they are potentially constant across runs messing up the cached data.
+     */
+    private final Set<AccessExecutionJobVertex> pendingStats = new HashSet<>();
+
+    private final int numSamples;
+
+    private final int statsRefreshInterval;
+
+    private final Time delayBetweenSamples;
+
+    private final int maxThreadInfoDepth;
+
+    // Used for testing purposes
+    private final CompletableFuture<Void> resultAvailableFuture = new 
CompletableFuture<>();
+
+    /** Flag indicating whether the stats tracker has been shut down. */
+    private boolean shutDown;
+
+    private ThreadInfoOperatorTracker(
+            ThreadInfoRequestCoordinator coordinator,
+            GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
+            Function<OperatorThreadInfoStats, T> createStatsFn,
+            ExecutorService executor,
+            int cleanUpInterval,
+            int numSamples,
+            int statsRefreshInterval,
+            Time delayBetweenSamples,
+            int maxStackTraceDepth) {
+
+        this.coordinator = checkNotNull(coordinator, "Thread info samples 
coordinator");
+        this.resourceManagerGatewayRetriever =
+                checkNotNull(resourceManagerGatewayRetriever, "Gateway 
retriever");
+        this.createStatsFn = checkNotNull(createStatsFn, "Create stats 
function");
+        this.executor = checkNotNull(executor, "Scheduled executor");
+
+        checkArgument(cleanUpInterval >= 0, "Clean up interval");
+
+        checkArgument(numSamples >= 1, "Number of samples");
+        this.numSamples = numSamples;
+
+        checkArgument(
+                statsRefreshInterval >= 0,
+                "Stats refresh interval must be greater than or equal to 0");
+        this.statsRefreshInterval = statsRefreshInterval;
+
+        this.delayBetweenSamples = checkNotNull(delayBetweenSamples, "Delay 
between samples");
+
+        checkArgument(
+                maxStackTraceDepth >= 0,
+                "Max stack trace depth must be greater than or equal to 0");
+        this.maxThreadInfoDepth = maxStackTraceDepth;
+
+        this.operatorStatsCache =
+                CacheBuilder.newBuilder()
+                        .concurrencyLevel(1)
+                        .expireAfterAccess(cleanUpInterval, 
TimeUnit.MILLISECONDS)
+                        .build();
+    }
+
+    @Override
+    public Optional<T> getOperatorStats(AccessExecutionJobVertex vertex) {
+        synchronized (lock) {
+            final T stats = operatorStatsCache.getIfPresent(vertex);
+            if (stats == null
+                    || System.currentTimeMillis() >= stats.getEndTime() + 
statsRefreshInterval) {

Review comment:
       Is this assuming that clocks are synchronized across machines?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/stats/TaskStatsRequestCoordinator.java
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.stats;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayDeque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Encapsulates the common functionality for requesting statistics from 
individual tasks and
+ * combining their responses.
+ *
+ * @param <T> Type of the statistics to be gathered.
+ * @param <V> Type of the combined response.
+ */
+public class TaskStatsRequestCoordinator<T, V> {
+    protected final Logger log = LoggerFactory.getLogger(getClass());
+
+    protected final int numGhostSampleIds = 10;
+
+    protected final Object lock = new Object();
+
+    /** Executor used to run the futures. */
+    protected final Executor executor;
+
+    /** Request time out of a triggered task stats request. */
+    protected final Time requestTimeout;
+
+    /** In progress samples. */
+    @GuardedBy("lock")
+    protected final Map<Integer, PendingStatsRequest<T, V>> pendingRequests = 
new HashMap<>();
+
+    /** A list of recent request IDs to identify late messages vs. invalid 
ones. */
+    protected final ArrayDeque<Integer> recentPendingRequestIds =
+            new ArrayDeque<>(numGhostSampleIds);
+
+    /** Sample ID counter. */
+    @GuardedBy("lock")
+    protected int requestIdCounter;
+
+    /** Flag indicating whether the coordinator is still running. */
+    @GuardedBy("lock")
+    protected boolean isShutDown;
+
+    /**
+     * Creates a new coordinator for the cluster.
+     *
+     * @param executor Used to execute the futures.
+     * @param requestTimeout Request time out of a triggered task stats 
request.
+     */
+    public TaskStatsRequestCoordinator(Executor executor, long requestTimeout) 
{
+        checkArgument(requestTimeout >= 0L, "The request timeout must be 
non-negative.");
+        this.executor = Preconditions.checkNotNull(executor);
+        this.requestTimeout = Time.milliseconds(requestTimeout);
+    }
+
+    /**
+     * Handles the failed stats response by canceling the corresponding 
unfinished pending request.
+     *
+     * @param requestId ID of the request to cancel.
+     * @param cause Cause of the cancelling (can be <code>null</code>).
+     */
+    public void handleFailedResponse(int requestId, @Nullable Throwable cause) 
{
+        synchronized (lock) {
+            if (isShutDown) {
+                return;
+            }
+
+            PendingStatsRequest<T, V> pendingRequest = 
pendingRequests.remove(requestId);
+            if (pendingRequest != null) {
+                if (cause != null) {
+                    log.info("Cancelling request " + requestId, cause);
+                } else {
+                    log.info("Cancelling request {}", requestId);
+                }
+
+                pendingRequest.discard(cause);
+                rememberRecentRequestId(requestId);
+            }
+        }
+    }
+
+    /**
+     * Shuts down the coordinator.
+     *
+     * <p>After shut down, no further operations are executed.
+     */
+    public void shutDown() {
+        synchronized (lock) {
+            if (!isShutDown) {
+                log.info("Shutting down task stats request coordinator.");
+
+                for (PendingStatsRequest<T, V> pending : 
pendingRequests.values()) {
+                    pending.discard(new RuntimeException("Shut down"));
+                }
+
+                pendingRequests.clear();
+                recentPendingRequestIds.clear();
+
+                isShutDown = true;
+            }
+        }
+    }
+
+    /**
+     * Handles the successfully returned task stats response by collecting the 
corresponding subtask
+     * samples.
+     *
+     * @param requestId ID of the request.
+     * @param executionId ID of the sampled task.
+     * @param result Result of stats request returned by an individual task.
+     * @throws IllegalStateException If unknown request ID and not recently 
finished or cancelled
+     *     sample.
+     */
+    public void handleSuccessfulResponse(int requestId, ExecutionAttemptID 
executionId, T result) {
+
+        synchronized (lock) {
+            if (isShutDown) {
+                return;
+            }
+
+            if (log.isDebugEnabled()) {
+                log.debug("Collecting stats sample {} of task {}", requestId, 
executionId);
+            }
+
+            PendingStatsRequest<T, V> pending = pendingRequests.get(requestId);
+
+            if (pending != null) {
+                pending.collectTaskStats(executionId, result);
+
+                // Publish the sample
+                if (pending.isComplete()) {
+                    pendingRequests.remove(requestId);
+                    rememberRecentRequestId(requestId);
+
+                    pending.completePromiseAndDiscard();
+                }
+            } else if (recentPendingRequestIds.contains(requestId)) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Received late stats sample {} of task {}", 
requestId, executionId);
+                }
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Unknown request ID %d.", 
requestId));
+                }
+            }
+        }
+    }
+
+    private void rememberRecentRequestId(int sampleId) {
+        if (recentPendingRequestIds.size() >= numGhostSampleIds) {
+            recentPendingRequestIds.removeFirst();
+        }
+        recentPendingRequestIds.addLast(sampleId);
+    }
+
+    @VisibleForTesting
+    public int getNumberOfPendingRequests() {
+        synchronized (lock) {
+            return pendingRequests.size();
+        }
+    }
+
+    // ------------------------------------------------------------------------
+
+    /**
+     * A pending task stats request, which collects samples from individual 
tasks and completes the
+     * response future upon gathering all of of them.
+     *
+     * <p>Has to be accessed in lock scope.
+     *
+     * @param <T> Type of the result collected from tasks.
+     * @param <V> Type of the result assembled and returned when all tasks 
where sampled.
+     */
+    protected abstract static class PendingStatsRequest<T, V> {

Review comment:
       Move to top-level? I'm finding nested abstracts odd if you do not 
include nested implementations.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/stats/TaskStatsRequestCoordinator.java
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.stats;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayDeque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Encapsulates the common functionality for requesting statistics from 
individual tasks and
+ * combining their responses.
+ *
+ * @param <T> Type of the statistics to be gathered.
+ * @param <V> Type of the combined response.
+ */
+public class TaskStatsRequestCoordinator<T, V> {
+    protected final Logger log = LoggerFactory.getLogger(getClass());

Review comment:
       We typically use a static LOG and I don't see a reason why it shouldn't 
be used here.

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
##########
@@ -156,6 +156,48 @@
                     
.withDeprecatedKeys("jobmanager.web.backpressure.delay-between-samples")
                     .withDescription("This config option is no longer used");
 
+    /** Time, in milliseconds, after which cached stats are cleaned up if not 
accessed. */
+    public static final ConfigOption<Integer> FLAMEGRAPH_CLEANUP_INTERVAL =

Review comment:
       I'd prefer duration for all timebased configurations.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/ThreadInfoSampleService.java
##########
@@ -0,0 +1,107 @@
+/*

Review comment:
       For such helper commits, it's always good to state when they are used. 
The `ThreadInfoSampleService` is later injected into ... to ....

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexFlameGraphHandler.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.FlameGraphTypeQueryParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexFlameGraphInfo;
+import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.threadinfo.OperatorFlameGraph;
+import 
org.apache.flink.runtime.webmonitor.threadinfo.OperatorFlameGraphFactory;
+import org.apache.flink.runtime.webmonitor.threadinfo.OperatorThreadInfoStats;
+import 
org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoOperatorTracker;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executor;
+
+/** Request handler for the job vertex Flame Graph. */
+public class JobVertexFlameGraphHandler
+        extends AbstractJobVertexHandler<JobVertexFlameGraphInfo, 
JobVertexMessageParameters> {
+
+    private final ThreadInfoOperatorTracker<OperatorThreadInfoStats> 
threadInfoOperatorTracker;
+
+    private static JobVertexFlameGraphInfo createJobVertexFlameGraphInfo(
+            OperatorFlameGraph flameGraph) {
+        return new JobVertexFlameGraphInfo(flameGraph.getEndTime(), 
flameGraph.getRoot());
+    }
+
+    public JobVertexFlameGraphHandler(
+            GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+            Time timeout,
+            Map<String, String> responseHeaders,
+            MessageHeaders<EmptyRequestBody, JobVertexFlameGraphInfo, 
JobVertexMessageParameters>
+                    messageHeaders,
+            ExecutionGraphCache executionGraphCache,
+            Executor executor,
+            ThreadInfoOperatorTracker<OperatorThreadInfoStats> 
threadInfoOperatorTracker) {
+        super(
+                leaderRetriever,
+                timeout,
+                responseHeaders,
+                messageHeaders,
+                executionGraphCache,
+                executor);
+        this.threadInfoOperatorTracker = threadInfoOperatorTracker;
+    }
+
+    @Override
+    protected JobVertexFlameGraphInfo handleRequest(
+            @Nonnull HandlerRequest<EmptyRequestBody, 
JobVertexMessageParameters> request,
+            @Nonnull AccessExecutionJobVertex jobVertex)
+            throws RestHandlerException {
+
+        final Optional<OperatorThreadInfoStats> threadInfoSample =
+                threadInfoOperatorTracker.getOperatorStats(jobVertex);
+
+        final List<FlameGraphTypeQueryParameter.Type> flameGraphTypeParameter =
+                request.getQueryParameter(FlameGraphTypeQueryParameter.class);
+        final FlameGraphTypeQueryParameter.Type flameGraphType;
+
+        if (flameGraphTypeParameter.isEmpty()) {
+            flameGraphType = FlameGraphTypeQueryParameter.Type.FULL;
+        } else {
+            flameGraphType = flameGraphTypeParameter.get(0);
+        }
+
+        final Optional<OperatorFlameGraph> operatorFlameGraph;
+
+        switch (flameGraphType) {
+            case FULL:
+                operatorFlameGraph =

Review comment:
       You could embed this mapping into the 
`FlameGraphTypeQueryParameter.Type` enum = e.g. making them somewhat 
intelligent.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/messages/TaskThreadInfoResponse.java
##########
@@ -0,0 +1,79 @@
+/*

Review comment:
       Merge this commit into the commit that actually uses the data structures.
   Alternatively, find a more descriptive commit message and add some 
explanation to the commit why these structures are needed.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/FlameGraphTypeQueryParameter.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.Arrays;
+
+/** Termination mode query parameter. */

Review comment:
       c&p error in doc?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
##########
@@ -166,6 +166,34 @@ public void testRequestTaskManagerInfo() throws Exception {
         assertEquals(0, taskManagerInfo.getNumberAvailableSlots());
     }
 
+    /**

Review comment:
       Tests should be part of the respective main commits. You want each 
commit to stand on its own feet (that's not always strictly possible).

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
##########
@@ -156,6 +156,48 @@
                     
.withDeprecatedKeys("jobmanager.web.backpressure.delay-between-samples")
                     .withDescription("This config option is no longer used");
 
+    /** Time, in milliseconds, after which cached stats are cleaned up if not 
accessed. */

Review comment:
       Component of commit message should be [rest].

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/messages/ThreadInfoSample.java
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import java.io.Serializable;
+import java.lang.management.LockInfo;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+
+/**
+ * A serializable wrapper around {@link java.lang.management.ThreadInfo} that 
excludes {@link

Review comment:
       The javadoc on the fields is copied from Java right? I'm not sure if 
this allowed... @zentol could you please take a look?
   
   Alternatively, link to the specific javadoc part of ThreadInfo?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/OperatorFlameGraphFactory.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.runtime.messages.ThreadInfoSample;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Factory class for creating Flame Graph representations. */
+public class OperatorFlameGraphFactory {
+
+    /**
+     * Converts {@link OperatorThreadInfoStats} into a FlameGraph.
+     *
+     * @param sample Thread details sample containing stack traces.
+     * @return FlameGraph data structure
+     */
+    public static OperatorFlameGraph 
createFullFlameGraphFrom(OperatorThreadInfoStats sample) {
+        Collection<Thread.State> included = 
Arrays.asList(Thread.State.values());
+        return createFlameGraphFromSample(sample, included);
+    }
+
+    /**
+     * Converts {@link OperatorThreadInfoStats} into a FlameGraph representing 
blocked (Off-CPU)
+     * threads.
+     *
+     * <p>Includes threads in states Thread.State.[TIMED_WAITING, 
Thread.State.BLOCKED,

Review comment:
       Formatting of enums.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoSamplesRequest.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.api.common.time.Time;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A wrapper for parameters of a thread info sampling request. */
+public class ThreadInfoSamplesRequest {
+    private final int requestId;
+    private final int numSubSamples;
+    private final Time delayBetweenSamples;
+    private final int maxStackTraceDepth;
+
+    /**
+     * @param requestId ID of the sampling request.
+     * @param numSamples The number of samples.
+     * @param delayBetweenSamples The time to wait between taking samples.
+     * @param maxStackTraceDepth The maximum depth of the returned stack 
traces.
+     */
+    public ThreadInfoSamplesRequest(
+            int requestId,
+            @Nonnegative int numSamples,
+            @Nonnull Time delayBetweenSamples,

Review comment:
       We usually only annotate `@Nullable`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/OperatorFlameGraph.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.runtime.webmonitor.stats.Stats;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Flame Graph representation for an operator.
+ *
+ * <p>Statistics are gathered by sampling stack traces of running tasks.
+ */
+public class OperatorFlameGraph implements Serializable, Stats {
+
+    /** Graph node. */
+    public static class Node {
+
+        private final String name;
+        private final int value;

Review comment:
       hitCount or numberOfSamples?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/OperatorFlameGraphFactory.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.runtime.messages.ThreadInfoSample;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Factory class for creating Flame Graph representations. */
+public class OperatorFlameGraphFactory {
+
+    /**
+     * Converts {@link OperatorThreadInfoStats} into a FlameGraph.
+     *
+     * @param sample Thread details sample containing stack traces.
+     * @return FlameGraph data structure
+     */
+    public static OperatorFlameGraph 
createFullFlameGraphFrom(OperatorThreadInfoStats sample) {
+        Collection<Thread.State> included = 
Arrays.asList(Thread.State.values());
+        return createFlameGraphFromSample(sample, included);
+    }
+
+    /**
+     * Converts {@link OperatorThreadInfoStats} into a FlameGraph representing 
blocked (Off-CPU)
+     * threads.
+     *
+     * <p>Includes threads in states Thread.State.[TIMED_WAITING, 
Thread.State.BLOCKED,
+     * Thread.State.WAITING].
+     *
+     * @param sample Thread details sample containing stack traces.
+     * @return FlameGraph data structure.
+     */
+    public static OperatorFlameGraph 
createOffCpuFlameGraph(OperatorThreadInfoStats sample) {
+        Collection<Thread.State> included =
+                Arrays.asList(
+                        Thread.State.TIMED_WAITING, Thread.State.BLOCKED, 
Thread.State.WAITING);
+        return createFlameGraphFromSample(sample, included);
+    }
+
+    /**
+     * Converts {@link OperatorThreadInfoStats} into a FlameGraph representing 
actively running
+     * (On-CPU) threads.
+     *
+     * <p>Includes threads in states Thread.State.[TIMED_WAITING, 
Thread.State.BLOCKED,
+     * Thread.State.WAITING].
+     *
+     * @param sample Thread details sample containing stack traces.
+     * @return FlameGraph data structure
+     */
+    public static OperatorFlameGraph 
createOnCpuFlameGraph(OperatorThreadInfoStats sample) {
+        Collection<Thread.State> included = 
Arrays.asList(Thread.State.RUNNABLE, Thread.State.NEW);
+        return createFlameGraphFromSample(sample, included);
+    }
+
+    private static OperatorFlameGraph createFlameGraphFromSample(
+            OperatorThreadInfoStats sample, Collection<Thread.State> 
threadStates) {
+        final NodeBuilder root = new NodeBuilder("root");
+        for (List<ThreadInfoSample> threadInfoSubSamples : 
sample.getSamplesBySubtask().values()) {
+            for (ThreadInfoSample threadInfo : threadInfoSubSamples) {
+                if (threadStates.contains(threadInfo.getThreadState())) {
+                    StackTraceElement[] traces = threadInfo.getStackTrace();
+                    root.increment();
+                    NodeBuilder parent = root;
+                    for (int i = traces.length - 1; i >= 0; i--) {
+                        final String name =
+                                traces[i].getClassName()
+                                        + "."
+                                        + traces[i].getMethodName()
+                                        + ":"
+                                        + traces[i].getLineNumber();
+                        parent = parent.addChild(name);
+                    }
+                }
+            }
+        }
+        return new OperatorFlameGraph(sample.getEndTime(), 
buildFlameGraph(root));
+    }
+
+    private static OperatorFlameGraph.Node buildFlameGraph(NodeBuilder 
builder) {
+        final List<OperatorFlameGraph.Node> children = new ArrayList<>();
+        for (NodeBuilder builderChild : builder.children.values()) {
+            children.add(buildFlameGraph(builderChild));
+        }
+        return new OperatorFlameGraph.Node(
+                builder.name, builder.value, 
Collections.unmodifiableList(children));
+    }
+
+    private static class NodeBuilder {
+
+        private final Map<String, NodeBuilder> children = new HashMap<>();
+
+        private final String name;
+
+        private int value = 0;

Review comment:
       hitCount or numberOfSamples?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/OperatorFlameGraphFactory.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.runtime.messages.ThreadInfoSample;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Factory class for creating Flame Graph representations. */
+public class OperatorFlameGraphFactory {
+
+    /**
+     * Converts {@link OperatorThreadInfoStats} into a FlameGraph.
+     *
+     * @param sample Thread details sample containing stack traces.
+     * @return FlameGraph data structure
+     */
+    public static OperatorFlameGraph 
createFullFlameGraphFrom(OperatorThreadInfoStats sample) {
+        Collection<Thread.State> included = 
Arrays.asList(Thread.State.values());
+        return createFlameGraphFromSample(sample, included);
+    }
+
+    /**
+     * Converts {@link OperatorThreadInfoStats} into a FlameGraph representing 
blocked (Off-CPU)
+     * threads.
+     *
+     * <p>Includes threads in states Thread.State.[TIMED_WAITING, 
Thread.State.BLOCKED,
+     * Thread.State.WAITING].
+     *
+     * @param sample Thread details sample containing stack traces.
+     * @return FlameGraph data structure.
+     */
+    public static OperatorFlameGraph 
createOffCpuFlameGraph(OperatorThreadInfoStats sample) {
+        Collection<Thread.State> included =
+                Arrays.asList(
+                        Thread.State.TIMED_WAITING, Thread.State.BLOCKED, 
Thread.State.WAITING);
+        return createFlameGraphFromSample(sample, included);
+    }
+
+    /**
+     * Converts {@link OperatorThreadInfoStats} into a FlameGraph representing 
actively running
+     * (On-CPU) threads.
+     *
+     * <p>Includes threads in states Thread.State.[TIMED_WAITING, 
Thread.State.BLOCKED,
+     * Thread.State.WAITING].
+     *
+     * @param sample Thread details sample containing stack traces.
+     * @return FlameGraph data structure
+     */
+    public static OperatorFlameGraph 
createOnCpuFlameGraph(OperatorThreadInfoStats sample) {
+        Collection<Thread.State> included = 
Arrays.asList(Thread.State.RUNNABLE, Thread.State.NEW);
+        return createFlameGraphFromSample(sample, included);
+    }
+
+    private static OperatorFlameGraph createFlameGraphFromSample(
+            OperatorThreadInfoStats sample, Collection<Thread.State> 
threadStates) {
+        final NodeBuilder root = new NodeBuilder("root");
+        for (List<ThreadInfoSample> threadInfoSubSamples : 
sample.getSamplesBySubtask().values()) {
+            for (ThreadInfoSample threadInfo : threadInfoSubSamples) {
+                if (threadStates.contains(threadInfo.getThreadState())) {
+                    StackTraceElement[] traces = threadInfo.getStackTrace();
+                    root.increment();
+                    NodeBuilder parent = root;
+                    for (int i = traces.length - 1; i >= 0; i--) {
+                        final String name =
+                                traces[i].getClassName()
+                                        + "."
+                                        + traces[i].getMethodName()
+                                        + ":"
+                                        + traces[i].getLineNumber();
+                        parent = parent.addChild(name);
+                    }
+                }
+            }
+        }
+        return new OperatorFlameGraph(sample.getEndTime(), 
buildFlameGraph(root));
+    }
+
+    private static OperatorFlameGraph.Node buildFlameGraph(NodeBuilder 
builder) {
+        final List<OperatorFlameGraph.Node> children = new ArrayList<>();
+        for (NodeBuilder builderChild : builder.children.values()) {
+            children.add(buildFlameGraph(builderChild));
+        }
+        return new OperatorFlameGraph.Node(
+                builder.name, builder.value, 
Collections.unmodifiableList(children));
+    }

Review comment:
       This should be an instance method of `NodeBuilder`, maybe `toNode`? 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/OperatorThreadInfoStats.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.ThreadInfoSample;
+import org.apache.flink.runtime.webmonitor.stats.Stats;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Tnread info statistics of multiple tasks. Each subtask can deliver multiple 
samples for

Review comment:
       typo

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoOperatorTracker.java
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.stats.OperatorStatsTracker;
+import org.apache.flink.runtime.webmonitor.stats.Stats;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Tracker of thread infos for {@link ExecutionJobVertex}.
+ *
+ * @param <T> Type of the derived statistics to return.
+ */
+public class ThreadInfoOperatorTracker<T extends Stats> implements 
OperatorStatsTracker<T> {
+
+    /**
+     * Create a new {@link Builder}.
+     *
+     * @param createStatsFn Function that converts a thread info sample into a 
derived statistic.
+     *     Could be an identity function.
+     * @param <T> Type of the derived statistics to return.
+     * @return Builder.
+     */
+    public static <T extends Stats> Builder<T> newBuilder(
+            GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
+            Function<OperatorThreadInfoStats, T> createStatsFn,
+            ExecutorService executor) {
+        return new Builder<>(resourceManagerGatewayRetriever, createStatsFn, 
executor);
+    }
+
+    /**
+     * Builder for {@link ThreadInfoOperatorTracker}.
+     *
+     * @param <T> Type of the derived statistics to return.
+     */
+    public static class Builder<T extends Stats> {
+
+        private final GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever;
+        private final Function<OperatorThreadInfoStats, T> createStatsFn;
+        private final ExecutorService executor;
+
+        private ThreadInfoRequestCoordinator coordinator;
+        private int cleanUpInterval;
+        private int numSamples;
+        private int statsRefreshInterval;
+        private Time delayBetweenSamples;
+        private int maxThreadInfoDepth;
+
+        private Builder(
+                GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
+                Function<OperatorThreadInfoStats, T> createStatsFn,
+                ExecutorService executor) {
+            this.resourceManagerGatewayRetriever = 
resourceManagerGatewayRetriever;
+            this.createStatsFn = createStatsFn;
+            this.executor = executor;
+        }
+
+        /**
+         * Sets {@code cleanUpInterval}.
+         *
+         * @param coordinator Coordinator for thread info stats request.
+         * @return Builder.
+         */
+        public Builder<T> setCoordinator(ThreadInfoRequestCoordinator 
coordinator) {
+            this.coordinator = coordinator;
+            return this;
+        }
+
+        /**
+         * Sets {@code cleanUpInterval}.
+         *
+         * @param cleanUpInterval Clean up interval for completed stats.
+         * @return Builder.
+         */
+        public Builder<T> setCleanUpInterval(int cleanUpInterval) {
+            this.cleanUpInterval = cleanUpInterval;
+            return this;
+        }
+
+        /**
+         * Sets {@code numSamples}.
+         *
+         * @param numSamples Number of thread info samples to collect for each 
subtask.
+         * @return Builder.
+         */
+        public Builder<T> setNumSamples(int numSamples) {
+            this.numSamples = numSamples;
+            return this;
+        }
+
+        /**
+         * Sets {@code statsRefreshInterval}.
+         *
+         * @param statsRefreshInterval Time interval after which the available 
thread info stats are
+         *     deprecated and need to be refreshed.
+         * @return Builder.
+         */
+        public Builder<T> setStatsRefreshInterval(int statsRefreshInterval) {
+            this.statsRefreshInterval = statsRefreshInterval;
+            return this;
+        }
+
+        /**
+         * Sets {@code delayBetweenSamples}.
+         *
+         * @param delayBetweenSamples Delay between individual samples per 
task.
+         * @return Builder.
+         */
+        public Builder<T> setDelayBetweenSamples(Time delayBetweenSamples) {
+            this.delayBetweenSamples = delayBetweenSamples;
+            return this;
+        }
+
+        /**
+         * Sets {@code delayBetweenSamples}.
+         *
+         * @param maxThreadInfoDepth Limit for the depth of the stack traces 
included when sampling
+         *     threads.
+         * @return Builder.
+         */
+        public Builder<T> setMaxThreadInfoDepth(int maxThreadInfoDepth) {
+            this.maxThreadInfoDepth = maxThreadInfoDepth;
+            return this;
+        }
+
+        /**
+         * Constructs a new {@link ThreadInfoOperatorTracker}.
+         *
+         * @return a new {@link ThreadInfoOperatorTracker} instance.
+         */
+        public ThreadInfoOperatorTracker<T> build() {
+            return new ThreadInfoOperatorTracker<>(
+                    coordinator,
+                    resourceManagerGatewayRetriever,
+                    createStatsFn,
+                    executor,
+                    cleanUpInterval,
+                    numSamples,
+                    statsRefreshInterval,
+                    delayBetweenSamples,
+                    maxThreadInfoDepth);
+        }
+    }
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ThreadInfoOperatorTracker.class);
+
+    /** Lock guarding trigger operations. */
+    private final Object lock = new Object();
+
+    private final ThreadInfoRequestCoordinator coordinator;
+
+    private final Function<OperatorThreadInfoStats, T> createStatsFn;
+
+    private final ExecutorService executor;
+
+    private final GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever;
+
+    /**
+     * Completed stats. Important: Job vertex IDs need to be scoped by job ID, 
because they are
+     * potentially constant across runs messing up the cached data.
+     */
+    private final Cache<AccessExecutionJobVertex, T> operatorStatsCache;
+
+    /**
+     * Pending in progress stats. Important: Job vertex IDs need to be scoped 
by job ID, because
+     * they are potentially constant across runs messing up the cached data.
+     */
+    private final Set<AccessExecutionJobVertex> pendingStats = new HashSet<>();
+
+    private final int numSamples;
+
+    private final int statsRefreshInterval;
+
+    private final Time delayBetweenSamples;
+
+    private final int maxThreadInfoDepth;
+
+    // Used for testing purposes
+    private final CompletableFuture<Void> resultAvailableFuture = new 
CompletableFuture<>();
+
+    /** Flag indicating whether the stats tracker has been shut down. */
+    private boolean shutDown;

Review comment:
       Please add `@GuardedBy` where appropriate.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
##########
@@ -834,6 +834,19 @@ public void notifySlotAvailable(
         }
     }
 
+    @Override

Review comment:
       Commit should be merged to the place where you use it.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/OperatorFlameGraphFactory.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.runtime.messages.ThreadInfoSample;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Factory class for creating Flame Graph representations. */
+public class OperatorFlameGraphFactory {
+
+    /**
+     * Converts {@link OperatorThreadInfoStats} into a FlameGraph.
+     *
+     * @param sample Thread details sample containing stack traces.
+     * @return FlameGraph data structure
+     */
+    public static OperatorFlameGraph 
createFullFlameGraphFrom(OperatorThreadInfoStats sample) {
+        Collection<Thread.State> included = 
Arrays.asList(Thread.State.values());
+        return createFlameGraphFromSample(sample, included);
+    }
+
+    /**
+     * Converts {@link OperatorThreadInfoStats} into a FlameGraph representing 
blocked (Off-CPU)
+     * threads.
+     *
+     * <p>Includes threads in states Thread.State.[TIMED_WAITING, 
Thread.State.BLOCKED,
+     * Thread.State.WAITING].
+     *
+     * @param sample Thread details sample containing stack traces.
+     * @return FlameGraph data structure.
+     */
+    public static OperatorFlameGraph 
createOffCpuFlameGraph(OperatorThreadInfoStats sample) {
+        Collection<Thread.State> included =
+                Arrays.asList(
+                        Thread.State.TIMED_WAITING, Thread.State.BLOCKED, 
Thread.State.WAITING);
+        return createFlameGraphFromSample(sample, included);
+    }
+
+    /**
+     * Converts {@link OperatorThreadInfoStats} into a FlameGraph representing 
actively running
+     * (On-CPU) threads.
+     *
+     * <p>Includes threads in states Thread.State.[TIMED_WAITING, 
Thread.State.BLOCKED,
+     * Thread.State.WAITING].
+     *
+     * @param sample Thread details sample containing stack traces.
+     * @return FlameGraph data structure
+     */
+    public static OperatorFlameGraph 
createOnCpuFlameGraph(OperatorThreadInfoStats sample) {
+        Collection<Thread.State> included = 
Arrays.asList(Thread.State.RUNNABLE, Thread.State.NEW);
+        return createFlameGraphFromSample(sample, included);
+    }
+
+    private static OperatorFlameGraph createFlameGraphFromSample(
+            OperatorThreadInfoStats sample, Collection<Thread.State> 
threadStates) {
+        final NodeBuilder root = new NodeBuilder("root");
+        for (List<ThreadInfoSample> threadInfoSubSamples : 
sample.getSamplesBySubtask().values()) {
+            for (ThreadInfoSample threadInfo : threadInfoSubSamples) {
+                if (threadStates.contains(threadInfo.getThreadState())) {
+                    StackTraceElement[] traces = threadInfo.getStackTrace();
+                    root.increment();
+                    NodeBuilder parent = root;
+                    for (int i = traces.length - 1; i >= 0; i--) {
+                        final String name =
+                                traces[i].getClassName()
+                                        + "."
+                                        + traces[i].getMethodName()
+                                        + ":"
+                                        + traces[i].getLineNumber();
+                        parent = parent.addChild(name);
+                    }
+                }
+            }
+        }
+        return new OperatorFlameGraph(sample.getEndTime(), 
buildFlameGraph(root));
+    }
+
+    private static OperatorFlameGraph.Node buildFlameGraph(NodeBuilder 
builder) {
+        final List<OperatorFlameGraph.Node> children = new ArrayList<>();

Review comment:
       Preinit size?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoOperatorTracker.java
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.stats.OperatorStatsTracker;
+import org.apache.flink.runtime.webmonitor.stats.Stats;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Tracker of thread infos for {@link ExecutionJobVertex}.
+ *
+ * @param <T> Type of the derived statistics to return.
+ */
+public class ThreadInfoOperatorTracker<T extends Stats> implements 
OperatorStatsTracker<T> {
+
+    /**
+     * Create a new {@link Builder}.
+     *
+     * @param createStatsFn Function that converts a thread info sample into a 
derived statistic.
+     *     Could be an identity function.
+     * @param <T> Type of the derived statistics to return.
+     * @return Builder.
+     */
+    public static <T extends Stats> Builder<T> newBuilder(
+            GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
+            Function<OperatorThreadInfoStats, T> createStatsFn,
+            ExecutorService executor) {
+        return new Builder<>(resourceManagerGatewayRetriever, createStatsFn, 
executor);
+    }
+
+    /**
+     * Builder for {@link ThreadInfoOperatorTracker}.
+     *
+     * @param <T> Type of the derived statistics to return.
+     */
+    public static class Builder<T extends Stats> {
+
+        private final GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever;
+        private final Function<OperatorThreadInfoStats, T> createStatsFn;
+        private final ExecutorService executor;
+
+        private ThreadInfoRequestCoordinator coordinator;
+        private int cleanUpInterval;
+        private int numSamples;
+        private int statsRefreshInterval;
+        private Time delayBetweenSamples;
+        private int maxThreadInfoDepth;
+
+        private Builder(
+                GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
+                Function<OperatorThreadInfoStats, T> createStatsFn,
+                ExecutorService executor) {
+            this.resourceManagerGatewayRetriever = 
resourceManagerGatewayRetriever;
+            this.createStatsFn = createStatsFn;
+            this.executor = executor;
+        }
+
+        /**
+         * Sets {@code cleanUpInterval}.
+         *
+         * @param coordinator Coordinator for thread info stats request.
+         * @return Builder.
+         */
+        public Builder<T> setCoordinator(ThreadInfoRequestCoordinator 
coordinator) {
+            this.coordinator = coordinator;
+            return this;
+        }
+
+        /**
+         * Sets {@code cleanUpInterval}.
+         *
+         * @param cleanUpInterval Clean up interval for completed stats.
+         * @return Builder.
+         */
+        public Builder<T> setCleanUpInterval(int cleanUpInterval) {
+            this.cleanUpInterval = cleanUpInterval;
+            return this;
+        }
+
+        /**
+         * Sets {@code numSamples}.
+         *
+         * @param numSamples Number of thread info samples to collect for each 
subtask.
+         * @return Builder.
+         */
+        public Builder<T> setNumSamples(int numSamples) {
+            this.numSamples = numSamples;
+            return this;
+        }
+
+        /**
+         * Sets {@code statsRefreshInterval}.
+         *
+         * @param statsRefreshInterval Time interval after which the available 
thread info stats are
+         *     deprecated and need to be refreshed.
+         * @return Builder.
+         */
+        public Builder<T> setStatsRefreshInterval(int statsRefreshInterval) {
+            this.statsRefreshInterval = statsRefreshInterval;
+            return this;
+        }
+
+        /**
+         * Sets {@code delayBetweenSamples}.
+         *
+         * @param delayBetweenSamples Delay between individual samples per 
task.
+         * @return Builder.
+         */
+        public Builder<T> setDelayBetweenSamples(Time delayBetweenSamples) {
+            this.delayBetweenSamples = delayBetweenSamples;
+            return this;
+        }
+
+        /**
+         * Sets {@code delayBetweenSamples}.
+         *
+         * @param maxThreadInfoDepth Limit for the depth of the stack traces 
included when sampling
+         *     threads.
+         * @return Builder.
+         */
+        public Builder<T> setMaxThreadInfoDepth(int maxThreadInfoDepth) {
+            this.maxThreadInfoDepth = maxThreadInfoDepth;
+            return this;
+        }
+
+        /**
+         * Constructs a new {@link ThreadInfoOperatorTracker}.
+         *
+         * @return a new {@link ThreadInfoOperatorTracker} instance.
+         */
+        public ThreadInfoOperatorTracker<T> build() {
+            return new ThreadInfoOperatorTracker<>(
+                    coordinator,
+                    resourceManagerGatewayRetriever,
+                    createStatsFn,
+                    executor,
+                    cleanUpInterval,
+                    numSamples,
+                    statsRefreshInterval,
+                    delayBetweenSamples,
+                    maxThreadInfoDepth);
+        }
+    }
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ThreadInfoOperatorTracker.class);
+
+    /** Lock guarding trigger operations. */
+    private final Object lock = new Object();
+
+    private final ThreadInfoRequestCoordinator coordinator;
+
+    private final Function<OperatorThreadInfoStats, T> createStatsFn;
+
+    private final ExecutorService executor;
+
+    private final GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever;
+
+    /**
+     * Completed stats. Important: Job vertex IDs need to be scoped by job ID, 
because they are
+     * potentially constant across runs messing up the cached data.
+     */
+    private final Cache<AccessExecutionJobVertex, T> operatorStatsCache;
+
+    /**
+     * Pending in progress stats. Important: Job vertex IDs need to be scoped 
by job ID, because
+     * they are potentially constant across runs messing up the cached data.
+     */
+    private final Set<AccessExecutionJobVertex> pendingStats = new HashSet<>();
+
+    private final int numSamples;
+
+    private final int statsRefreshInterval;
+
+    private final Time delayBetweenSamples;
+
+    private final int maxThreadInfoDepth;
+
+    // Used for testing purposes
+    private final CompletableFuture<Void> resultAvailableFuture = new 
CompletableFuture<>();
+
+    /** Flag indicating whether the stats tracker has been shut down. */
+    private boolean shutDown;
+
+    private ThreadInfoOperatorTracker(
+            ThreadInfoRequestCoordinator coordinator,
+            GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
+            Function<OperatorThreadInfoStats, T> createStatsFn,
+            ExecutorService executor,
+            int cleanUpInterval,
+            int numSamples,
+            int statsRefreshInterval,
+            Time delayBetweenSamples,
+            int maxStackTraceDepth) {
+
+        this.coordinator = checkNotNull(coordinator, "Thread info samples 
coordinator");
+        this.resourceManagerGatewayRetriever =
+                checkNotNull(resourceManagerGatewayRetriever, "Gateway 
retriever");
+        this.createStatsFn = checkNotNull(createStatsFn, "Create stats 
function");
+        this.executor = checkNotNull(executor, "Scheduled executor");
+
+        checkArgument(cleanUpInterval >= 0, "Clean up interval");

Review comment:
       What's interval 0 supposed to do? Ditto on later parameters.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoOperatorTracker.java
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.stats.OperatorStatsTracker;
+import org.apache.flink.runtime.webmonitor.stats.Stats;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Tracker of thread infos for {@link ExecutionJobVertex}.
+ *
+ * @param <T> Type of the derived statistics to return.
+ */
+public class ThreadInfoOperatorTracker<T extends Stats> implements 
OperatorStatsTracker<T> {
+
+    /**
+     * Create a new {@link Builder}.
+     *
+     * @param createStatsFn Function that converts a thread info sample into a 
derived statistic.
+     *     Could be an identity function.
+     * @param <T> Type of the derived statistics to return.
+     * @return Builder.
+     */
+    public static <T extends Stats> Builder<T> newBuilder(
+            GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
+            Function<OperatorThreadInfoStats, T> createStatsFn,
+            ExecutorService executor) {
+        return new Builder<>(resourceManagerGatewayRetriever, createStatsFn, 
executor);
+    }
+
+    /**
+     * Builder for {@link ThreadInfoOperatorTracker}.
+     *
+     * @param <T> Type of the derived statistics to return.
+     */
+    public static class Builder<T extends Stats> {
+
+        private final GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever;
+        private final Function<OperatorThreadInfoStats, T> createStatsFn;
+        private final ExecutorService executor;
+
+        private ThreadInfoRequestCoordinator coordinator;
+        private int cleanUpInterval;
+        private int numSamples;
+        private int statsRefreshInterval;
+        private Time delayBetweenSamples;
+        private int maxThreadInfoDepth;
+
+        private Builder(
+                GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
+                Function<OperatorThreadInfoStats, T> createStatsFn,
+                ExecutorService executor) {
+            this.resourceManagerGatewayRetriever = 
resourceManagerGatewayRetriever;
+            this.createStatsFn = createStatsFn;
+            this.executor = executor;
+        }
+
+        /**
+         * Sets {@code cleanUpInterval}.
+         *
+         * @param coordinator Coordinator for thread info stats request.
+         * @return Builder.
+         */
+        public Builder<T> setCoordinator(ThreadInfoRequestCoordinator 
coordinator) {
+            this.coordinator = coordinator;
+            return this;
+        }
+
+        /**
+         * Sets {@code cleanUpInterval}.
+         *
+         * @param cleanUpInterval Clean up interval for completed stats.
+         * @return Builder.
+         */
+        public Builder<T> setCleanUpInterval(int cleanUpInterval) {
+            this.cleanUpInterval = cleanUpInterval;
+            return this;
+        }
+
+        /**
+         * Sets {@code numSamples}.
+         *
+         * @param numSamples Number of thread info samples to collect for each 
subtask.
+         * @return Builder.
+         */
+        public Builder<T> setNumSamples(int numSamples) {
+            this.numSamples = numSamples;
+            return this;
+        }
+
+        /**
+         * Sets {@code statsRefreshInterval}.
+         *
+         * @param statsRefreshInterval Time interval after which the available 
thread info stats are
+         *     deprecated and need to be refreshed.
+         * @return Builder.
+         */
+        public Builder<T> setStatsRefreshInterval(int statsRefreshInterval) {
+            this.statsRefreshInterval = statsRefreshInterval;
+            return this;
+        }
+
+        /**
+         * Sets {@code delayBetweenSamples}.
+         *
+         * @param delayBetweenSamples Delay between individual samples per 
task.
+         * @return Builder.
+         */
+        public Builder<T> setDelayBetweenSamples(Time delayBetweenSamples) {
+            this.delayBetweenSamples = delayBetweenSamples;
+            return this;
+        }
+
+        /**
+         * Sets {@code delayBetweenSamples}.
+         *
+         * @param maxThreadInfoDepth Limit for the depth of the stack traces 
included when sampling
+         *     threads.
+         * @return Builder.
+         */
+        public Builder<T> setMaxThreadInfoDepth(int maxThreadInfoDepth) {
+            this.maxThreadInfoDepth = maxThreadInfoDepth;
+            return this;
+        }
+
+        /**
+         * Constructs a new {@link ThreadInfoOperatorTracker}.
+         *
+         * @return a new {@link ThreadInfoOperatorTracker} instance.
+         */
+        public ThreadInfoOperatorTracker<T> build() {
+            return new ThreadInfoOperatorTracker<>(
+                    coordinator,
+                    resourceManagerGatewayRetriever,
+                    createStatsFn,
+                    executor,
+                    cleanUpInterval,
+                    numSamples,
+                    statsRefreshInterval,
+                    delayBetweenSamples,
+                    maxThreadInfoDepth);
+        }
+    }
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ThreadInfoOperatorTracker.class);
+
+    /** Lock guarding trigger operations. */
+    private final Object lock = new Object();
+
+    private final ThreadInfoRequestCoordinator coordinator;
+
+    private final Function<OperatorThreadInfoStats, T> createStatsFn;
+
+    private final ExecutorService executor;
+
+    private final GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever;
+
+    /**
+     * Completed stats. Important: Job vertex IDs need to be scoped by job ID, 
because they are
+     * potentially constant across runs messing up the cached data.
+     */
+    private final Cache<AccessExecutionJobVertex, T> operatorStatsCache;

Review comment:
       What is the actual purpose of the cache? How often do you access cached 
data? What is the flow?
   
   Is the cache hit, when a user looks at the flame graph of a job and switches 
back and forth of a task? Or is it hit in a more fine-grain level? Do you have 
an auto-fresh feature in the UI that needs the cache for updates? Couldn't we 
use async requests instead? (Note that these questions should not result in any 
changes to this PR but are more meant for a future refactoring)

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/stats/TaskStatsRequestCoordinator.java
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.stats;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayDeque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Encapsulates the common functionality for requesting statistics from 
individual tasks and
+ * combining their responses.
+ *
+ * @param <T> Type of the statistics to be gathered.
+ * @param <V> Type of the combined response.
+ */
+public class TaskStatsRequestCoordinator<T, V> {
+    protected final Logger log = LoggerFactory.getLogger(getClass());
+
+    protected final int numGhostSampleIds = 10;

Review comment:
       Convert to constant? What is this about?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/stats/TaskStatsRequestCoordinator.java
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.stats;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayDeque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Encapsulates the common functionality for requesting statistics from 
individual tasks and
+ * combining their responses.
+ *
+ * @param <T> Type of the statistics to be gathered.
+ * @param <V> Type of the combined response.
+ */
+public class TaskStatsRequestCoordinator<T, V> {
+    protected final Logger log = LoggerFactory.getLogger(getClass());
+
+    protected final int numGhostSampleIds = 10;
+
+    protected final Object lock = new Object();
+
+    /** Executor used to run the futures. */
+    protected final Executor executor;
+
+    /** Request time out of a triggered task stats request. */
+    protected final Time requestTimeout;
+
+    /** In progress samples. */
+    @GuardedBy("lock")
+    protected final Map<Integer, PendingStatsRequest<T, V>> pendingRequests = 
new HashMap<>();
+
+    /** A list of recent request IDs to identify late messages vs. invalid 
ones. */
+    protected final ArrayDeque<Integer> recentPendingRequestIds =
+            new ArrayDeque<>(numGhostSampleIds);
+
+    /** Sample ID counter. */
+    @GuardedBy("lock")
+    protected int requestIdCounter;
+
+    /** Flag indicating whether the coordinator is still running. */
+    @GuardedBy("lock")
+    protected boolean isShutDown;
+
+    /**
+     * Creates a new coordinator for the cluster.
+     *
+     * @param executor Used to execute the futures.
+     * @param requestTimeout Request time out of a triggered task stats 
request.
+     */
+    public TaskStatsRequestCoordinator(Executor executor, long requestTimeout) 
{
+        checkArgument(requestTimeout >= 0L, "The request timeout must be 
non-negative.");

Review comment:
       What's the semantic of timeout = 0?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinator.java
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.TaskThreadInfoResponse;
+import org.apache.flink.runtime.messages.ThreadInfoSample;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.webmonitor.stats.TaskStatsRequestCoordinator;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A coordinator for triggering and collecting thread info stats of running 
operator subtasks. */
+public class ThreadInfoRequestCoordinator
+        extends TaskStatsRequestCoordinator<List<ThreadInfoSample>, 
OperatorThreadInfoStats> {
+
+    /**
+     * Creates a new coordinator for the job.
+     *
+     * @param executor Used to execute the futures.
+     * @param requestTimeout Time out after the expected sampling duration. 
This is added to the
+     *     expected duration of a request, which is determined by the number 
of samples and the
+     *     delay between each sample.
+     */
+    public ThreadInfoRequestCoordinator(Executor executor, long 
requestTimeout) {
+        super(executor, requestTimeout);
+    }
+
+    /**
+     * Triggers collection of thread info stats of an operator by combining 
thread info responses
+     * from given subtasks. A thread info response of a subtask in turn 
consists of {@code
+     * numSamples}, collected with {@code delayBetweenSamples} milliseconds 
delay between them.
+     *
+     * @param subtasksWithGateways Execution vertices together with 
TaskExecutors running them.
+     * @param numSamples Number of thread info samples to collect from each 
subtask.
+     * @param delayBetweenSamples Delay between consecutive samples (ms).
+     * @param maxStackTraceDepth Maximum depth of the stack traces collected 
within thread info
+     *     samples.
+     * @return A future of the completed thread info stats.
+     */
+    public CompletableFuture<OperatorThreadInfoStats> triggerThreadInfoRequest(
+            List<Tuple2<AccessExecutionVertex, 
CompletableFuture<TaskExecutorGateway>>>

Review comment:
       This parameter type is inducing nightmares ;) Maybe add a small record 
class?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/stats/TaskStatsRequestCoordinator.java
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.stats;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayDeque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Encapsulates the common functionality for requesting statistics from 
individual tasks and
+ * combining their responses.
+ *
+ * @param <T> Type of the statistics to be gathered.
+ * @param <V> Type of the combined response.
+ */
+public class TaskStatsRequestCoordinator<T, V> {
+    protected final Logger log = LoggerFactory.getLogger(getClass());
+
+    protected final int numGhostSampleIds = 10;
+
+    protected final Object lock = new Object();
+
+    /** Executor used to run the futures. */
+    protected final Executor executor;
+
+    /** Request time out of a triggered task stats request. */
+    protected final Time requestTimeout;
+
+    /** In progress samples. */
+    @GuardedBy("lock")
+    protected final Map<Integer, PendingStatsRequest<T, V>> pendingRequests = 
new HashMap<>();
+
+    /** A list of recent request IDs to identify late messages vs. invalid 
ones. */
+    protected final ArrayDeque<Integer> recentPendingRequestIds =
+            new ArrayDeque<>(numGhostSampleIds);
+
+    /** Sample ID counter. */
+    @GuardedBy("lock")
+    protected int requestIdCounter;
+
+    /** Flag indicating whether the coordinator is still running. */
+    @GuardedBy("lock")
+    protected boolean isShutDown;
+
+    /**
+     * Creates a new coordinator for the cluster.
+     *
+     * @param executor Used to execute the futures.
+     * @param requestTimeout Request time out of a triggered task stats 
request.
+     */
+    public TaskStatsRequestCoordinator(Executor executor, long requestTimeout) 
{
+        checkArgument(requestTimeout >= 0L, "The request timeout must be 
non-negative.");
+        this.executor = Preconditions.checkNotNull(executor);
+        this.requestTimeout = Time.milliseconds(requestTimeout);
+    }
+
+    /**
+     * Handles the failed stats response by canceling the corresponding 
unfinished pending request.
+     *
+     * @param requestId ID of the request to cancel.
+     * @param cause Cause of the cancelling (can be <code>null</code>).
+     */
+    public void handleFailedResponse(int requestId, @Nullable Throwable cause) 
{
+        synchronized (lock) {
+            if (isShutDown) {
+                return;
+            }
+
+            PendingStatsRequest<T, V> pendingRequest = 
pendingRequests.remove(requestId);
+            if (pendingRequest != null) {
+                if (cause != null) {
+                    log.info("Cancelling request " + requestId, cause);
+                } else {
+                    log.info("Cancelling request {}", requestId);
+                }
+
+                pendingRequest.discard(cause);
+                rememberRecentRequestId(requestId);
+            }
+        }
+    }
+
+    /**
+     * Shuts down the coordinator.
+     *
+     * <p>After shut down, no further operations are executed.
+     */
+    public void shutDown() {
+        synchronized (lock) {
+            if (!isShutDown) {
+                log.info("Shutting down task stats request coordinator.");
+
+                for (PendingStatsRequest<T, V> pending : 
pendingRequests.values()) {
+                    pending.discard(new RuntimeException("Shut down"));
+                }
+
+                pendingRequests.clear();
+                recentPendingRequestIds.clear();
+
+                isShutDown = true;
+            }
+        }
+    }
+
+    /**
+     * Handles the successfully returned task stats response by collecting the 
corresponding subtask
+     * samples.
+     *
+     * @param requestId ID of the request.
+     * @param executionId ID of the sampled task.
+     * @param result Result of stats request returned by an individual task.
+     * @throws IllegalStateException If unknown request ID and not recently 
finished or cancelled
+     *     sample.
+     */
+    public void handleSuccessfulResponse(int requestId, ExecutionAttemptID 
executionId, T result) {
+
+        synchronized (lock) {
+            if (isShutDown) {
+                return;
+            }
+
+            if (log.isDebugEnabled()) {
+                log.debug("Collecting stats sample {} of task {}", requestId, 
executionId);
+            }
+
+            PendingStatsRequest<T, V> pending = pendingRequests.get(requestId);
+
+            if (pending != null) {
+                pending.collectTaskStats(executionId, result);
+
+                // Publish the sample
+                if (pending.isComplete()) {
+                    pendingRequests.remove(requestId);
+                    rememberRecentRequestId(requestId);
+
+                    pending.completePromiseAndDiscard();
+                }
+            } else if (recentPendingRequestIds.contains(requestId)) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Received late stats sample {} of task {}", 
requestId, executionId);
+                }
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Unknown request ID %d.", 
requestId));
+                }
+            }
+        }
+    }
+
+    private void rememberRecentRequestId(int sampleId) {
+        if (recentPendingRequestIds.size() >= numGhostSampleIds) {
+            recentPendingRequestIds.removeFirst();
+        }
+        recentPendingRequestIds.addLast(sampleId);
+    }
+
+    @VisibleForTesting
+    public int getNumberOfPendingRequests() {
+        synchronized (lock) {
+            return pendingRequests.size();
+        }
+    }
+
+    // ------------------------------------------------------------------------
+
+    /**
+     * A pending task stats request, which collects samples from individual 
tasks and completes the
+     * response future upon gathering all of of them.
+     *
+     * <p>Has to be accessed in lock scope.

Review comment:
       Might be easier to add `@NotThreadSafe`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoOperatorTracker.java
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.stats.OperatorStatsTracker;
+import org.apache.flink.runtime.webmonitor.stats.Stats;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Tracker of thread infos for {@link ExecutionJobVertex}.
+ *
+ * @param <T> Type of the derived statistics to return.
+ */
+public class ThreadInfoOperatorTracker<T extends Stats> implements 
OperatorStatsTracker<T> {
+
+    /**
+     * Create a new {@link Builder}.
+     *
+     * @param createStatsFn Function that converts a thread info sample into a 
derived statistic.
+     *     Could be an identity function.
+     * @param <T> Type of the derived statistics to return.
+     * @return Builder.
+     */
+    public static <T extends Stats> Builder<T> newBuilder(
+            GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
+            Function<OperatorThreadInfoStats, T> createStatsFn,
+            ExecutorService executor) {
+        return new Builder<>(resourceManagerGatewayRetriever, createStatsFn, 
executor);
+    }
+
+    /**
+     * Builder for {@link ThreadInfoOperatorTracker}.
+     *
+     * @param <T> Type of the derived statistics to return.
+     */
+    public static class Builder<T extends Stats> {
+
+        private final GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever;
+        private final Function<OperatorThreadInfoStats, T> createStatsFn;
+        private final ExecutorService executor;
+
+        private ThreadInfoRequestCoordinator coordinator;
+        private int cleanUpInterval;
+        private int numSamples;
+        private int statsRefreshInterval;
+        private Time delayBetweenSamples;
+        private int maxThreadInfoDepth;
+
+        private Builder(
+                GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
+                Function<OperatorThreadInfoStats, T> createStatsFn,
+                ExecutorService executor) {
+            this.resourceManagerGatewayRetriever = 
resourceManagerGatewayRetriever;
+            this.createStatsFn = createStatsFn;
+            this.executor = executor;
+        }
+
+        /**
+         * Sets {@code cleanUpInterval}.
+         *
+         * @param coordinator Coordinator for thread info stats request.
+         * @return Builder.
+         */
+        public Builder<T> setCoordinator(ThreadInfoRequestCoordinator 
coordinator) {
+            this.coordinator = coordinator;
+            return this;
+        }
+
+        /**
+         * Sets {@code cleanUpInterval}.
+         *
+         * @param cleanUpInterval Clean up interval for completed stats.
+         * @return Builder.
+         */
+        public Builder<T> setCleanUpInterval(int cleanUpInterval) {
+            this.cleanUpInterval = cleanUpInterval;
+            return this;
+        }
+
+        /**
+         * Sets {@code numSamples}.
+         *
+         * @param numSamples Number of thread info samples to collect for each 
subtask.
+         * @return Builder.
+         */
+        public Builder<T> setNumSamples(int numSamples) {
+            this.numSamples = numSamples;
+            return this;
+        }
+
+        /**
+         * Sets {@code statsRefreshInterval}.
+         *
+         * @param statsRefreshInterval Time interval after which the available 
thread info stats are
+         *     deprecated and need to be refreshed.
+         * @return Builder.
+         */
+        public Builder<T> setStatsRefreshInterval(int statsRefreshInterval) {
+            this.statsRefreshInterval = statsRefreshInterval;
+            return this;
+        }
+
+        /**
+         * Sets {@code delayBetweenSamples}.
+         *
+         * @param delayBetweenSamples Delay between individual samples per 
task.
+         * @return Builder.
+         */
+        public Builder<T> setDelayBetweenSamples(Time delayBetweenSamples) {
+            this.delayBetweenSamples = delayBetweenSamples;
+            return this;
+        }
+
+        /**
+         * Sets {@code delayBetweenSamples}.
+         *
+         * @param maxThreadInfoDepth Limit for the depth of the stack traces 
included when sampling
+         *     threads.
+         * @return Builder.
+         */
+        public Builder<T> setMaxThreadInfoDepth(int maxThreadInfoDepth) {
+            this.maxThreadInfoDepth = maxThreadInfoDepth;
+            return this;
+        }
+
+        /**
+         * Constructs a new {@link ThreadInfoOperatorTracker}.
+         *
+         * @return a new {@link ThreadInfoOperatorTracker} instance.
+         */
+        public ThreadInfoOperatorTracker<T> build() {
+            return new ThreadInfoOperatorTracker<>(
+                    coordinator,
+                    resourceManagerGatewayRetriever,
+                    createStatsFn,
+                    executor,
+                    cleanUpInterval,
+                    numSamples,
+                    statsRefreshInterval,
+                    delayBetweenSamples,
+                    maxThreadInfoDepth);
+        }
+    }
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ThreadInfoOperatorTracker.class);
+
+    /** Lock guarding trigger operations. */
+    private final Object lock = new Object();
+
+    private final ThreadInfoRequestCoordinator coordinator;
+
+    private final Function<OperatorThreadInfoStats, T> createStatsFn;
+
+    private final ExecutorService executor;
+
+    private final GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever;
+
+    /**
+     * Completed stats. Important: Job vertex IDs need to be scoped by job ID, 
because they are
+     * potentially constant across runs messing up the cached data.
+     */
+    private final Cache<AccessExecutionJobVertex, T> operatorStatsCache;
+
+    /**
+     * Pending in progress stats. Important: Job vertex IDs need to be scoped 
by job ID, because
+     * they are potentially constant across runs messing up the cached data.
+     */
+    private final Set<AccessExecutionJobVertex> pendingStats = new HashSet<>();
+
+    private final int numSamples;
+
+    private final int statsRefreshInterval;
+
+    private final Time delayBetweenSamples;
+
+    private final int maxThreadInfoDepth;
+
+    // Used for testing purposes
+    private final CompletableFuture<Void> resultAvailableFuture = new 
CompletableFuture<>();
+
+    /** Flag indicating whether the stats tracker has been shut down. */
+    private boolean shutDown;
+
+    private ThreadInfoOperatorTracker(
+            ThreadInfoRequestCoordinator coordinator,
+            GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
+            Function<OperatorThreadInfoStats, T> createStatsFn,
+            ExecutorService executor,
+            int cleanUpInterval,
+            int numSamples,
+            int statsRefreshInterval,
+            Time delayBetweenSamples,
+            int maxStackTraceDepth) {
+
+        this.coordinator = checkNotNull(coordinator, "Thread info samples 
coordinator");
+        this.resourceManagerGatewayRetriever =
+                checkNotNull(resourceManagerGatewayRetriever, "Gateway 
retriever");
+        this.createStatsFn = checkNotNull(createStatsFn, "Create stats 
function");
+        this.executor = checkNotNull(executor, "Scheduled executor");
+
+        checkArgument(cleanUpInterval >= 0, "Clean up interval");
+
+        checkArgument(numSamples >= 1, "Number of samples");
+        this.numSamples = numSamples;
+
+        checkArgument(
+                statsRefreshInterval >= 0,
+                "Stats refresh interval must be greater than or equal to 0");
+        this.statsRefreshInterval = statsRefreshInterval;
+
+        this.delayBetweenSamples = checkNotNull(delayBetweenSamples, "Delay 
between samples");
+
+        checkArgument(
+                maxStackTraceDepth >= 0,
+                "Max stack trace depth must be greater than or equal to 0");
+        this.maxThreadInfoDepth = maxStackTraceDepth;
+
+        this.operatorStatsCache =
+                CacheBuilder.newBuilder()
+                        .concurrencyLevel(1)
+                        .expireAfterAccess(cleanUpInterval, 
TimeUnit.MILLISECONDS)
+                        .build();
+    }
+
+    @Override
+    public Optional<T> getOperatorStats(AccessExecutionJobVertex vertex) {
+        synchronized (lock) {
+            final T stats = operatorStatsCache.getIfPresent(vertex);
+            if (stats == null
+                    || System.currentTimeMillis() >= stats.getEndTime() + 
statsRefreshInterval) {
+                triggerThreadInfoSampleInternal(vertex);
+            }
+            return Optional.ofNullable(stats);
+        }
+    }
+
+    /**
+     * Triggers a request for an operator to gather the thread info 
statistics. If there is a sample
+     * in progress for the operator, the call is ignored.
+     *
+     * @param vertex Operator to get the stats for.
+     */
+    private void triggerThreadInfoSampleInternal(final 
AccessExecutionJobVertex vertex) {
+        assert (Thread.holdsLock(lock));
+
+        if (shutDown) {
+            return;
+        }
+
+        if (!pendingStats.contains(vertex)) {
+            pendingStats.add(vertex);
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(
+                        "Triggering thread info sample for tasks: "
+                                + Arrays.toString(vertex.getTaskVertices()));
+            }
+
+            final AccessExecutionVertex[] executionVertices = 
vertex.getTaskVertices();
+            final CompletableFuture<ResourceManagerGateway> gatewayFuture =
+                    resourceManagerGatewayRetriever.getFuture();
+
+            CompletableFuture<OperatorThreadInfoStats> sample =
+                    gatewayFuture.thenCompose(
+                            (ResourceManagerGateway resourceManagerGateway) ->
+                                    coordinator.triggerThreadInfoRequest(
+                                            matchExecutionsWithGateways(
+                                                    executionVertices, 
resourceManagerGateway),
+                                            numSamples,
+                                            delayBetweenSamples,
+                                            maxThreadInfoDepth));
+
+            sample.handleAsync(new ThreadInfoSampleCompletionCallback(vertex), 
executor);

Review comment:
       `whenCompleteAsync`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/ThreadInfoSampleableTask.java
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+/** Task interface used by {@link ThreadInfoSampleService} for thread info 
tracking. */
+interface ThreadInfoSampleableTask {

Review comment:
       Maybe just `SampleableTask`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinator.java
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.TaskThreadInfoResponse;
+import org.apache.flink.runtime.messages.ThreadInfoSample;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.webmonitor.stats.TaskStatsRequestCoordinator;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A coordinator for triggering and collecting thread info stats of running 
operator subtasks. */
+public class ThreadInfoRequestCoordinator
+        extends TaskStatsRequestCoordinator<List<ThreadInfoSample>, 
OperatorThreadInfoStats> {
+
+    /**
+     * Creates a new coordinator for the job.
+     *
+     * @param executor Used to execute the futures.
+     * @param requestTimeout Time out after the expected sampling duration. 
This is added to the
+     *     expected duration of a request, which is determined by the number 
of samples and the
+     *     delay between each sample.
+     */
+    public ThreadInfoRequestCoordinator(Executor executor, long 
requestTimeout) {
+        super(executor, requestTimeout);
+    }
+
+    /**
+     * Triggers collection of thread info stats of an operator by combining 
thread info responses
+     * from given subtasks. A thread info response of a subtask in turn 
consists of {@code
+     * numSamples}, collected with {@code delayBetweenSamples} milliseconds 
delay between them.
+     *
+     * @param subtasksWithGateways Execution vertices together with 
TaskExecutors running them.
+     * @param numSamples Number of thread info samples to collect from each 
subtask.
+     * @param delayBetweenSamples Delay between consecutive samples (ms).
+     * @param maxStackTraceDepth Maximum depth of the stack traces collected 
within thread info
+     *     samples.
+     * @return A future of the completed thread info stats.
+     */
+    public CompletableFuture<OperatorThreadInfoStats> triggerThreadInfoRequest(
+            List<Tuple2<AccessExecutionVertex, 
CompletableFuture<TaskExecutorGateway>>>
+                    subtasksWithGateways,
+            int numSamples,
+            Time delayBetweenSamples,
+            int maxStackTraceDepth) {
+
+        checkNotNull(subtasksWithGateways, "Tasks to sample");
+        checkArgument(subtasksWithGateways.size() > 0, "No tasks to sample");
+        checkArgument(numSamples >= 1, "No number of samples");
+        checkArgument(maxStackTraceDepth >= 0, "Negative maximum stack trace 
depth");
+
+        // Execution IDs of running tasks
+        List<ExecutionAttemptID> runningSubtasksIds = new ArrayList<>();
+
+        // Check that all tasks are RUNNING before triggering anything. The
+        // triggering can still fail.
+        for (Tuple2<AccessExecutionVertex, 
CompletableFuture<TaskExecutorGateway>>
+                executionsWithGateway : subtasksWithGateways) {
+            AccessExecution execution = 
executionsWithGateway.f0.getCurrentExecutionAttempt();
+            if (execution != null && execution.getState() == 
ExecutionState.RUNNING) {
+                runningSubtasksIds.add(execution.getAttemptId());
+            } else {
+                return FutureUtils.completedExceptionally(
+                        new IllegalStateException(
+                                "Task "
+                                        + 
executionsWithGateway.f0.getTaskNameWithSubtaskIndex()
+                                        + " is not running."));
+            }
+        }
+
+        synchronized (lock) {
+            if (isShutDown) {
+                return FutureUtils.completedExceptionally(new 
IllegalStateException("Shut down"));
+            }
+
+            final int requestId = requestIdCounter++;
+
+            log.debug("Triggering thread info request {}", requestId);
+
+            final PendingThreadInfoRequest pending =
+                    new PendingThreadInfoRequest(requestId, 
runningSubtasksIds);
+
+            // requestTimeout is treated as the time on top of the expected 
sampling duration.
+            // Discard the request if it takes too long. We don't send cancel
+            // messages to the task managers, but only wait for the responses
+            // and then ignore them.
+            long expectedDuration = numSamples * 
delayBetweenSamples.toMilliseconds();
+            Time timeout = Time.milliseconds(expectedDuration + 
requestTimeout.toMilliseconds());
+
+            // Add the pending request before scheduling the discard task to
+            // prevent races with removing it again.
+            pendingRequests.put(requestId, pending);
+
+            ThreadInfoSamplesRequest requestParams =
+                    new ThreadInfoSamplesRequest(
+                            requestId, numSamples, delayBetweenSamples, 
maxStackTraceDepth);
+
+            requestThreadInfo(subtasksWithGateways, requestParams, timeout);
+
+            return pending.getStatsFuture();
+        }
+    }
+
+    /**
+     * Requests thread infos from given subtasks. The response would be 
ignored if it does not
+     * return within timeout.
+     */
+    private void requestThreadInfo(
+            List<Tuple2<AccessExecutionVertex, 
CompletableFuture<TaskExecutorGateway>>>
+                    subtasksWithGateways,
+            ThreadInfoSamplesRequest requestParams,
+            Time timeout) {
+
+        // Trigger samples collection from all subtasks
+        for (Tuple2<AccessExecutionVertex, 
CompletableFuture<TaskExecutorGateway>>
+                executionWithGateway : subtasksWithGateways) {
+
+            CompletableFuture<TaskExecutorGateway> executorGatewayFuture = 
executionWithGateway.f1;
+
+            ExecutionAttemptID taskExecutionAttemptId =
+                    
executionWithGateway.f0.getCurrentExecutionAttempt().getAttemptId();
+
+            CompletableFuture<TaskThreadInfoResponse> threadInfo =
+                    executorGatewayFuture.thenCompose(
+                            executorGateway ->
+                                    executorGateway.requestThreadInfoSamples(
+                                            taskExecutionAttemptId, 
requestParams, timeout));
+
+            threadInfo.handleAsync(

Review comment:
       whenCompleteAsync?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
##########
@@ -74,6 +74,8 @@
 import org.apache.flink.runtime.management.JMXService;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.TaskThreadInfoResponse;

Review comment:
       For commit message, I'd reformulate it in a positive way: you need it 
for test mocks.

##########
File path: flink-runtime-web/web-dashboard/package.json
##########
@@ -43,11 +45,11 @@
     "husky": "^1.3.1",
     "jasmine-core": "~2.99.1",
     "jasmine-spec-reporter": "~4.2.1",
+    "karma": "~4.0.0",

Review comment:
       Reordering probably makes it much harder to track what packages you 
changed here.

##########
File path: flink-runtime-web/web-dashboard/package.json
##########
@@ -22,6 +22,8 @@
     "@antv/g2": "^3.4.10",
     "core-js": "^2.5.4",
     "d3": "^5.9.1",
+    "d3-flame-graph": "^4.0.6",

Review comment:
       I have no clue what's going on here ;) But it looks good.
   
   One thing that I don't understand at all - why do you update package-lock in 
this commit and then in the next commit with a huge diff?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to