AHeise commented on code in PR #19228:
URL: https://github.com/apache/flink/pull/19228#discussion_r862691155


##########
flink-runtime/src/main/java/org/apache/flink/runtime/messages/ThreadInfoSample.java:
##########
@@ -67,11 +67,15 @@ public static Optional<ThreadInfoSample> from(@Nullable 
ThreadInfo threadInfo) {
      * @param threadInfos {@link ThreadInfo} array where the data will be 
copied from.
      * @return a Collection of the corresponding {@link ThreadInfoSample}s
      */
-    public static Collection<ThreadInfoSample> from(ThreadInfo[] threadInfos) {

Review Comment:
   ```
       public StackTraceElement[] getStackTrace() {
           return stackTrace.clone();
       }
   ```
   Why do we clone here?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinatorTest.java:
##########
@@ -227,10 +229,17 @@ private static 
CompletableFuture<TaskExecutorThreadInfoGateway> createMockTaskMa
         final CompletableFuture<TaskThreadInfoResponse> responseFuture = new 
CompletableFuture<>();
         switch (completionType) {
             case SUCCESSFULLY:
-                ThreadInfoSample sample =
-                        
JvmUtils.createThreadInfoSample(Thread.currentThread().getId(), 100).get();
+                List<IdleTestTask> tasks = new ArrayList<>();
+                tasks.add(new IdleTestTask());
+                tasks.add(new IdleTestTask());
+                List<Long> threadIds =
+                        tasks.stream()
+                                .map(t -> t.getExecutingThread().getId())
+                                .collect(Collectors.toList());
+                Collection<ThreadInfoSample> threadInfoSample =
+                        JvmUtils.createThreadInfoSample(threadIds, 100);
                 responseFuture.complete(
-                        new 
TaskThreadInfoResponse(Collections.singletonList(sample)));
+                        new TaskThreadInfoResponse(new 
ArrayList<>(threadInfoSample)));

Review Comment:
   Can you align the the types so that a conversion is not necessary? (either 
always collection or always list?)
   A conversion from collection to list smells (either you need the order or 
you don't but once the order is lost there is no way back).



##########
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java:
##########
@@ -204,31 +206,33 @@ private void triggerThreadInfoSampleInternal(
         }
     }
 
-    private Map<Set<ExecutionAttemptID>, 
CompletableFuture<TaskExecutorThreadInfoGateway>>
+    private Map<ImmutableSet<ExecutionAttemptID>, 
CompletableFuture<TaskExecutorThreadInfoGateway>>

Review Comment:
   I think it makes it much clearer to use `ImmutableSet` for keys. So I'd keep 
the change.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinatorTest.java:
##########
@@ -265,15 +267,20 @@ private static 
CompletableFuture<TaskExecutorThreadInfoGateway> createMockTaskMa
         return CompletableFuture.completedFuture(executorGateway);
     }
 
-    private static Map<Set<ExecutionAttemptID>, 
CompletableFuture<TaskExecutorThreadInfoGateway>>
+    private static Map<
+                    ImmutableSet<ExecutionAttemptID>,
+                    CompletableFuture<TaskExecutorThreadInfoGateway>>
             createMockSubtaskWithGateways(CompletionType... completionTypes) {
-        final Map<Set<ExecutionAttemptID>, 
CompletableFuture<TaskExecutorThreadInfoGateway>>
+        final Map<
+                        ImmutableSet<ExecutionAttemptID>,
+                        CompletableFuture<TaskExecutorThreadInfoGateway>>
                 result = new HashMap<>();
         for (CompletionType completionType : completionTypes) {
             Set<ExecutionAttemptID> attemptIds = new HashSet<>();

Review Comment:
   You should be able to avoid `attemptIds` entirely. Can't you use 
`ImmutableSet.of(new ExecutionAttemptID(), new ExecutionAttemptID())`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java:
##########
@@ -204,31 +206,33 @@ private void triggerThreadInfoSampleInternal(
         }
     }
 
-    private Map<Set<ExecutionAttemptID>, 
CompletableFuture<TaskExecutorThreadInfoGateway>>
+    private Map<ImmutableSet<ExecutionAttemptID>, 
CompletableFuture<TaskExecutorThreadInfoGateway>>
             matchExecutionsWithGateways(
                     AccessExecutionVertex[] executionVertices,
                     ResourceManagerGateway resourceManagerGateway) {
 
         // Group executions by their TaskManagerLocation to be able to issue 
one sampling
         // request per TaskManager for all relevant tasks at once
-        final Map<TaskManagerLocation, Set<ExecutionAttemptID>> 
executionsByLocation =
+        final Map<TaskManagerLocation, ImmutableSet<ExecutionAttemptID>> 
executionsByLocation =

Review Comment:
   This is not needed strictly speaking but it may ease things, so I'm fine 
either way. Just wanted to make it explicit. Certainly better than using `? 
extends Set` here.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/messages/ThreadInfoSample.java:
##########
@@ -67,11 +67,15 @@ public static Optional<ThreadInfoSample> from(@Nullable 
ThreadInfo threadInfo) {
      * @param threadInfos {@link ThreadInfo} array where the data will be 
copied from.
      * @return a Collection of the corresponding {@link ThreadInfoSample}s
      */
-    public static Collection<ThreadInfoSample> from(ThreadInfo[] threadInfos) {
+    public static Collection<ThreadInfoSample> from(
+            ThreadInfo[] threadInfos, long[] requestedThreadIds) {
         Collection<ThreadInfoSample> result = new ArrayList<>();
-        for (ThreadInfo threadInfo : threadInfos) {
+        for (int i = 0; i < threadInfos.length; i++) {
+            ThreadInfo threadInfo = threadInfos[i];
             if (threadInfo == null) {
-                LOG.warn("Missing thread info.");
+                LOG.warn(
+                        "FlameGraphs: thread {} is not alive or does not 
exist.",
+                        requestedThreadIds[i]);
             } else {

Review Comment:
   I'd still prefer it on call site and have the invariant that all 
`ThreadInfo` is non-null. Arrays with nulls are a nightmare and I don't see 
that this is necessary here (it's not a concurrent section where nulls cannot 
be avoided sometimes).
   
   Btw do we really want to log warnings? I'd imagine that this is a common 
case for bounded applications and it would spam the log for larger setups.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/IdleTestTask.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+/** The test task that creates an idle (sleeping) thread. */
+public class IdleTestTask implements SampleableTask {
+
+    private final ExecutionAttemptID executionAttemptID = new 
ExecutionAttemptID();
+    private final Thread thread;
+
+    /** Instantiates a new idle test task with default sleep duration (10s). */
+    public IdleTestTask() {
+        this(10000L);
+    }

Review Comment:
   Inline so the tests become easier to read?
   Also should you wait indefinitely in case of azure hickups?
   What happens with spurious wakeups? Do tests fail then?
   
   A clean solution would be to have a `run` like this:
   ```java
                         while (!stopped) {
                               try {
                                   Thread.sleep(100);
                               } catch (InterruptedException e) {
                               }
                        }
   ```
   
   Then your test code is responsible for setting the `volatile boolean 
stopped` and shutting the thread down (ideally in a `finally` clause).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to