zentol commented on a change in pull request #15054: URL: https://github.com/apache/flink/pull/15054#discussion_r602396389
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexFlameGraphHandler.java ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.FlameGraphTypeQueryParameter; +import org.apache.flink.runtime.rest.messages.JobVertexFlameGraphInfo; +import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.runtime.webmonitor.threadinfo.OperatorFlameGraph; +import org.apache.flink.runtime.webmonitor.threadinfo.OperatorFlameGraphFactory; +import org.apache.flink.runtime.webmonitor.threadinfo.OperatorThreadInfoStats; +import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoOperatorTracker; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.Executor; + +/** Request handler for the job vertex Flame Graph. */ +public class JobVertexFlameGraphHandler + extends AbstractJobVertexHandler<JobVertexFlameGraphInfo, JobVertexMessageParameters> { + + private final ThreadInfoOperatorTracker<OperatorThreadInfoStats> threadInfoOperatorTracker; + + private static JobVertexFlameGraphInfo createJobVertexFlameGraphInfo( + OperatorFlameGraph flameGraph) { + return new JobVertexFlameGraphInfo(flameGraph.getEndTime(), flameGraph.getRoot()); + } + + public JobVertexFlameGraphHandler( + GatewayRetriever<? extends RestfulGateway> leaderRetriever, + Time timeout, + Map<String, String> responseHeaders, + MessageHeaders<EmptyRequestBody, JobVertexFlameGraphInfo, JobVertexMessageParameters> Review comment: I know this is a common pattern in other handlers, but it's pretty eh; the handler should decide on it's own what headers it implements. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexFlameGraphHandler.java ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.FlameGraphTypeQueryParameter; +import org.apache.flink.runtime.rest.messages.JobVertexFlameGraphInfo; +import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.runtime.webmonitor.threadinfo.OperatorFlameGraph; +import org.apache.flink.runtime.webmonitor.threadinfo.OperatorFlameGraphFactory; +import org.apache.flink.runtime.webmonitor.threadinfo.OperatorThreadInfoStats; +import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoOperatorTracker; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.Executor; + +/** Request handler for the job vertex Flame Graph. */ +public class JobVertexFlameGraphHandler + extends AbstractJobVertexHandler<JobVertexFlameGraphInfo, JobVertexMessageParameters> { + + private final ThreadInfoOperatorTracker<OperatorThreadInfoStats> threadInfoOperatorTracker; + + private static JobVertexFlameGraphInfo createJobVertexFlameGraphInfo( + OperatorFlameGraph flameGraph) { + return new JobVertexFlameGraphInfo(flameGraph.getEndTime(), flameGraph.getRoot()); + } + + public JobVertexFlameGraphHandler( + GatewayRetriever<? extends RestfulGateway> leaderRetriever, + Time timeout, + Map<String, String> responseHeaders, + MessageHeaders<EmptyRequestBody, JobVertexFlameGraphInfo, JobVertexMessageParameters> + messageHeaders, + ExecutionGraphCache executionGraphCache, + Executor executor, + ThreadInfoOperatorTracker<OperatorThreadInfoStats> threadInfoOperatorTracker) { + super( + leaderRetriever, + timeout, + responseHeaders, + messageHeaders, + executionGraphCache, + executor); + this.threadInfoOperatorTracker = threadInfoOperatorTracker; + } + + @Override + protected JobVertexFlameGraphInfo handleRequest( + @Nonnull HandlerRequest<EmptyRequestBody, JobVertexMessageParameters> request, + @Nonnull AccessExecutionJobVertex jobVertex) Review comment: Nonnull is generally assumed default ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/messages/ThreadInfoSample.java ########## @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.messages; + +import java.io.Serializable; +import java.lang.management.LockInfo; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; + +/** + * A serializable wrapper around {@link java.lang.management.ThreadInfo} that excludes {@link + * java.lang.management.LockInfo}-based non-serializable fields. + */ +public class ThreadInfoSample implements Serializable { + + private final String threadName; + private final long threadId; + private final long blockedTime; + private final long blockedCount; + private final long waitedTime; + private final long waitedCount; + private final String lockName; + private final long lockOwnerId; + private final String lockOwnerName; + private final boolean inNative; + private final boolean suspended; + private final Thread.State threadState; + private final StackTraceElement[] stackTrace; Review comment: Do we really need all these things? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexFlameGraphHeaders.java ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.job.JobVertexFlameGraphHandler; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** Message headers for the {@link JobVertexFlameGraphHandler}. */ +public class JobVertexFlameGraphHeaders + implements MessageHeaders< + EmptyRequestBody, JobVertexFlameGraphInfo, JobVertexMessageParameters> { + + private static final JobVertexFlameGraphHeaders INSTANCE = new JobVertexFlameGraphHeaders(); + + private static final String URL = + "/jobs/:" + + JobIDPathParameter.KEY + + "/vertices/:" + + JobVertexIdPathParameter.KEY + + "/flamegraph"; + + @Override + public Class<EmptyRequestBody> getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public Class<JobVertexFlameGraphInfo> getResponseClass() { + return JobVertexFlameGraphInfo.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public JobVertexMessageParameters getUnresolvedMessageParameters() { + return new JobVertexMessageParameters(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } + + public static JobVertexFlameGraphHeaders getInstance() { + return INSTANCE; + } + + @Override + public String getDescription() { + return "Returns flame graph information for a job, and may initiate flame graph sampling if necessary."; Review comment: ```suggestion return "Returns flame graph information for a vertex, and may initiate flame graph sampling if necessary."; ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinator.java ########## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.threadinfo; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecution; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.messages.TaskThreadInfoResponse; +import org.apache.flink.runtime.messages.ThreadInfoSample; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.webmonitor.stats.TaskStatsRequestCoordinator; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** A coordinator for triggering and collecting thread info stats of running operator subtasks. */ +public class ThreadInfoRequestCoordinator + extends TaskStatsRequestCoordinator<List<ThreadInfoSample>, OperatorThreadInfoStats> { + + /** + * Creates a new coordinator for the job. + * + * @param executor Used to execute the futures. + * @param requestTimeout Time out after the expected sampling duration. This is added to the + * expected duration of a request, which is determined by the number of samples and the + * delay between each sample. + */ + public ThreadInfoRequestCoordinator(Executor executor, long requestTimeout) { + super(executor, requestTimeout); + } + + /** + * Triggers collection of thread info stats of an operator by combining thread info responses + * from given subtasks. A thread info response of a subtask in turn consists of {@code + * numSamples}, collected with {@code delayBetweenSamples} milliseconds delay between them. + * + * @param subtasksWithGateways Execution vertices together with TaskExecutors running them. + * @param numSamples Number of thread info samples to collect from each subtask. + * @param delayBetweenSamples Delay between consecutive samples (ms). + * @param maxStackTraceDepth Maximum depth of the stack traces collected within thread info + * samples. + * @return A future of the completed thread info stats. + */ + public CompletableFuture<OperatorThreadInfoStats> triggerThreadInfoRequest( + List<Tuple2<AccessExecutionVertex, CompletableFuture<TaskExecutorGateway>>> Review comment: This code would be easier to test if it wouldn't specifically require a TaskExecutorGateway; we could move the newly added method into a separate interface that the TM extends. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexFlameGraphInfo.java ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.handler.job.JobVertexFlameGraphHandler; +import org.apache.flink.runtime.webmonitor.threadinfo.OperatorFlameGraph; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +/** Response type of the {@link JobVertexFlameGraphHandler}. */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class JobVertexFlameGraphInfo implements ResponseBody { + + public static JobVertexFlameGraphInfo empty() { + return new JobVertexFlameGraphInfo(null, null); + } + + private static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp"; + private static final String FIELD_NAME_ROOT = "data"; + + @JsonProperty(FIELD_NAME_END_TIMESTAMP) + private final Long endTimestamp; + + @JsonProperty(FIELD_NAME_ROOT) + private final OperatorFlameGraph.Node root; + + @JsonCreator + public JobVertexFlameGraphInfo( + @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp, Review comment: ```suggestion @JsonProperty(FIELD_NAME_END_TIMESTAMP) long endTimestamp, ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java ########## @@ -266,4 +267,12 @@ void heartbeatFromTaskManager( */ CompletableFuture<ThreadDumpInfo> requestThreadDump( ResourceID taskManagerId, @RpcTimeout Time timeout); + + /** + * Requests the {@link TaskExecutorGateway}. + * + * @param taskManagerId identifying the {@link TaskExecutor}. + * @return Future containing the task executor gateway. + */ + CompletableFuture<TaskExecutorGateway> requestTaskExecutorGateway(ResourceID taskManagerId); Review comment: We generally don't expose other component gateways, because it make it unnecessarily difficult to mock the RMGateway in tests. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinator.java ########## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.threadinfo; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecution; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.messages.TaskThreadInfoResponse; +import org.apache.flink.runtime.messages.ThreadInfoSample; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.webmonitor.stats.TaskStatsRequestCoordinator; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** A coordinator for triggering and collecting thread info stats of running operator subtasks. */ +public class ThreadInfoRequestCoordinator + extends TaskStatsRequestCoordinator<List<ThreadInfoSample>, OperatorThreadInfoStats> { + + /** + * Creates a new coordinator for the job. + * + * @param executor Used to execute the futures. + * @param requestTimeout Time out after the expected sampling duration. This is added to the + * expected duration of a request, which is determined by the number of samples and the + * delay between each sample. + */ + public ThreadInfoRequestCoordinator(Executor executor, long requestTimeout) { + super(executor, requestTimeout); + } + + /** + * Triggers collection of thread info stats of an operator by combining thread info responses + * from given subtasks. A thread info response of a subtask in turn consists of {@code + * numSamples}, collected with {@code delayBetweenSamples} milliseconds delay between them. + * + * @param subtasksWithGateways Execution vertices together with TaskExecutors running them. + * @param numSamples Number of thread info samples to collect from each subtask. + * @param delayBetweenSamples Delay between consecutive samples (ms). + * @param maxStackTraceDepth Maximum depth of the stack traces collected within thread info + * samples. + * @return A future of the completed thread info stats. + */ + public CompletableFuture<OperatorThreadInfoStats> triggerThreadInfoRequest( + List<Tuple2<AccessExecutionVertex, CompletableFuture<TaskExecutorGateway>>> + subtasksWithGateways, + int numSamples, + Time delayBetweenSamples, + int maxStackTraceDepth) { + + checkNotNull(subtasksWithGateways, "Tasks to sample"); + checkArgument(subtasksWithGateways.size() > 0, "No tasks to sample"); + checkArgument(numSamples >= 1, "No number of samples"); + checkArgument(maxStackTraceDepth >= 0, "Negative maximum stack trace depth"); + + // Execution IDs of running tasks + List<ExecutionAttemptID> runningSubtasksIds = new ArrayList<>(); + + // Check that all tasks are RUNNING before triggering anything. The + // triggering can still fail. + for (Tuple2<AccessExecutionVertex, CompletableFuture<TaskExecutorGateway>> + executionsWithGateway : subtasksWithGateways) { + AccessExecution execution = executionsWithGateway.f0.getCurrentExecutionAttempt(); Review comment: IF all we need are the attemptIds then we should only require that to be passed in. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/FlameGraphTypeQueryParameter.java ########## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import java.util.Arrays; + +/** Termination mode query parameter. */ +public class FlameGraphTypeQueryParameter + extends MessageQueryParameter<FlameGraphTypeQueryParameter.Type> { + + private static final String key = "type"; + + public FlameGraphTypeQueryParameter() { + super(key, MessageParameterRequisiteness.OPTIONAL); + } + + @Override + public Type convertStringToValue(String value) { + return Type.valueOf(value.toUpperCase()); + } + + @Override + public String convertValueToString(Type value) { + return value.name().toLowerCase(); + } + + @Override + public String getDescription() { + return "String value that specifies the Flame Graph type. Supported options are: \"" + + Arrays.toString(Type.values()) + + "\"."; + } + + /** Termination mode. */ + public enum Type { + FULL, + ON_CPU, + OFF_CPU Review comment: Can we add a short description to each of these? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinator.java ########## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.threadinfo; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecution; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.messages.TaskThreadInfoResponse; +import org.apache.flink.runtime.messages.ThreadInfoSample; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.webmonitor.stats.TaskStatsRequestCoordinator; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** A coordinator for triggering and collecting thread info stats of running operator subtasks. */ +public class ThreadInfoRequestCoordinator + extends TaskStatsRequestCoordinator<List<ThreadInfoSample>, OperatorThreadInfoStats> { + + /** + * Creates a new coordinator for the job. + * + * @param executor Used to execute the futures. + * @param requestTimeout Time out after the expected sampling duration. This is added to the + * expected duration of a request, which is determined by the number of samples and the + * delay between each sample. + */ + public ThreadInfoRequestCoordinator(Executor executor, long requestTimeout) { + super(executor, requestTimeout); + } + + /** + * Triggers collection of thread info stats of an operator by combining thread info responses + * from given subtasks. A thread info response of a subtask in turn consists of {@code + * numSamples}, collected with {@code delayBetweenSamples} milliseconds delay between them. + * + * @param subtasksWithGateways Execution vertices together with TaskExecutors running them. + * @param numSamples Number of thread info samples to collect from each subtask. + * @param delayBetweenSamples Delay between consecutive samples (ms). + * @param maxStackTraceDepth Maximum depth of the stack traces collected within thread info + * samples. + * @return A future of the completed thread info stats. + */ + public CompletableFuture<OperatorThreadInfoStats> triggerThreadInfoRequest( + List<Tuple2<AccessExecutionVertex, CompletableFuture<TaskExecutorGateway>>> + subtasksWithGateways, + int numSamples, + Time delayBetweenSamples, + int maxStackTraceDepth) { + + checkNotNull(subtasksWithGateways, "Tasks to sample"); + checkArgument(subtasksWithGateways.size() > 0, "No tasks to sample"); + checkArgument(numSamples >= 1, "No number of samples"); + checkArgument(maxStackTraceDepth >= 0, "Negative maximum stack trace depth"); + + // Execution IDs of running tasks + List<ExecutionAttemptID> runningSubtasksIds = new ArrayList<>(); + + // Check that all tasks are RUNNING before triggering anything. The + // triggering can still fail. + for (Tuple2<AccessExecutionVertex, CompletableFuture<TaskExecutorGateway>> + executionsWithGateway : subtasksWithGateways) { + AccessExecution execution = executionsWithGateway.f0.getCurrentExecutionAttempt(); + if (execution != null && execution.getState() == ExecutionState.RUNNING) { + runningSubtasksIds.add(execution.getAttemptId()); + } else { + return FutureUtils.completedExceptionally( + new IllegalStateException( Review comment: Will failing so hard not creating quite a bit of noise? ########## File path: docs/layouts/shortcodes/generated/rest_v1_dispatcher.html ########## @@ -3636,6 +3656,16 @@ </ul> </td> </tr> + <tr> + <td colspan="2">Query parameters</td> + </tr> + <tr> + <td colspan="2"> + <ul> +<li><code>type</code> (optional): String value that specifies the Flame Graph type. Supported options are: "[FULL, ON_CPU, OFF_CPU]".</li> Review comment: seems weird that the backpressure headers now have a flamegraph type? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexFlameGraphInfo.java ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.handler.job.JobVertexFlameGraphHandler; +import org.apache.flink.runtime.webmonitor.threadinfo.OperatorFlameGraph; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +/** Response type of the {@link JobVertexFlameGraphHandler}. */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class JobVertexFlameGraphInfo implements ResponseBody { + + public static JobVertexFlameGraphInfo empty() { + return new JobVertexFlameGraphInfo(null, null); + } + + private static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp"; + private static final String FIELD_NAME_ROOT = "data"; + + @JsonProperty(FIELD_NAME_END_TIMESTAMP) + private final Long endTimestamp; + + @JsonProperty(FIELD_NAME_ROOT) + private final OperatorFlameGraph.Node root; + + @JsonCreator + public JobVertexFlameGraphInfo( + @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp, + @JsonProperty(FIELD_NAME_ROOT) OperatorFlameGraph.Node root) { + this.endTimestamp = endTimestamp; + this.root = root; + } + + public Long getEndTimestamp() { Review comment: ```suggestion publiclong getEndTimestamp() { ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexMessageParameters.java ########## @@ -26,6 +26,8 @@ public class JobVertexMessageParameters extends JobMessageParameters { public final JobVertexIdPathParameter jobVertexIdPathParameter = new JobVertexIdPathParameter(); + public final FlameGraphTypeQueryParameter flameGraphTypeQueryParameter = Review comment: Well this is pretty jank. Why is there not a separate parameters class extending this one? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ########## @@ -497,6 +505,32 @@ private void stopTaskExecutorServices() throws Exception { // RPC methods // ====================================================================== + @Override + public CompletableFuture<TaskThreadInfoResponse> requestThreadInfoSamples( + final ExecutionAttemptID taskExecutionAttemptId, + ThreadInfoSamplesRequest requestParams, Review comment: ```suggestion final ThreadInfoSamplesRequest requestParams, ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/stats/OperatorStatsTracker.java ########## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.stats; + +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.util.FlinkException; + +import java.util.Optional; + +/** + * Interface for a tracker of statistics for {@link AccessExecutionJobVertex}. + * + * @param <T> Type of statistics to track + */ +public interface OperatorStatsTracker<T extends Stats> { + + /** + * Returns statistics for an operator. Automatically triggers sampling request if statistics are + * not available or outdated. + * + * @param vertex Operator to get the stats for. + * @return Statistics for an operator + */ + Optional<T> getOperatorStats(AccessExecutionJobVertex vertex); Review comment: Javadocs are missing details on when the optional is empty ########## File path: flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java ########## @@ -156,6 +156,48 @@ .withDeprecatedKeys("jobmanager.web.backpressure.delay-between-samples") .withDescription("This config option is no longer used"); + /** Time, in milliseconds, after which cached stats are cleaned up if not accessed. */ + public static final ConfigOption<Integer> FLAMEGRAPH_CLEANUP_INTERVAL = Review comment: Duration types force the user to specify the unit when configuring it. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/ThreadInfoSampleService.java ########## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.messages.ThreadInfoSample; +import org.apache.flink.runtime.util.JvmUtils; +import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoSamplesRequest; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkNotNull; Review comment: inconsistent usage of null checks ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/messages/ThreadInfoSample.java ########## @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.messages; + +import java.io.Serializable; +import java.lang.management.LockInfo; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; + +/** + * A serializable wrapper around {@link java.lang.management.ThreadInfo} that excludes {@link Review comment: It seems a bit questionable. The licensing story around the JDK is a bit complicated, so let's be as pessimistic as possible. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/ThreadInfoSampleService.java ########## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.messages.ThreadInfoSample; +import org.apache.flink.runtime.util.JvmUtils; +import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoSamplesRequest; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Samples thread infos of tasks. */ +class ThreadInfoSampleService { + + private final ScheduledExecutor scheduledExecutor; + + ThreadInfoSampleService(final ScheduledExecutor scheduledExecutor) { + this.scheduledExecutor = + requireNonNull(scheduledExecutor, "scheduledExecutor must not be null"); + } + + /** + * Returns a future that completes with a given number of thread info samples of a task thread. + * + * @param task The task to be sampled from. + * @param requestParams Parameters of the sampling request. + * @return A future containing the stack trace samples. + */ + public CompletableFuture<List<ThreadInfoSample>> requestThreadInfoSamples( + final ThreadInfoSampleableTask task, final ThreadInfoSamplesRequest requestParams) { + checkNotNull(task, "task must not be null"); + + return requestThreadInfoSamples( + task, + requestParams.getNumSamples(), + requestParams.getDelayBetweenSamples(), + requestParams.getMaxStackTraceDepth(), + new ArrayList<>(requestParams.getNumSamples()), + new CompletableFuture<>()); Review comment: seems a bit odd to have this as a parameter -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
