This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 039b05585cb Add worker status and duration metrics in live and task
reports (#15180)
039b05585cb is described below
commit 039b05585cb0984fa773393b9bf3b303796513af
Author: Vishesh Garg <[email protected]>
AuthorDate: Mon Oct 30 09:43:22 2023 +0530
Add worker status and duration metrics in live and task reports (#15180)
Add worker status and duration metrics in live and task reports for
tracking.
---
docs/api-reference/sql-ingestion-api.md | 5 ++
.../org/apache/druid/msq/exec/ControllerImpl.java | 3 +
.../druid/msq/indexing/MSQWorkerTaskLauncher.java | 77 ++++++++++++++++++++--
.../druid/msq/indexing/report/MSQStatusReport.java | 13 ++++
.../msq/indexing/report/MSQTaskReportTest.java | 7 +-
.../sql/resources/SqlStatementResourceTest.java | 2 +
.../msq/util/SqlStatementResourceHelperTest.java | 5 ++
7 files changed, 105 insertions(+), 7 deletions(-)
diff --git a/docs/api-reference/sql-ingestion-api.md
b/docs/api-reference/sql-ingestion-api.md
index 3daadfa5085..5492c3ea46d 100644
--- a/docs/api-reference/sql-ingestion-api.md
+++ b/docs/api-reference/sql-ingestion-api.md
@@ -603,6 +603,11 @@ The following table describes the response fields when you
retrieve a report for
| `multiStageQuery.payload.status.status` | RUNNING, SUCCESS, or FAILED. |
| `multiStageQuery.payload.status.startTime` | Start time of the query in ISO
format. Only present if the query has started running. |
| `multiStageQuery.payload.status.durationMs` | Milliseconds elapsed after the
query has started running. -1 denotes that the query hasn't started running
yet. |
+| `multiStageQuery.payload.status.workers` | Workers for the controller task.|
+| `multiStageQuery.payload.status.workers.<workerNumber>` | Array of worker
tasks including retries. |
+| `multiStageQuery.payload.status.workers.<workerNumber>[].workerId` | Id of
the worker task.| |
+| `multiStageQuery.payload.status.workers.<workerNumber>[].status` | RUNNING,
SUCCESS, or FAILED.|
+| `multiStageQuery.payload.status.workers.<workerNumber>[].durationMs` |
Milliseconds elapsed after the worker task started running. It is -1 for worker
tasks with status RUNNING.|
| `multiStageQuery.payload.status.pendingTasks` | Number of tasks that are not
fully started. -1 denotes that the number is currently unknown. |
| `multiStageQuery.payload.status.runningTasks` | Number of currently running
tasks. Should be at least 1 since the controller is included. |
| `multiStageQuery.payload.status.segmentLoadStatus` | Segment loading
container. Only present after the segments have been published. |
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index f2260b055a9..1ae3cdc5a6e 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -2262,11 +2262,13 @@ public class ControllerImpl implements Controller
{
int pendingTasks = -1;
int runningTasks = 1;
+ Map<Integer, List<MSQWorkerTaskLauncher.WorkerStats>> workerStatsMap = new
HashMap<>();
if (taskLauncher != null) {
WorkerCount workerTaskCount = taskLauncher.getWorkerTaskCount();
pendingTasks = workerTaskCount.getPendingWorkerCount();
runningTasks = workerTaskCount.getRunningWorkerCount() + 1; // To
account for controller.
+ workerStatsMap = taskLauncher.getWorkerStats();
}
SegmentLoadStatusFetcher.SegmentLoadWaiterStatus status =
segmentLoadWaiter == null ? null : segmentLoadWaiter.status();
@@ -2277,6 +2279,7 @@ public class ControllerImpl implements Controller
errorReports,
queryStartTime,
queryDuration,
+ workerStatsMap,
pendingTasks,
runningTasks,
status
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
index dcc81d86864..c2092e7f24a 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
@@ -19,6 +19,7 @@
package org.apache.druid.msq.indexing;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
@@ -47,15 +48,18 @@ import
org.apache.druid.msq.indexing.error.WorkerFailedFault;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -108,10 +112,11 @@ public class MSQWorkerTaskLauncher
@GuardedBy("taskIds")
private final IntSet fullyStartedTasks = new IntOpenHashSet();
- // Mutable state accessible only to the main loop. LinkedHashMap since order
of key set matters. Tasks are added
- // here once they are submitted for running, but before they are fully
started up.
+ // Mutable state accessed by mainLoop, ControllerImpl, and jetty
(/liveReports) threads.
+ // Tasks are added here once they are submitted for running, but before they
are fully started up.
// taskId -> taskTracker
- private final Map<String, TaskTracker> taskTrackers = new LinkedHashMap<>();
+ private final ConcurrentMap<String, TaskTracker> taskTrackers = new
ConcurrentSkipListMap<>(Comparator.comparingInt(
+ MSQTasks::workerFromTaskId));
// Set of tasks which are issued a cancel request by the controller.
private final Set<String> canceledWorkerTasks =
ConcurrentHashMap.newKeySet();
@@ -348,6 +353,70 @@ public class MSQWorkerTaskLauncher
}
}
+ public static class WorkerStats
+ {
+ String workerId;
+ TaskState state;
+ long duration;
+
+ /**
+ * For JSON deserialization only
+ */
+ public WorkerStats()
+ {
+ }
+
+ public WorkerStats(String workerId, TaskState state, long duration)
+ {
+ this.workerId = workerId;
+ this.state = state;
+ this.duration = duration;
+ }
+
+ @JsonProperty
+ public String getWorkerId()
+ {
+ return workerId;
+ }
+
+ @JsonProperty
+ public TaskState getState()
+ {
+ return state;
+ }
+
+ @JsonProperty("durationMs")
+ public long getDuration()
+ {
+ return duration;
+ }
+ }
+
+ public Map<Integer, List<WorkerStats>> getWorkerStats()
+ {
+ final Map<Integer, List<WorkerStats>> workerStats = new TreeMap<>();
+
+ for (Map.Entry<String, TaskTracker> taskEntry : taskTrackers.entrySet()) {
+
+ TaskTracker taskTracker = taskEntry.getValue();
+
+ workerStats.computeIfAbsent(taskTracker.workerNumber, k -> new
ArrayList<>())
+ .add(new WorkerStats(taskEntry.getKey(),
+ taskTracker.status.getStatusCode(),
+ // getDuration() returns -1 for running
tasks.
+ // It's not calculated on-the-fly here
since
+ // taskTracker.startTimeMillis marks task
+ // submission time rather than the
actual start.
+ taskTracker.status.getDuration()
+ ));
+ }
+
+ for (List<WorkerStats> workerStatsList : workerStats.values()) {
+ workerStatsList.sort(Comparator.comparing(WorkerStats::getWorkerId));
+ }
+ return workerStats;
+ }
+
private void mainLoop()
{
try {
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQStatusReport.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQStatusReport.java
index d3864498349..1dd7d658903 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQStatusReport.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQStatusReport.java
@@ -25,12 +25,15 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.msq.exec.SegmentLoadStatusFetcher;
+import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher;
import org.apache.druid.msq.indexing.error.MSQErrorReport;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import java.util.Objects;
public class MSQStatusReport
@@ -47,6 +50,8 @@ public class MSQStatusReport
private final long durationMs;
+ private final Map<Integer, List<MSQWorkerTaskLauncher.WorkerStats>>
workerStats;
+
private final int pendingTasks;
private final int runningTasks;
@@ -61,6 +66,7 @@ public class MSQStatusReport
@JsonProperty("warnings") Collection<MSQErrorReport> warningReports,
@JsonProperty("startTime") @Nullable DateTime startTime,
@JsonProperty("durationMs") long durationMs,
+ @JsonProperty("workers") Map<Integer,
List<MSQWorkerTaskLauncher.WorkerStats>> workerStats,
@JsonProperty("pendingTasks") int pendingTasks,
@JsonProperty("runningTasks") int runningTasks,
@JsonProperty("segmentLoadWaiterStatus") @Nullable
SegmentLoadStatusFetcher.SegmentLoadWaiterStatus segmentLoadWaiterStatus
@@ -71,6 +77,7 @@ public class MSQStatusReport
this.warningReports = warningReports != null ? warningReports :
Collections.emptyList();
this.startTime = startTime;
this.durationMs = durationMs;
+ this.workerStats = workerStats;
this.pendingTasks = pendingTasks;
this.runningTasks = runningTasks;
this.segmentLoadWaiterStatus = segmentLoadWaiterStatus;
@@ -123,6 +130,12 @@ public class MSQStatusReport
return durationMs;
}
+ @JsonProperty("workers")
+ public Map<Integer, List<MSQWorkerTaskLauncher.WorkerStats>> getWorkerStats()
+ {
+ return workerStats;
+ }
+
@Nullable
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
index 3b49572996e..4bc3d1075c1 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
@@ -55,6 +55,7 @@ import java.io.File;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -107,7 +108,7 @@ public class MSQTaskReportTest
final MSQTaskReport report = new MSQTaskReport(
TASK_ID,
new MSQTaskReportPayload(
- new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(),
null, 0, 1, 2, status),
+ new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(),
null, 0, new HashMap<>(), 1, 2, status),
MSQStagesReport.create(
QUERY_DEFINITION,
ImmutableMap.of(),
@@ -172,7 +173,7 @@ public class MSQTaskReportTest
final MSQTaskReport report = new MSQTaskReport(
TASK_ID,
new MSQTaskReportPayload(
- new MSQStatusReport(TaskState.FAILED, errorReport, new
ArrayDeque<>(), null, 0, 1, 2, status),
+ new MSQStatusReport(TaskState.FAILED, errorReport, new
ArrayDeque<>(), null, 0, new HashMap<>(), 1, 2, status),
MSQStagesReport.create(
QUERY_DEFINITION,
ImmutableMap.of(),
@@ -220,7 +221,7 @@ public class MSQTaskReportTest
final MSQTaskReport report = new MSQTaskReport(
TASK_ID,
new MSQTaskReportPayload(
- new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(),
null, 0, 1, 2, status),
+ new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(),
null, 0, new HashMap<>(), 1, 2, status),
MSQStagesReport.create(
QUERY_DEFINITION,
ImmutableMap.of(),
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
index d6572801207..e3995a4a96c 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
@@ -244,6 +244,7 @@ public class SqlStatementResourceTest extends MSQTestBase
new ArrayDeque<>(),
null,
0,
+ new HashMap<>(),
1,
2,
null
@@ -310,6 +311,7 @@ public class SqlStatementResourceTest extends MSQTestBase
new ArrayDeque<>(),
null,
0,
+ new HashMap<>(),
1,
2,
null
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java
index 806bd8ebe98..0254a61a2c7 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java
@@ -67,6 +67,7 @@ public class SqlStatementResourceHelperTest
new ArrayDeque<>(),
null,
0,
+ new HashMap<>(),
1,
2,
null
@@ -105,6 +106,7 @@ public class SqlStatementResourceHelperTest
new ArrayDeque<>(),
null,
0,
+ new HashMap<>(),
1,
2,
null
@@ -144,6 +146,7 @@ public class SqlStatementResourceHelperTest
new ArrayDeque<>(),
null,
0,
+ new HashMap<>(),
1,
2,
null
@@ -181,6 +184,7 @@ public class SqlStatementResourceHelperTest
new ArrayDeque<>(),
null,
0,
+ new HashMap<>(),
1,
2,
null
@@ -220,6 +224,7 @@ public class SqlStatementResourceHelperTest
new ArrayDeque<>(),
null,
0,
+ new HashMap<>(),
1,
2,
null
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]