zentol commented on a change in pull request #15054:
URL: https://github.com/apache/flink/pull/15054#discussion_r603178228



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
##########
@@ -310,6 +315,9 @@ public TaskExecutor(
                 createJobManagerHeartbeatManager(heartbeatServices, 
resourceId);
         this.resourceManagerHeartbeatManager =
                 createResourceManagerHeartbeatManager(heartbeatServices, 
resourceId);
+
+        this.threadInfoSampleService =
+                new ThreadInfoSampleService(rpcService.getScheduledExecutor());

Review comment:
       The use of the rpcService executors is discouraged; this method will be 
removed at some point because it is too difficult to control what is run with 
it. There should be a dedicated executor for the sampling service.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/stats/OperatorStatsTracker.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.stats;
+
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.util.FlinkException;
+
+import java.util.Optional;
+
+/**
+ * Interface for a tracker of statistics for {@link AccessExecutionJobVertex}.
+ *
+ * @param <T> Type of statistics to track
+ */
+public interface OperatorStatsTracker<T extends Statistics> {

Review comment:
       The operator naming seems a bit off; we aren't sampling individual 
operators but tasks? This is also relevant for the commit messages / JIRA 
ticket title and javadocs of course.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/messages/ThreadInfoSample.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import com.sun.istack.Nullable;

Review comment:
       replace with another annotation

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexFlameGraphInfo.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.handler.job.JobVertexFlameGraphHandler;
+import org.apache.flink.runtime.webmonitor.threadinfo.OperatorFlameGraph;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+/** Response type of the {@link JobVertexFlameGraphHandler}. */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class JobVertexFlameGraphInfo implements ResponseBody {
+
+    public static JobVertexFlameGraphInfo empty() {
+        return new JobVertexFlameGraphInfo(-1, null);
+    }
+
+    private static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";

Review comment:
       ```suggestion
       private static final String FIELD_NAME_END_TIMESTAMP = "endTimestamp";
   ```
   We are trying to phase out dashes

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
##########
@@ -216,6 +227,49 @@ public WebMonitorEndpoint(
 
         this.leaderElectionService = 
Preconditions.checkNotNull(leaderElectionService);
         this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);
+
+        this.threadInfoOperatorTracker = initializeThreadInfoTracker(executor);
+    }
+
+    private ThreadInfoOperatorTracker<OperatorThreadInfoStats> 
initializeThreadInfoTracker(

Review comment:
       maintenance-wise it would be good for this method to be static so we 
don't have to worry about accessing uninitialized fields if we re-order stuff 
(or rather to make this dependency more explicit).

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
##########
@@ -179,6 +189,7 @@
     private final LeaderElectionService leaderElectionService;
 
     private final FatalErrorHandler fatalErrorHandler;
+    private final ThreadInfoOperatorTracker<OperatorThreadInfoStats> 
threadInfoOperatorTracker;

Review comment:
       I think we could create this within initializeHandlers(); there isn't 
really a need to have this here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinator.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.TaskThreadInfoResponse;
+import org.apache.flink.runtime.messages.ThreadInfoSample;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.webmonitor.stats.TaskStatsRequestCoordinator;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A coordinator for triggering and collecting thread info stats of running 
operator subtasks. */
+public class ThreadInfoRequestCoordinator
+        extends TaskStatsRequestCoordinator<List<ThreadInfoSample>, 
OperatorThreadInfoStats> {
+
+    /**
+     * Creates a new coordinator for the job.
+     *
+     * @param executor Used to execute the futures.
+     * @param requestTimeout Time out after the expected sampling duration. 
This is added to the
+     *     expected duration of a request, which is determined by the number 
of samples and the
+     *     delay between each sample.
+     */
+    public ThreadInfoRequestCoordinator(Executor executor, long 
requestTimeout) {
+        super(executor, requestTimeout);
+    }
+
+    /**
+     * Triggers collection of thread info stats of an operator by combining 
thread info responses
+     * from given subtasks. A thread info response of a subtask in turn 
consists of {@code
+     * numSamples}, collected with {@code delayBetweenSamples} milliseconds 
delay between them.
+     *
+     * @param subtasksWithGateways Execution vertices together with 
TaskExecutors running them.
+     * @param numSamples Number of thread info samples to collect from each 
subtask.
+     * @param delayBetweenSamples Delay between consecutive samples (ms).
+     * @param maxStackTraceDepth Maximum depth of the stack traces collected 
within thread info
+     *     samples.
+     * @return A future of the completed thread info stats.
+     */
+    public CompletableFuture<OperatorThreadInfoStats> triggerThreadInfoRequest(
+            List<Tuple2<AccessExecutionVertex, 
CompletableFuture<TaskExecutorGateway>>>
+                    subtasksWithGateways,
+            int numSamples,
+            Time delayBetweenSamples,
+            int maxStackTraceDepth) {
+
+        checkNotNull(subtasksWithGateways, "Tasks to sample");
+        checkArgument(subtasksWithGateways.size() > 0, "No tasks to sample");
+        checkArgument(numSamples >= 1, "No number of samples");
+        checkArgument(maxStackTraceDepth >= 0, "Negative maximum stack trace 
depth");
+
+        // Execution IDs of running tasks
+        List<ExecutionAttemptID> runningSubtasksIds = new ArrayList<>();
+
+        // Check that all tasks are RUNNING before triggering anything. The
+        // triggering can still fail.
+        for (Tuple2<AccessExecutionVertex, 
CompletableFuture<TaskExecutorGateway>>
+                executionsWithGateway : subtasksWithGateways) {
+            AccessExecution execution = 
executionsWithGateway.f0.getCurrentExecutionAttempt();
+            if (execution != null && execution.getState() == 
ExecutionState.RUNNING) {
+                runningSubtasksIds.add(execution.getAttemptId());
+            } else {
+                return FutureUtils.completedExceptionally(
+                        new IllegalStateException(
+                                "Task "
+                                        + 
executionsWithGateway.f0.getTaskNameWithSubtaskIndex()
+                                        + " is not running."));
+            }
+        }
+
+        synchronized (lock) {
+            if (isShutDown) {
+                return FutureUtils.completedExceptionally(new 
IllegalStateException("Shut down"));
+            }
+
+            final int requestId = requestIdCounter++;
+
+            log.debug("Triggering thread info request {}", requestId);
+
+            final PendingThreadInfoRequest pending =
+                    new PendingThreadInfoRequest(requestId, 
runningSubtasksIds);
+
+            // requestTimeout is treated as the time on top of the expected 
sampling duration.
+            // Discard the request if it takes too long. We don't send cancel
+            // messages to the task managers, but only wait for the responses
+            // and then ignore them.
+            long expectedDuration = numSamples * 
delayBetweenSamples.toMilliseconds();
+            Time timeout = Time.milliseconds(expectedDuration + 
requestTimeout.toMilliseconds());
+
+            // Add the pending request before scheduling the discard task to
+            // prevent races with removing it again.
+            pendingRequests.put(requestId, pending);
+
+            ThreadInfoSamplesRequest requestParams =
+                    new ThreadInfoSamplesRequest(
+                            requestId, numSamples, delayBetweenSamples, 
maxStackTraceDepth);
+
+            requestThreadInfo(subtasksWithGateways, requestParams, timeout);
+
+            return pending.getStatsFuture();
+        }
+    }
+
+    /**
+     * Requests thread infos from given subtasks. The response would be 
ignored if it does not
+     * return within timeout.
+     */
+    private void requestThreadInfo(
+            List<Tuple2<AccessExecutionVertex, 
CompletableFuture<TaskExecutorGateway>>>
+                    subtasksWithGateways,
+            ThreadInfoSamplesRequest requestParams,
+            Time timeout) {
+
+        // Trigger samples collection from all subtasks
+        for (Tuple2<AccessExecutionVertex, 
CompletableFuture<TaskExecutorGateway>>
+                executionWithGateway : subtasksWithGateways) {
+
+            CompletableFuture<TaskExecutorGateway> executorGatewayFuture = 
executionWithGateway.f1;
+
+            ExecutionAttemptID taskExecutionAttemptId =
+                    
executionWithGateway.f0.getCurrentExecutionAttempt().getAttemptId();
+
+            CompletableFuture<TaskThreadInfoResponse> threadInfo =
+                    executorGatewayFuture.thenCompose(
+                            executorGateway ->
+                                    executorGateway.requestThreadInfoSamples(
+                                            taskExecutionAttemptId, 
requestParams, timeout));
+
+            threadInfo.whenCompleteAsync(
+                    (TaskThreadInfoResponse threadInfoSamplesResponse, 
Throwable throwable) -> {
+                        if (threadInfoSamplesResponse != null) {
+                            handleSuccessfulResponse(
+                                    threadInfoSamplesResponse.getRequestId(),
+                                    
threadInfoSamplesResponse.getExecutionAttemptID(),

Review comment:
       Related to another comment about simplifying the TaskExecutor interface.
   ```suggestion
                                       requestParams.getRequestId(),
                                       requestParams.getExecutionAttemptID(),
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/messages/ThreadInfoSample.java
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import java.io.Serializable;
+import java.lang.management.LockInfo;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+
+/**
+ * A serializable wrapper around {@link java.lang.management.ThreadInfo} that 
excludes {@link
+ * java.lang.management.LockInfo}-based non-serializable fields.
+ */
+public class ThreadInfoSample implements Serializable {
+
+    private final String threadName;
+    private final long threadId;
+    private final long blockedTime;
+    private final long blockedCount;
+    private final long waitedTime;
+    private final long waitedCount;
+    private final String lockName;
+    private final long lockOwnerId;
+    private final String lockOwnerName;
+    private final boolean inNative;
+    private final boolean suspended;
+    private final Thread.State threadState;
+    private final StackTraceElement[] stackTrace;

Review comment:
       let's only keep the ones we actually need; that way we may also get 
around the JDK licensing things a bit.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
##########
@@ -497,6 +505,32 @@ private void stopTaskExecutorServices() throws Exception {
     //  RPC methods
     // ======================================================================
 
+    @Override
+    public CompletableFuture<TaskThreadInfoResponse> requestThreadInfoSamples(
+            final ExecutionAttemptID taskExecutionAttemptId,
+            final ThreadInfoSamplesRequest requestParams,
+            final Time timeout) {
+
+        final Task task = taskSlotTable.getTask(taskExecutionAttemptId);
+        if (task == null) {
+            return FutureUtils.completedExceptionally(
+                    new IllegalStateException(
+                            String.format(
+                                    "Cannot sample task %s. "
+                                            + "Task is not known to the task 
manager.",
+                                    taskExecutionAttemptId)));
+        }
+
+        final CompletableFuture<List<ThreadInfoSample>> stackTracesFuture =
+                threadInfoSampleService.requestThreadInfoSamples(
+                        SampleableTaskAdapter.fromTask(task), requestParams);
+
+        return stackTracesFuture.thenApply(
+                stackTraces ->
+                        new TaskThreadInfoResponse(
+                                requestParams.getRequestId(), 
taskExecutionAttemptId, stackTraces));

Review comment:
       It seems unnecessary that the TaskExecutor must again return attempt and 
request IDs. The ThreadInfoRequestCoordinator should be able to differentiate 
between responses on it's own.
   We can then of course simplify the return type to just by a 
`CompletableFuture<List<ThreadInfoSample>>`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
##########
@@ -497,6 +505,32 @@ private void stopTaskExecutorServices() throws Exception {
     //  RPC methods
     // ======================================================================
 
+    @Override
+    public CompletableFuture<TaskThreadInfoResponse> requestThreadInfoSamples(
+            final ExecutionAttemptID taskExecutionAttemptId,
+            final ThreadInfoSamplesRequest requestParams,
+            final Time timeout) {
+
+        final Task task = taskSlotTable.getTask(taskExecutionAttemptId);
+        if (task == null) {
+            return FutureUtils.completedExceptionally(
+                    new IllegalStateException(
+                            String.format(
+                                    "Cannot sample task %s. "
+                                            + "Task is not known to the task 
manager.",

Review comment:
       This is quite likely to occur at some point if the flame graph is used 
(because the job finished or the task failed), and we should ensure we don't 
pollute the logs unnecessarily because of that. IOW, we may want to failure in 
the OperatorTracker on trace.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/ThreadInfoSampleService.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.messages.ThreadInfoSample;
+import org.apache.flink.runtime.util.JvmUtils;
+import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoSamplesRequest;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Samples thread infos of tasks. */
+class ThreadInfoSampleService {
+
+    private final ScheduledExecutor scheduledExecutor;
+
+    ThreadInfoSampleService(final ScheduledExecutor scheduledExecutor) {
+        this.scheduledExecutor =
+                checkNotNull(scheduledExecutor, "scheduledExecutor must not be 
null");
+    }
+
+    /**
+     * Returns a future that completes with a given number of thread info 
samples of a task thread.
+     *
+     * @param task The task to be sampled from.
+     * @param requestParams Parameters of the sampling request.
+     * @return A future containing the stack trace samples.
+     */
+    public CompletableFuture<List<ThreadInfoSample>> requestThreadInfoSamples(
+            final SampleableTask task, final ThreadInfoSamplesRequest 
requestParams) {
+        checkNotNull(task, "task must not be null");
+        checkNotNull(requestParams, "requestParams must not be null");
+
+        return requestThreadInfoSamples(
+                task,
+                requestParams.getNumSamples(),
+                requestParams.getDelayBetweenSamples(),
+                requestParams.getMaxStackTraceDepth(),
+                new ArrayList<>(requestParams.getNumSamples()),
+                new CompletableFuture<>());
+    }
+
+    private CompletableFuture<List<ThreadInfoSample>> requestThreadInfoSamples(
+            final SampleableTask task,
+            final int numSamples,
+            final Time delayBetweenSamples,
+            final int maxStackTraceDepth,
+            final List<ThreadInfoSample> currentTraces,
+            final CompletableFuture<List<ThreadInfoSample>> resultFuture) {
+
+        final long threadId = task.getExecutingThread().getId();
+        final Optional<ThreadInfoSample> threadInfoSample =
+                JvmUtils.createThreadInfoSample(threadId, maxStackTraceDepth);
+
+        if (threadInfoSample.isPresent()) {
+            currentTraces.add(threadInfoSample.get());
+        } else if (!currentTraces.isEmpty()) {
+            resultFuture.complete(currentTraces);
+            return resultFuture;
+        } else {
+            throw new IllegalStateException(

Review comment:
       This should complete the future in some form; either normally with an 
empty list or exceptionally. As is the result future will never be complete 
which can result in memory leaks.
   
   This method is also unfortunately called by different threads; the first 
time it is called by the TaskManager thread, while the remaining ones by the 
executor.
   This implies that the TaskManager thread is doing some work it isn't 
supposed to be doing, and if this exception is thrown (or any, really) then it 
will also crash the TaskExecutor.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexFlameGraphHandler.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.FlameGraphTypeQueryParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexFlameGraphInfo;
+import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.threadinfo.OperatorFlameGraph;
+import 
org.apache.flink.runtime.webmonitor.threadinfo.OperatorFlameGraphFactory;
+import org.apache.flink.runtime.webmonitor.threadinfo.OperatorThreadInfoStats;
+import 
org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoOperatorTracker;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executor;
+
+/** Request handler for the job vertex Flame Graph. */
+public class JobVertexFlameGraphHandler
+        extends AbstractJobVertexHandler<JobVertexFlameGraphInfo, 
JobVertexMessageParameters> {
+
+    private final ThreadInfoOperatorTracker<OperatorThreadInfoStats> 
threadInfoOperatorTracker;
+
+    private static JobVertexFlameGraphInfo createJobVertexFlameGraphInfo(
+            OperatorFlameGraph flameGraph) {
+        return new JobVertexFlameGraphInfo(flameGraph.getEndTime(), 
flameGraph.getRoot());
+    }
+
+    public JobVertexFlameGraphHandler(
+            GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+            Time timeout,
+            Map<String, String> responseHeaders,
+            MessageHeaders<EmptyRequestBody, JobVertexFlameGraphInfo, 
JobVertexMessageParameters>
+                    messageHeaders,
+            ExecutionGraphCache executionGraphCache,
+            Executor executor,
+            ThreadInfoOperatorTracker<OperatorThreadInfoStats> 
threadInfoOperatorTracker) {
+        super(
+                leaderRetriever,
+                timeout,
+                responseHeaders,
+                messageHeaders,
+                executionGraphCache,
+                executor);

Review comment:
       
https://github.com/apache/flink/pull/15054/files/1a96fd9f2de79afc26fc7a7977bccfdb37c8cb3a#r602933301
   
   Essentially we should do this:
   ```suggestion
           super(
                   leaderRetriever,
                   timeout,
                   responseHeaders,
                   JobVertexFlameGraphHeaders.getInstance(),
                   executionGraphCache,
                   executor);
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/OperatorFlameGraphFactory.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.runtime.messages.ThreadInfoSample;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/** Factory class for creating Flame Graph representations. */
+public class OperatorFlameGraphFactory {
+
+    /**
+     * Converts {@link OperatorThreadInfoStats} into a FlameGraph.
+     *
+     * @param sample Thread details sample containing stack traces.
+     * @return FlameGraph data structure
+     */
+    public static OperatorFlameGraph 
createFullFlameGraphFrom(OperatorThreadInfoStats sample) {
+        EnumSet<Thread.State> included = EnumSet.allOf(Thread.State.class);
+        return createFlameGraphFromSample(sample, included);
+    }
+
+    /**
+     * Converts {@link OperatorThreadInfoStats} into a FlameGraph representing 
blocked (Off-CPU)
+     * threads.
+     *
+     * <p>Includes threads in states Thread.State.[TIMED_WAITING, BLOCKED, 
WAITING].
+     *
+     * @param sample Thread details sample containing stack traces.
+     * @return FlameGraph data structure.
+     */
+    public static OperatorFlameGraph 
createOffCpuFlameGraph(OperatorThreadInfoStats sample) {
+        EnumSet<Thread.State> included =
+                EnumSet.of(Thread.State.TIMED_WAITING, Thread.State.BLOCKED, 
Thread.State.WAITING);
+        return createFlameGraphFromSample(sample, included);
+    }
+
+    /**
+     * Converts {@link OperatorThreadInfoStats} into a FlameGraph representing 
actively running
+     * (On-CPU) threads.
+     *
+     * <p>Includes threads in states Thread.State.[RUNNABLE, NEW].
+     *
+     * @param sample Thread details sample containing stack traces.
+     * @return FlameGraph data structure
+     */
+    public static OperatorFlameGraph 
createOnCpuFlameGraph(OperatorThreadInfoStats sample) {
+        EnumSet<Thread.State> included = EnumSet.of(Thread.State.RUNNABLE, 
Thread.State.NEW);
+        return createFlameGraphFromSample(sample, included);
+    }
+
+    private static OperatorFlameGraph createFlameGraphFromSample(
+            OperatorThreadInfoStats sample, Set<Thread.State> threadStates) {
+        final NodeBuilder root = new NodeBuilder("root");
+        for (List<ThreadInfoSample> threadInfoSubSamples : 
sample.getSamplesBySubtask().values()) {
+            for (ThreadInfoSample threadInfo : threadInfoSubSamples) {
+                if (threadStates.contains(threadInfo.getThreadState())) {
+                    StackTraceElement[] traces = threadInfo.getStackTrace();
+                    root.increment();
+                    NodeBuilder parent = root;
+                    for (int i = traces.length - 1; i >= 0; i--) {
+                        final String name =
+                                traces[i].getClassName()
+                                        + "."
+                                        + traces[i].getMethodName()
+                                        + ":"
+                                        + traces[i].getLineNumber();
+                        parent = parent.addChild(name);
+                    }
+                }
+            }
+        }
+        return new OperatorFlameGraph(sample.getEndTime(), root.toNode());
+    }
+
+    private static class NodeBuilder {
+
+        private final Map<String, NodeBuilder> children = new HashMap<>();
+
+        private final String name;
+
+        private int hitCount = 0;
+
+        NodeBuilder(String name) {
+            this.name = name;
+        }
+
+        NodeBuilder addChild(String name) {
+            final NodeBuilder child = children.computeIfAbsent(name, 
NodeBuilder::new);
+            child.increment();
+            return child;
+        }
+
+        void increment() {

Review comment:
       lets rename this to something more descriptive, like incrementHitCount

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/messages/ThreadInfoSample.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import com.sun.istack.Nullable;
+
+import java.io.Serializable;
+import java.lang.management.LockInfo;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+
+/**
+ * A serializable wrapper around {@link java.lang.management.ThreadInfo} that 
excludes {@link
+ * java.lang.management.LockInfo}-based non-serializable fields.
+ */
+public class ThreadInfoSample implements Serializable {
+
+    private final String threadName;
+    private final long threadId;
+    private final long blockedTime;
+    private final long blockedCount;
+    private final long waitedTime;
+    private final long waitedCount;
+    private final String lockName;
+    private final long lockOwnerId;
+    private final String lockOwnerName;
+    private final boolean inNative;
+    private final boolean suspended;
+    private final Thread.State threadState;
+    private final StackTraceElement[] stackTrace;
+
+    private ThreadInfoSample(
+            String threadName,
+            long threadId,
+            long blockedTime,
+            long blockedCount,
+            long waitedTime,
+            long waitedCount,
+            String lockName,
+            long lockOwnerId,
+            String lockOwnerName,
+            boolean inNative,
+            boolean suspended,
+            Thread.State threadState,
+            StackTraceElement[] stackTrace) {
+        this.threadName = threadName;
+        this.threadId = threadId;
+        this.blockedTime = blockedTime;
+        this.blockedCount = blockedCount;
+        this.waitedTime = waitedTime;
+        this.waitedCount = waitedCount;
+        this.lockName = lockName;
+        this.lockOwnerId = lockOwnerId;
+        this.lockOwnerName = lockOwnerName;
+        this.inNative = inNative;
+        this.suspended = suspended;
+        this.threadState = threadState;
+        this.stackTrace = stackTrace;
+    }
+
+    /**
+     * Constructs a {@link ThreadInfoSample} from {@link ThreadInfo}.
+     *
+     * @param threadInfo {@link ThreadInfo} where the data will be copied from.
+     * @return new {@link ThreadInfoSample}
+     */
+    public static @Nullable ThreadInfoSample from(@Nullable ThreadInfo 
threadInfo) {

Review comment:
       This could already return an Optional

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoSamplesRequest.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.api.common.time.Time;
+
+import javax.annotation.Nonnegative;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A wrapper for parameters of a thread info sampling request. */
+public class ThreadInfoSamplesRequest {
+    private final int requestId;
+    private final int numSubSamples;
+    private final Time delayBetweenSamples;
+    private final int maxStackTraceDepth;
+
+    /**
+     * @param requestId ID of the sampling request.
+     * @param numSamples The number of samples.
+     * @param delayBetweenSamples The time to wait between taking samples.
+     * @param maxStackTraceDepth The maximum depth of the returned stack 
traces.
+     */
+    public ThreadInfoSamplesRequest(
+            int requestId,
+            @Nonnegative int numSamples,
+            Time delayBetweenSamples,
+            int maxStackTraceDepth) {

Review comment:
       Is this allowed to be negative? `ThreadMXBean#getThreadInfo(long, int)` 
seems to reject negative depths at least

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/OperatorFlameGraph.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.runtime.webmonitor.stats.Statistics;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Flame Graph representation for an operator.
+ *
+ * <p>Statistics are gathered by sampling stack traces of running tasks.
+ */
+public class OperatorFlameGraph implements Serializable, Statistics {
+
+    /** Graph node. */
+    public static class Node {

Review comment:
       Inner classes are usually placed at at the bottom

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/stats/Statistics.java
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.stats;
+
+/** Represents one or more statistics samples. */
+public interface Statistics {

Review comment:
       I'm not sure if we really need this interface, and the generic parameter 
for the ThreadInfoOperatorTracker for that matter.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoOperatorTracker.java
##########
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.stats.OperatorStatsTracker;
+import org.apache.flink.runtime.webmonitor.stats.Statistics;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Tracker of thread infos for {@link ExecutionJobVertex}.
+ *
+ * @param <T> Type of the derived statistics to return.
+ */
+public class ThreadInfoOperatorTracker<T extends Statistics> implements 
OperatorStatsTracker<T> {
+
+    /**
+     * Create a new {@link Builder}.
+     *
+     * @param createStatsFn Function that converts a thread info sample into a 
derived statistic.
+     *     Could be an identity function.
+     * @param <T> Type of the derived statistics to return.
+     * @return Builder.
+     */
+    public static <T extends Statistics> Builder<T> newBuilder(
+            GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
+            Function<OperatorThreadInfoStats, T> createStatsFn,
+            ExecutorService executor) {
+        return new Builder<>(resourceManagerGatewayRetriever, createStatsFn, 
executor);
+    }
+
+    /**
+     * Builder for {@link ThreadInfoOperatorTracker}.
+     *
+     * @param <T> Type of the derived statistics to return.
+     */
+    public static class Builder<T extends Statistics> {
+
+        private final GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever;
+        private final Function<OperatorThreadInfoStats, T> createStatsFn;
+        private final ExecutorService executor;
+
+        private ThreadInfoRequestCoordinator coordinator;
+        private int cleanUpInterval;
+        private int numSamples;
+        private int statsRefreshInterval;
+        private Time delayBetweenSamples;
+        private int maxThreadInfoDepth;
+
+        private Builder(
+                GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
+                Function<OperatorThreadInfoStats, T> createStatsFn,
+                ExecutorService executor) {
+            this.resourceManagerGatewayRetriever = 
resourceManagerGatewayRetriever;
+            this.createStatsFn = createStatsFn;
+            this.executor = executor;
+        }
+
+        /**
+         * Sets {@code cleanUpInterval}.
+         *
+         * @param coordinator Coordinator for thread info stats request.
+         * @return Builder.
+         */
+        public Builder<T> setCoordinator(ThreadInfoRequestCoordinator 
coordinator) {
+            this.coordinator = coordinator;
+            return this;
+        }
+
+        /**
+         * Sets {@code cleanUpInterval}.
+         *
+         * @param cleanUpInterval Clean up interval for completed stats.
+         * @return Builder.
+         */
+        public Builder<T> setCleanUpInterval(int cleanUpInterval) {
+            this.cleanUpInterval = cleanUpInterval;
+            return this;
+        }
+
+        /**
+         * Sets {@code numSamples}.
+         *
+         * @param numSamples Number of thread info samples to collect for each 
subtask.
+         * @return Builder.
+         */
+        public Builder<T> setNumSamples(int numSamples) {
+            this.numSamples = numSamples;
+            return this;
+        }
+
+        /**
+         * Sets {@code statsRefreshInterval}.
+         *
+         * @param statsRefreshInterval Time interval after which the available 
thread info stats are
+         *     deprecated and need to be refreshed.
+         * @return Builder.
+         */
+        public Builder<T> setStatsRefreshInterval(int statsRefreshInterval) {
+            this.statsRefreshInterval = statsRefreshInterval;
+            return this;
+        }
+
+        /**
+         * Sets {@code delayBetweenSamples}.
+         *
+         * @param delayBetweenSamples Delay between individual samples per 
task.
+         * @return Builder.
+         */
+        public Builder<T> setDelayBetweenSamples(Time delayBetweenSamples) {
+            this.delayBetweenSamples = delayBetweenSamples;
+            return this;
+        }
+
+        /**
+         * Sets {@code delayBetweenSamples}.
+         *
+         * @param maxThreadInfoDepth Limit for the depth of the stack traces 
included when sampling
+         *     threads.
+         * @return Builder.
+         */
+        public Builder<T> setMaxThreadInfoDepth(int maxThreadInfoDepth) {
+            this.maxThreadInfoDepth = maxThreadInfoDepth;
+            return this;
+        }
+
+        /**
+         * Constructs a new {@link ThreadInfoOperatorTracker}.
+         *
+         * @return a new {@link ThreadInfoOperatorTracker} instance.
+         */
+        public ThreadInfoOperatorTracker<T> build() {
+            return new ThreadInfoOperatorTracker<>(
+                    coordinator,
+                    resourceManagerGatewayRetriever,
+                    createStatsFn,
+                    executor,
+                    cleanUpInterval,
+                    numSamples,
+                    statsRefreshInterval,
+                    delayBetweenSamples,
+                    maxThreadInfoDepth);
+        }
+    }
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ThreadInfoOperatorTracker.class);
+
+    /** Lock guarding trigger operations. */
+    private final Object lock = new Object();
+
+    @GuardedBy("lock")
+    private final ThreadInfoRequestCoordinator coordinator;
+
+    private final Function<OperatorThreadInfoStats, T> createStatsFn;
+
+    private final ExecutorService executor;
+
+    private final GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever;
+
+    /**
+     * Completed stats. Important: Job vertex IDs need to be scoped by job ID, 
because they are
+     * potentially constant across runs messing up the cached data.
+     */
+    @GuardedBy("lock")
+    private final Cache<AccessExecutionJobVertex, T> operatorStatsCache;
+
+    /**
+     * Pending in progress stats. Important: Job vertex IDs need to be scoped 
by job ID, because
+     * they are potentially constant across runs messing up the cached data.
+     */
+    @GuardedBy("lock")
+    private final Set<AccessExecutionJobVertex> pendingStats = new HashSet<>();
+
+    private final int numSamples;
+
+    private final int statsRefreshInterval;
+
+    private final Time delayBetweenSamples;
+
+    private final int maxThreadInfoDepth;
+
+    // Used for testing purposes
+    private final CompletableFuture<Void> resultAvailableFuture = new 
CompletableFuture<>();
+
+    /** Flag indicating whether the stats tracker has been shut down. */
+    private boolean shutDown;
+
+    private ThreadInfoOperatorTracker(
+            ThreadInfoRequestCoordinator coordinator,
+            GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
+            Function<OperatorThreadInfoStats, T> createStatsFn,
+            ExecutorService executor,
+            int cleanUpInterval,
+            int numSamples,
+            int statsRefreshInterval,
+            Time delayBetweenSamples,
+            int maxStackTraceDepth) {
+
+        this.coordinator = checkNotNull(coordinator, "Thread info samples 
coordinator");
+        this.resourceManagerGatewayRetriever =
+                checkNotNull(resourceManagerGatewayRetriever, "Gateway 
retriever");
+        this.createStatsFn = checkNotNull(createStatsFn, "Create stats 
function");
+        this.executor = checkNotNull(executor, "Scheduled executor");
+
+        checkArgument(cleanUpInterval >= 0, "Clean up interval");
+
+        checkArgument(numSamples >= 1, "Number of samples");
+        this.numSamples = numSamples;
+
+        checkArgument(
+                statsRefreshInterval >= 0,
+                "Stats refresh interval must be greater than or equal to 0");
+        this.statsRefreshInterval = statsRefreshInterval;
+
+        this.delayBetweenSamples = checkNotNull(delayBetweenSamples, "Delay 
between samples");
+
+        checkArgument(
+                maxStackTraceDepth >= 0,
+                "Max stack trace depth must be greater than or equal to 0");
+        this.maxThreadInfoDepth = maxStackTraceDepth;
+
+        this.operatorStatsCache =
+                CacheBuilder.newBuilder()
+                        .concurrencyLevel(1)
+                        .expireAfterAccess(cleanUpInterval, 
TimeUnit.MILLISECONDS)
+                        .build();
+    }
+
+    @Override
+    public Optional<T> getOperatorStats(AccessExecutionJobVertex vertex) {
+        synchronized (lock) {
+            final T stats = operatorStatsCache.getIfPresent(vertex);
+            if (stats == null
+                    || System.currentTimeMillis() >= stats.getEndTime() + 
statsRefreshInterval) {
+                triggerThreadInfoSampleInternal(vertex);
+            }
+            return Optional.ofNullable(stats);
+        }
+    }
+
+    /**
+     * Triggers a request for an operator to gather the thread info 
statistics. If there is a sample
+     * in progress for the operator, the call is ignored.
+     *
+     * @param vertex Operator to get the stats for.
+     */
+    private void triggerThreadInfoSampleInternal(final 
AccessExecutionJobVertex vertex) {
+        assert (Thread.holdsLock(lock));
+
+        if (shutDown) {
+            return;
+        }
+
+        if (!pendingStats.contains(vertex)) {
+            pendingStats.add(vertex);
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(
+                        "Triggering thread info sample for tasks: "
+                                + Arrays.toString(vertex.getTaskVertices()));
+            }
+
+            final AccessExecutionVertex[] executionVertices = 
vertex.getTaskVertices();
+            final CompletableFuture<ResourceManagerGateway> gatewayFuture =
+                    resourceManagerGatewayRetriever.getFuture();
+
+            CompletableFuture<OperatorThreadInfoStats> sample =
+                    gatewayFuture.thenCompose(
+                            (ResourceManagerGateway resourceManagerGateway) ->
+                                    coordinator.triggerThreadInfoRequest(
+                                            matchExecutionsWithGateways(
+                                                    executionVertices, 
resourceManagerGateway),
+                                            numSamples,
+                                            delayBetweenSamples,
+                                            maxThreadInfoDepth));
+
+            sample.whenCompleteAsync(new 
ThreadInfoSampleCompletionCallback(vertex), executor);
+        }
+    }
+
+    private List<Tuple2<AccessExecutionVertex, 
CompletableFuture<TaskExecutorGateway>>>
+            matchExecutionsWithGateways(
+                    AccessExecutionVertex[] executionVertices,
+                    ResourceManagerGateway resourceManagerGateway) {
+
+        List<Tuple2<AccessExecutionVertex, 
CompletableFuture<TaskExecutorGateway>>>
+                executionsWithGateways = new ArrayList<>();
+
+        for (AccessExecutionVertex executionVertex : executionVertices) {
+            TaskManagerLocation tmLocation = 
executionVertex.getCurrentAssignedResourceLocation();
+
+            if (tmLocation != null) {
+                CompletableFuture<TaskExecutorGateway> 
taskExecutorGatewayFuture =
+                        resourceManagerGateway.requestTaskExecutorGateway(
+                                tmLocation.getResourceID());
+
+                executionsWithGateways.add(
+                        new Tuple2<>(executionVertex, 
taskExecutorGatewayFuture));
+            } else {
+                LOG.warn("ExecutionVertex " + executionVertex + "is currently 
not assigned");
+            }
+        }
+
+        return executionsWithGateways;
+    }
+
+    @Override
+    public void cleanUpOperatorStatsCache() {
+        operatorStatsCache.cleanUp();
+    }
+
+    @Override
+    public void shutDown() {
+        synchronized (lock) {
+            if (!shutDown) {
+                operatorStatsCache.invalidateAll();
+                pendingStats.clear();
+
+                shutDown = true;
+            }
+        }
+    }
+
+    @VisibleForTesting
+    CompletableFuture<Void> getResultAvailableFuture() {
+        return resultAvailableFuture;
+    }
+
+    /** Callback on completed thread info sample. */
+    class ThreadInfoSampleCompletionCallback

Review comment:
       why is this package-private?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoOperatorTracker.java
##########
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.stats.OperatorStatsTracker;
+import org.apache.flink.runtime.webmonitor.stats.Statistics;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Tracker of thread infos for {@link ExecutionJobVertex}.
+ *
+ * @param <T> Type of the derived statistics to return.
+ */
+public class ThreadInfoOperatorTracker<T extends Statistics> implements 
OperatorStatsTracker<T> {
+
+    /**
+     * Create a new {@link Builder}.
+     *
+     * @param createStatsFn Function that converts a thread info sample into a 
derived statistic.
+     *     Could be an identity function.
+     * @param <T> Type of the derived statistics to return.
+     * @return Builder.
+     */
+    public static <T extends Statistics> Builder<T> newBuilder(
+            GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
+            Function<OperatorThreadInfoStats, T> createStatsFn,
+            ExecutorService executor) {
+        return new Builder<>(resourceManagerGatewayRetriever, createStatsFn, 
executor);
+    }
+
+    /**
+     * Builder for {@link ThreadInfoOperatorTracker}.
+     *
+     * @param <T> Type of the derived statistics to return.
+     */
+    public static class Builder<T extends Statistics> {
+
+        private final GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever;
+        private final Function<OperatorThreadInfoStats, T> createStatsFn;

Review comment:
       unless I'm missing something this is always `Function.Identity()`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoOperatorTracker.java
##########
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.stats.OperatorStatsTracker;
+import org.apache.flink.runtime.webmonitor.stats.Statistics;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Tracker of thread infos for {@link ExecutionJobVertex}.
+ *
+ * @param <T> Type of the derived statistics to return.
+ */
+public class ThreadInfoOperatorTracker<T extends Statistics> implements 
OperatorStatsTracker<T> {
+
+    /**
+     * Create a new {@link Builder}.
+     *
+     * @param createStatsFn Function that converts a thread info sample into a 
derived statistic.
+     *     Could be an identity function.
+     * @param <T> Type of the derived statistics to return.
+     * @return Builder.
+     */
+    public static <T extends Statistics> Builder<T> newBuilder(
+            GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
+            Function<OperatorThreadInfoStats, T> createStatsFn,
+            ExecutorService executor) {
+        return new Builder<>(resourceManagerGatewayRetriever, createStatsFn, 
executor);
+    }
+
+    /**
+     * Builder for {@link ThreadInfoOperatorTracker}.
+     *
+     * @param <T> Type of the derived statistics to return.
+     */
+    public static class Builder<T extends Statistics> {
+
+        private final GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever;
+        private final Function<OperatorThreadInfoStats, T> createStatsFn;
+        private final ExecutorService executor;
+
+        private ThreadInfoRequestCoordinator coordinator;
+        private int cleanUpInterval;
+        private int numSamples;
+        private int statsRefreshInterval;
+        private Time delayBetweenSamples;
+        private int maxThreadInfoDepth;
+
+        private Builder(
+                GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
+                Function<OperatorThreadInfoStats, T> createStatsFn,
+                ExecutorService executor) {
+            this.resourceManagerGatewayRetriever = 
resourceManagerGatewayRetriever;
+            this.createStatsFn = createStatsFn;
+            this.executor = executor;
+        }
+
+        /**
+         * Sets {@code cleanUpInterval}.
+         *
+         * @param coordinator Coordinator for thread info stats request.
+         * @return Builder.
+         */
+        public Builder<T> setCoordinator(ThreadInfoRequestCoordinator 
coordinator) {
+            this.coordinator = coordinator;
+            return this;
+        }
+
+        /**
+         * Sets {@code cleanUpInterval}.
+         *
+         * @param cleanUpInterval Clean up interval for completed stats.
+         * @return Builder.
+         */
+        public Builder<T> setCleanUpInterval(int cleanUpInterval) {
+            this.cleanUpInterval = cleanUpInterval;
+            return this;
+        }
+
+        /**
+         * Sets {@code numSamples}.
+         *
+         * @param numSamples Number of thread info samples to collect for each 
subtask.
+         * @return Builder.
+         */
+        public Builder<T> setNumSamples(int numSamples) {
+            this.numSamples = numSamples;
+            return this;
+        }
+
+        /**
+         * Sets {@code statsRefreshInterval}.
+         *
+         * @param statsRefreshInterval Time interval after which the available 
thread info stats are
+         *     deprecated and need to be refreshed.
+         * @return Builder.
+         */
+        public Builder<T> setStatsRefreshInterval(int statsRefreshInterval) {
+            this.statsRefreshInterval = statsRefreshInterval;
+            return this;
+        }
+
+        /**
+         * Sets {@code delayBetweenSamples}.
+         *
+         * @param delayBetweenSamples Delay between individual samples per 
task.
+         * @return Builder.
+         */
+        public Builder<T> setDelayBetweenSamples(Time delayBetweenSamples) {
+            this.delayBetweenSamples = delayBetweenSamples;
+            return this;
+        }
+
+        /**
+         * Sets {@code delayBetweenSamples}.
+         *
+         * @param maxThreadInfoDepth Limit for the depth of the stack traces 
included when sampling
+         *     threads.
+         * @return Builder.
+         */
+        public Builder<T> setMaxThreadInfoDepth(int maxThreadInfoDepth) {
+            this.maxThreadInfoDepth = maxThreadInfoDepth;
+            return this;
+        }
+
+        /**
+         * Constructs a new {@link ThreadInfoOperatorTracker}.
+         *
+         * @return a new {@link ThreadInfoOperatorTracker} instance.
+         */
+        public ThreadInfoOperatorTracker<T> build() {
+            return new ThreadInfoOperatorTracker<>(
+                    coordinator,
+                    resourceManagerGatewayRetriever,
+                    createStatsFn,
+                    executor,
+                    cleanUpInterval,
+                    numSamples,
+                    statsRefreshInterval,
+                    delayBetweenSamples,
+                    maxThreadInfoDepth);
+        }
+    }
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ThreadInfoOperatorTracker.class);
+
+    /** Lock guarding trigger operations. */
+    private final Object lock = new Object();
+
+    @GuardedBy("lock")
+    private final ThreadInfoRequestCoordinator coordinator;
+
+    private final Function<OperatorThreadInfoStats, T> createStatsFn;
+
+    private final ExecutorService executor;
+
+    private final GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever;
+
+    /**
+     * Completed stats. Important: Job vertex IDs need to be scoped by job ID, 
because they are
+     * potentially constant across runs messing up the cached data.
+     */
+    @GuardedBy("lock")
+    private final Cache<AccessExecutionJobVertex, T> operatorStatsCache;
+
+    /**
+     * Pending in progress stats. Important: Job vertex IDs need to be scoped 
by job ID, because
+     * they are potentially constant across runs messing up the cached data.
+     */
+    @GuardedBy("lock")
+    private final Set<AccessExecutionJobVertex> pendingStats = new HashSet<>();
+
+    private final int numSamples;
+
+    private final int statsRefreshInterval;
+
+    private final Time delayBetweenSamples;
+
+    private final int maxThreadInfoDepth;
+
+    // Used for testing purposes
+    private final CompletableFuture<Void> resultAvailableFuture = new 
CompletableFuture<>();
+
+    /** Flag indicating whether the stats tracker has been shut down. */
+    private boolean shutDown;
+
+    private ThreadInfoOperatorTracker(
+            ThreadInfoRequestCoordinator coordinator,
+            GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
+            Function<OperatorThreadInfoStats, T> createStatsFn,
+            ExecutorService executor,
+            int cleanUpInterval,
+            int numSamples,
+            int statsRefreshInterval,
+            Time delayBetweenSamples,
+            int maxStackTraceDepth) {
+
+        this.coordinator = checkNotNull(coordinator, "Thread info samples 
coordinator");
+        this.resourceManagerGatewayRetriever =
+                checkNotNull(resourceManagerGatewayRetriever, "Gateway 
retriever");
+        this.createStatsFn = checkNotNull(createStatsFn, "Create stats 
function");
+        this.executor = checkNotNull(executor, "Scheduled executor");
+
+        checkArgument(cleanUpInterval >= 0, "Clean up interval");
+
+        checkArgument(numSamples >= 1, "Number of samples");
+        this.numSamples = numSamples;
+
+        checkArgument(
+                statsRefreshInterval >= 0,
+                "Stats refresh interval must be greater than or equal to 0");
+        this.statsRefreshInterval = statsRefreshInterval;
+
+        this.delayBetweenSamples = checkNotNull(delayBetweenSamples, "Delay 
between samples");
+
+        checkArgument(
+                maxStackTraceDepth >= 0,
+                "Max stack trace depth must be greater than or equal to 0");
+        this.maxThreadInfoDepth = maxStackTraceDepth;
+
+        this.operatorStatsCache =
+                CacheBuilder.newBuilder()
+                        .concurrencyLevel(1)
+                        .expireAfterAccess(cleanUpInterval, 
TimeUnit.MILLISECONDS)
+                        .build();
+    }
+
+    @Override
+    public Optional<T> getOperatorStats(AccessExecutionJobVertex vertex) {
+        synchronized (lock) {
+            final T stats = operatorStatsCache.getIfPresent(vertex);
+            if (stats == null

Review comment:
       Where do we prevent multiple requests being initiated when the first one 
has not completed yet? Asking since the check is based on the 
operatorStatsCache that is only populated once the response from the 
TaskExecutor is received.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinator.java
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.TaskThreadInfoResponse;
+import org.apache.flink.runtime.messages.ThreadInfoSample;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.webmonitor.stats.TaskStatsRequestCoordinator;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A coordinator for triggering and collecting thread info stats of running 
operator subtasks. */
+public class ThreadInfoRequestCoordinator
+        extends TaskStatsRequestCoordinator<List<ThreadInfoSample>, 
OperatorThreadInfoStats> {
+
+    /**
+     * Creates a new coordinator for the job.
+     *
+     * @param executor Used to execute the futures.
+     * @param requestTimeout Time out after the expected sampling duration. 
This is added to the
+     *     expected duration of a request, which is determined by the number 
of samples and the
+     *     delay between each sample.
+     */
+    public ThreadInfoRequestCoordinator(Executor executor, long 
requestTimeout) {
+        super(executor, requestTimeout);
+    }
+
+    /**
+     * Triggers collection of thread info stats of an operator by combining 
thread info responses
+     * from given subtasks. A thread info response of a subtask in turn 
consists of {@code
+     * numSamples}, collected with {@code delayBetweenSamples} milliseconds 
delay between them.
+     *
+     * @param subtasksWithGateways Execution vertices together with 
TaskExecutors running them.
+     * @param numSamples Number of thread info samples to collect from each 
subtask.
+     * @param delayBetweenSamples Delay between consecutive samples (ms).
+     * @param maxStackTraceDepth Maximum depth of the stack traces collected 
within thread info
+     *     samples.
+     * @return A future of the completed thread info stats.
+     */
+    public CompletableFuture<OperatorThreadInfoStats> triggerThreadInfoRequest(
+            List<Tuple2<AccessExecutionVertex, 
CompletableFuture<TaskExecutorGateway>>>
+                    subtasksWithGateways,
+            int numSamples,
+            Time delayBetweenSamples,
+            int maxStackTraceDepth) {
+
+        checkNotNull(subtasksWithGateways, "Tasks to sample");
+        checkArgument(subtasksWithGateways.size() > 0, "No tasks to sample");
+        checkArgument(numSamples >= 1, "No number of samples");
+        checkArgument(maxStackTraceDepth >= 0, "Negative maximum stack trace 
depth");
+
+        // Execution IDs of running tasks
+        List<ExecutionAttemptID> runningSubtasksIds = new ArrayList<>();
+
+        // Check that all tasks are RUNNING before triggering anything. The
+        // triggering can still fail.
+        for (Tuple2<AccessExecutionVertex, 
CompletableFuture<TaskExecutorGateway>>
+                executionsWithGateway : subtasksWithGateways) {
+            AccessExecution execution = 
executionsWithGateway.f0.getCurrentExecutionAttempt();
+            if (execution != null && execution.getState() == 
ExecutionState.RUNNING) {
+                runningSubtasksIds.add(execution.getAttemptId());
+            } else {
+                return FutureUtils.completedExceptionally(
+                        new IllegalStateException(

Review comment:
       Similar to what I said elsewhere we may be alright with logging this on 
trace.
   
   It's not really a good solution though. I don't think we really want these 
to be logged in cases where a job finished / task failed, but I don't see a way 
to differentiate between these.
   Maybe we can solve this entirely in the UI. We don't log any errors (or do 
it on trace after all to have it somewhere at least), but if a returned sample 
contains nothing we display something along the lines of "no sample could be 
gathered task not running ...".

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinator.java
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.TaskThreadInfoResponse;
+import org.apache.flink.runtime.messages.ThreadInfoSample;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.webmonitor.stats.TaskStatsRequestCoordinator;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A coordinator for triggering and collecting thread info stats of running 
operator subtasks. */
+public class ThreadInfoRequestCoordinator
+        extends TaskStatsRequestCoordinator<List<ThreadInfoSample>, 
OperatorThreadInfoStats> {
+
+    /**
+     * Creates a new coordinator for the job.
+     *
+     * @param executor Used to execute the futures.
+     * @param requestTimeout Time out after the expected sampling duration. 
This is added to the
+     *     expected duration of a request, which is determined by the number 
of samples and the
+     *     delay between each sample.
+     */
+    public ThreadInfoRequestCoordinator(Executor executor, long 
requestTimeout) {
+        super(executor, requestTimeout);
+    }
+
+    /**
+     * Triggers collection of thread info stats of an operator by combining 
thread info responses
+     * from given subtasks. A thread info response of a subtask in turn 
consists of {@code
+     * numSamples}, collected with {@code delayBetweenSamples} milliseconds 
delay between them.
+     *
+     * @param subtasksWithGateways Execution vertices together with 
TaskExecutors running them.
+     * @param numSamples Number of thread info samples to collect from each 
subtask.
+     * @param delayBetweenSamples Delay between consecutive samples (ms).
+     * @param maxStackTraceDepth Maximum depth of the stack traces collected 
within thread info
+     *     samples.
+     * @return A future of the completed thread info stats.
+     */
+    public CompletableFuture<OperatorThreadInfoStats> triggerThreadInfoRequest(
+            List<Tuple2<AccessExecutionVertex, 
CompletableFuture<TaskExecutorGateway>>>
+                    subtasksWithGateways,
+            int numSamples,
+            Time delayBetweenSamples,
+            int maxStackTraceDepth) {
+
+        checkNotNull(subtasksWithGateways, "Tasks to sample");
+        checkArgument(subtasksWithGateways.size() > 0, "No tasks to sample");
+        checkArgument(numSamples >= 1, "No number of samples");
+        checkArgument(maxStackTraceDepth >= 0, "Negative maximum stack trace 
depth");
+
+        // Execution IDs of running tasks
+        List<ExecutionAttemptID> runningSubtasksIds = new ArrayList<>();
+
+        // Check that all tasks are RUNNING before triggering anything. The
+        // triggering can still fail.
+        for (Tuple2<AccessExecutionVertex, 
CompletableFuture<TaskExecutorGateway>>
+                executionsWithGateway : subtasksWithGateways) {
+            AccessExecution execution = 
executionsWithGateway.f0.getCurrentExecutionAttempt();
+            if (execution != null && execution.getState() == 
ExecutionState.RUNNING) {
+                runningSubtasksIds.add(execution.getAttemptId());
+            } else {
+                return FutureUtils.completedExceptionally(
+                        new IllegalStateException(

Review comment:
       We don't really have a good pattern for handling such cases.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
##########
@@ -216,6 +227,49 @@ public WebMonitorEndpoint(
 
         this.leaderElectionService = 
Preconditions.checkNotNull(leaderElectionService);
         this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);
+
+        this.threadInfoOperatorTracker = initializeThreadInfoTracker(executor);
+    }
+
+    private ThreadInfoOperatorTracker<OperatorThreadInfoStats> 
initializeThreadInfoTracker(
+            ScheduledExecutorService executor) {
+        final Duration akkaTimeout;
+        try {
+            akkaTimeout = AkkaUtils.getTimeout(clusterConfiguration);
+        } catch (NumberFormatException e) {
+            throw new 
IllegalConfigurationException(AkkaUtils.formatDurationParsingErrorMessage());
+        }
+
+        final int flameGraphCleanUpInterval =
+                
clusterConfiguration.getInteger(WebOptions.FLAMEGRAPH_CLEANUP_INTERVAL);
+        final ThreadInfoRequestCoordinator threadInfoRequestCoordinator =
+                new ThreadInfoRequestCoordinator(executor, 
akkaTimeout.toMillis());
+        final ThreadInfoOperatorTracker<OperatorThreadInfoStats> 
threadInfoOperatorTracker =
+                ThreadInfoOperatorTracker.newBuilder(
+                                resourceManagerRetriever, Function.identity(), 
executor)
+                        .setCoordinator(threadInfoRequestCoordinator)
+                        .setCleanUpInterval(flameGraphCleanUpInterval)
+                        .setNumSamples(
+                                
clusterConfiguration.getInteger(WebOptions.FLAMEGRAPH_NUM_SAMPLES))
+                        .setStatsRefreshInterval(
+                                clusterConfiguration.getInteger(
+                                        
WebOptions.FLAMEGRAPH_REFRESH_INTERVAL))
+                        .setDelayBetweenSamples(
+                                Time.milliseconds(
+                                        clusterConfiguration.getInteger(
+                                                WebOptions.FLAMEGRAPH_DELAY)))
+                        .setMaxThreadInfoDepth(
+                                clusterConfiguration.getInteger(
+                                        
WebOptions.FLAMEGRAPH_STACK_TRACE_DEPTH))
+                        .build();
+
+        executor.scheduleWithFixedDelay(

Review comment:
       shouldn't the tracker care of care of initiating this cleanup?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinator.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.TaskThreadInfoResponse;
+import org.apache.flink.runtime.messages.ThreadInfoSample;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.webmonitor.stats.TaskStatsRequestCoordinator;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A coordinator for triggering and collecting thread info stats of running 
operator subtasks. */
+public class ThreadInfoRequestCoordinator
+        extends TaskStatsRequestCoordinator<List<ThreadInfoSample>, 
OperatorThreadInfoStats> {
+
+    /**
+     * Creates a new coordinator for the job.
+     *
+     * @param executor Used to execute the futures.
+     * @param requestTimeout Time out after the expected sampling duration. 
This is added to the
+     *     expected duration of a request, which is determined by the number 
of samples and the
+     *     delay between each sample.
+     */
+    public ThreadInfoRequestCoordinator(Executor executor, long 
requestTimeout) {
+        super(executor, requestTimeout);
+    }
+
+    /**
+     * Triggers collection of thread info stats of an operator by combining 
thread info responses
+     * from given subtasks. A thread info response of a subtask in turn 
consists of {@code
+     * numSamples}, collected with {@code delayBetweenSamples} milliseconds 
delay between them.
+     *
+     * @param subtasksWithGateways Execution vertices together with 
TaskExecutors running them.
+     * @param numSamples Number of thread info samples to collect from each 
subtask.
+     * @param delayBetweenSamples Delay between consecutive samples (ms).
+     * @param maxStackTraceDepth Maximum depth of the stack traces collected 
within thread info
+     *     samples.
+     * @return A future of the completed thread info stats.
+     */
+    public CompletableFuture<OperatorThreadInfoStats> triggerThreadInfoRequest(
+            List<Tuple2<AccessExecutionVertex, 
CompletableFuture<TaskExecutorGateway>>>
+                    subtasksWithGateways,
+            int numSamples,
+            Time delayBetweenSamples,
+            int maxStackTraceDepth) {
+
+        checkNotNull(subtasksWithGateways, "Tasks to sample");
+        checkArgument(subtasksWithGateways.size() > 0, "No tasks to sample");
+        checkArgument(numSamples >= 1, "No number of samples");
+        checkArgument(maxStackTraceDepth >= 0, "Negative maximum stack trace 
depth");
+
+        // Execution IDs of running tasks
+        List<ExecutionAttemptID> runningSubtasksIds = new ArrayList<>();
+
+        // Check that all tasks are RUNNING before triggering anything. The
+        // triggering can still fail.
+        for (Tuple2<AccessExecutionVertex, 
CompletableFuture<TaskExecutorGateway>>
+                executionsWithGateway : subtasksWithGateways) {
+            AccessExecution execution = 
executionsWithGateway.f0.getCurrentExecutionAttempt();
+            if (execution != null && execution.getState() == 
ExecutionState.RUNNING) {
+                runningSubtasksIds.add(execution.getAttemptId());
+            } else {
+                return FutureUtils.completedExceptionally(
+                        new IllegalStateException(
+                                "Task "
+                                        + 
executionsWithGateway.f0.getTaskNameWithSubtaskIndex()
+                                        + " is not running."));
+            }
+        }
+
+        synchronized (lock) {
+            if (isShutDown) {
+                return FutureUtils.completedExceptionally(new 
IllegalStateException("Shut down"));
+            }
+
+            final int requestId = requestIdCounter++;
+
+            log.debug("Triggering thread info request {}", requestId);
+
+            final PendingThreadInfoRequest pending =
+                    new PendingThreadInfoRequest(requestId, 
runningSubtasksIds);
+
+            // requestTimeout is treated as the time on top of the expected 
sampling duration.
+            // Discard the request if it takes too long. We don't send cancel
+            // messages to the task managers, but only wait for the responses
+            // and then ignore them.
+            long expectedDuration = numSamples * 
delayBetweenSamples.toMilliseconds();
+            Time timeout = Time.milliseconds(expectedDuration + 
requestTimeout.toMilliseconds());
+
+            // Add the pending request before scheduling the discard task to
+            // prevent races with removing it again.
+            pendingRequests.put(requestId, pending);
+
+            ThreadInfoSamplesRequest requestParams =
+                    new ThreadInfoSamplesRequest(
+                            requestId, numSamples, delayBetweenSamples, 
maxStackTraceDepth);
+
+            requestThreadInfo(subtasksWithGateways, requestParams, timeout);
+
+            return pending.getStatsFuture();
+        }
+    }
+
+    /**
+     * Requests thread infos from given subtasks. The response would be 
ignored if it does not
+     * return within timeout.
+     */
+    private void requestThreadInfo(
+            List<Tuple2<AccessExecutionVertex, 
CompletableFuture<TaskExecutorGateway>>>
+                    subtasksWithGateways,
+            ThreadInfoSamplesRequest requestParams,
+            Time timeout) {
+
+        // Trigger samples collection from all subtasks
+        for (Tuple2<AccessExecutionVertex, 
CompletableFuture<TaskExecutorGateway>>
+                executionWithGateway : subtasksWithGateways) {
+
+            CompletableFuture<TaskExecutorGateway> executorGatewayFuture = 
executionWithGateway.f1;
+
+            ExecutionAttemptID taskExecutionAttemptId =
+                    
executionWithGateway.f0.getCurrentExecutionAttempt().getAttemptId();
+
+            CompletableFuture<TaskThreadInfoResponse> threadInfo =
+                    executorGatewayFuture.thenCompose(
+                            executorGateway ->
+                                    executorGateway.requestThreadInfoSamples(

Review comment:
       this will send an RPC for every attempt. To reduce the number of issues 
RPC calls we could batch requests for executions running in the same task 
executor.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to