This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a7dae26cf6d MSQ: Add workerDesc to WorkerStats. (#19171)
a7dae26cf6d is described below

commit a7dae26cf6d9ac64ef08c4fa6509cef7f8a4973b
Author: Gian Merlino <[email protected]>
AuthorDate: Wed Mar 18 09:12:33 2026 -0700

    MSQ: Add workerDesc to WorkerStats. (#19171)
    
    The existing field workerId is designed for MSQ itself to uniquely
    identify a worker. The new field workerDesc is for users (and the web
    console) to more easily identify where a worker is running. Its javadoc
    specs it as either a task ID or a host:port.
    
    This patch also updates the web console to use workerDesc, if available,
    to show the task (with link to the tasks view) or show the server. This
    replaces some older, more brittle logic that was constructing a task ID
    from the query ID and worker number. The approach in this patch better
    handles task relaunches, in which case the worker number -> task ID
    mapping changes.
---
 .../msq/dart/controller/DartControllerContext.java |  8 ++++-
 .../msq/dart/controller/DartWorkerManager.java     |  5 +++-
 .../org/apache/druid/msq/exec/WorkerStats.java     | 34 +++++++++++++++++----
 .../druid/msq/indexing/MSQWorkerTaskLauncher.java  | 10 +++----
 .../msq/dart/controller/DartWorkerManagerTest.java | 10 +++++--
 .../src/druid-models/execution/execution.spec.ts   | 12 ++++++++
 .../src/druid-models/execution/execution.ts        |  6 ++++
 web-console/src/druid-models/task/task.ts          |  1 +
 .../execution-details-pane.spec.tsx.snap           | 30 +++++++++++++++++++
 .../execution-stages-pane.tsx                      | 35 +++++++++++++++-------
 10 files changed, 125 insertions(+), 26 deletions(-)

diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
index 9276c11916b..b7e1b3c94bc 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
@@ -52,6 +52,7 @@ import org.apache.druid.server.coordination.ServerType;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * Dart implementation of {@link ControllerContext}.
@@ -210,7 +211,12 @@ public class DartControllerContext implements 
ControllerContext
   {
     // We're ignoring WorkerFailureListener. Dart worker failures are routed 
into the controller by
     // ControllerMessageListener, which receives a notification when a worker 
goes offline.
-    return new DartWorkerManager(queryKernelConfig.getWorkerIds(), 
workerClient);
+    final List<String> workerIds = queryKernelConfig.getWorkerIds();
+    return new DartWorkerManager(
+        workerIds,
+        workerIds.stream().map(id -> 
WorkerId.fromString(id).getHostAndPort()).collect(Collectors.toList()),
+        workerClient
+    );
   }
 
   @Override
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java
index 8d479671a94..05a26c39344 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java
@@ -59,6 +59,7 @@ public class DartWorkerManager implements WorkerManager
   private static final Logger log = new Logger(DartWorkerManager.class);
 
   private final List<String> workerIds;
+  private final List<String> workerDescs;
   private final DartWorkerClient workerClient;
   private final Object2IntMap<String> workerIdToNumber;
   private final AtomicReference<State> state = new 
AtomicReference<>(State.NEW);
@@ -73,10 +74,12 @@ public class DartWorkerManager implements WorkerManager
 
   public DartWorkerManager(
       final List<String> workerIds,
+      final List<String> workerDescs,
       final DartWorkerClient workerClient
   )
   {
     this.workerIds = workerIds;
+    this.workerDescs = workerDescs;
     this.workerClient = workerClient;
     this.workerIdToNumber = new Object2IntOpenHashMap<>();
     this.workerIdToNumber.defaultReturnValue(UNKNOWN_WORKER_NUMBER);
@@ -158,7 +161,7 @@ public class DartWorkerManager implements WorkerManager
     final Int2ObjectMap<List<WorkerStats>> retVal = new 
Int2ObjectAVLTreeMap<>();
 
     for (int i = 0; i < workerIds.size(); i++) {
-      retVal.put(i, Collections.singletonList(new 
WorkerStats(workerIds.get(i), TaskState.RUNNING, -1, -1)));
+      retVal.put(i, Collections.singletonList(new 
WorkerStats(workerIds.get(i), workerDescs.get(i), TaskState.RUNNING, -1, -1)));
     }
 
     return retVal;
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerStats.java 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerStats.java
index 831ea645e40..8011c115281 100644
--- a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerStats.java
+++ b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerStats.java
@@ -20,38 +20,60 @@
 package org.apache.druid.msq.exec;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.indexer.TaskState;
 
+import javax.annotation.Nullable;
 import java.util.Objects;
 
 public class WorkerStats
 {
   private final String workerId;
+  @Nullable
+  private final String workerDesc;
   private final TaskState state;
   private final long durationMs;
   private final long pendingMs;
 
   @JsonCreator
   public WorkerStats(
-      @JsonProperty("workerId") String workerId,
-      @JsonProperty("state") TaskState state,
-      @JsonProperty("durationMs") long durationMs,
-      @JsonProperty("pendingMs") long pendingMs
+      @JsonProperty("workerId") final String workerId,
+      @JsonProperty("workerDesc") @Nullable final String workerDesc,
+      @JsonProperty("state") final TaskState state,
+      @JsonProperty("durationMs") final long durationMs,
+      @JsonProperty("pendingMs") final long pendingMs
   )
   {
     this.workerId = workerId;
+    this.workerDesc = workerDesc;
     this.state = state;
     this.durationMs = durationMs;
     this.pendingMs = pendingMs;
   }
 
+  /**
+   * Unique worker ID, same as {@link Worker#id()}.
+   */
   @JsonProperty
   public String getWorkerId()
   {
     return workerId;
   }
 
+  /**
+   * Worker description. Used by the web console to more easily find where a 
worker is running. Generally this is
+   * either a task ID or a host:port. It may not be unique; for example, with 
Dart, if multiple queries run on the
+   * same host:port then they would have unique workerId but would have the 
same workerDesc.
+   */
+  @Nullable
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public String getWorkerDesc()
+  {
+    return workerDesc;
+  }
+
   @JsonProperty
   public TaskState getState()
   {
@@ -83,13 +105,14 @@ public class WorkerStats
     return durationMs == that.durationMs
            && pendingMs == that.pendingMs
            && Objects.equals(workerId, that.workerId)
+           && Objects.equals(workerDesc, that.workerDesc)
            && state == that.state;
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(workerId, state, durationMs, pendingMs);
+    return Objects.hash(workerId, workerDesc, state, durationMs, pendingMs);
   }
 
   @Override
@@ -97,6 +120,7 @@ public class WorkerStats
   {
     return "WorkerStats{" +
            "workerId='" + workerId + '\'' +
+           ", workerDesc='" + workerDesc + '\'' +
            ", state=" + state +
            ", durationMs=" + durationMs +
            ", pendingMs=" + pendingMs +
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
index d2babe86101..d773f225fac 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
@@ -280,7 +280,7 @@ public class MSQWorkerTaskLauncher implements 
RetryCapableWorkerManager
       throws InterruptedException
   {
     Set<IntObjectPair<MSQFault>> failedWorkers = new HashSet<>();
-    
+
     synchronized (taskIds) {
       retryInactiveTasksIfNeeded(workerCount);
 
@@ -294,7 +294,7 @@ public class MSQWorkerTaskLauncher implements 
RetryCapableWorkerManager
           FutureUtils.getUnchecked(stopFuture, false);
           throw new ISE("Stopped");
         }
-        
+
         // Check for failed workers and collect them
         for (TaskTracker taskTracker : taskTrackers.values()) {
           if (taskTracker.isRetryCandidate()) {
@@ -310,7 +310,7 @@ public class MSQWorkerTaskLauncher implements 
RetryCapableWorkerManager
         taskIds.wait(taskIdsLockTimeout);
       }
     }
-    
+
     // this should always be empty
     return Collections.emptySet();
   }
@@ -355,7 +355,7 @@ public class MSQWorkerTaskLauncher implements 
RetryCapableWorkerManager
       throws InterruptedException
   {
     Set<IntObjectPair<MSQFault>> failedWorkers = new HashSet<>();
-    
+
     synchronized (taskIds) {
       while (!fullyStartedTasks.containsAll(workerNumbers)) {
         if (stopFuture.isDone() || stopFuture.isCancelled()) {
@@ -425,7 +425,7 @@ public class MSQWorkerTaskLauncher implements 
RetryCapableWorkerManager
       final long duration = taskStatus != null ? taskStatus.getDuration() : -1;
 
       workerStats.computeIfAbsent(taskTracker.workerNumber, k -> new 
ArrayList<>())
-                 .add(new WorkerStats(taskEntry.getKey(), statusCode, 
duration, taskTracker.taskPendingTimeInMs()));
+                 .add(new WorkerStats(taskEntry.getKey(), taskEntry.getKey(), 
statusCode, duration, taskTracker.taskPendingTimeInMs()));
     }
 
     for (List<WorkerStats> workerStatsList : workerStats.values()) {
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartWorkerManagerTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartWorkerManagerTest.java
index 4dc928e66b2..41eff775542 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartWorkerManagerTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartWorkerManagerTest.java
@@ -62,7 +62,11 @@ public class DartWorkerManagerTest
   public void setUp()
   {
     mockCloser = MockitoAnnotations.openMocks(this);
-    workerManager = new DartWorkerManager(WORKERS, workerClient);
+    workerManager = new DartWorkerManager(
+        WORKERS,
+        List.of("localhost:1001", "localhost:1002"),
+        workerClient
+    );
   }
 
   @AfterEach
@@ -90,8 +94,8 @@ public class DartWorkerManagerTest
     final Map<Integer, List<WorkerStats>> stats = 
workerManager.getWorkerStats();
     Assertions.assertEquals(
         ImmutableMap.of(
-            0, Collections.singletonList(new WorkerStats(WORKERS.get(0), 
TaskState.RUNNING, -1, -1)),
-            1, Collections.singletonList(new WorkerStats(WORKERS.get(1), 
TaskState.RUNNING, -1, -1))
+            0, Collections.singletonList(new WorkerStats(WORKERS.get(0), 
"localhost:1001", TaskState.RUNNING, -1, -1)),
+            1, Collections.singletonList(new WorkerStats(WORKERS.get(1), 
"localhost:1002", TaskState.RUNNING, -1, -1))
         ),
         stats
     );
diff --git a/web-console/src/druid-models/execution/execution.spec.ts 
b/web-console/src/druid-models/execution/execution.spec.ts
index 12e590bbb7d..15dc15bf8eb 100644
--- a/web-console/src/druid-models/execution/execution.spec.ts
+++ b/web-console/src/druid-models/execution/execution.spec.ts
@@ -602,6 +602,16 @@ describe('Execution', () => {
             "runningTasks": 1,
           },
           "warnings": undefined,
+          "workers": {
+            "0": [
+              {
+                "durationMs": 8789,
+                "pendingMs": 123,
+                "state": "SUCCESS",
+                "workerId": 
"query-346b9ac6-4912-46e4-9b98-75f11071af87-worker0_0",
+              },
+            ],
+          },
         }
       `);
     });
@@ -1112,6 +1122,7 @@ describe('Execution', () => {
           "status": "SUCCESS",
           "usageInfo": undefined,
           "warnings": undefined,
+          "workers": undefined,
         }
       `);
     });
@@ -1512,6 +1523,7 @@ describe('Execution', () => {
               "taskId": "query-ea3e36df-ad67-4870-b136-f5616b17d9c4-worker0_0",
             },
           ],
+          "workers": undefined,
         }
       `);
     });
diff --git a/web-console/src/druid-models/execution/execution.ts 
b/web-console/src/druid-models/execution/execution.ts
index 0c327b1d064..b4fc89fa025 100644
--- a/web-console/src/druid-models/execution/execution.ts
+++ b/web-console/src/druid-models/execution/execution.ts
@@ -38,6 +38,7 @@ import type {
   MsqTaskReportResponse,
   SegmentLoadWaiterStatus,
   TaskStatus,
+  WorkerState,
 } from '../task/task';
 
 const IGNORE_CONTEXT_KEYS = [
@@ -188,6 +189,7 @@ export interface ExecutionValue {
   error?: ExecutionError;
   warnings?: ExecutionError[];
   capacityInfo?: CapacityInfo;
+  workers?: Record<string, WorkerState[]>;
   _payload?: MsqTaskPayloadResponse;
   segmentStatus?: SegmentLoadWaiterStatus;
 }
@@ -348,6 +350,7 @@ export class Execution {
       stages: Array.isArray(stages)
         ? new Stages(stages, deepGet(taskReport, 
'multiStageQuery.payload.counters'))
         : undefined,
+      workers: deepGet(taskReport, 'multiStageQuery.payload.status.workers'),
       error,
       warnings: Array.isArray(warnings) ? warnings : undefined,
       result,
@@ -406,6 +409,7 @@ export class Execution {
   public readonly error?: ExecutionError;
   public readonly warnings?: ExecutionError[];
   public readonly capacityInfo?: CapacityInfo;
+  public readonly workers?: Record<string, WorkerState[]>;
   public readonly segmentStatus?: SegmentLoadWaiterStatus;
 
   public readonly _payload?: { payload: any; task: string };
@@ -428,6 +432,7 @@ export class Execution {
     this.error = value.error;
     this.warnings = nonEmptyArray(value.warnings) ? value.warnings : undefined;
     this.capacityInfo = value.capacityInfo;
+    this.workers = value.workers;
     this.segmentStatus = value.segmentStatus;
 
     this._payload = value._payload;
@@ -451,6 +456,7 @@ export class Execution {
       error: this.error,
       warnings: this.warnings,
       capacityInfo: this.capacityInfo,
+      workers: this.workers,
       segmentStatus: this.segmentStatus,
 
       _payload: this._payload,
diff --git a/web-console/src/druid-models/task/task.ts 
b/web-console/src/druid-models/task/task.ts
index 0d6cd07266f..77150730030 100644
--- a/web-console/src/druid-models/task/task.ts
+++ b/web-console/src/druid-models/task/task.ts
@@ -79,6 +79,7 @@ export interface MsqTaskPayloadResponse {
 
 export interface WorkerState {
   workerId: string;
+  workerDesc?: string;
   state: string;
   durationMs: number;
   pendingMs: number;
diff --git 
a/web-console/src/views/workbench-view/execution-details-pane/__snapshots__/execution-details-pane.spec.tsx.snap
 
b/web-console/src/views/workbench-view/execution-details-pane/__snapshots__/execution-details-pane.spec.tsx.snap
index 4a27ae98ca2..eff030e9654 100644
--- 
a/web-console/src/views/workbench-view/execution-details-pane/__snapshots__/execution-details-pane.spec.tsx.snap
+++ 
b/web-console/src/views/workbench-view/execution-details-pane/__snapshots__/execution-details-pane.spec.tsx.snap
@@ -722,6 +722,16 @@ Caused by: 
com.fasterxml.jackson.databind.exc.MismatchedInputException: No conte
               "taskId": "query-26d490c6-c06d-4cd2-938f-bc5f7f982754-worker0_0",
             },
           ],
+          "workers": {
+            "0": [
+              {
+                "durationMs": -1,
+                "pendingMs": -1,
+                "state": "FAILED",
+                "workerId": 
"query-26d490c6-c06d-4cd2-938f-bc5f7f982754-worker0_0",
+              },
+            ],
+          },
         }
       }
     />
@@ -1354,6 +1364,16 @@ Caused by: 
com.fasterxml.jackson.databind.exc.MismatchedInputException: No conte
               "taskId": "query-26d490c6-c06d-4cd2-938f-bc5f7f982754-worker0_0",
             },
           ],
+          "workers": {
+            "0": [
+              {
+                "durationMs": -1,
+                "pendingMs": -1,
+                "state": "FAILED",
+                "workerId": 
"query-26d490c6-c06d-4cd2-938f-bc5f7f982754-worker0_0",
+              },
+            ],
+          },
         }
       }
       goToTask={[Function]}
@@ -2031,6 +2051,16 @@ Caused by: 
com.fasterxml.jackson.databind.exc.MismatchedInputException: No conte
             "taskId": "query-26d490c6-c06d-4cd2-938f-bc5f7f982754-worker0_0",
           },
         ],
+        "workers": {
+          "0": [
+            {
+              "durationMs": -1,
+              "pendingMs": -1,
+              "state": "FAILED",
+              "workerId": 
"query-26d490c6-c06d-4cd2-938f-bc5f7f982754-worker0_0",
+            },
+          ],
+        },
       }
     }
   />
diff --git 
a/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
 
b/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
index ff43e3f8f99..74bff4380da 100644
--- 
a/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
+++ 
b/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
@@ -264,17 +264,30 @@ export const ExecutionStagesPane = React.memo(function 
ExecutionStagesPane(
             className: goToTask ? undefined : 'padded',
             width: 95,
             Cell({ value }) {
-              if (!goToTask) return `Worker${value}`;
-              const taskId = `${execution.id}-worker${value}_0`;
-              return (
-                <TableClickableCell
-                  hoverIcon={IconNames.SHARE}
-                  tooltip={`Go to task: ${taskId}`}
-                  onClick={() => {
-                    goToTask(taskId);
-                  }}
-                >{`Worker${value}`}</TableClickableCell>
-              );
+              const workerStates = execution.workers?.[String(value)];
+              const workerState = workerStates?.[workerStates.length - 1];
+              const label = `Worker${value}`;
+
+              const workerRef = workerState?.workerDesc || 
workerState?.workerId;
+              if (goToTask && workerRef && !/:\d+$/.test(workerRef)) {
+                return (
+                  <TableClickableCell
+                    hoverIcon={IconNames.SHARE}
+                    tooltip={`Go to task: ${workerRef}`}
+                    onClick={() => {
+                      goToTask(workerRef);
+                    }}
+                  >
+                    {label}
+                  </TableClickableCell>
+                );
+              }
+
+              if (workerRef) {
+                return <span data-tooltip={workerRef}>{label}</span>;
+              }
+
+              return label;
             },
           } as Column<SimpleWideCounter>,
           {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to