This is an automated email from the ASF dual-hosted git repository. abhishekrb pushed a commit to branch group_by_stats_datasource_taskId in repository https://gitbox.apache.org/repos/asf/druid.git
commit 72ed511daef827a96d17b8e29df5b88df47386e1 Author: Abhishek Balaji Radhakrishnan <[email protected]> AuthorDate: Fri Oct 31 14:22:33 2025 -0700 Add taskID and dataSource to GroupByStatsMonitor. --- .../initialization/jetty/JettyServerModule.java | 3 +- .../server/metrics/DataSourceTaskIdHolder.java | 4 ++ .../druid/server/metrics/GroupByStatsMonitor.java | 11 +++- .../druid/server/metrics/MonitorsConfig.java | 3 +- .../server/metrics/GroupByStatsMonitorTest.java | 69 +++++++++++++++++++--- 5 files changed, 78 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java index 5d8ee33bf96..1ff1d08deba 100644 --- a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java +++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java @@ -77,6 +77,7 @@ import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler; +import javax.annotation.Nullable; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLEngine; import javax.net.ssl.TrustManager; @@ -526,7 +527,7 @@ public class JettyServerModule extends JerseyServletModule { private final Map<String, String[]> dimensions; - public JettyMonitor(String dataSource, String taskId) + public JettyMonitor(@Nullable String dataSource, @Nullable String taskId) { this.dimensions = MonitorsConfig.mapOfDatasourceAndTaskID(dataSource, taskId); } diff --git a/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java b/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java index 87002a5157f..df32251b495 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java +++ b/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java @@ -24,6 +24,8 @@ import com.google.inject.name.Named; import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; +import javax.annotation.Nullable; + public class DataSourceTaskIdHolder { public static final String DATA_SOURCE_BINDING = "druidDataSource"; @@ -46,11 +48,13 @@ public class DataSourceTaskIdHolder @Inject(optional = true) BroadcastDatasourceLoadingSpec broadcastDatasourceLoadingSpec = BroadcastDatasourceLoadingSpec.ALL; + @Nullable public String getDataSource() { return dataSource; } + @Nullable public String getTaskId() { return taskId; diff --git a/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java index 10985b4b4d3..87a8c20fad9 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java +++ b/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java @@ -27,9 +27,11 @@ import org.apache.druid.guice.annotations.Merging; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.AbstractMonitor; +import org.apache.druid.java.util.metrics.MonitorUtils; import org.apache.druid.query.groupby.GroupByStatsProvider; import java.nio.ByteBuffer; +import java.util.Map; @LoadScope(roles = { NodeRole.BROKER_JSON_NAME, @@ -41,21 +43,28 @@ public class GroupByStatsMonitor extends AbstractMonitor { private final GroupByStatsProvider groupByStatsProvider; private final BlockingPool<ByteBuffer> mergeBufferPool; + private final Map<String, String[]> dimensions; @Inject public GroupByStatsMonitor( GroupByStatsProvider groupByStatsProvider, - @Merging BlockingPool<ByteBuffer> mergeBufferPool + @Merging BlockingPool<ByteBuffer> mergeBufferPool, + DataSourceTaskIdHolder dataSourceTaskIdHolder ) { this.groupByStatsProvider = groupByStatsProvider; this.mergeBufferPool = mergeBufferPool; + this.dimensions = MonitorsConfig.mapOfDatasourceAndTaskID( + dataSourceTaskIdHolder.getDataSource(), + dataSourceTaskIdHolder.getTaskId() + ); } @Override public boolean doMonitor(ServiceEmitter emitter) { final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); + MonitorUtils.addDimensionsToBuilder(builder, dimensions); emitter.emit(builder.setMetric("mergeBuffer/pendingRequests", mergeBufferPool.getPendingRequests())); diff --git a/server/src/main/java/org/apache/druid/server/metrics/MonitorsConfig.java b/server/src/main/java/org/apache/druid/server/metrics/MonitorsConfig.java index 25e124c1983..6405d54e16f 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/MonitorsConfig.java +++ b/server/src/main/java/org/apache/druid/server/metrics/MonitorsConfig.java @@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.metrics.Monitor; import org.apache.druid.query.DruidMetrics; +import javax.annotation.Nullable; import javax.validation.constraints.NotNull; import java.util.ArrayList; import java.util.HashMap; @@ -79,7 +80,7 @@ public class MonitorsConfig '}'; } - public static Map<String, String[]> mapOfDatasourceAndTaskID(final String datasource, final String taskId) + public static Map<String, String[]> mapOfDatasourceAndTaskID(@Nullable final String datasource, @Nullable final String taskId) { final ImmutableMap.Builder<String, String[]> builder = ImmutableMap.builder(); if (datasource != null) { diff --git a/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java index aa5955cd914..c983dcf4016 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java @@ -19,10 +19,18 @@ package org.apache.druid.server.metrics; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.name.Names; import org.apache.druid.collections.BlockingPool; import org.apache.druid.collections.DefaultBlockingPool; +import org.apache.druid.guice.GuiceInjectors; +import org.apache.druid.guice.JsonConfigProvider; +import org.apache.druid.guice.annotations.Self; +import org.apache.druid.initialization.Initialization; import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.query.groupby.GroupByStatsProvider; +import org.apache.druid.server.DruidNode; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -31,6 +39,7 @@ import org.junit.Test; import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -52,12 +61,12 @@ public class GroupByStatsMonitorTest public synchronized AggregateStats getStatsSince() { return new AggregateStats( - 1L, - 100L, - 2L, - 200L, - 300L - ); + 1L, + 100L, + 2L, + 200L, + 300L + ); } }; @@ -75,7 +84,7 @@ public class GroupByStatsMonitorTest public void testMonitor() { final GroupByStatsMonitor monitor = - new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool); + new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool, new DataSourceTaskIdHolder()); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); monitor.doMonitor(emitter); emitter.flush(); @@ -92,6 +101,48 @@ public class GroupByStatsMonitorTest emitter.verifyValue("groupBy/mergeDictionarySize", 300L); } + @Test + public void testMonitorWithDatasourceAndTaskIdDimensions() + { + final String dataSource = "fooDs"; + final String taskId = "taskId"; + final Injector injector = Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), + List.of(binder -> { + JsonConfigProvider.bindInstance( + binder, + Key.get(DruidNode.class, Self.class), + new DruidNode("test-inject", null, false, null, null, true, false) + ); + binder.bind(Key.get(String.class, Names.named(DataSourceTaskIdHolder.DATA_SOURCE_BINDING))) + .toInstance(dataSource); + binder.bind(Key.get(String.class, Names.named(DataSourceTaskIdHolder.TASK_ID_BINDING))) + .toInstance(taskId); + }) + ); + final DataSourceTaskIdHolder dimensionIdHolder = new DataSourceTaskIdHolder(); + injector.injectMembers(dimensionIdHolder); + + final GroupByStatsMonitor monitor = + new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool, dimensionIdHolder); + final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + monitor.doMonitor(emitter); + emitter.flush(); + // Trigger metric emission + monitor.doMonitor(emitter); + + final Map<String, Object> dimFilters = Map.of("id", List.of(taskId), "dataSource", List.of(dataSource)); + Assert.assertEquals(7, emitter.getNumEmittedEvents()); + emitter.verifyValue("mergeBuffer/pendingRequests", dimFilters, 0L); + emitter.verifyValue("mergeBuffer/used", dimFilters, 0L); + emitter.verifyValue("mergeBuffer/queries", dimFilters, 1L); + emitter.verifyValue("mergeBuffer/acquisitionTimeNs", dimFilters, 100L); + emitter.verifyValue("groupBy/spilledQueries", dimFilters, 2L); + emitter.verifyValue("groupBy/spilledBytes", dimFilters, 200L); + emitter.verifyValue("groupBy/mergeDictionarySize", dimFilters, 300L); + } + + @Test public void testMonitoringMergeBuffer_acquiredCount() throws ExecutionException, InterruptedException, TimeoutException @@ -101,7 +152,7 @@ public class GroupByStatsMonitorTest }).get(20, TimeUnit.SECONDS); final GroupByStatsMonitor monitor = - new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool); + new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool, new DataSourceTaskIdHolder()); final StubServiceEmitter emitter = new StubServiceEmitter("DummyService", "DummyHost"); boolean ret = monitor.doMonitor(emitter); Assert.assertTrue(ret); @@ -130,7 +181,7 @@ public class GroupByStatsMonitorTest } final GroupByStatsMonitor monitor = - new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool); + new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool, new DataSourceTaskIdHolder()); final StubServiceEmitter emitter = new StubServiceEmitter("DummyService", "DummyHost"); boolean ret = monitor.doMonitor(emitter); Assert.assertTrue(ret); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
