This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5bb5b41b188 Adding task pending time in MSQ reports (#15966)
5bb5b41b188 is described below

commit 5bb5b41b188b83778ebc459a1ad036e06f7a558a
Author: Karan Kumar <[email protected]>
AuthorDate: Tue Feb 27 14:41:28 2024 +0530

    Adding task pending time in MSQ reports (#15966)
    
    
        Added a new field pendingMs in MSQ task reports. This helps in figuring 
out the exact run time of the MSQ worker tasks.
        Fixed data races.
---
 docs/api-reference/sql-ingestion-api.md            |  13 +-
 .../druid/msq/indexing/MSQWorkerTaskLauncher.java  | 172 +++++++++++++--------
 2 files changed, 119 insertions(+), 66 deletions(-)

diff --git a/docs/api-reference/sql-ingestion-api.md 
b/docs/api-reference/sql-ingestion-api.md
index 5492c3ea46d..988f860a85d 100644
--- a/docs/api-reference/sql-ingestion-api.md
+++ b/docs/api-reference/sql-ingestion-api.md
@@ -287,6 +287,16 @@ The response shows an example report for a query.
         "status": "SUCCESS",
         "startTime": "2022-09-14T22:12:09.266Z",
         "durationMs": 28227,
+        "workers": {
+          "0": [
+            {
+              "workerId": 
"query-3dc0c45d-34d7-4b15-86c9-cdb2d3ebfc4e-worker0_0",
+              "state": "SUCCESS",
+              "durationMs": 15511,
+              "pendingMs": 137
+            }
+          ]
+        },
         "pendingTasks": 0,
         "runningTasks": 2,
         "segmentLoadStatus": {
@@ -607,7 +617,8 @@ The following table describes the response fields when you 
retrieve a report for
 | `multiStageQuery.payload.status.workers.<workerNumber>` | Array of worker 
tasks including retries. |
 | `multiStageQuery.payload.status.workers.<workerNumber>[].workerId` | Id of 
the worker task.| |
 | `multiStageQuery.payload.status.workers.<workerNumber>[].status` | RUNNING, 
SUCCESS, or FAILED.|
-| `multiStageQuery.payload.status.workers.<workerNumber>[].durationMs` | 
Milliseconds elapsed after the worker task started running. It is -1 for worker 
tasks with status RUNNING.|
+| `multiStageQuery.payload.status.workers.<workerNumber>[].durationMs` | 
Milliseconds elapsed between when the worker task was first requested and when 
it finished. It is -1 for worker tasks with status RUNNING.|
+| `multiStageQuery.payload.status.workers.<workerNumber>[].pendingMs` | 
Milliseconds elapsed between when the worker task was first requested and when 
it fully started RUNNING. Actual work time can be calculated using 
`actualWorkTimeMS = durationMs - pendingMs`.|
 | `multiStageQuery.payload.status.pendingTasks` | Number of tasks that are not 
fully started. -1 denotes that the number is currently unknown. |
 | `multiStageQuery.payload.status.runningTasks` | Number of currently running 
tasks. Should be at least 1 since the controller is included. |
 | `multiStageQuery.payload.status.segmentLoadStatus` | Segment loading 
container. Only present after the segments have been published. |
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
index c2092e7f24a..a485831532f 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
@@ -306,7 +306,6 @@ public class MSQWorkerTaskLauncher
    * Blocks the call untill the worker tasks are ready to be contacted for 
work.
    *
    * @param workerSet
-   *
    * @throws InterruptedException
    */
   public void waitUntilWorkersReady(Set<Integer> workerSet) throws 
InterruptedException
@@ -353,45 +352,6 @@ public class MSQWorkerTaskLauncher
     }
   }
 
-  public static class WorkerStats
-  {
-    String workerId;
-    TaskState state;
-    long duration;
-
-    /**
-     * For JSON deserialization only
-     */
-    public WorkerStats()
-    {
-    }
-
-    public WorkerStats(String workerId, TaskState state, long duration)
-    {
-      this.workerId = workerId;
-      this.state = state;
-      this.duration = duration;
-    }
-
-    @JsonProperty
-    public String getWorkerId()
-    {
-      return workerId;
-    }
-
-    @JsonProperty
-    public TaskState getState()
-    {
-      return state;
-    }
-
-    @JsonProperty("durationMs")
-    public long getDuration()
-    {
-      return duration;
-    }
-  }
-
   public Map<Integer, List<WorkerStats>> getWorkerStats()
   {
     final Map<Integer, List<WorkerStats>> workerStats = new TreeMap<>();
@@ -400,14 +360,17 @@ public class MSQWorkerTaskLauncher
 
       TaskTracker taskTracker = taskEntry.getValue();
 
+      TaskStatus taskStatus = taskTracker.statusRef.get();
       workerStats.computeIfAbsent(taskTracker.workerNumber, k -> new 
ArrayList<>())
-                 .add(new WorkerStats(taskEntry.getKey(),
-                                      taskTracker.status.getStatusCode(),
-                                      // getDuration() returns -1 for running 
tasks.
-                                      // It's not calculated on-the-fly here 
since
-                                      // taskTracker.startTimeMillis marks task
-                                      // submission time rather than the 
actual start.
-                                      taskTracker.status.getDuration()
+                 .add(new WorkerStats(
+                     taskEntry.getKey(),
+                     taskStatus.getStatusCode(),
+                     // getDuration() returns -1 for running tasks.
+                     // It's not calculated on-the-fly here since
+                     // taskTracker.startTimeMillis marks task
+                     // submission time rather than the actual start.
+                     taskStatus.getDuration(),
+                     taskTracker.taskPendingTimeInMs()
                  ));
     }
 
@@ -576,18 +539,20 @@ public class MSQWorkerTaskLauncher
       for (Map.Entry<String, TaskStatus> statusEntry : statuses.entrySet()) {
         final String taskId = statusEntry.getKey();
         final TaskTracker tracker = taskTrackers.get(taskId);
-        tracker.status = statusEntry.getValue();
+        tracker.updateStatus(statusEntry.getValue());
+        TaskStatus status = tracker.statusRef.get();
 
-        if (!tracker.status.getStatusCode().isComplete() && 
tracker.unknownLocation()) {
+        if (!status.getStatusCode().isComplete() && tracker.unknownLocation()) 
{
           // Look up location if not known. Note: this location is not used to 
actually contact the task. For that,
           // we have SpecificTaskServiceLocator. This location is only used to 
determine if a task has started up.
-          tracker.initialLocation = workerManager.location(taskId);
+          tracker.setLocation(workerManager.location(taskId));
         }
 
-        if (tracker.status.getStatusCode() == TaskState.RUNNING && 
!tracker.unknownLocation()) {
+        if (status.getStatusCode() == TaskState.RUNNING && 
!tracker.unknownLocation()) {
           synchronized (taskIds) {
             if (fullyStartedTasks.add(tracker.workerNumber)) {
               
recentFullyStartedWorkerTimeInMillis.set(System.currentTimeMillis());
+              tracker.setFullyStartedTime(System.currentTimeMillis());
             }
             taskIds.notifyAll();
           }
@@ -616,7 +581,7 @@ public class MSQWorkerTaskLauncher
         continue;
       }
 
-      if (tracker.status == null) {
+      if (tracker.statusRef.get() == null) {
         removeWorkerFromFullyStartedWorkers(tracker);
         final String errorMessage = StringUtils.format("Task [%s] status 
missing", taskId);
         log.info(errorMessage + ". Trying to relaunch the worker");
@@ -635,9 +600,10 @@ public class MSQWorkerTaskLauncher
         ));
       } else if (tracker.didFail() && !canceledWorkerTasks.contains(taskId)) {
         removeWorkerFromFullyStartedWorkers(tracker);
-        log.info("Task[%s] failed because %s. Trying to relaunch the worker", 
taskId, tracker.status.getErrorMsg());
+        TaskStatus taskStatus = tracker.statusRef.get();
+        log.info("Task[%s] failed because %s. Trying to relaunch the worker", 
taskId, taskStatus.getErrorMsg());
         tracker.enableRetrying();
-        retryTask.retry(tracker.msqWorkerTask, new WorkerFailedFault(taskId, 
tracker.status.getErrorMsg()));
+        retryTask.retry(tracker.msqWorkerTask, new WorkerFailedFault(taskId, 
taskStatus.getErrorMsg()));
       }
     }
   }
@@ -717,7 +683,7 @@ public class MSQWorkerTaskLauncher
           Limits.PER_WORKER_RELAUNCH_LIMIT,
           relaunchTask.getId(),
           relaunchTask.getWorkerNumber(),
-          tracker.status.getErrorMsg()
+          tracker.statusRef.get().getErrorMsg()
       ));
     }
     if (currentRelaunchCount > Limits.TOTAL_RELAUNCH_LIMIT) {
@@ -725,7 +691,7 @@ public class MSQWorkerTaskLauncher
           Limits.TOTAL_RELAUNCH_LIMIT,
           currentRelaunchCount,
           relaunchTask.getId(),
-          tracker.status.getErrorMsg()
+          tracker.statusRef.get().getErrorMsg()
       ));
     }
   }
@@ -737,8 +703,9 @@ public class MSQWorkerTaskLauncher
     for (final Map.Entry<String, TaskTracker> taskEntry : 
taskTrackers.entrySet()) {
       final String taskId = taskEntry.getKey();
       final TaskTracker tracker = taskEntry.getValue();
-      if (!canceledWorkerTasks.contains(taskId)
-          && (tracker.status == null || 
!tracker.status.getStatusCode().isComplete())) {
+      if ((!canceledWorkerTasks.contains(taskId))
+          &&
+          (!tracker.isComplete())) {
         canceledWorkerTasks.add(taskId);
         context.workerManager().cancel(taskId);
       }
@@ -831,11 +798,12 @@ public class MSQWorkerTaskLauncher
   {
     private final int workerNumber;
     private final long startTimeMillis = System.currentTimeMillis();
+    private final AtomicLong taskFullyStartedTimeRef = new AtomicLong();
     private final MSQWorkerTask msqWorkerTask;
-    private TaskStatus status;
-    private TaskLocation initialLocation;
+    private final AtomicReference<TaskStatus> statusRef = new 
AtomicReference<>();
+    private final AtomicReference<TaskLocation> initialLocationRef = new 
AtomicReference<>();
 
-    private boolean isRetrying = false;
+    private final AtomicBoolean isRetryingRef = new AtomicBoolean(false);
 
     public TaskTracker(int workerNumber, MSQWorkerTask msqWorkerTask)
     {
@@ -845,16 +813,19 @@ public class MSQWorkerTaskLauncher
 
     public boolean unknownLocation()
     {
+      TaskLocation initialLocation = initialLocationRef.get();
       return initialLocation == null || 
TaskLocation.unknown().equals(initialLocation);
     }
 
     public boolean isComplete()
     {
+      TaskStatus status = statusRef.get();
       return status != null && status.getStatusCode().isComplete();
     }
 
     public boolean didFail()
     {
+      TaskStatus status = statusRef.get();
       return status != null && status.getStatusCode().isFailure();
     }
 
@@ -869,6 +840,7 @@ public class MSQWorkerTaskLauncher
     public boolean didRunTimeOut(final long maxTaskStartDelayMillis)
     {
       long currentTimeMillis = System.currentTimeMillis();
+      TaskStatus status = statusRef.get();
       return (status == null || status.getStatusCode() == TaskState.RUNNING)
              && unknownLocation()
              && currentTimeMillis - recentFullyStartedWorkerTimeInMillis.get() 
> maxTaskStartDelayMillis
@@ -880,17 +852,87 @@ public class MSQWorkerTaskLauncher
      */
     public void enableRetrying()
     {
-      isRetrying = true;
+      isRetryingRef.set(true);
     }
 
     /**
      * Checks is the task is retrying,
-     *
-     * @return
      */
     public boolean isRetrying()
     {
-      return isRetrying;
+      return isRetryingRef.get();
+    }
+
+    public void setLocation(TaskLocation taskLocation)
+    {
+      initialLocationRef.set(taskLocation);
+    }
+
+    public void updateStatus(TaskStatus taskStatus)
+    {
+      statusRef.set(taskStatus);
+    }
+
+    public void setFullyStartedTime(long currentTimeMillis)
+    {
+      taskFullyStartedTimeRef.set(currentTimeMillis);
+    }
+
+    public long taskPendingTimeInMs()
+    {
+      long currentFullyStartingTime = taskFullyStartedTimeRef.get();
+      if (currentFullyStartingTime == 0) {
+        return System.currentTimeMillis() - startTimeMillis;
+      } else {
+        return Math.max(0, currentFullyStartingTime - startTimeMillis);
+      }
+    }
+  }
+
+  public static class WorkerStats
+  {
+    String workerId;
+    TaskState state;
+    long duration;
+    long pendingTimeInMs;
+
+    /**
+     * For JSON deserialization only
+     */
+    public WorkerStats()
+    {
+    }
+
+    public WorkerStats(String workerId, TaskState state, long duration, long 
pendingTimeInMs)
+    {
+      this.workerId = workerId;
+      this.state = state;
+      this.duration = duration;
+      this.pendingTimeInMs = pendingTimeInMs;
+    }
+
+    @JsonProperty
+    public String getWorkerId()
+    {
+      return workerId;
+    }
+
+    @JsonProperty
+    public TaskState getState()
+    {
+      return state;
+    }
+
+    @JsonProperty("durationMs")
+    public long getDuration()
+    {
+      return duration;
+    }
+
+    @JsonProperty("pendingMs")
+    public long getPendingTimeInMs()
+    {
+      return pendingTimeInMs;
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to