This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 98057245162 [FLINK-32137][flamegraph] Added filtering of lambdas when
building flame graphs
98057245162 is described below
commit 98057245162acd8f7008f72272564d536d461fd3
Author: Vladimir Matveev <[email protected]>
AuthorDate: Wed Jun 21 15:26:13 2023 -0700
[FLINK-32137][flamegraph] Added filtering of lambdas when building flame
graphs
---
.../threadinfo/VertexFlameGraphFactory.java | 44 +++++++++-
.../taskexecutor/ThreadInfoSampleServiceTest.java | 40 +++------
.../threadinfo/VertexFlameGraphFactoryTest.java | 99 ++++++++++++++++++++++
3 files changed, 156 insertions(+), 27 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexFlameGraphFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexFlameGraphFactory.java
index 35f06663274..266f379c699 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexFlameGraphFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexFlameGraphFactory.java
@@ -28,6 +28,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
/** Factory class for creating Flame Graph representations. */
public class VertexFlameGraphFactory {
@@ -79,7 +81,7 @@ public class VertexFlameGraphFactory {
sample.getSamplesBySubtask().values()) {
for (ThreadInfoSample threadInfo : threadInfoSubSamples) {
if (threadStates.contains(threadInfo.getThreadState())) {
- StackTraceElement[] traces = threadInfo.getStackTrace();
+ StackTraceElement[] traces =
cleanLambdaNames(threadInfo.getStackTrace());
root.incrementHitCount();
NodeBuilder parent = root;
for (int i = traces.length - 1; i >= 0; i--) {
@@ -97,6 +99,46 @@ public class VertexFlameGraphFactory {
return new VertexFlameGraph(sample.getEndTime(), root.toNode());
}
+ // Matches class names like
+ //
org.apache.flink.streaming.runtime.io.RecordProcessorUtils$$Lambda$773/0x00000001007f84a0
+ //
org.apache.flink.runtime.taskexecutor.IdleTestTask$$Lambda$351/605293351
+ private static final Pattern LAMBDA_CLASS_NAME =
+ Pattern.compile("(\\$Lambda\\$)\\d+/(0x)?\\p{XDigit}+$");
+
+ // Drops stack trace elements with class names matching the above regular
expression.
+ // These elements are useless, because they don't provide any additional
information
+ // except the fact that a lambda is used (they don't have source
information, for example),
+ // and also the lambda "class names" can be different across different
JVMs, which pollutes
+ // flame graphs.
+ // Note that Thread.getStackTrace() performs a similar logic - the stack
trace returned
+ // by this method will not contain lambda references with it. But
ThreadMXBean does collect
+ // lambdas, so we have to clean them up explicitly.
+ private static StackTraceElement[] cleanLambdaNames(StackTraceElement[]
stackTrace) {
+ StackTraceElement[] result = new StackTraceElement[stackTrace.length];
+ for (int i = 0; i < stackTrace.length; i++) {
+ StackTraceElement element = stackTrace[i];
+ Matcher matcher =
LAMBDA_CLASS_NAME.matcher(element.getClassName());
+ if (matcher.find()) {
+ //
org.apache.flink.streaming.runtime.io.RecordProcessorUtils$$Lambda$773/0x00000001007f84a0
+ // -->
+ //
org.apache.flink.streaming.runtime.io.RecordProcessorUtils$$Lambda$0/0x0
+ // This ensures that the name is stable across JVMs, but at
the same time
+ // keeps the stack frame in the call since it has the method
name, which
+ // may be useful for analysis.
+ String newClassName = matcher.replaceFirst("$10/$20");
+ result[i] =
+ new StackTraceElement(
+ newClassName,
+ element.getMethodName(),
+ element.getFileName(),
+ element.getLineNumber());
+ } else {
+ result[i] = element;
+ }
+ }
+ return result;
+ }
+
private static class NodeBuilder {
private final Map<String, NodeBuilder> children = new HashMap<>();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/ThreadInfoSampleServiceTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/ThreadInfoSampleServiceTest.java
index 4e14bafd6d6..ea41d22f9f2 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/ThreadInfoSampleServiceTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/ThreadInfoSampleServiceTest.java
@@ -78,12 +78,7 @@ class ThreadInfoSampleServiceTest {
tasks.add(new IdleTestTask());
Thread.sleep(2000);
- Map<Long, ExecutionAttemptID> threads =
- tasks.stream()
- .collect(
- Collectors.toMap(
- task ->
task.getExecutingThread().getId(),
-
IdleTestTask::getExecutionId));
+ Map<Long, ExecutionAttemptID> threads =
collectExecutionAttempts(tasks);
final Map<ExecutionAttemptID,
Collection<ThreadInfoSample>> threadInfoSamples =
threadInfoSampleService
.requestThreadInfoSamples(threads,
requestParams)
@@ -109,12 +104,8 @@ class ThreadInfoSampleServiceTest {
executeWithTerminationGuarantee(
() -> {
tasks.add(new IdleTestTask());
- Map<Long, ExecutionAttemptID> threads =
- tasks.stream()
- .collect(
- Collectors.toMap(
- task ->
task.getExecutingThread().getId(),
-
IdleTestTask::getExecutionId));
+ Map<Long, ExecutionAttemptID> threads =
collectExecutionAttempts(tasks);
+
final Map<ExecutionAttemptID,
Collection<ThreadInfoSample>> threadInfoSamples1 =
threadInfoSampleService
.requestThreadInfoSamples(threads,
requestParams)
@@ -158,14 +149,7 @@ class ThreadInfoSampleServiceTest {
tasks.add(new IdleTestTask());
Map<Long, ExecutionAttemptID>
threads =
- tasks.stream()
- .collect(
-
Collectors.toMap(
-
task ->
-
task.getExecutingThread()
-
.getId(),
-
IdleTestTask
-
::getExecutionId));
+
collectExecutionAttempts(tasks);
threadInfoSampleService.requestThreadInfoSamples(
threads,
new
ThreadInfoSamplesRequest(
@@ -186,12 +170,7 @@ class ThreadInfoSampleServiceTest {
Set<SampleableTask> tasks = new HashSet<>();
tasks.add(new NotRunningTask());
- Map<Long, ExecutionAttemptID> threads =
- tasks.stream()
- .collect(
- Collectors.toMap(
- task ->
task.getExecutingThread().getId(),
- SampleableTask::getExecutionId));
+ Map<Long, ExecutionAttemptID> threads =
collectExecutionAttempts(tasks);
final CompletableFuture<Map<ExecutionAttemptID,
Collection<ThreadInfoSample>>>
sampleFuture =
threadInfoSampleService.requestThreadInfoSamples(threads, requestParams);
@@ -201,6 +180,15 @@ class ThreadInfoSampleServiceTest {
.isInstanceOf(IllegalStateException.class);
}
+ private static Map<Long, ExecutionAttemptID> collectExecutionAttempts(
+ Set<? extends SampleableTask> tasks) {
+ return tasks.stream()
+ .collect(
+ Collectors.toMap(
+ task -> task.getExecutingThread().getId(),
+ SampleableTask::getExecutionId));
+ }
+
private static class NotRunningTask implements SampleableTask {
private final ExecutionAttemptID executionId =
ExecutionAttemptID.randomId();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexFlameGraphFactoryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexFlameGraphFactoryTest.java
new file mode 100644
index 00000000000..583f8ae83f0
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexFlameGraphFactoryTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.ThreadInfoSample;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.jupiter.api.Test;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+
+/** Tests for {@link VertexFlameGraphFactory}. */
+public class VertexFlameGraphFactoryTest extends TestLogger {
+ /** Tests that lambda class names are cleaned up inside the stack traces.
*/
+ @Test
+ public void testLambdaClassNamesCleanUp() {
+ Map<ExecutionAttemptID, Collection<ThreadInfoSample>> samplesBySubtask
= generateSamples();
+
+ VertexThreadInfoStats sample = new VertexThreadInfoStats(0, 0, 0,
samplesBySubtask);
+
+ VertexFlameGraph graph =
VertexFlameGraphFactory.createFullFlameGraphFrom(sample);
+ int encounteredLambdas = verifyRecursively(graph.getRoot());
+ if (encounteredLambdas == 0) {
+ fail("No lambdas encountered in the test, cleanup functionality
was not tested");
+ }
+ }
+
+ private int verifyRecursively(VertexFlameGraph.Node node) {
+ String location = node.getStackTraceLocation();
+ int lambdas = 0;
+ if (location.contains("$Lambda$")) {
+ lambdas++;
+ // com.example.ClassName.method:123
+ // -> com.example.ClassName.method
+ // -> com.example.ClassName
+ String locationWithoutLineNumber = location.substring(0,
location.lastIndexOf(":"));
+ String className =
+ locationWithoutLineNumber.substring(
+ 0, locationWithoutLineNumber.lastIndexOf("."));
+ assertThat(className).endsWith("$Lambda$0/0");
+ }
+ return lambdas +
node.getChildren().stream().mapToInt(this::verifyRecursively).sum();
+ }
+
+ private Map<ExecutionAttemptID, Collection<ThreadInfoSample>>
generateSamples() {
+ ThreadInfoSample sample1 =
ThreadInfoSample.from(getStackTraceWithLambda()).get();
+
+ List<ThreadInfoSample> samples = new ArrayList<>();
+ samples.add(sample1);
+
+ ExecutionAttemptID executionAttemptID =
+ new ExecutionAttemptID(
+ new ExecutionGraphID(), new ExecutionVertexID(new
JobVertexID(), 0), 0);
+
+ Map<ExecutionAttemptID, Collection<ThreadInfoSample>> result = new
HashMap<>();
+ result.put(executionAttemptID, samples);
+
+ return result;
+ }
+
+ private ThreadInfo getStackTraceWithLambda() {
+ Supplier<ThreadInfo> r1 =
+ () ->
+ ManagementFactory.getThreadMXBean()
+ .getThreadInfo(Thread.currentThread().getId(),
Integer.MAX_VALUE);
+ Supplier<ThreadInfo> r2 = () -> r1.get();
+ return r2.get();
+ }
+}