This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a7dae26cf6d MSQ: Add workerDesc to WorkerStats. (#19171)
a7dae26cf6d is described below
commit a7dae26cf6d9ac64ef08c4fa6509cef7f8a4973b
Author: Gian Merlino <[email protected]>
AuthorDate: Wed Mar 18 09:12:33 2026 -0700
MSQ: Add workerDesc to WorkerStats. (#19171)
The existing field workerId is designed for MSQ itself to uniquely
identify a worker. The new field workerDesc is for users (and the web
console) to more easily identify where a worker is running. Its javadoc
specs it as either a task ID or a host:port.
This patch also updates the web console to use workerDesc, if available,
to show the task (with link to the tasks view) or show the server. This
replaces some older, more brittle logic that was constructing a task ID
from the query ID and worker number. The approach in this patch better
handles task relaunches, in which case the worker number -> task ID
mapping changes.
---
.../msq/dart/controller/DartControllerContext.java | 8 ++++-
.../msq/dart/controller/DartWorkerManager.java | 5 +++-
.../org/apache/druid/msq/exec/WorkerStats.java | 34 +++++++++++++++++----
.../druid/msq/indexing/MSQWorkerTaskLauncher.java | 10 +++----
.../msq/dart/controller/DartWorkerManagerTest.java | 10 +++++--
.../src/druid-models/execution/execution.spec.ts | 12 ++++++++
.../src/druid-models/execution/execution.ts | 6 ++++
web-console/src/druid-models/task/task.ts | 1 +
.../execution-details-pane.spec.tsx.snap | 30 +++++++++++++++++++
.../execution-stages-pane.tsx | 35 +++++++++++++++-------
10 files changed, 125 insertions(+), 26 deletions(-)
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
index 9276c11916b..b7e1b3c94bc 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
@@ -52,6 +52,7 @@ import org.apache.druid.server.coordination.ServerType;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.stream.Collectors;
/**
* Dart implementation of {@link ControllerContext}.
@@ -210,7 +211,12 @@ public class DartControllerContext implements
ControllerContext
{
// We're ignoring WorkerFailureListener. Dart worker failures are routed
into the controller by
// ControllerMessageListener, which receives a notification when a worker
goes offline.
- return new DartWorkerManager(queryKernelConfig.getWorkerIds(),
workerClient);
+ final List<String> workerIds = queryKernelConfig.getWorkerIds();
+ return new DartWorkerManager(
+ workerIds,
+ workerIds.stream().map(id ->
WorkerId.fromString(id).getHostAndPort()).collect(Collectors.toList()),
+ workerClient
+ );
}
@Override
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java
index 8d479671a94..05a26c39344 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java
@@ -59,6 +59,7 @@ public class DartWorkerManager implements WorkerManager
private static final Logger log = new Logger(DartWorkerManager.class);
private final List<String> workerIds;
+ private final List<String> workerDescs;
private final DartWorkerClient workerClient;
private final Object2IntMap<String> workerIdToNumber;
private final AtomicReference<State> state = new
AtomicReference<>(State.NEW);
@@ -73,10 +74,12 @@ public class DartWorkerManager implements WorkerManager
public DartWorkerManager(
final List<String> workerIds,
+ final List<String> workerDescs,
final DartWorkerClient workerClient
)
{
this.workerIds = workerIds;
+ this.workerDescs = workerDescs;
this.workerClient = workerClient;
this.workerIdToNumber = new Object2IntOpenHashMap<>();
this.workerIdToNumber.defaultReturnValue(UNKNOWN_WORKER_NUMBER);
@@ -158,7 +161,7 @@ public class DartWorkerManager implements WorkerManager
final Int2ObjectMap<List<WorkerStats>> retVal = new
Int2ObjectAVLTreeMap<>();
for (int i = 0; i < workerIds.size(); i++) {
- retVal.put(i, Collections.singletonList(new
WorkerStats(workerIds.get(i), TaskState.RUNNING, -1, -1)));
+ retVal.put(i, Collections.singletonList(new
WorkerStats(workerIds.get(i), workerDescs.get(i), TaskState.RUNNING, -1, -1)));
}
return retVal;
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerStats.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerStats.java
index 831ea645e40..8011c115281 100644
--- a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerStats.java
+++ b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerStats.java
@@ -20,38 +20,60 @@
package org.apache.druid.msq.exec;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexer.TaskState;
+import javax.annotation.Nullable;
import java.util.Objects;
public class WorkerStats
{
private final String workerId;
+ @Nullable
+ private final String workerDesc;
private final TaskState state;
private final long durationMs;
private final long pendingMs;
@JsonCreator
public WorkerStats(
- @JsonProperty("workerId") String workerId,
- @JsonProperty("state") TaskState state,
- @JsonProperty("durationMs") long durationMs,
- @JsonProperty("pendingMs") long pendingMs
+ @JsonProperty("workerId") final String workerId,
+ @JsonProperty("workerDesc") @Nullable final String workerDesc,
+ @JsonProperty("state") final TaskState state,
+ @JsonProperty("durationMs") final long durationMs,
+ @JsonProperty("pendingMs") final long pendingMs
)
{
this.workerId = workerId;
+ this.workerDesc = workerDesc;
this.state = state;
this.durationMs = durationMs;
this.pendingMs = pendingMs;
}
+ /**
+ * Unique worker ID, same as {@link Worker#id()}.
+ */
@JsonProperty
public String getWorkerId()
{
return workerId;
}
+ /**
+ * Worker description. Used by the web console to more easily find where a
worker is running. Generally this is
+ * either a task ID or a host:port. It may not be unique; for example, with
Dart, if multiple queries run on the
+ * same host:port then they would have unique workerId but would have the
same workerDesc.
+ */
+ @Nullable
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public String getWorkerDesc()
+ {
+ return workerDesc;
+ }
+
@JsonProperty
public TaskState getState()
{
@@ -83,13 +105,14 @@ public class WorkerStats
return durationMs == that.durationMs
&& pendingMs == that.pendingMs
&& Objects.equals(workerId, that.workerId)
+ && Objects.equals(workerDesc, that.workerDesc)
&& state == that.state;
}
@Override
public int hashCode()
{
- return Objects.hash(workerId, state, durationMs, pendingMs);
+ return Objects.hash(workerId, workerDesc, state, durationMs, pendingMs);
}
@Override
@@ -97,6 +120,7 @@ public class WorkerStats
{
return "WorkerStats{" +
"workerId='" + workerId + '\'' +
+ ", workerDesc='" + workerDesc + '\'' +
", state=" + state +
", durationMs=" + durationMs +
", pendingMs=" + pendingMs +
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
index d2babe86101..d773f225fac 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
@@ -280,7 +280,7 @@ public class MSQWorkerTaskLauncher implements
RetryCapableWorkerManager
throws InterruptedException
{
Set<IntObjectPair<MSQFault>> failedWorkers = new HashSet<>();
-
+
synchronized (taskIds) {
retryInactiveTasksIfNeeded(workerCount);
@@ -294,7 +294,7 @@ public class MSQWorkerTaskLauncher implements
RetryCapableWorkerManager
FutureUtils.getUnchecked(stopFuture, false);
throw new ISE("Stopped");
}
-
+
// Check for failed workers and collect them
for (TaskTracker taskTracker : taskTrackers.values()) {
if (taskTracker.isRetryCandidate()) {
@@ -310,7 +310,7 @@ public class MSQWorkerTaskLauncher implements
RetryCapableWorkerManager
taskIds.wait(taskIdsLockTimeout);
}
}
-
+
// this should always be empty
return Collections.emptySet();
}
@@ -355,7 +355,7 @@ public class MSQWorkerTaskLauncher implements
RetryCapableWorkerManager
throws InterruptedException
{
Set<IntObjectPair<MSQFault>> failedWorkers = new HashSet<>();
-
+
synchronized (taskIds) {
while (!fullyStartedTasks.containsAll(workerNumbers)) {
if (stopFuture.isDone() || stopFuture.isCancelled()) {
@@ -425,7 +425,7 @@ public class MSQWorkerTaskLauncher implements
RetryCapableWorkerManager
final long duration = taskStatus != null ? taskStatus.getDuration() : -1;
workerStats.computeIfAbsent(taskTracker.workerNumber, k -> new
ArrayList<>())
- .add(new WorkerStats(taskEntry.getKey(), statusCode,
duration, taskTracker.taskPendingTimeInMs()));
+ .add(new WorkerStats(taskEntry.getKey(), taskEntry.getKey(),
statusCode, duration, taskTracker.taskPendingTimeInMs()));
}
for (List<WorkerStats> workerStatsList : workerStats.values()) {
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartWorkerManagerTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartWorkerManagerTest.java
index 4dc928e66b2..41eff775542 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartWorkerManagerTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartWorkerManagerTest.java
@@ -62,7 +62,11 @@ public class DartWorkerManagerTest
public void setUp()
{
mockCloser = MockitoAnnotations.openMocks(this);
- workerManager = new DartWorkerManager(WORKERS, workerClient);
+ workerManager = new DartWorkerManager(
+ WORKERS,
+ List.of("localhost:1001", "localhost:1002"),
+ workerClient
+ );
}
@AfterEach
@@ -90,8 +94,8 @@ public class DartWorkerManagerTest
final Map<Integer, List<WorkerStats>> stats =
workerManager.getWorkerStats();
Assertions.assertEquals(
ImmutableMap.of(
- 0, Collections.singletonList(new WorkerStats(WORKERS.get(0),
TaskState.RUNNING, -1, -1)),
- 1, Collections.singletonList(new WorkerStats(WORKERS.get(1),
TaskState.RUNNING, -1, -1))
+ 0, Collections.singletonList(new WorkerStats(WORKERS.get(0),
"localhost:1001", TaskState.RUNNING, -1, -1)),
+ 1, Collections.singletonList(new WorkerStats(WORKERS.get(1),
"localhost:1002", TaskState.RUNNING, -1, -1))
),
stats
);
diff --git a/web-console/src/druid-models/execution/execution.spec.ts
b/web-console/src/druid-models/execution/execution.spec.ts
index 12e590bbb7d..15dc15bf8eb 100644
--- a/web-console/src/druid-models/execution/execution.spec.ts
+++ b/web-console/src/druid-models/execution/execution.spec.ts
@@ -602,6 +602,16 @@ describe('Execution', () => {
"runningTasks": 1,
},
"warnings": undefined,
+ "workers": {
+ "0": [
+ {
+ "durationMs": 8789,
+ "pendingMs": 123,
+ "state": "SUCCESS",
+ "workerId":
"query-346b9ac6-4912-46e4-9b98-75f11071af87-worker0_0",
+ },
+ ],
+ },
}
`);
});
@@ -1112,6 +1122,7 @@ describe('Execution', () => {
"status": "SUCCESS",
"usageInfo": undefined,
"warnings": undefined,
+ "workers": undefined,
}
`);
});
@@ -1512,6 +1523,7 @@ describe('Execution', () => {
"taskId": "query-ea3e36df-ad67-4870-b136-f5616b17d9c4-worker0_0",
},
],
+ "workers": undefined,
}
`);
});
diff --git a/web-console/src/druid-models/execution/execution.ts
b/web-console/src/druid-models/execution/execution.ts
index 0c327b1d064..b4fc89fa025 100644
--- a/web-console/src/druid-models/execution/execution.ts
+++ b/web-console/src/druid-models/execution/execution.ts
@@ -38,6 +38,7 @@ import type {
MsqTaskReportResponse,
SegmentLoadWaiterStatus,
TaskStatus,
+ WorkerState,
} from '../task/task';
const IGNORE_CONTEXT_KEYS = [
@@ -188,6 +189,7 @@ export interface ExecutionValue {
error?: ExecutionError;
warnings?: ExecutionError[];
capacityInfo?: CapacityInfo;
+ workers?: Record<string, WorkerState[]>;
_payload?: MsqTaskPayloadResponse;
segmentStatus?: SegmentLoadWaiterStatus;
}
@@ -348,6 +350,7 @@ export class Execution {
stages: Array.isArray(stages)
? new Stages(stages, deepGet(taskReport,
'multiStageQuery.payload.counters'))
: undefined,
+ workers: deepGet(taskReport, 'multiStageQuery.payload.status.workers'),
error,
warnings: Array.isArray(warnings) ? warnings : undefined,
result,
@@ -406,6 +409,7 @@ export class Execution {
public readonly error?: ExecutionError;
public readonly warnings?: ExecutionError[];
public readonly capacityInfo?: CapacityInfo;
+ public readonly workers?: Record<string, WorkerState[]>;
public readonly segmentStatus?: SegmentLoadWaiterStatus;
public readonly _payload?: { payload: any; task: string };
@@ -428,6 +432,7 @@ export class Execution {
this.error = value.error;
this.warnings = nonEmptyArray(value.warnings) ? value.warnings : undefined;
this.capacityInfo = value.capacityInfo;
+ this.workers = value.workers;
this.segmentStatus = value.segmentStatus;
this._payload = value._payload;
@@ -451,6 +456,7 @@ export class Execution {
error: this.error,
warnings: this.warnings,
capacityInfo: this.capacityInfo,
+ workers: this.workers,
segmentStatus: this.segmentStatus,
_payload: this._payload,
diff --git a/web-console/src/druid-models/task/task.ts
b/web-console/src/druid-models/task/task.ts
index 0d6cd07266f..77150730030 100644
--- a/web-console/src/druid-models/task/task.ts
+++ b/web-console/src/druid-models/task/task.ts
@@ -79,6 +79,7 @@ export interface MsqTaskPayloadResponse {
export interface WorkerState {
workerId: string;
+ workerDesc?: string;
state: string;
durationMs: number;
pendingMs: number;
diff --git
a/web-console/src/views/workbench-view/execution-details-pane/__snapshots__/execution-details-pane.spec.tsx.snap
b/web-console/src/views/workbench-view/execution-details-pane/__snapshots__/execution-details-pane.spec.tsx.snap
index 4a27ae98ca2..eff030e9654 100644
---
a/web-console/src/views/workbench-view/execution-details-pane/__snapshots__/execution-details-pane.spec.tsx.snap
+++
b/web-console/src/views/workbench-view/execution-details-pane/__snapshots__/execution-details-pane.spec.tsx.snap
@@ -722,6 +722,16 @@ Caused by:
com.fasterxml.jackson.databind.exc.MismatchedInputException: No conte
"taskId": "query-26d490c6-c06d-4cd2-938f-bc5f7f982754-worker0_0",
},
],
+ "workers": {
+ "0": [
+ {
+ "durationMs": -1,
+ "pendingMs": -1,
+ "state": "FAILED",
+ "workerId":
"query-26d490c6-c06d-4cd2-938f-bc5f7f982754-worker0_0",
+ },
+ ],
+ },
}
}
/>
@@ -1354,6 +1364,16 @@ Caused by:
com.fasterxml.jackson.databind.exc.MismatchedInputException: No conte
"taskId": "query-26d490c6-c06d-4cd2-938f-bc5f7f982754-worker0_0",
},
],
+ "workers": {
+ "0": [
+ {
+ "durationMs": -1,
+ "pendingMs": -1,
+ "state": "FAILED",
+ "workerId":
"query-26d490c6-c06d-4cd2-938f-bc5f7f982754-worker0_0",
+ },
+ ],
+ },
}
}
goToTask={[Function]}
@@ -2031,6 +2051,16 @@ Caused by:
com.fasterxml.jackson.databind.exc.MismatchedInputException: No conte
"taskId": "query-26d490c6-c06d-4cd2-938f-bc5f7f982754-worker0_0",
},
],
+ "workers": {
+ "0": [
+ {
+ "durationMs": -1,
+ "pendingMs": -1,
+ "state": "FAILED",
+ "workerId":
"query-26d490c6-c06d-4cd2-938f-bc5f7f982754-worker0_0",
+ },
+ ],
+ },
}
}
/>
diff --git
a/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
b/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
index ff43e3f8f99..74bff4380da 100644
---
a/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
+++
b/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
@@ -264,17 +264,30 @@ export const ExecutionStagesPane = React.memo(function
ExecutionStagesPane(
className: goToTask ? undefined : 'padded',
width: 95,
Cell({ value }) {
- if (!goToTask) return `Worker${value}`;
- const taskId = `${execution.id}-worker${value}_0`;
- return (
- <TableClickableCell
- hoverIcon={IconNames.SHARE}
- tooltip={`Go to task: ${taskId}`}
- onClick={() => {
- goToTask(taskId);
- }}
- >{`Worker${value}`}</TableClickableCell>
- );
+ const workerStates = execution.workers?.[String(value)];
+ const workerState = workerStates?.[workerStates.length - 1];
+ const label = `Worker${value}`;
+
+ const workerRef = workerState?.workerDesc ||
workerState?.workerId;
+ if (goToTask && workerRef && !/:\d+$/.test(workerRef)) {
+ return (
+ <TableClickableCell
+ hoverIcon={IconNames.SHARE}
+ tooltip={`Go to task: ${workerRef}`}
+ onClick={() => {
+ goToTask(workerRef);
+ }}
+ >
+ {label}
+ </TableClickableCell>
+ );
+ }
+
+ if (workerRef) {
+ return <span data-tooltip={workerRef}>{label}</span>;
+ }
+
+ return label;
},
} as Column<SimpleWideCounter>,
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]