This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 039b05585cb Add worker status and duration metrics in live and task 
reports  (#15180)
039b05585cb is described below

commit 039b05585cb0984fa773393b9bf3b303796513af
Author: Vishesh Garg <[email protected]>
AuthorDate: Mon Oct 30 09:43:22 2023 +0530

    Add worker status and duration metrics in live and task reports  (#15180)
    
    Add worker status and duration metrics in live and task reports for 
tracking.
---
 docs/api-reference/sql-ingestion-api.md            |  5 ++
 .../org/apache/druid/msq/exec/ControllerImpl.java  |  3 +
 .../druid/msq/indexing/MSQWorkerTaskLauncher.java  | 77 ++++++++++++++++++++--
 .../druid/msq/indexing/report/MSQStatusReport.java | 13 ++++
 .../msq/indexing/report/MSQTaskReportTest.java     |  7 +-
 .../sql/resources/SqlStatementResourceTest.java    |  2 +
 .../msq/util/SqlStatementResourceHelperTest.java   |  5 ++
 7 files changed, 105 insertions(+), 7 deletions(-)

diff --git a/docs/api-reference/sql-ingestion-api.md 
b/docs/api-reference/sql-ingestion-api.md
index 3daadfa5085..5492c3ea46d 100644
--- a/docs/api-reference/sql-ingestion-api.md
+++ b/docs/api-reference/sql-ingestion-api.md
@@ -603,6 +603,11 @@ The following table describes the response fields when you 
retrieve a report for
 | `multiStageQuery.payload.status.status` | RUNNING, SUCCESS, or FAILED. |
 | `multiStageQuery.payload.status.startTime` | Start time of the query in ISO 
format. Only present if the query has started running. |
 | `multiStageQuery.payload.status.durationMs` | Milliseconds elapsed after the 
query has started running. -1 denotes that the query hasn't started running 
yet. |
+| `multiStageQuery.payload.status.workers` | Workers for the controller task.|
+| `multiStageQuery.payload.status.workers.<workerNumber>` | Array of worker 
tasks including retries. |
+| `multiStageQuery.payload.status.workers.<workerNumber>[].workerId` | Id of 
the worker task.| |
+| `multiStageQuery.payload.status.workers.<workerNumber>[].status` | RUNNING, 
SUCCESS, or FAILED.|
+| `multiStageQuery.payload.status.workers.<workerNumber>[].durationMs` | 
Milliseconds elapsed after the worker task started running. It is -1 for worker 
tasks with status RUNNING.|
 | `multiStageQuery.payload.status.pendingTasks` | Number of tasks that are not 
fully started. -1 denotes that the number is currently unknown. |
 | `multiStageQuery.payload.status.runningTasks` | Number of currently running 
tasks. Should be at least 1 since the controller is included. |
 | `multiStageQuery.payload.status.segmentLoadStatus` | Segment loading 
container. Only present after the segments have been published. |
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index f2260b055a9..1ae3cdc5a6e 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -2262,11 +2262,13 @@ public class ControllerImpl implements Controller
   {
     int pendingTasks = -1;
     int runningTasks = 1;
+    Map<Integer, List<MSQWorkerTaskLauncher.WorkerStats>> workerStatsMap = new 
HashMap<>();
 
     if (taskLauncher != null) {
       WorkerCount workerTaskCount = taskLauncher.getWorkerTaskCount();
       pendingTasks = workerTaskCount.getPendingWorkerCount();
       runningTasks = workerTaskCount.getRunningWorkerCount() + 1; // To 
account for controller.
+      workerStatsMap = taskLauncher.getWorkerStats();
     }
 
     SegmentLoadStatusFetcher.SegmentLoadWaiterStatus status = 
segmentLoadWaiter == null ? null : segmentLoadWaiter.status();
@@ -2277,6 +2279,7 @@ public class ControllerImpl implements Controller
         errorReports,
         queryStartTime,
         queryDuration,
+        workerStatsMap,
         pendingTasks,
         runningTasks,
         status
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
index dcc81d86864..c2092e7f24a 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.msq.indexing;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
@@ -47,15 +48,18 @@ import 
org.apache.druid.msq.indexing.error.WorkerFailedFault;
 
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.OptionalLong;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -108,10 +112,11 @@ public class MSQWorkerTaskLauncher
   @GuardedBy("taskIds")
   private final IntSet fullyStartedTasks = new IntOpenHashSet();
 
-  // Mutable state accessible only to the main loop. LinkedHashMap since order 
of key set matters. Tasks are added
-  // here once they are submitted for running, but before they are fully 
started up.
+  // Mutable state accessed by mainLoop, ControllerImpl, and jetty 
(/liveReports) threads.
+  // Tasks are added here once they are submitted for running, but before they 
are fully started up.
   // taskId -> taskTracker
-  private final Map<String, TaskTracker> taskTrackers = new LinkedHashMap<>();
+  private final ConcurrentMap<String, TaskTracker> taskTrackers = new 
ConcurrentSkipListMap<>(Comparator.comparingInt(
+      MSQTasks::workerFromTaskId));
 
   // Set of tasks which are issued a cancel request by the controller.
   private final Set<String> canceledWorkerTasks = 
ConcurrentHashMap.newKeySet();
@@ -348,6 +353,70 @@ public class MSQWorkerTaskLauncher
     }
   }
 
+  public static class WorkerStats
+  {
+    String workerId;
+    TaskState state;
+    long duration;
+
+    /**
+     * For JSON deserialization only
+     */
+    public WorkerStats()
+    {
+    }
+
+    public WorkerStats(String workerId, TaskState state, long duration)
+    {
+      this.workerId = workerId;
+      this.state = state;
+      this.duration = duration;
+    }
+
+    @JsonProperty
+    public String getWorkerId()
+    {
+      return workerId;
+    }
+
+    @JsonProperty
+    public TaskState getState()
+    {
+      return state;
+    }
+
+    @JsonProperty("durationMs")
+    public long getDuration()
+    {
+      return duration;
+    }
+  }
+
+  public Map<Integer, List<WorkerStats>> getWorkerStats()
+  {
+    final Map<Integer, List<WorkerStats>> workerStats = new TreeMap<>();
+
+    for (Map.Entry<String, TaskTracker> taskEntry : taskTrackers.entrySet()) {
+
+      TaskTracker taskTracker = taskEntry.getValue();
+
+      workerStats.computeIfAbsent(taskTracker.workerNumber, k -> new 
ArrayList<>())
+                 .add(new WorkerStats(taskEntry.getKey(),
+                                      taskTracker.status.getStatusCode(),
+                                      // getDuration() returns -1 for running 
tasks.
+                                      // It's not calculated on-the-fly here 
since
+                                      // taskTracker.startTimeMillis marks task
+                                      // submission time rather than the 
actual start.
+                                      taskTracker.status.getDuration()
+                 ));
+    }
+
+    for (List<WorkerStats> workerStatsList : workerStats.values()) {
+      workerStatsList.sort(Comparator.comparing(WorkerStats::getWorkerId));
+    }
+    return workerStats;
+  }
+
   private void mainLoop()
   {
     try {
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQStatusReport.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQStatusReport.java
index d3864498349..1dd7d658903 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQStatusReport.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQStatusReport.java
@@ -25,12 +25,15 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.msq.exec.SegmentLoadStatusFetcher;
+import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher;
 import org.apache.druid.msq.indexing.error.MSQErrorReport;
 import org.joda.time.DateTime;
 
 import javax.annotation.Nullable;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 public class MSQStatusReport
@@ -47,6 +50,8 @@ public class MSQStatusReport
 
   private final long durationMs;
 
+  private final Map<Integer, List<MSQWorkerTaskLauncher.WorkerStats>> 
workerStats;
+
   private final int pendingTasks;
 
   private final int runningTasks;
@@ -61,6 +66,7 @@ public class MSQStatusReport
       @JsonProperty("warnings") Collection<MSQErrorReport> warningReports,
       @JsonProperty("startTime") @Nullable DateTime startTime,
       @JsonProperty("durationMs") long durationMs,
+      @JsonProperty("workers") Map<Integer, 
List<MSQWorkerTaskLauncher.WorkerStats>> workerStats,
       @JsonProperty("pendingTasks") int pendingTasks,
       @JsonProperty("runningTasks") int runningTasks,
       @JsonProperty("segmentLoadWaiterStatus") @Nullable 
SegmentLoadStatusFetcher.SegmentLoadWaiterStatus segmentLoadWaiterStatus
@@ -71,6 +77,7 @@ public class MSQStatusReport
     this.warningReports = warningReports != null ? warningReports : 
Collections.emptyList();
     this.startTime = startTime;
     this.durationMs = durationMs;
+    this.workerStats = workerStats;
     this.pendingTasks = pendingTasks;
     this.runningTasks = runningTasks;
     this.segmentLoadWaiterStatus = segmentLoadWaiterStatus;
@@ -123,6 +130,12 @@ public class MSQStatusReport
     return durationMs;
   }
 
+  @JsonProperty("workers")
+  public Map<Integer, List<MSQWorkerTaskLauncher.WorkerStats>> getWorkerStats()
+  {
+    return workerStats;
+  }
+
   @Nullable
   @JsonProperty
   @JsonInclude(JsonInclude.Include.NON_NULL)
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
index 3b49572996e..4bc3d1075c1 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
@@ -55,6 +55,7 @@ import java.io.File;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -107,7 +108,7 @@ public class MSQTaskReportTest
     final MSQTaskReport report = new MSQTaskReport(
         TASK_ID,
         new MSQTaskReportPayload(
-            new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), 
null, 0, 1, 2, status),
+            new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), 
null, 0, new HashMap<>(), 1, 2, status),
             MSQStagesReport.create(
                 QUERY_DEFINITION,
                 ImmutableMap.of(),
@@ -172,7 +173,7 @@ public class MSQTaskReportTest
     final MSQTaskReport report = new MSQTaskReport(
         TASK_ID,
         new MSQTaskReportPayload(
-            new MSQStatusReport(TaskState.FAILED, errorReport, new 
ArrayDeque<>(), null, 0, 1, 2, status),
+            new MSQStatusReport(TaskState.FAILED, errorReport, new 
ArrayDeque<>(), null, 0, new HashMap<>(), 1, 2, status),
             MSQStagesReport.create(
                 QUERY_DEFINITION,
                 ImmutableMap.of(),
@@ -220,7 +221,7 @@ public class MSQTaskReportTest
     final MSQTaskReport report = new MSQTaskReport(
         TASK_ID,
         new MSQTaskReportPayload(
-            new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), 
null, 0, 1, 2, status),
+            new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), 
null, 0, new HashMap<>(), 1, 2, status),
             MSQStagesReport.create(
                 QUERY_DEFINITION,
                 ImmutableMap.of(),
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
index d6572801207..e3995a4a96c 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
@@ -244,6 +244,7 @@ public class SqlStatementResourceTest extends MSQTestBase
               new ArrayDeque<>(),
               null,
               0,
+              new HashMap<>(),
               1,
               2,
               null
@@ -310,6 +311,7 @@ public class SqlStatementResourceTest extends MSQTestBase
               new ArrayDeque<>(),
               null,
               0,
+              new HashMap<>(),
               1,
               2,
               null
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java
index 806bd8ebe98..0254a61a2c7 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java
@@ -67,6 +67,7 @@ public class SqlStatementResourceHelperTest
         new ArrayDeque<>(),
         null,
         0,
+        new HashMap<>(),
         1,
         2,
         null
@@ -105,6 +106,7 @@ public class SqlStatementResourceHelperTest
         new ArrayDeque<>(),
         null,
         0,
+        new HashMap<>(),
         1,
         2,
         null
@@ -144,6 +146,7 @@ public class SqlStatementResourceHelperTest
         new ArrayDeque<>(),
         null,
         0,
+        new HashMap<>(),
         1,
         2,
         null
@@ -181,6 +184,7 @@ public class SqlStatementResourceHelperTest
         new ArrayDeque<>(),
         null,
         0,
+        new HashMap<>(),
         1,
         2,
         null
@@ -220,6 +224,7 @@ public class SqlStatementResourceHelperTest
         new ArrayDeque<>(),
         null,
         0,
+        new HashMap<>(),
         1,
         2,
         null


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to