This is an automated email from the ASF dual-hosted git repository.

tuglu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new db2b81eb0a9 Add task/waiting/time metric (#18735)
db2b81eb0a9 is described below

commit db2b81eb0a936864370f27e2fe0832c665e8613a
Author: jtuglu1 <[email protected]>
AuthorDate: Tue Nov 11 00:22:49 2025 -0800

    Add task/waiting/time metric (#18735)
    
    Adds a metric `task/waiting/time` that measures the time it takes for a 
task to be placed onto the task runner for scheduling+running. This time 
encompasses initial persistent storage insertion, lock acquisition time, any 
task queue-related latencies, noisy-neighbor-related slowness, and any other 
delays between when a task is first added and when it is scheduled for running 
on the task runner. Useful for debugging issues related to task startup.
---
 docs/operations/metrics.md                         |  3 +-
 .../apache/druid/indexing/overlord/TaskQueue.java  | 22 ++++++++++-
 .../druid/indexing/overlord/TaskQueueTest.java     | 46 ++++++++++++++++++++++
 3 files changed, 68 insertions(+), 3 deletions(-)

diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index d16246ab6d4..aa1c25a693a 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -302,7 +302,8 @@ If the JVM does not support CPU time measurement for the 
current thread, `ingest
 |Metric|Description|Dimensions|Normal value|
 |------|-----------|----------|------------|
 |`task/run/time`|Milliseconds taken to run a task.| `dataSource`, `taskId`, 
`taskType`, `groupId`, `taskStatus`, `description`, `tags`|Varies|
-|`task/pending/time`|Milliseconds taken for a task to wait for running.| 
`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Varies|
+|`task/pending/time`|Milliseconds taken for a task to start running after 
being scheduled by the Overlord.| `dataSource`, `taskId`, `taskType`, 
`groupId`, `tags`|Varies|
+|`task/waiting/time`|Milliseconds taken for a task to be scheduled to run 
after being submitted to the Overlord.| `dataSource`, `taskId`, `taskType`, 
`groupId`, `tags`|Varies|
 |`task/action/run/time`|Milliseconds taken to execute a task action.| 
`dataSource`, `taskId`, `taskType`, `groupId`, `taskActionType`, `tags`|Varies 
from subsecond to a few seconds, based on action type.|
 |`task/action/success/count`|Number of task actions that were executed 
successfully during the emission period. Currently only being emitted for 
[batched `segmentAllocate` 
actions](../ingestion/tasks.md#batching-segmentallocate-actions).| 
`dataSource`, `taskId`, `taskType`, `groupId`, `taskActionType`, `tags`|Varies|
 |`task/action/failed/count`|Number of task actions that failed during the 
emission period. Currently only being emitted for [batched `segmentAllocate` 
actions](../ingestion/tasks.md#batching-segmentallocate-actions).| 
`dataSource`, `taskId`, `taskType`, `groupId`, `taskActionType`, `tags`|Varies|
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index c923e70fcad..985579b923a 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -69,6 +69,7 @@ import org.apache.druid.server.coordinator.stats.Dimension;
 import org.apache.druid.server.coordinator.stats.RowKey;
 import org.apache.druid.utils.CollectionUtils;
 import org.joda.time.DateTime;
+import org.joda.time.Duration;
 
 import java.util.Collection;
 import java.util.HashMap;
@@ -454,6 +455,12 @@ public class TaskQueue
           if (taskIsReady) {
             log.info("Asking taskRunner to run task[%s]", task.getId());
             runnerTaskFuture = taskRunner.run(task);
+
+            // Emit the waiting time for the task
+            final ServiceMetricEvent.Builder metricBuilder = new 
ServiceMetricEvent.Builder();
+            IndexTaskUtils.setTaskDimensions(metricBuilder, task);
+            final long waitDurationMillis = new 
Duration(entry.getTaskSubmittedTime(), DateTimes.nowUtc()).getMillis();
+            emitter.emit(metricBuilder.setMetric("task/waiting/time", 
waitDurationMillis));
           } else {
             // Task.isReady() can internally lock intervals or segments.
             // We should release them if the task is not ready.
@@ -564,7 +571,7 @@ public class TaskQueue
         prevEntry -> {
           if (prevEntry == null) {
             added.set(true);
-            return new TaskEntry(taskInfo);
+            return new TaskEntry(taskInfo, updateTime);
           } else if (prevEntry.lastUpdatedTime.isBefore(updateTime)) {
             prevEntry.updateStatus(taskInfo.getStatus(), updateTime);
           }
@@ -1163,13 +1170,16 @@ public class TaskQueue
   {
     private TaskInfo taskInfo;
 
+    // Approximate time this task was submitted to Overlord
+    private final DateTime taskSubmittedTime;
     private DateTime lastUpdatedTime;
     private ListenableFuture<TaskStatus> future = null;
     private boolean isComplete = false;
 
-    TaskEntry(TaskInfo taskInfo)
+    TaskEntry(TaskInfo taskInfo, DateTime taskSubmittedTime)
     {
       this.taskInfo = taskInfo;
+      this.taskSubmittedTime = taskSubmittedTime;
       this.lastUpdatedTime = DateTimes.nowUtc();
     }
 
@@ -1190,6 +1200,14 @@ public class TaskQueue
       this.taskInfo = this.taskInfo.withStatus(status);
       this.lastUpdatedTime = updateTime;
     }
+
+    /**
+     * Returns the approximate time the task referenced by this {@link 
TaskEntry} was submitted to the Overlord.
+     */
+    DateTime getTaskSubmittedTime()
+    {
+      return taskSubmittedTime;
+    }
   }
 
   private static RowKey getMetricKey(final Task task)
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
index 3ee1fa065b2..71441862bff 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
@@ -690,6 +690,52 @@ public class TaskQueueTest extends IngestionTestBase
     Assert.assertEquals(failedStatus, 
getTaskStorage().getStatus(task.getId()).get());
   }
 
+  @Test
+  public void testTaskWaitingTimeMetricNotEmittedWhenTaskNotReady() throws 
Exception
+  {
+    // task1 acquires a lock that will block task2
+    final TestTask task1 = new TestTask("t1", Intervals.of("2021-01/P1M"));
+    prepareTaskForLocking(task1);
+    Assert.assertTrue(task1.isReady(actionClientFactory.create(task1)));
+
+    // task2 will not be ready because of task1's lock
+    final TestTask task2 = new TestTask("t2", Intervals.of("2021-01-31/P1M"));
+    taskQueue.add(task2);
+    taskQueue.manageQueuedTasks();
+
+    Thread.sleep(100);
+
+    // Verify that task/waiting/time was not emitted for task2 since it's not 
ready
+    serviceEmitter.verifyNotEmitted("task/waiting/time");
+
+    // Now release task1's lock
+    shutdownTask(task1);
+
+    // task2 should now be ready and run
+    taskQueue.manageQueuedTasks();
+    Thread.sleep(100);
+    serviceEmitter.verifyEmitted("task/waiting/time", 1);
+    serviceEmitter.verifyEmitted("task/run/time", 1);
+  }
+
+  @Test
+  public void testTaskWaitingTimeMetricEmittedForMultipleTasks() throws 
Exception
+  {
+    final TestTask task1 = new TestTask("multi-wait-task-1", 
Intervals.of("2021-01-01/P1D"));
+    final TestTask task2 = new TestTask("multi-wait-task-2", 
Intervals.of("2021-01-02/P1D"));
+    final TestTask task3 = new TestTask("multi-wait-task-3", 
Intervals.of("2021-01-03/P1D"));
+
+    taskQueue.add(task1);
+    taskQueue.add(task2);
+    taskQueue.add(task3);
+    taskQueue.manageQueuedTasks();
+
+    Thread.sleep(100);
+
+    serviceEmitter.verifyEmitted("task/waiting/time", 3);
+    serviceEmitter.verifyEmitted("task/run/time", 3);
+  }
+
   private HttpRemoteTaskRunner createHttpRemoteTaskRunner()
   {
     final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to