This is an automated email from the ASF dual-hosted git repository.

abhishekrb pushed a commit to branch group_by_stats_datasource_taskId
in repository https://gitbox.apache.org/repos/asf/druid.git

commit 72ed511daef827a96d17b8e29df5b88df47386e1
Author: Abhishek Balaji Radhakrishnan <[email protected]>
AuthorDate: Fri Oct 31 14:22:33 2025 -0700

    Add taskID and dataSource to GroupByStatsMonitor.
---
 .../initialization/jetty/JettyServerModule.java    |  3 +-
 .../server/metrics/DataSourceTaskIdHolder.java     |  4 ++
 .../druid/server/metrics/GroupByStatsMonitor.java  | 11 +++-
 .../druid/server/metrics/MonitorsConfig.java       |  3 +-
 .../server/metrics/GroupByStatsMonitorTest.java    | 69 +++++++++++++++++++---
 5 files changed, 78 insertions(+), 12 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java
 
b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java
index 5d8ee33bf96..1ff1d08deba 100644
--- 
a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java
+++ 
b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java
@@ -77,6 +77,7 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
 import org.eclipse.jetty.util.thread.QueuedThreadPool;
 import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
 
+import javax.annotation.Nullable;
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.TrustManager;
@@ -526,7 +527,7 @@ public class JettyServerModule extends JerseyServletModule
   {
     private final Map<String, String[]> dimensions;
 
-    public JettyMonitor(String dataSource, String taskId)
+    public JettyMonitor(@Nullable String dataSource, @Nullable String taskId)
     {
       this.dimensions = MonitorsConfig.mapOfDatasourceAndTaskID(dataSource, 
taskId);
     }
diff --git 
a/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java
 
b/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java
index 87002a5157f..df32251b495 100644
--- 
a/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java
+++ 
b/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java
@@ -24,6 +24,8 @@ import com.google.inject.name.Named;
 import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
 import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
 
+import javax.annotation.Nullable;
+
 public class DataSourceTaskIdHolder
 {
   public static final String DATA_SOURCE_BINDING = "druidDataSource";
@@ -46,11 +48,13 @@ public class DataSourceTaskIdHolder
   @Inject(optional = true)
   BroadcastDatasourceLoadingSpec broadcastDatasourceLoadingSpec = 
BroadcastDatasourceLoadingSpec.ALL;
 
+  @Nullable
   public String getDataSource()
   {
     return dataSource;
   }
 
+  @Nullable
   public String getTaskId()
   {
     return taskId;
diff --git 
a/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java 
b/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java
index 10985b4b4d3..87a8c20fad9 100644
--- 
a/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java
+++ 
b/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java
@@ -27,9 +27,11 @@ import org.apache.druid.guice.annotations.Merging;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.java.util.metrics.AbstractMonitor;
+import org.apache.druid.java.util.metrics.MonitorUtils;
 import org.apache.druid.query.groupby.GroupByStatsProvider;
 
 import java.nio.ByteBuffer;
+import java.util.Map;
 
 @LoadScope(roles = {
     NodeRole.BROKER_JSON_NAME,
@@ -41,21 +43,28 @@ public class GroupByStatsMonitor extends AbstractMonitor
 {
   private final GroupByStatsProvider groupByStatsProvider;
   private final BlockingPool<ByteBuffer> mergeBufferPool;
+  private final Map<String, String[]> dimensions;
 
   @Inject
   public GroupByStatsMonitor(
       GroupByStatsProvider groupByStatsProvider,
-      @Merging BlockingPool<ByteBuffer> mergeBufferPool
+      @Merging BlockingPool<ByteBuffer> mergeBufferPool,
+      DataSourceTaskIdHolder dataSourceTaskIdHolder
   )
   {
     this.groupByStatsProvider = groupByStatsProvider;
     this.mergeBufferPool = mergeBufferPool;
+    this.dimensions = MonitorsConfig.mapOfDatasourceAndTaskID(
+            dataSourceTaskIdHolder.getDataSource(),
+            dataSourceTaskIdHolder.getTaskId()
+    );
   }
 
   @Override
   public boolean doMonitor(ServiceEmitter emitter)
   {
     final ServiceMetricEvent.Builder builder = new 
ServiceMetricEvent.Builder();
+    MonitorUtils.addDimensionsToBuilder(builder, dimensions);
 
     emitter.emit(builder.setMetric("mergeBuffer/pendingRequests", 
mergeBufferPool.getPendingRequests()));
 
diff --git 
a/server/src/main/java/org/apache/druid/server/metrics/MonitorsConfig.java 
b/server/src/main/java/org/apache/druid/server/metrics/MonitorsConfig.java
index 25e124c1983..6405d54e16f 100644
--- a/server/src/main/java/org/apache/druid/server/metrics/MonitorsConfig.java
+++ b/server/src/main/java/org/apache/druid/server/metrics/MonitorsConfig.java
@@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.metrics.Monitor;
 import org.apache.druid.query.DruidMetrics;
 
+import javax.annotation.Nullable;
 import javax.validation.constraints.NotNull;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -79,7 +80,7 @@ public class MonitorsConfig
            '}';
   }
 
-  public static Map<String, String[]> mapOfDatasourceAndTaskID(final String 
datasource, final String taskId)
+  public static Map<String, String[]> mapOfDatasourceAndTaskID(@Nullable final 
String datasource, @Nullable final String taskId)
   {
     final ImmutableMap.Builder<String, String[]> builder = 
ImmutableMap.builder();
     if (datasource != null) {
diff --git 
a/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java
 
b/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java
index aa5955cd914..c983dcf4016 100644
--- 
a/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java
@@ -19,10 +19,18 @@
 
 package org.apache.druid.server.metrics;
 
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.name.Names;
 import org.apache.druid.collections.BlockingPool;
 import org.apache.druid.collections.DefaultBlockingPool;
+import org.apache.druid.guice.GuiceInjectors;
+import org.apache.druid.guice.JsonConfigProvider;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.initialization.Initialization;
 import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.apache.druid.query.groupby.GroupByStatsProvider;
+import org.apache.druid.server.DruidNode;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -31,6 +39,7 @@ import org.junit.Test;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -52,12 +61,12 @@ public class GroupByStatsMonitorTest
       public synchronized AggregateStats getStatsSince()
       {
         return new AggregateStats(
-                1L,
-                100L,
-                2L,
-                200L,
-                300L
-            );
+            1L,
+            100L,
+            2L,
+            200L,
+            300L
+        );
       }
     };
 
@@ -75,7 +84,7 @@ public class GroupByStatsMonitorTest
   public void testMonitor()
   {
     final GroupByStatsMonitor monitor =
-        new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool);
+        new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool, new 
DataSourceTaskIdHolder());
     final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
     monitor.doMonitor(emitter);
     emitter.flush();
@@ -92,6 +101,48 @@ public class GroupByStatsMonitorTest
     emitter.verifyValue("groupBy/mergeDictionarySize", 300L);
   }
 
+  @Test
+  public void testMonitorWithDatasourceAndTaskIdDimensions()
+  {
+    final String dataSource = "fooDs";
+    final String taskId = "taskId";
+    final Injector injector = Initialization.makeInjectorWithModules(
+        GuiceInjectors.makeStartupInjector(),
+        List.of(binder -> {
+          JsonConfigProvider.bindInstance(
+              binder,
+              Key.get(DruidNode.class, Self.class),
+              new DruidNode("test-inject", null, false, null, null, true, 
false)
+          );
+          binder.bind(Key.get(String.class, 
Names.named(DataSourceTaskIdHolder.DATA_SOURCE_BINDING)))
+                .toInstance(dataSource);
+          binder.bind(Key.get(String.class, 
Names.named(DataSourceTaskIdHolder.TASK_ID_BINDING)))
+                .toInstance(taskId);
+        })
+    );
+    final DataSourceTaskIdHolder dimensionIdHolder = new 
DataSourceTaskIdHolder();
+    injector.injectMembers(dimensionIdHolder);
+
+    final GroupByStatsMonitor monitor =
+        new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool, 
dimensionIdHolder);
+    final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
+    monitor.doMonitor(emitter);
+    emitter.flush();
+    // Trigger metric emission
+    monitor.doMonitor(emitter);
+
+    final Map<String, Object> dimFilters = Map.of("id", List.of(taskId), 
"dataSource", List.of(dataSource));
+    Assert.assertEquals(7, emitter.getNumEmittedEvents());
+    emitter.verifyValue("mergeBuffer/pendingRequests", dimFilters, 0L);
+    emitter.verifyValue("mergeBuffer/used", dimFilters, 0L);
+    emitter.verifyValue("mergeBuffer/queries", dimFilters, 1L);
+    emitter.verifyValue("mergeBuffer/acquisitionTimeNs", dimFilters, 100L);
+    emitter.verifyValue("groupBy/spilledQueries", dimFilters, 2L);
+    emitter.verifyValue("groupBy/spilledBytes", dimFilters, 200L);
+    emitter.verifyValue("groupBy/mergeDictionarySize", dimFilters, 300L);
+  }
+
+
   @Test
   public void testMonitoringMergeBuffer_acquiredCount()
       throws ExecutionException, InterruptedException, TimeoutException
@@ -101,7 +152,7 @@ public class GroupByStatsMonitorTest
     }).get(20, TimeUnit.SECONDS);
 
     final GroupByStatsMonitor monitor =
-        new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool);
+        new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool, new 
DataSourceTaskIdHolder());
     final StubServiceEmitter emitter = new StubServiceEmitter("DummyService", 
"DummyHost");
     boolean ret = monitor.doMonitor(emitter);
     Assert.assertTrue(ret);
@@ -130,7 +181,7 @@ public class GroupByStatsMonitorTest
       }
 
       final GroupByStatsMonitor monitor =
-          new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool);
+          new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool, new 
DataSourceTaskIdHolder());
       final StubServiceEmitter emitter = new 
StubServiceEmitter("DummyService", "DummyHost");
       boolean ret = monitor.doMonitor(emitter);
       Assert.assertTrue(ret);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to