This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 98057245162 [FLINK-32137][flamegraph] Added filtering of lambdas when 
building flame graphs
98057245162 is described below

commit 98057245162acd8f7008f72272564d536d461fd3
Author: Vladimir Matveev <[email protected]>
AuthorDate: Wed Jun 21 15:26:13 2023 -0700

    [FLINK-32137][flamegraph] Added filtering of lambdas when building flame 
graphs
---
 .../threadinfo/VertexFlameGraphFactory.java        | 44 +++++++++-
 .../taskexecutor/ThreadInfoSampleServiceTest.java  | 40 +++------
 .../threadinfo/VertexFlameGraphFactoryTest.java    | 99 ++++++++++++++++++++++
 3 files changed, 156 insertions(+), 27 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexFlameGraphFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexFlameGraphFactory.java
index 35f06663274..266f379c699 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexFlameGraphFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexFlameGraphFactory.java
@@ -28,6 +28,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 /** Factory class for creating Flame Graph representations. */
 public class VertexFlameGraphFactory {
@@ -79,7 +81,7 @@ public class VertexFlameGraphFactory {
                 sample.getSamplesBySubtask().values()) {
             for (ThreadInfoSample threadInfo : threadInfoSubSamples) {
                 if (threadStates.contains(threadInfo.getThreadState())) {
-                    StackTraceElement[] traces = threadInfo.getStackTrace();
+                    StackTraceElement[] traces = 
cleanLambdaNames(threadInfo.getStackTrace());
                     root.incrementHitCount();
                     NodeBuilder parent = root;
                     for (int i = traces.length - 1; i >= 0; i--) {
@@ -97,6 +99,46 @@ public class VertexFlameGraphFactory {
         return new VertexFlameGraph(sample.getEndTime(), root.toNode());
     }
 
+    // Matches class names like
+    //   
org.apache.flink.streaming.runtime.io.RecordProcessorUtils$$Lambda$773/0x00000001007f84a0
+    //   
org.apache.flink.runtime.taskexecutor.IdleTestTask$$Lambda$351/605293351
+    private static final Pattern LAMBDA_CLASS_NAME =
+            Pattern.compile("(\\$Lambda\\$)\\d+/(0x)?\\p{XDigit}+$");
+
+    // Drops stack trace elements with class names matching the above regular 
expression.
+    // These elements are useless, because they don't provide any additional 
information
+    // except the fact that a lambda is used (they don't have source 
information, for example),
+    // and also the lambda "class names" can be different across different 
JVMs, which pollutes
+    // flame graphs.
+    // Note that Thread.getStackTrace() performs a similar logic - the stack 
trace returned
+    // by this method will not contain lambda references with it. But 
ThreadMXBean does collect
+    // lambdas, so we have to clean them up explicitly.
+    private static StackTraceElement[] cleanLambdaNames(StackTraceElement[] 
stackTrace) {
+        StackTraceElement[] result = new StackTraceElement[stackTrace.length];
+        for (int i = 0; i < stackTrace.length; i++) {
+            StackTraceElement element = stackTrace[i];
+            Matcher matcher = 
LAMBDA_CLASS_NAME.matcher(element.getClassName());
+            if (matcher.find()) {
+                // 
org.apache.flink.streaming.runtime.io.RecordProcessorUtils$$Lambda$773/0x00000001007f84a0
+                //  -->
+                // 
org.apache.flink.streaming.runtime.io.RecordProcessorUtils$$Lambda$0/0x0
+                // This ensures that the name is stable across JVMs, but at 
the same time
+                // keeps the stack frame in the call since it has the method 
name, which
+                // may be useful for analysis.
+                String newClassName = matcher.replaceFirst("$10/$20");
+                result[i] =
+                        new StackTraceElement(
+                                newClassName,
+                                element.getMethodName(),
+                                element.getFileName(),
+                                element.getLineNumber());
+            } else {
+                result[i] = element;
+            }
+        }
+        return result;
+    }
+
     private static class NodeBuilder {
 
         private final Map<String, NodeBuilder> children = new HashMap<>();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/ThreadInfoSampleServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/ThreadInfoSampleServiceTest.java
index 4e14bafd6d6..ea41d22f9f2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/ThreadInfoSampleServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/ThreadInfoSampleServiceTest.java
@@ -78,12 +78,7 @@ class ThreadInfoSampleServiceTest {
                     tasks.add(new IdleTestTask());
                     Thread.sleep(2000);
 
-                    Map<Long, ExecutionAttemptID> threads =
-                            tasks.stream()
-                                    .collect(
-                                            Collectors.toMap(
-                                                    task -> 
task.getExecutingThread().getId(),
-                                                    
IdleTestTask::getExecutionId));
+                    Map<Long, ExecutionAttemptID> threads = 
collectExecutionAttempts(tasks);
                     final Map<ExecutionAttemptID, 
Collection<ThreadInfoSample>> threadInfoSamples =
                             threadInfoSampleService
                                     .requestThreadInfoSamples(threads, 
requestParams)
@@ -109,12 +104,8 @@ class ThreadInfoSampleServiceTest {
         executeWithTerminationGuarantee(
                 () -> {
                     tasks.add(new IdleTestTask());
-                    Map<Long, ExecutionAttemptID> threads =
-                            tasks.stream()
-                                    .collect(
-                                            Collectors.toMap(
-                                                    task -> 
task.getExecutingThread().getId(),
-                                                    
IdleTestTask::getExecutionId));
+                    Map<Long, ExecutionAttemptID> threads = 
collectExecutionAttempts(tasks);
+
                     final Map<ExecutionAttemptID, 
Collection<ThreadInfoSample>> threadInfoSamples1 =
                             threadInfoSampleService
                                     .requestThreadInfoSamples(threads, 
requestParams)
@@ -158,14 +149,7 @@ class ThreadInfoSampleServiceTest {
                                             tasks.add(new IdleTestTask());
 
                                             Map<Long, ExecutionAttemptID> 
threads =
-                                                    tasks.stream()
-                                                            .collect(
-                                                                    
Collectors.toMap(
-                                                                            
task ->
-                                                                               
     task.getExecutingThread()
-                                                                               
             .getId(),
-                                                                            
IdleTestTask
-                                                                               
     ::getExecutionId));
+                                                    
collectExecutionAttempts(tasks);
                                             
threadInfoSampleService.requestThreadInfoSamples(
                                                     threads,
                                                     new 
ThreadInfoSamplesRequest(
@@ -186,12 +170,7 @@ class ThreadInfoSampleServiceTest {
         Set<SampleableTask> tasks = new HashSet<>();
         tasks.add(new NotRunningTask());
 
-        Map<Long, ExecutionAttemptID> threads =
-                tasks.stream()
-                        .collect(
-                                Collectors.toMap(
-                                        task -> 
task.getExecutingThread().getId(),
-                                        SampleableTask::getExecutionId));
+        Map<Long, ExecutionAttemptID> threads = 
collectExecutionAttempts(tasks);
         final CompletableFuture<Map<ExecutionAttemptID, 
Collection<ThreadInfoSample>>>
                 sampleFuture =
                         
threadInfoSampleService.requestThreadInfoSamples(threads, requestParams);
@@ -201,6 +180,15 @@ class ThreadInfoSampleServiceTest {
                 .isInstanceOf(IllegalStateException.class);
     }
 
+    private static Map<Long, ExecutionAttemptID> collectExecutionAttempts(
+            Set<? extends SampleableTask> tasks) {
+        return tasks.stream()
+                .collect(
+                        Collectors.toMap(
+                                task -> task.getExecutingThread().getId(),
+                                SampleableTask::getExecutionId));
+    }
+
     private static class NotRunningTask implements SampleableTask {
 
         private final ExecutionAttemptID executionId = 
ExecutionAttemptID.randomId();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexFlameGraphFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexFlameGraphFactoryTest.java
new file mode 100644
index 00000000000..583f8ae83f0
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexFlameGraphFactoryTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.threadinfo;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.ThreadInfoSample;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.jupiter.api.Test;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+
+/** Tests for {@link VertexFlameGraphFactory}. */
+public class VertexFlameGraphFactoryTest extends TestLogger {
+    /** Tests that lambda class names are cleaned up inside the stack traces. 
*/
+    @Test
+    public void testLambdaClassNamesCleanUp() {
+        Map<ExecutionAttemptID, Collection<ThreadInfoSample>> samplesBySubtask 
= generateSamples();
+
+        VertexThreadInfoStats sample = new VertexThreadInfoStats(0, 0, 0, 
samplesBySubtask);
+
+        VertexFlameGraph graph = 
VertexFlameGraphFactory.createFullFlameGraphFrom(sample);
+        int encounteredLambdas = verifyRecursively(graph.getRoot());
+        if (encounteredLambdas == 0) {
+            fail("No lambdas encountered in the test, cleanup functionality 
was not tested");
+        }
+    }
+
+    private int verifyRecursively(VertexFlameGraph.Node node) {
+        String location = node.getStackTraceLocation();
+        int lambdas = 0;
+        if (location.contains("$Lambda$")) {
+            lambdas++;
+            //    com.example.ClassName.method:123
+            // -> com.example.ClassName.method
+            // -> com.example.ClassName
+            String locationWithoutLineNumber = location.substring(0, 
location.lastIndexOf(":"));
+            String className =
+                    locationWithoutLineNumber.substring(
+                            0, locationWithoutLineNumber.lastIndexOf("."));
+            assertThat(className).endsWith("$Lambda$0/0");
+        }
+        return lambdas + 
node.getChildren().stream().mapToInt(this::verifyRecursively).sum();
+    }
+
+    private Map<ExecutionAttemptID, Collection<ThreadInfoSample>> 
generateSamples() {
+        ThreadInfoSample sample1 = 
ThreadInfoSample.from(getStackTraceWithLambda()).get();
+
+        List<ThreadInfoSample> samples = new ArrayList<>();
+        samples.add(sample1);
+
+        ExecutionAttemptID executionAttemptID =
+                new ExecutionAttemptID(
+                        new ExecutionGraphID(), new ExecutionVertexID(new 
JobVertexID(), 0), 0);
+
+        Map<ExecutionAttemptID, Collection<ThreadInfoSample>> result = new 
HashMap<>();
+        result.put(executionAttemptID, samples);
+
+        return result;
+    }
+
+    private ThreadInfo getStackTraceWithLambda() {
+        Supplier<ThreadInfo> r1 =
+                () ->
+                        ManagementFactory.getThreadMXBean()
+                                .getThreadInfo(Thread.currentThread().getId(), 
Integer.MAX_VALUE);
+        Supplier<ThreadInfo> r2 = () -> r1.get();
+        return r2.get();
+    }
+}

Reply via email to