This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new c8323d1a7cc Add indexer task success and failure metrics (#16829)
c8323d1a7cc is described below
commit c8323d1a7cca9585ee20d24e3d0e6fff3e28c679
Author: Rushikesh Bankar <[email protected]>
AuthorDate: Mon Aug 5 16:21:27 2024 +0530
Add indexer task success and failure metrics (#16829)
This PR adds indexer-level task metrics-
"indexer/task/failed/count"
"indexer/task/success/count"
the current "worker/task/completed/count" metric shows all the tasks
completed irrespective of success or failure status so these metrics would help
us get more visibility into the status of the completed tasks
---
docs/operations/metrics.md | 2 ++
.../main/resources/defaultMetricDimensions.json | 4 +--
.../druid/indexing/worker/WorkerTaskManager.java | 16 +++++++++
.../indexing/worker/WorkerTaskManagerTest.java | 25 ++++++++++++--
.../metrics/IndexerTaskCountStatsProvider.java | 4 +++
.../metrics/WorkerTaskCountStatsMonitor.java | 2 ++
.../metrics/WorkerTaskCountStatsMonitorTest.java | 40 +++++++++++++++++++++-
7 files changed, 88 insertions(+), 5 deletions(-)
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 21267de9136..ec97f44fe39 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -299,6 +299,8 @@ If the JVM does not support CPU time measurement for the
current thread, `ingest
|`worker/taskSlot/used/count`|Number of busy task slots on the reporting
worker per emission period. This metric is only available if the
`WorkerTaskCountStatsMonitor` module is included.| `category`,
`workerVersion`|Varies|
|`worker/task/assigned/count`|Number of tasks assigned to an indexer per
emission period. This metric is only available if the
`WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
|`worker/task/completed/count`|Number of tasks completed by an indexer per
emission period. This metric is only available if the
`WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
+|`worker/task/failed/count`|Number of tasks that failed on an indexer during
the emission period. This metric is only available if the
`WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
+|`worker/task/success/count`|Number of tasks that succeeded on an indexer
during the emission period. This metric is only available if the
`WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
|`worker/task/running/count`|Number of tasks running on an indexer per
emission period. This metric is only available if the
`WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
## Shuffle metrics (Native parallel task)
diff --git
a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
index ad065c63d39..91d7c4c4abd 100644
---
a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
+++
b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
@@ -74,8 +74,8 @@
"worker/task/assigned/count" : { "dimensions" : ["dataSource"], "type" :
"count" },
"worker/task/running/count" : { "dimensions" : ["dataSource"], "type" :
"count" },
"worker/task/completed/count" : { "dimensions" : ["dataSource"], "type" :
"count" },
- "worker/task/failed/count" : { "dimensions" : ["category", "workerVersion"],
"type" : "count" },
- "worker/task/success/count" : { "dimensions" : ["category",
"workerVersion"], "type" : "count" },
+ "worker/task/failed/count" : { "dimensions" : ["category", "workerVersion",
"dataSource"], "type" : "count" },
+ "worker/task/success/count" : { "dimensions" : ["category", "workerVersion",
"dataSource"], "type" : "count" },
"worker/taskSlot/idle/count" : { "dimensions" : ["category",
"workerVersion"], "type" : "gauge" },
"worker/taskSlot/total/count" : { "dimensions" : ["category",
"workerVersion"], "type" : "gauge" },
"worker/taskSlot/used/count" : { "dimensions" : ["category",
"workerVersion"], "type" : "gauge" },
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
index 729ac1d1617..a1c131ad0f4 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
@@ -640,6 +640,22 @@ public class WorkerTaskManager implements
IndexerTaskCountStatsProvider
return getNumTasksPerDatasource(this.getCompletedTasks().values(),
TaskAnnouncement::getTaskDataSource);
}
+ @Override
+ public Map<String, Long> getWorkerFailedTasks()
+ {
+ return getNumTasksPerDatasource(completedTasks.entrySet().stream()
+ .filter(entry -> entry.getValue().getTaskStatus().isFailure())
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue)).values(), TaskAnnouncement::getTaskDataSource);
+ }
+
+ @Override
+ public Map<String, Long> getWorkerSuccessfulTasks()
+ {
+ return getNumTasksPerDatasource(completedTasks.entrySet().stream()
+ .filter(entry -> entry.getValue().getTaskStatus().isSuccess())
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue)).values(), TaskAnnouncement::getTaskDataSource);
+ }
+
private static class TaskDetails
{
private final Task task;
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
index 6b08be3a3c6..37839f8e077 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
@@ -455,6 +455,19 @@ public class WorkerTaskManagerTest
return new NoopTask(id, null, dataSource, 100, 0,
ImmutableMap.of(Tasks.PRIORITY_KEY, 0));
}
+ private NoopTask createNoopFailingTask(String id, String dataSource)
+ {
+ return new NoopTask(id, null, dataSource, 100, 0,
ImmutableMap.of(Tasks.PRIORITY_KEY, 0))
+ {
+ @Override
+ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
+ {
+ Thread.sleep(getRunTime());
+ return TaskStatus.failure(getId(), "Failed to complete the task");
+ }
+ };
+ }
+
/**
* Start the {@link #workerTaskManager}, submit a {@link NoopTask}, wait for
it to be complete. Common preamble
* for various tests of {@link WorkerTaskManager#doCompletedTasksCleanup()}.
@@ -494,7 +507,7 @@ public class WorkerTaskManagerTest
Task task1 = createNoopTask("task1", "wikipedia");
Task task2 = createNoopTask("task2", "wikipedia");
- Task task3 = createNoopTask("task3", "animals");
+ Task task3 = createNoopFailingTask("task3", "animals");
workerTaskManager.start();
// befor assigning tasks we should get no running tasks
@@ -517,11 +530,19 @@ public class WorkerTaskManagerTest
Thread.sleep(10);
} while (!runningTasks.isEmpty());
- // When running tasks are empty all task should be reported as completed
+ // When running tasks are empty all task should be reported as completed
and
+ // one of the task for animals datasource should fail and other 2 tasks in
+ // the wikipedia datasource should succeed
Assert.assertEquals(workerTaskManager.getWorkerCompletedTasks(),
ImmutableMap.of(
"wikipedia", 2L,
"animals", 1L
));
+ Assert.assertEquals(workerTaskManager.getWorkerFailedTasks(),
ImmutableMap.of(
+ "animals", 1L
+ ));
+ Assert.assertEquals(workerTaskManager.getWorkerSuccessfulTasks(),
ImmutableMap.of(
+ "wikipedia", 2L
+ ));
Assert.assertEquals(workerTaskManager.getWorkerAssignedTasks().size(), 0L);
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/metrics/IndexerTaskCountStatsProvider.java
b/server/src/main/java/org/apache/druid/server/metrics/IndexerTaskCountStatsProvider.java
index 735bc27abb3..b38b461eb36 100644
---
a/server/src/main/java/org/apache/druid/server/metrics/IndexerTaskCountStatsProvider.java
+++
b/server/src/main/java/org/apache/druid/server/metrics/IndexerTaskCountStatsProvider.java
@@ -41,4 +41,8 @@ public interface IndexerTaskCountStatsProvider
* Map from datasource name to the number of completed tasks by the Indexer.
*/
Map<String, Long> getWorkerCompletedTasks();
+
+ Map<String, Long> getWorkerFailedTasks();
+
+ Map<String, Long> getWorkerSuccessfulTasks();
}
diff --git
a/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitor.java
b/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitor.java
index d07311c1a46..bc09e95b5ce 100644
---
a/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitor.java
+++
b/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitor.java
@@ -72,6 +72,8 @@ public class WorkerTaskCountStatsMonitor extends
AbstractMonitor
emit(emitter, "worker/task/running/count",
indexerStatsProvider.getWorkerRunningTasks());
emit(emitter, "worker/task/assigned/count",
indexerStatsProvider.getWorkerAssignedTasks());
emit(emitter, "worker/task/completed/count",
indexerStatsProvider.getWorkerCompletedTasks());
+ emit(emitter, "worker/task/failed/count",
indexerStatsProvider.getWorkerFailedTasks());
+ emit(emitter, "worker/task/success/count",
indexerStatsProvider.getWorkerSuccessfulTasks());
}
return true;
}
diff --git
a/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java
b/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java
index ff9fcffb8d9..ad00e5e6dbd 100644
---
a/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java
+++
b/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java
@@ -120,6 +120,24 @@ public class WorkerTaskCountStatsMonitorTest
"metrics", 9L
);
}
+
+ @Override
+ public Map<String, Long> getWorkerFailedTasks()
+ {
+ return ImmutableMap.of(
+ "movies", 4L,
+ "games", 6L
+ );
+ }
+
+ @Override
+ public Map<String, Long> getWorkerSuccessfulTasks()
+ {
+ return ImmutableMap.of(
+ "games", 23L,
+ "inventory", 89L
+ );
+ }
};
nullStatsProvider = new WorkerTaskCountStatsProvider()
@@ -239,7 +257,7 @@ public class WorkerTaskCountStatsMonitorTest
new WorkerTaskCountStatsMonitor(injectorForIndexer,
ImmutableSet.of(NodeRole.INDEXER));
final StubServiceEmitter emitter = new StubServiceEmitter("service",
"host");
monitor.doMonitor(emitter);
- Assert.assertEquals(6, emitter.getEvents().size());
+ Assert.assertEquals(10, emitter.getEvents().size());
emitter.verifyValue(
"worker/task/running/count",
ImmutableMap.of("dataSource", "wikipedia"),
@@ -270,6 +288,26 @@ public class WorkerTaskCountStatsMonitorTest
ImmutableMap.of("dataSource", "metrics"),
9L
);
+ emitter.verifyValue(
+ "worker/task/failed/count",
+ ImmutableMap.of("dataSource", "movies"),
+ 4L
+ );
+ emitter.verifyValue(
+ "worker/task/failed/count",
+ ImmutableMap.of("dataSource", "games"),
+ 6L
+ );
+ emitter.verifyValue(
+ "worker/task/success/count",
+ ImmutableMap.of("dataSource", "games"),
+ 23L
+ );
+ emitter.verifyValue(
+ "worker/task/success/count",
+ ImmutableMap.of("dataSource", "inventory"),
+ 89L
+ );
}
@Test
public void testMonitorWithNulls()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]