This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 4775427e2c Add task start status to worker report (#13263)
4775427e2c is described below

commit 4775427e2c77840c19e1d2458b7e0bb0d01e76d8
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Fri Oct 28 12:00:15 2022 +0530

    Add task start status to worker report (#13263)
    
    * Add task start status to worker report
    
    * Address review comments
    
    * Address review comments
    
    * Update documentation
    
    * Update spelling checks
---
 docs/multi-stage-query/api.md                      |  2 ++
 .../org/apache/druid/msq/exec/ControllerImpl.java  | 27 ++++++++++++++++++----
 .../druid/msq/indexing/MSQWorkerTaskLauncher.java  | 14 +++++++++++
 .../druid/msq/indexing/report/MSQStatusReport.java | 21 ++++++++++++++++-
 .../msq/indexing/report/MSQTaskReportTest.java     |  8 ++++---
 website/.spelling                                  |  2 ++
 6 files changed, 66 insertions(+), 8 deletions(-)

diff --git a/docs/multi-stage-query/api.md b/docs/multi-stage-query/api.md
index 5de3ed1273..b81370c42a 100644
--- a/docs/multi-stage-query/api.md
+++ b/docs/multi-stage-query/api.md
@@ -553,6 +553,8 @@ The following table describes the response fields when you 
retrieve a report for
 |multiStageQuery.payload.status.status|RUNNING, SUCCESS, or FAILED.|
 |multiStageQuery.payload.status.startTime|Start time of the query in ISO 
format. Only present if the query has started running.|
 |multiStageQuery.payload.status.durationMs|Milliseconds elapsed after the 
query has started running. -1 denotes that the query hasn't started running 
yet.|
+|multiStageQuery.payload.status.pendingTasks|Number of tasks that are not 
fully started. -1 denotes that the number is currently unknown.|
+|multiStageQuery.payload.status.runningTasks|Number of currently running 
tasks. Should be at least 1 since the controller is included.|
 |multiStageQuery.payload.status.errorReport|Error object. Only present if 
there was an error.|
 |multiStageQuery.payload.status.errorReport.taskId|The task that reported the 
error, if known. May be a controller task or a worker task.|
 |multiStageQuery.payload.status.errorReport.host|The hostname and port of the 
task that reported the error, if known.|
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 68d73b2efd..8686abbc2d 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -447,7 +447,8 @@ public class ControllerImpl implements Controller
               errorForReport,
               workerWarnings,
               queryStartTime,
-              new Interval(queryStartTime, 
DateTimes.nowUtc()).toDurationMillis()
+              new Interval(queryStartTime, 
DateTimes.nowUtc()).toDurationMillis(),
+              workerTaskLauncher
           ),
           stagesReport,
           countersSnapshot,
@@ -716,7 +717,8 @@ public class ControllerImpl implements Controller
                     null,
                     workerWarnings,
                     queryStartTime,
-                    queryStartTime == null ? -1L : new 
Interval(queryStartTime, DateTimes.nowUtc()).toDurationMillis()
+                    queryStartTime == null ? -1L : new 
Interval(queryStartTime, DateTimes.nowUtc()).toDurationMillis(),
+                    workerTaskLauncher
                 ),
                 makeStageReport(
                     queryDef,
@@ -1807,10 +1809,27 @@ public class ControllerImpl implements Controller
       @Nullable final MSQErrorReport errorReport,
       final Queue<MSQErrorReport> errorReports,
       @Nullable final DateTime queryStartTime,
-      final long queryDuration
+      final long queryDuration,
+      MSQWorkerTaskLauncher taskLauncher
   )
   {
-    return new MSQStatusReport(taskState, errorReport, errorReports, 
queryStartTime, queryDuration);
+    int pendingTasks = -1;
+    int runningTasks = 1;
+
+    if (taskLauncher != null) {
+      Pair<Integer, Integer> workerTaskStatus = 
taskLauncher.getWorkerTaskStatus();
+      pendingTasks = workerTaskStatus.lhs;
+      runningTasks = workerTaskStatus.rhs + 1; // To account for controller.
+    }
+    return new MSQStatusReport(
+        taskState,
+        errorReport,
+        errorReports,
+        queryStartTime,
+        queryDuration,
+        pendingTasks,
+        runningTasks
+    );
   }
 
   private static InputSpecSlicerFactory makeInputSpecSlicerFactory(final 
DataSegmentTimelineView timelineView)
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
index f84e067931..12c869d23c 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
@@ -30,6 +30,7 @@ import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.logger.Logger;
@@ -345,6 +346,19 @@ public class MSQWorkerTaskLauncher
     }
   }
 
+  /**
+   * Returns a pair which contains the number of currently running worker 
tasks and the number of worker tasks that are
+   * not yet fully started as left and right respectively.
+   */
+  public Pair<Integer, Integer> getWorkerTaskStatus()
+  {
+    synchronized (taskIds) {
+      int runningTasks = fullyStartedTasks.size();
+      int pendingTasks = desiredTaskCount - runningTasks;
+      return Pair.of(runningTasks, pendingTasks);
+    }
+  }
+
   /**
    * Used by the main loop to update {@link #taskTrackers} and {@link 
#fullyStartedTasks}.
    */
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQStatusReport.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQStatusReport.java
index d2849d8b2c..3791bc82e1 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQStatusReport.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQStatusReport.java
@@ -46,6 +46,9 @@ public class MSQStatusReport
 
   private final long durationMs;
 
+  private final int pendingTasks;
+
+  private final int runningTasks;
 
   @JsonCreator
   public MSQStatusReport(
@@ -53,7 +56,9 @@ public class MSQStatusReport
       @JsonProperty("errorReport") @Nullable MSQErrorReport errorReport,
       @JsonProperty("warnings") Collection<MSQErrorReport> warningReports,
       @JsonProperty("startTime") @Nullable DateTime startTime,
-      @JsonProperty("durationMs") long durationMs
+      @JsonProperty("durationMs") long durationMs,
+      @JsonProperty("pendingTasks") int pendingTasks,
+      @JsonProperty("runningTasks") int runningTasks
   )
   {
     this.status = Preconditions.checkNotNull(status, "status");
@@ -61,6 +66,8 @@ public class MSQStatusReport
     this.warningReports = warningReports != null ? warningReports : 
Collections.emptyList();
     this.startTime = startTime;
     this.durationMs = durationMs;
+    this.pendingTasks = pendingTasks;
+    this.runningTasks = runningTasks;
   }
 
   @JsonProperty
@@ -92,6 +99,18 @@ public class MSQStatusReport
     return startTime;
   }
 
+  @JsonProperty
+  public int getPendingTasks()
+  {
+    return pendingTasks;
+  }
+
+  @JsonProperty
+  public int getRunningTasks()
+  {
+    return runningTasks;
+  }
+
   @JsonProperty
   public long getDurationMs()
   {
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
index 2e4fc794cf..4902675509 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
@@ -91,7 +91,7 @@ public class MSQTaskReportTest
     final MSQTaskReport report = new MSQTaskReport(
         TASK_ID,
         new MSQTaskReportPayload(
-            new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), 
null, 0),
+            new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), 
null, 0, 1, 2),
             MSQStagesReport.create(
                 QUERY_DEFINITION,
                 ImmutableMap.of(),
@@ -119,6 +119,8 @@ public class MSQTaskReportTest
     Assert.assertEquals(TASK_ID, report2.getTaskId());
     Assert.assertEquals(report.getPayload().getStatus().getStatus(), 
report2.getPayload().getStatus().getStatus());
     Assert.assertNull(report2.getPayload().getStatus().getErrorReport());
+    Assert.assertEquals(report.getPayload().getStatus().getRunningTasks(), 
report2.getPayload().getStatus().getRunningTasks());
+    Assert.assertEquals(report.getPayload().getStatus().getPendingTasks(), 
report2.getPayload().getStatus().getPendingTasks());
     Assert.assertEquals(report.getPayload().getStages(), 
report2.getPayload().getStages());
 
     Yielder<Object[]> yielder = 
report2.getPayload().getResults().getResultYielder();
@@ -142,7 +144,7 @@ public class MSQTaskReportTest
     final MSQTaskReport report = new MSQTaskReport(
         TASK_ID,
         new MSQTaskReportPayload(
-            new MSQStatusReport(TaskState.FAILED, errorReport, new 
ArrayDeque<>(), null, 0),
+            new MSQStatusReport(TaskState.FAILED, errorReport, new 
ArrayDeque<>(), null, 0, 1, 2),
             MSQStagesReport.create(
                 QUERY_DEFINITION,
                 ImmutableMap.of(),
@@ -179,7 +181,7 @@ public class MSQTaskReportTest
     final MSQTaskReport report = new MSQTaskReport(
         TASK_ID,
         new MSQTaskReportPayload(
-            new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), 
null, 0),
+            new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), 
null, 0, 1, 2),
             MSQStagesReport.create(
                 QUERY_DEFINITION,
                 ImmutableMap.of(),
diff --git a/website/.spelling b/website/.spelling
index bbabc3160e..5f9e60ff58 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -632,6 +632,8 @@ multiStageQuery.payload.status
 multiStageQuery.payload.status.status
 multiStageQuery.payload.status.startTime 
 multiStageQuery.payload.status.durationMs
+multiStageQuery.payload.status.pendingTasks
+multiStageQuery.payload.status.runningTasks
 multiStageQuery.payload.status.errorReport
 multiStageQuery.payload.status.errorReport.taskId
 multiStageQuery.payload.status.errorReport.host


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to