afedulov commented on code in PR #19228:
URL: https://github.com/apache/flink/pull/19228#discussion_r853540712


##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/ThreadInfoSampleService.java:
##########
@@ -71,34 +74,44 @@
     }
 
     private void requestThreadInfoSamples(
-            final SampleableTask task,
+            final Collection<SampleableTask> tasks,
             final int numSamples,
             final Duration delayBetweenSamples,
             final int maxStackTraceDepth,
             final List<ThreadInfoSample> currentTraces,
             final CompletableFuture<List<ThreadInfoSample>> resultFuture) {
 
-        final long threadId = task.getExecutingThread().getId();
-        final Optional<ThreadInfoSample> threadInfoSample =
-                JvmUtils.createThreadInfoSample(threadId, maxStackTraceDepth);
+        final Collection<Long> threadIds =
+                tasks.stream()
+                        .map(t -> t.getExecutingThread().getId())
+                        .collect(Collectors.toList());
 
-        if (threadInfoSample.isPresent()) {
-            currentTraces.add(threadInfoSample.get());
+        final Collection<ThreadInfoSample> threadInfoSample =
+                JvmUtils.createThreadInfoSample(threadIds, maxStackTraceDepth);
+
+        if (!threadInfoSample.isEmpty()) {
+            currentTraces.addAll(threadInfoSample);
         } else if (!currentTraces.isEmpty()) {
+            // Requested tasks are not running anymore, completing with 
whatever was collected by
+            // now.
             resultFuture.complete(currentTraces);
         } else {
+            final String ids =
+                    tasks.stream()
+                            .map(SampleableTask::getExecutionId)
+                            .map(e -> e == null ? "unknown" : e.toString())

Review Comment:
   There are no concrete details, but the docs of the `ExecutionAttemptID` say 
the following:
   
   > Unique identifier for the attempt to execute a task. Multiple attempts 
happen in cases of failures and recovery.
   
   This makes me think that `ExecutionAttemptID` is volatile in relation to the 
Task. As this is an exceptional case where we do not get the expected thread 
info samples, I could see how this could be related to failures (no respective 
threads are running anymore) and hence the tasks not having the execution 
attempt id assigned. Filtering in advance is not really possible in this case 
because getting into this state is circumstantial and could potentially happen 
in the middle of the method's execution.
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to