This is an automated email from the ASF dual-hosted git repository.
tuglu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new db2b81eb0a9 Add task/waiting/time metric (#18735)
db2b81eb0a9 is described below
commit db2b81eb0a936864370f27e2fe0832c665e8613a
Author: jtuglu1 <[email protected]>
AuthorDate: Tue Nov 11 00:22:49 2025 -0800
Add task/waiting/time metric (#18735)
Adds a metric `task/waiting/time` that measures the time it takes for a
task to be placed onto the task runner for scheduling+running. This time
encompasses initial persistent storage insertion, lock acquisition time, any
task queue-related latencies, noisy-neighbor-related slowness, and any other
delays between when a task is first added and when it is scheduled for running
on the task runner. Useful for debugging issues related to task startup.
---
docs/operations/metrics.md | 3 +-
.../apache/druid/indexing/overlord/TaskQueue.java | 22 ++++++++++-
.../druid/indexing/overlord/TaskQueueTest.java | 46 ++++++++++++++++++++++
3 files changed, 68 insertions(+), 3 deletions(-)
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index d16246ab6d4..aa1c25a693a 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -302,7 +302,8 @@ If the JVM does not support CPU time measurement for the
current thread, `ingest
|Metric|Description|Dimensions|Normal value|
|------|-----------|----------|------------|
|`task/run/time`|Milliseconds taken to run a task.| `dataSource`, `taskId`,
`taskType`, `groupId`, `taskStatus`, `description`, `tags`|Varies|
-|`task/pending/time`|Milliseconds taken for a task to wait for running.|
`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Varies|
+|`task/pending/time`|Milliseconds taken for a task to start running after
being scheduled by the Overlord.| `dataSource`, `taskId`, `taskType`,
`groupId`, `tags`|Varies|
+|`task/waiting/time`|Milliseconds taken for a task to be scheduled to run
after being submitted to the Overlord.| `dataSource`, `taskId`, `taskType`,
`groupId`, `tags`|Varies|
|`task/action/run/time`|Milliseconds taken to execute a task action.|
`dataSource`, `taskId`, `taskType`, `groupId`, `taskActionType`, `tags`|Varies
from subsecond to a few seconds, based on action type.|
|`task/action/success/count`|Number of task actions that were executed
successfully during the emission period. Currently only being emitted for
[batched `segmentAllocate`
actions](../ingestion/tasks.md#batching-segmentallocate-actions).|
`dataSource`, `taskId`, `taskType`, `groupId`, `taskActionType`, `tags`|Varies|
|`task/action/failed/count`|Number of task actions that failed during the
emission period. Currently only being emitted for [batched `segmentAllocate`
actions](../ingestion/tasks.md#batching-segmentallocate-actions).|
`dataSource`, `taskId`, `taskType`, `groupId`, `taskActionType`, `tags`|Varies|
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index c923e70fcad..985579b923a 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -69,6 +69,7 @@ import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;
+import org.joda.time.Duration;
import java.util.Collection;
import java.util.HashMap;
@@ -454,6 +455,12 @@ public class TaskQueue
if (taskIsReady) {
log.info("Asking taskRunner to run task[%s]", task.getId());
runnerTaskFuture = taskRunner.run(task);
+
+ // Emit the waiting time for the task
+ final ServiceMetricEvent.Builder metricBuilder = new
ServiceMetricEvent.Builder();
+ IndexTaskUtils.setTaskDimensions(metricBuilder, task);
+ final long waitDurationMillis = new
Duration(entry.getTaskSubmittedTime(), DateTimes.nowUtc()).getMillis();
+ emitter.emit(metricBuilder.setMetric("task/waiting/time",
waitDurationMillis));
} else {
// Task.isReady() can internally lock intervals or segments.
// We should release them if the task is not ready.
@@ -564,7 +571,7 @@ public class TaskQueue
prevEntry -> {
if (prevEntry == null) {
added.set(true);
- return new TaskEntry(taskInfo);
+ return new TaskEntry(taskInfo, updateTime);
} else if (prevEntry.lastUpdatedTime.isBefore(updateTime)) {
prevEntry.updateStatus(taskInfo.getStatus(), updateTime);
}
@@ -1163,13 +1170,16 @@ public class TaskQueue
{
private TaskInfo taskInfo;
+ // Approximate time this task was submitted to Overlord
+ private final DateTime taskSubmittedTime;
private DateTime lastUpdatedTime;
private ListenableFuture<TaskStatus> future = null;
private boolean isComplete = false;
- TaskEntry(TaskInfo taskInfo)
+ TaskEntry(TaskInfo taskInfo, DateTime taskSubmittedTime)
{
this.taskInfo = taskInfo;
+ this.taskSubmittedTime = taskSubmittedTime;
this.lastUpdatedTime = DateTimes.nowUtc();
}
@@ -1190,6 +1200,14 @@ public class TaskQueue
this.taskInfo = this.taskInfo.withStatus(status);
this.lastUpdatedTime = updateTime;
}
+
+ /**
+ * Returns the approximate time the task referenced by this {@link
TaskEntry} was submitted to the Overlord.
+ */
+ DateTime getTaskSubmittedTime()
+ {
+ return taskSubmittedTime;
+ }
}
private static RowKey getMetricKey(final Task task)
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
index 3ee1fa065b2..71441862bff 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
@@ -690,6 +690,52 @@ public class TaskQueueTest extends IngestionTestBase
Assert.assertEquals(failedStatus,
getTaskStorage().getStatus(task.getId()).get());
}
+ @Test
+ public void testTaskWaitingTimeMetricNotEmittedWhenTaskNotReady() throws
Exception
+ {
+ // task1 acquires a lock that will block task2
+ final TestTask task1 = new TestTask("t1", Intervals.of("2021-01/P1M"));
+ prepareTaskForLocking(task1);
+ Assert.assertTrue(task1.isReady(actionClientFactory.create(task1)));
+
+ // task2 will not be ready because of task1's lock
+ final TestTask task2 = new TestTask("t2", Intervals.of("2021-01-31/P1M"));
+ taskQueue.add(task2);
+ taskQueue.manageQueuedTasks();
+
+ Thread.sleep(100);
+
+ // Verify that task/waiting/time was not emitted for task2 since it's not
ready
+ serviceEmitter.verifyNotEmitted("task/waiting/time");
+
+ // Now release task1's lock
+ shutdownTask(task1);
+
+ // task2 should now be ready and run
+ taskQueue.manageQueuedTasks();
+ Thread.sleep(100);
+ serviceEmitter.verifyEmitted("task/waiting/time", 1);
+ serviceEmitter.verifyEmitted("task/run/time", 1);
+ }
+
+ @Test
+ public void testTaskWaitingTimeMetricEmittedForMultipleTasks() throws
Exception
+ {
+ final TestTask task1 = new TestTask("multi-wait-task-1",
Intervals.of("2021-01-01/P1D"));
+ final TestTask task2 = new TestTask("multi-wait-task-2",
Intervals.of("2021-01-02/P1D"));
+ final TestTask task3 = new TestTask("multi-wait-task-3",
Intervals.of("2021-01-03/P1D"));
+
+ taskQueue.add(task1);
+ taskQueue.add(task2);
+ taskQueue.add(task3);
+ taskQueue.manageQueuedTasks();
+
+ Thread.sleep(100);
+
+ serviceEmitter.verifyEmitted("task/waiting/time", 3);
+ serviceEmitter.verifyEmitted("task/run/time", 3);
+ }
+
private HttpRemoteTaskRunner createHttpRemoteTaskRunner()
{
final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]