This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c8323d1a7cc Add indexer task success and failure metrics  (#16829)
c8323d1a7cc is described below

commit c8323d1a7cca9585ee20d24e3d0e6fff3e28c679
Author: Rushikesh Bankar <[email protected]>
AuthorDate: Mon Aug 5 16:21:27 2024 +0530

    Add indexer task success and failure metrics  (#16829)
    
    This PR adds indexer-level task metrics-
    
    "indexer/task/failed/count"
    "indexer/task/success/count"
    
    the current "worker/task/completed/count" metric shows all the tasks 
completed irrespective of success or failure status so these metrics would help 
us get more visibility into the status of the completed tasks
---
 docs/operations/metrics.md                         |  2 ++
 .../main/resources/defaultMetricDimensions.json    |  4 +--
 .../druid/indexing/worker/WorkerTaskManager.java   | 16 +++++++++
 .../indexing/worker/WorkerTaskManagerTest.java     | 25 ++++++++++++--
 .../metrics/IndexerTaskCountStatsProvider.java     |  4 +++
 .../metrics/WorkerTaskCountStatsMonitor.java       |  2 ++
 .../metrics/WorkerTaskCountStatsMonitorTest.java   | 40 +++++++++++++++++++++-
 7 files changed, 88 insertions(+), 5 deletions(-)

diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 21267de9136..ec97f44fe39 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -299,6 +299,8 @@ If the JVM does not support CPU time measurement for the 
current thread, `ingest
 |`worker/taskSlot/used/count`|Number of busy task slots on the reporting 
worker per emission period. This metric is only available if the 
`WorkerTaskCountStatsMonitor` module is included.| `category`, 
`workerVersion`|Varies|
 |`worker/task/assigned/count`|Number of tasks assigned to an indexer per 
emission period. This metric is only available if the 
`WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
 |`worker/task/completed/count`|Number of tasks completed by an indexer per 
emission period. This metric is only available if the 
`WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
+|`worker/task/failed/count`|Number of tasks that failed on an indexer during 
the emission period. This metric is only available if the 
`WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
+|`worker/task/success/count`|Number of tasks that succeeded on an indexer 
during the emission period. This metric is only available if the 
`WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
 |`worker/task/running/count`|Number of tasks running on an indexer per 
emission period. This metric is only available if the 
`WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
 
 ## Shuffle metrics (Native parallel task)
diff --git 
a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
 
b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
index ad065c63d39..91d7c4c4abd 100644
--- 
a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
+++ 
b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
@@ -74,8 +74,8 @@
   "worker/task/assigned/count" : { "dimensions" : ["dataSource"], "type" : 
"count" },
   "worker/task/running/count" : { "dimensions" : ["dataSource"], "type" : 
"count" },
   "worker/task/completed/count" : { "dimensions" : ["dataSource"], "type" : 
"count" },
-  "worker/task/failed/count" : { "dimensions" : ["category", "workerVersion"], 
"type" : "count" },
-  "worker/task/success/count" : { "dimensions" : ["category", 
"workerVersion"], "type" : "count" },
+  "worker/task/failed/count" : { "dimensions" : ["category", "workerVersion", 
"dataSource"], "type" : "count" },
+  "worker/task/success/count" : { "dimensions" : ["category", "workerVersion", 
"dataSource"], "type" : "count" },
   "worker/taskSlot/idle/count" : { "dimensions" : ["category", 
"workerVersion"], "type" : "gauge" },
   "worker/taskSlot/total/count" : { "dimensions" : ["category", 
"workerVersion"], "type" : "gauge" },
   "worker/taskSlot/used/count" : { "dimensions" : ["category", 
"workerVersion"], "type" : "gauge" },
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
index 729ac1d1617..a1c131ad0f4 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
@@ -640,6 +640,22 @@ public class WorkerTaskManager implements 
IndexerTaskCountStatsProvider
     return getNumTasksPerDatasource(this.getCompletedTasks().values(), 
TaskAnnouncement::getTaskDataSource);
   }
 
+  @Override
+  public Map<String, Long> getWorkerFailedTasks()
+  {
+    return getNumTasksPerDatasource(completedTasks.entrySet().stream()
+            .filter(entry -> entry.getValue().getTaskStatus().isFailure())
+            .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue)).values(), TaskAnnouncement::getTaskDataSource);
+  }
+
+  @Override
+  public Map<String, Long> getWorkerSuccessfulTasks()
+  {
+    return getNumTasksPerDatasource(completedTasks.entrySet().stream()
+            .filter(entry -> entry.getValue().getTaskStatus().isSuccess())
+            .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue)).values(), TaskAnnouncement::getTaskDataSource);
+  }
+
   private static class TaskDetails
   {
     private final Task task;
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
index 6b08be3a3c6..37839f8e077 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
@@ -455,6 +455,19 @@ public class WorkerTaskManagerTest
     return new NoopTask(id, null, dataSource, 100, 0, 
ImmutableMap.of(Tasks.PRIORITY_KEY, 0));
   }
 
+  private NoopTask createNoopFailingTask(String id, String dataSource)
+  {
+    return new NoopTask(id, null, dataSource, 100, 0, 
ImmutableMap.of(Tasks.PRIORITY_KEY, 0))
+    {
+      @Override
+      public TaskStatus runTask(TaskToolbox toolbox) throws Exception
+      {
+        Thread.sleep(getRunTime());
+        return TaskStatus.failure(getId(), "Failed to complete the task");
+      }
+    };
+  }
+
   /**
    * Start the {@link #workerTaskManager}, submit a {@link NoopTask}, wait for 
it to be complete. Common preamble
    * for various tests of {@link WorkerTaskManager#doCompletedTasksCleanup()}.
@@ -494,7 +507,7 @@ public class WorkerTaskManagerTest
 
     Task task1 = createNoopTask("task1", "wikipedia");
     Task task2 = createNoopTask("task2", "wikipedia");
-    Task task3 = createNoopTask("task3", "animals");
+    Task task3 = createNoopFailingTask("task3", "animals");
 
     workerTaskManager.start();
     // befor assigning tasks we should get no running tasks
@@ -517,11 +530,19 @@ public class WorkerTaskManagerTest
       Thread.sleep(10);
     } while (!runningTasks.isEmpty());
 
-    // When running tasks are empty all task should be reported as completed
+    // When running tasks are empty all task should be reported as completed 
and
+    // one of the task for animals datasource should fail and other 2 tasks in
+    // the wikipedia datasource should succeed
     Assert.assertEquals(workerTaskManager.getWorkerCompletedTasks(), 
ImmutableMap.of(
         "wikipedia", 2L,
         "animals", 1L
     ));
+    Assert.assertEquals(workerTaskManager.getWorkerFailedTasks(), 
ImmutableMap.of(
+            "animals", 1L
+    ));
+    Assert.assertEquals(workerTaskManager.getWorkerSuccessfulTasks(), 
ImmutableMap.of(
+            "wikipedia", 2L
+    ));
     Assert.assertEquals(workerTaskManager.getWorkerAssignedTasks().size(), 0L);
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/metrics/IndexerTaskCountStatsProvider.java
 
b/server/src/main/java/org/apache/druid/server/metrics/IndexerTaskCountStatsProvider.java
index 735bc27abb3..b38b461eb36 100644
--- 
a/server/src/main/java/org/apache/druid/server/metrics/IndexerTaskCountStatsProvider.java
+++ 
b/server/src/main/java/org/apache/druid/server/metrics/IndexerTaskCountStatsProvider.java
@@ -41,4 +41,8 @@ public interface IndexerTaskCountStatsProvider
    * Map from datasource name to the number of completed tasks by the Indexer.
    */
   Map<String, Long> getWorkerCompletedTasks();
+
+  Map<String, Long> getWorkerFailedTasks();
+
+  Map<String, Long> getWorkerSuccessfulTasks();
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitor.java
 
b/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitor.java
index d07311c1a46..bc09e95b5ce 100644
--- 
a/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitor.java
+++ 
b/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitor.java
@@ -72,6 +72,8 @@ public class WorkerTaskCountStatsMonitor extends 
AbstractMonitor
       emit(emitter, "worker/task/running/count", 
indexerStatsProvider.getWorkerRunningTasks());
       emit(emitter, "worker/task/assigned/count", 
indexerStatsProvider.getWorkerAssignedTasks());
       emit(emitter, "worker/task/completed/count", 
indexerStatsProvider.getWorkerCompletedTasks());
+      emit(emitter, "worker/task/failed/count", 
indexerStatsProvider.getWorkerFailedTasks());
+      emit(emitter, "worker/task/success/count", 
indexerStatsProvider.getWorkerSuccessfulTasks());
     }
     return true;
   }
diff --git 
a/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java
 
b/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java
index ff9fcffb8d9..ad00e5e6dbd 100644
--- 
a/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java
@@ -120,6 +120,24 @@ public class WorkerTaskCountStatsMonitorTest
             "metrics", 9L
         );
       }
+
+      @Override
+      public Map<String, Long> getWorkerFailedTasks()
+      {
+        return ImmutableMap.of(
+                "movies", 4L,
+                "games", 6L
+        );
+      }
+
+      @Override
+      public Map<String, Long> getWorkerSuccessfulTasks()
+      {
+        return ImmutableMap.of(
+                "games", 23L,
+                "inventory", 89L
+        );
+      }
     };
 
     nullStatsProvider = new WorkerTaskCountStatsProvider()
@@ -239,7 +257,7 @@ public class WorkerTaskCountStatsMonitorTest
         new WorkerTaskCountStatsMonitor(injectorForIndexer, 
ImmutableSet.of(NodeRole.INDEXER));
     final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
     monitor.doMonitor(emitter);
-    Assert.assertEquals(6, emitter.getEvents().size());
+    Assert.assertEquals(10, emitter.getEvents().size());
     emitter.verifyValue(
         "worker/task/running/count",
         ImmutableMap.of("dataSource", "wikipedia"),
@@ -270,6 +288,26 @@ public class WorkerTaskCountStatsMonitorTest
         ImmutableMap.of("dataSource", "metrics"),
         9L
     );
+    emitter.verifyValue(
+            "worker/task/failed/count",
+            ImmutableMap.of("dataSource", "movies"),
+            4L
+    );
+    emitter.verifyValue(
+            "worker/task/failed/count",
+            ImmutableMap.of("dataSource", "games"),
+            6L
+    );
+    emitter.verifyValue(
+            "worker/task/success/count",
+            ImmutableMap.of("dataSource", "games"),
+            23L
+    );
+    emitter.verifyValue(
+            "worker/task/success/count",
+            ImmutableMap.of("dataSource", "inventory"),
+            89L
+    );
   }
   @Test
   public void testMonitorWithNulls()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to