This is an automated email from the ASF dual-hosted git repository.
abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 8acdf17726f Add dataSource and taskId dimensions to
GroupByStatsMonitor for peons (#18709)
8acdf17726f is described below
commit 8acdf17726f0d3805316714292cec79d0423bcf1
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Wed Nov 5 22:59:24 2025 -0800
Add dataSource and taskId dimensions to GroupByStatsMonitor for peons
(#18709)
The GroupByStatsMonitor includes dataSource and taskId dimensions for
metrics emitted on the peons.
Monitors on peons that previously emitted the id dimension from
JettyMonitor, OshiSysMonitor, JvmMonitor, JvmCpuMonitor, JvmThreadsMonitor and
SysMonitor to represent the task ID are deprecated and will be removed in a
future release. Use the taskId dimension instead.
---
docs/ingestion/native-batch.md | 2 +-
docs/operations/metrics.md | 19 +++---
.../initialization/jetty/JettyServerModule.java | 10 +++-
.../server/metrics/DataSourceTaskIdHolder.java | 14 +++++
.../druid/server/metrics/GroupByStatsMonitor.java | 11 +++-
.../druid/server/metrics/MonitorsConfig.java | 13 +++-
.../jetty/JettyServerModuleTest.java | 5 +-
.../server/metrics/GroupByStatsMonitorTest.java | 70 +++++++++++++++++++---
8 files changed, 121 insertions(+), 23 deletions(-)
diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md
index fb6b4f0d3df..4d56d24fb9a 100644
--- a/docs/ingestion/native-batch.md
+++ b/docs/ingestion/native-batch.md
@@ -458,7 +458,7 @@ Range partitioning is not possible on multi-value
dimensions. If one of the prov
|`type`|Set the value to `range`.|none|yes|
|`partitionDimensions`|An array of dimensions to partition on. Order the
dimensions from most frequently queried to least frequently queried. For best
results, limit your number of dimensions to between three and five
dimensions.|none|yes|
|`targetRowsPerSegment`|Target number of rows to include in a partition,
should be a number that targets segments of 500MB\~1GB.|none|either this or
`maxRowsPerSegment`|
-|maxRowsPerSegment|Soft max for the number of rows to include in a
partition.|none|either this or `targetRowsPerSegment`|
+|`maxRowsPerSegment`|Soft max for the number of rows to include in a
partition.|none|either this or `targetRowsPerSegment`|
|`assumeGrouped`|Assume that input data has already been grouped on time and
dimensions. Ingestion will run faster, but may choose sub-optimal partitions if
this assumption is violated.|false|no|
#### Benefits of range partitioning
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index a17e10c6299..d16246ab6d4 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -119,6 +119,11 @@ Most metric values reset each emission period, as
specified in `druid.monitoring
### Real-time
+:::info
+Monitors on peons that previously emitted the `id` dimension from
`JettyMonitor`, `OshiSysMonitor`, `JvmMonitor`, `JvmCpuMonitor`,
`JvmThreadsMonitor` and `SysMonitor`
+to represent the task ID are deprecated and will be removed in a future
release. Use the `taskId` dimension instead.
+:::
+
|Metric|Description|Dimensions|Normal value|
|------|-----------|----------|------------|
|`query/time`|Milliseconds taken to complete a query.|<p>Common: `dataSource`,
`type`, `interval`, `hasFilters`, `duration`, `context`, `remoteAddress`, `id`,
`statusCode`.</p><p> Aggregation Queries: `numMetrics`,
`numComplexMetrics`.</p><p> GroupBy: `numDimensions`.</p><p> TopN: `threshold`,
`dimension`.</p>|< 1s|
@@ -131,13 +136,13 @@ Most metric values reset each emission period, as
specified in `druid.monitoring
|`query/failed/count`|Number of failed queries.|This metric is only available
if the `QueryCountStatsMonitor` module is included.||
|`query/interrupted/count`|Number of queries interrupted due to
cancellation.|This metric is only available if the `QueryCountStatsMonitor`
module is included.||
|`query/timeout/count`|Number of timed out queries.|This metric is only
available if the `QueryCountStatsMonitor` module is included.||
-|`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch
of buffers from the merge buffer pool.|This metric is only available if the
`GroupByStatsMonitor` module is included.|Should be ideally 0, though a higher
number isn't representative of a problem.|
-|`mergeBuffer/used`|Number of merge buffers used from the merge buffer
pool.|This metric is only available if the `GroupByStatsMonitor` module is
included.|Depends on the number of groupBy queries needing merge buffers.|
-|`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of
buffers from the merge buffer pool.|This metric is only available if the
`GroupByStatsMonitor` module is included.|Depends on the number of groupBy
queries needing merge buffers.|
-|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge
buffer for groupBy queries.|This metric is only available if the
`GroupByStatsMonitor` module is included.|Varies|
-|`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the
disk.|This metric is only available if the `GroupByStatsMonitor` module is
included.|Varies|
-|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy
queries.|This metric is only available if the `GroupByStatsMonitor` module is
included.|Varies|
-|`groupBy/mergeDictionarySize`|Size of on-heap merge dictionary in bytes.|This
metric is only available if the `GroupByStatsMonitor` module is
included.|Varies|
+|`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch
of buffers from the merge buffer pool. This metric is only available if the
`GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Should be
ideally 0, though a higher number isn't representative of a problem.|
+|`mergeBuffer/used`|Number of merge buffers used from the merge buffer pool.
This metric is only available if the `GroupByStatsMonitor` module is
included.|`dataSource`, `taskId`|Depends on the number of groupBy queries
needing merge buffers.|
+|`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of
buffers from the merge buffer pool. This metric is only available if the
`GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Depends on the
number of groupBy queries needing merge buffers.|
+|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge
buffer for groupBy queries. This metric is only available if the
`GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
+|`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the
disk. This metric is only available if the `GroupByStatsMonitor` module is
included.|`dataSource`, `taskId`|Varies|
+|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy
queries. This metric is only available if the `GroupByStatsMonitor` module is
included.|`dataSource`, `taskId`|Varies|
+|`groupBy/mergeDictionarySize`|Size of on-heap merge dictionary in bytes. This
metric is only available if the `GroupByStatsMonitor` module is
included.|`dataSource`, `taskId`|Varies|
### Jetty
diff --git
a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java
b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java
index 5d8ee33bf96..6593e67947e 100644
---
a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java
+++
b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java
@@ -519,16 +519,20 @@ public class JettyServerModule extends JerseyServletModule
@LazySingleton
public JettyMonitor getJettyMonitor(DataSourceTaskIdHolder
dataSourceTaskIdHolder)
{
- return new JettyMonitor(dataSourceTaskIdHolder.getDataSource(),
dataSourceTaskIdHolder.getTaskId());
+ return new JettyMonitor(
+ MonitorsConfig.mapOfDatasourceAndTaskID(
+ dataSourceTaskIdHolder.getDataSource(),
dataSourceTaskIdHolder.getTaskId()
+ )
+ );
}
public static class JettyMonitor extends AbstractMonitor
{
private final Map<String, String[]> dimensions;
- public JettyMonitor(String dataSource, String taskId)
+ public JettyMonitor(Map<String, String[]> dimensions)
{
- this.dimensions = MonitorsConfig.mapOfDatasourceAndTaskID(dataSource,
taskId);
+ this.dimensions = dimensions;
}
@Override
diff --git
a/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java
b/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java
index 87002a5157f..d5eec77f87f 100644
---
a/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java
+++
b/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java
@@ -24,6 +24,11 @@ import com.google.inject.name.Named;
import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
+import javax.annotation.Nullable;
+
+/**
+ * This holder is only applicable to {@code CliPeon} servers.
+ */
public class DataSourceTaskIdHolder
{
public static final String DATA_SOURCE_BINDING = "druidDataSource";
@@ -34,6 +39,7 @@ public class DataSourceTaskIdHolder
@Named(DATA_SOURCE_BINDING)
@Inject(optional = true)
String dataSource = null;
+
@Named(TASK_ID_BINDING)
@Inject(optional = true)
String taskId = null;
@@ -46,11 +52,19 @@ public class DataSourceTaskIdHolder
@Inject(optional = true)
BroadcastDatasourceLoadingSpec broadcastDatasourceLoadingSpec =
BroadcastDatasourceLoadingSpec.ALL;
+ /**
+ * @return the taskId for CliPeon servers; {@code null} for all other
servers.
+ */
+ @Nullable
public String getDataSource()
{
return dataSource;
}
+ /**
+ * @return the dataSource for CliPeon servers; {@code null} for all other
servers.
+ */
+ @Nullable
public String getTaskId()
{
return taskId;
diff --git
a/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java
b/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java
index 10985b4b4d3..0f07bd2894b 100644
---
a/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java
+++
b/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java
@@ -27,9 +27,11 @@ import org.apache.druid.guice.annotations.Merging;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.AbstractMonitor;
+import org.apache.druid.java.util.metrics.MonitorUtils;
import org.apache.druid.query.groupby.GroupByStatsProvider;
import java.nio.ByteBuffer;
+import java.util.Map;
@LoadScope(roles = {
NodeRole.BROKER_JSON_NAME,
@@ -41,21 +43,28 @@ public class GroupByStatsMonitor extends AbstractMonitor
{
private final GroupByStatsProvider groupByStatsProvider;
private final BlockingPool<ByteBuffer> mergeBufferPool;
+ private final Map<String, String[]> dimensions;
@Inject
public GroupByStatsMonitor(
GroupByStatsProvider groupByStatsProvider,
- @Merging BlockingPool<ByteBuffer> mergeBufferPool
+ @Merging BlockingPool<ByteBuffer> mergeBufferPool,
+ DataSourceTaskIdHolder dataSourceTaskIdHolder
)
{
this.groupByStatsProvider = groupByStatsProvider;
this.mergeBufferPool = mergeBufferPool;
+ this.dimensions = MonitorsConfig.mapOfDatasourceAndTaskID(
+ dataSourceTaskIdHolder.getDataSource(),
+ dataSourceTaskIdHolder.getTaskId()
+ );
}
@Override
public boolean doMonitor(ServiceEmitter emitter)
{
final ServiceMetricEvent.Builder builder = new
ServiceMetricEvent.Builder();
+ MonitorUtils.addDimensionsToBuilder(builder, dimensions);
emitter.emit(builder.setMetric("mergeBuffer/pendingRequests",
mergeBufferPool.getPendingRequests()));
diff --git
a/server/src/main/java/org/apache/druid/server/metrics/MonitorsConfig.java
b/server/src/main/java/org/apache/druid/server/metrics/MonitorsConfig.java
index 25e124c1983..ea578611b08 100644
--- a/server/src/main/java/org/apache/druid/server/metrics/MonitorsConfig.java
+++ b/server/src/main/java/org/apache/druid/server/metrics/MonitorsConfig.java
@@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.metrics.Monitor;
import org.apache.druid.query.DruidMetrics;
+import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.util.ArrayList;
import java.util.HashMap;
@@ -79,13 +80,23 @@ public class MonitorsConfig
'}';
}
- public static Map<String, String[]> mapOfDatasourceAndTaskID(final String
datasource, final String taskId)
+ /**
+ * @return a map of {@code datasource} and {@code taskId} dimensions if
provided; otherwise, returns an empty map.
+ * When {@code taskId} is provided, both {@link DruidMetrics#ID} and {@link
DruidMetrics#TASK_ID} dimensions are added
+ * to the map for backward compatibility. {@link DruidMetrics#ID} is
deprecated because it's ambiguous and will be
+ * removed in a future release.
+ */
+ public static Map<String, String[]> mapOfDatasourceAndTaskID(
+ @Nullable final String datasource,
+ @Nullable final String taskId
+ )
{
final ImmutableMap.Builder<String, String[]> builder =
ImmutableMap.builder();
if (datasource != null) {
builder.put(DruidMetrics.DATASOURCE, new String[]{datasource});
}
if (taskId != null) {
+ builder.put(DruidMetrics.TASK_ID, new String[]{taskId});
builder.put(DruidMetrics.ID, new String[]{taskId});
}
return builder.build();
diff --git
a/server/src/test/java/org/apache/druid/server/initialization/jetty/JettyServerModuleTest.java
b/server/src/test/java/org/apache/druid/server/initialization/jetty/JettyServerModuleTest.java
index a0bb48af699..126f262062c 100644
---
a/server/src/test/java/org/apache/druid/server/initialization/jetty/JettyServerModuleTest.java
+++
b/server/src/test/java/org/apache/druid/server/initialization/jetty/JettyServerModuleTest.java
@@ -28,6 +28,7 @@ import org.apache.druid.metadata.PasswordProvider;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.initialization.TLSServerConfig;
+import org.apache.druid.server.metrics.MonitorsConfig;
import org.apache.druid.server.security.TLSCertificateChecker;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.ssl.SslContextFactory;
@@ -54,7 +55,9 @@ public class JettyServerModuleTest
Mockito.when(jettyServerThreadPool.getQueueSize()).thenReturn(50);
Mockito.when(jettyServerThreadPool.getBusyThreads()).thenReturn(60);
- JettyServerModule.JettyMonitor jettyMonitor = new
JettyServerModule.JettyMonitor("ds", "t0");
+ JettyServerModule.JettyMonitor jettyMonitor = new
JettyServerModule.JettyMonitor(
+ MonitorsConfig.mapOfDatasourceAndTaskID("ds", "t0")
+ );
final StubServiceEmitter serviceEmitter = new
StubServiceEmitter("service", "host");
jettyMonitor.doMonitor(serviceEmitter);
diff --git
a/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java
b/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java
index aa5955cd914..5931fba677c 100644
---
a/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java
+++
b/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java
@@ -19,10 +19,18 @@
package org.apache.druid.server.metrics;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.name.Names;
import org.apache.druid.collections.BlockingPool;
import org.apache.druid.collections.DefaultBlockingPool;
+import org.apache.druid.guice.GuiceInjectors;
+import org.apache.druid.guice.JsonConfigProvider;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.initialization.Initialization;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.groupby.GroupByStatsProvider;
+import org.apache.druid.server.DruidNode;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -31,6 +39,7 @@ import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -52,12 +61,12 @@ public class GroupByStatsMonitorTest
public synchronized AggregateStats getStatsSince()
{
return new AggregateStats(
- 1L,
- 100L,
- 2L,
- 200L,
- 300L
- );
+ 1L,
+ 100L,
+ 2L,
+ 200L,
+ 300L
+ );
}
};
@@ -75,7 +84,7 @@ public class GroupByStatsMonitorTest
public void testMonitor()
{
final GroupByStatsMonitor monitor =
- new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool);
+ new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool, new
DataSourceTaskIdHolder());
final StubServiceEmitter emitter = new StubServiceEmitter("service",
"host");
monitor.doMonitor(emitter);
emitter.flush();
@@ -92,6 +101,49 @@ public class GroupByStatsMonitorTest
emitter.verifyValue("groupBy/mergeDictionarySize", 300L);
}
+ @Test
+ public void testMonitorWithServiceDimensions()
+ {
+ final String dataSource = "fooDs";
+ final String taskId = "taskId1";
+ final Injector injector = Initialization.makeInjectorWithModules(
+ GuiceInjectors.makeStartupInjector(),
+ List.of(binder -> {
+ JsonConfigProvider.bindInstance(
+ binder,
+ Key.get(DruidNode.class, Self.class),
+ new DruidNode("test-inject", null, false, null, null, true,
false)
+ );
+ binder.bind(Key.get(String.class,
Names.named(DataSourceTaskIdHolder.DATA_SOURCE_BINDING)))
+ .toInstance(dataSource);
+ binder.bind(Key.get(String.class,
Names.named(DataSourceTaskIdHolder.TASK_ID_BINDING)))
+ .toInstance(taskId);
+ })
+ );
+ final DataSourceTaskIdHolder dsTaskIdHolder = new DataSourceTaskIdHolder();
+ injector.injectMembers(dsTaskIdHolder);
+ final GroupByStatsMonitor monitor =
+ new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool,
dsTaskIdHolder);
+ final StubServiceEmitter emitter = new StubServiceEmitter("service",
"host");
+ monitor.doMonitor(emitter);
+ emitter.flush();
+ // Trigger metric emission
+ monitor.doMonitor(emitter);
+
+ final Map<String, Object> dimFilters = Map.of(
+ "taskId", List.of(taskId), "dataSource", List.of(dataSource), "id",
List.of(taskId)
+ );
+ Assert.assertEquals(7, emitter.getNumEmittedEvents());
+ emitter.verifyValue("mergeBuffer/pendingRequests", dimFilters, 0L);
+ emitter.verifyValue("mergeBuffer/used", dimFilters, 0L);
+ emitter.verifyValue("mergeBuffer/queries", dimFilters, 1L);
+ emitter.verifyValue("mergeBuffer/acquisitionTimeNs", dimFilters, 100L);
+ emitter.verifyValue("groupBy/spilledQueries", dimFilters, 2L);
+ emitter.verifyValue("groupBy/spilledBytes", dimFilters, 200L);
+ emitter.verifyValue("groupBy/mergeDictionarySize", dimFilters, 300L);
+ }
+
+
@Test
public void testMonitoringMergeBuffer_acquiredCount()
throws ExecutionException, InterruptedException, TimeoutException
@@ -101,7 +153,7 @@ public class GroupByStatsMonitorTest
}).get(20, TimeUnit.SECONDS);
final GroupByStatsMonitor monitor =
- new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool);
+ new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool, new
DataSourceTaskIdHolder());
final StubServiceEmitter emitter = new StubServiceEmitter("DummyService",
"DummyHost");
boolean ret = monitor.doMonitor(emitter);
Assert.assertTrue(ret);
@@ -130,7 +182,7 @@ public class GroupByStatsMonitorTest
}
final GroupByStatsMonitor monitor =
- new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool);
+ new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool, new
DataSourceTaskIdHolder());
final StubServiceEmitter emitter = new
StubServiceEmitter("DummyService", "DummyHost");
boolean ret = monitor.doMonitor(emitter);
Assert.assertTrue(ret);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]