fapaul commented on code in PR #19228:
URL: https://github.com/apache/flink/pull/19228#discussion_r880228717


##########
flink-runtime/src/main/java/org/apache/flink/runtime/messages/ThreadInfoSample.java:
##########
@@ -18,18 +18,25 @@
 
 package org.apache.flink.runtime.messages;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
 import java.lang.management.ThreadInfo;
+import java.util.Collection;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 /**
  * A serializable wrapper container for transferring parts of the {@link
  * java.lang.management.ThreadInfo}.
  */
 public class ThreadInfoSample implements Serializable {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(ThreadInfoSample.class);

Review Comment:
   Seems to be unused



##########
flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmUtils.java:
##########
@@ -56,6 +64,39 @@ public static Optional<ThreadInfoSample> 
createThreadInfoSample(
         return ThreadInfoSample.from(threadMxBean.getThreadInfo(threadId, 
maxStackTraceDepth));
     }
 
+    /**
+     * Creates a {@link ThreadInfoSample} for a specific thread. Contains 
thread traces if
+     * maxStackTraceDepth > 0.
+     *
+     * @param threadIds The IDs of the threads to create the thread dump for.
+     * @param maxStackTraceDepth The maximum number of entries in the stack 
trace to be collected.
+     * @return The thread information for the requested thread IDs.
+     */
+    public static Collection<ThreadInfoSample> createThreadInfoSample(
+            Collection<Long> threadIds, int maxStackTraceDepth) {
+        ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean();
+        long[] threadIdsArray = threadIds.stream().mapToLong(l -> l).toArray();
+
+        ThreadInfo[] threadInfo = threadMxBean.getThreadInfo(threadIdsArray, 
maxStackTraceDepth);
+
+        List<ThreadInfo> threadInfoNoNulls =
+                IntStream.range(0, threadIdsArray.length)
+                        .filter(
+                                i -> {
+                                    if (threadInfo[i] == null) {
+                                        LOG.debug(

Review Comment:
   Nit: Not use if the sampling path is considered performance-critical but you 
can add 
   ```java
   if (log.isDebugEnabled()) {
   ...
               }
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/ThreadInfoSampleServiceTest.java:
##########
@@ -19,34 +19,30 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
-import org.apache.flink.core.testutils.FlinkMatchers;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.messages.ThreadInfoSample;
 import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoSamplesRequest;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.After;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 
 import java.time.Duration;
-import java.util.List;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
-import static org.hamcrest.Matchers.arrayWithSize;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static 
org.apache.flink.runtime.taskexecutor.IdleTestTask.executeWithTerminationGuarantee;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link ThreadInfoSampleService}. */
+@Timeout(10)

Review Comment:
   Nit: Afaik it is discouraged to use timeouts for tests in the Flink codebase 
because it may swallow the information about the actual blocking location and 
introduce flakiness on slower machines. Since it has been like this before I am 
fine to keep it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to