LakshSingla commented on code in PR #17360:
URL: https://github.com/apache/druid/pull/17360#discussion_r1827505139
##########
docs/operations/metrics.md:
##########
@@ -86,6 +85,13 @@ Most metric values reset each emission period, as specified
in `druid.monitoring
|`subquery/fallback/unknownReason/count`|Number of subqueries which cannot be
materialized as frames due other reasons.|This metric is only available if the
`SubqueryCountStatsMonitor` module is included.| |
|`query/rowLimit/exceeded/count`|Number of queries whose inlined subquery
results exceeded the given row limit|This metric is only available if the
`SubqueryCountStatsMonitor` module is included.| |
|`query/byteLimit/exceeded/count`|Number of queries whose inlined subquery
results exceeded the given byte limit|This metric is only available if the
`SubqueryCountStatsMonitor` module is included.| |
+|`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch
of buffers from the merge buffer pool.|This metric is only available if the
`GroupByStatsMonitor` module is included.|Should be 0.|
+|`mergeBuffer/usedCount`|Number of merge buffers used from the merge buffer
pool.|This metric is only available if the `GroupByStatsMonitor` module is
included.|Depends on the number of groupBy queries needing merge buffers.|
+|`mergeBuffer/acquisitionCount`|Number of times groupBy queries acquired merge
buffers.|This metric is only available if the `GroupByStatsMonitor` module is
included.|Depends on the number of groupBy queries needing merge buffers.|
Review Comment:
> Number of times groupBy queries acquired merge buffers.|
Is it cumulative or per query? A query should acquire merge buffer only
once. I think if we reword the metric name or the documentation, it will be
clear to the users.
##########
docs/operations/metrics.md:
##########
@@ -103,7 +109,13 @@ Most metric values reset each emission period, as
specified in `druid.monitoring
|`query/failed/count`|Number of failed queries.|This metric is only available
if the `QueryCountStatsMonitor` module is included.||
|`query/interrupted/count`|Number of queries interrupted due to
cancellation.|This metric is only available if the `QueryCountStatsMonitor`
module is included.||
|`query/timeout/count`|Number of timed out queries.|This metric is only
available if the `QueryCountStatsMonitor` module is included.||
-|`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch
of buffers from the merge buffer pool.|This metric is only available if the
`QueryCountStatsMonitor` module is included.||
+|`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch
of buffers from the merge buffer pool.|This metric is only available if the
`GroupByStatsMonitor` module is included.|Should be 0.|
+|`mergeBuffer/usedCount`|Number of merge buffers used from the merge buffer
pool.|This metric is only available if the `GroupByStatsMonitor` module is
included.|Depends on the number of groupBy queries needing merge buffers.|
+|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge
buffer for groupBy queries.|This metric is only available if the
`GroupByStatsMonitor` module is included.|Should be as low as possible.|
+|`mergeBuffer/acquisitionCount`|Number of times groupBy queries acquired merge
buffers.|This metric is only available if the `GroupByStatsMonitor` module is
included.|Depends on the number of groupBy queries needing merge buffers.|
+|`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the
disk.|This metric is only available if the `GroupByStatsMonitor` module is
included.|Varies|
+|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy
queries.|This metric is only available if the `GroupByStatsMonitor` module is
included.|Varies|
+|`groupBy/mergeDictionarySize`|Size of on-heap merge dictionary in bytes.|This
metric is only available if the `GroupByStatsMonitor` module is
included.|Varies|
Review Comment:
I wonder if there's a way to reference all these in a single place instead.
It seems wasteful that two copies of the same documentation are maintained. LMK
if there are any prior metrics that can do that.
##########
docs/configuration/index.md:
##########
@@ -408,6 +408,7 @@ Metric monitoring is an essential part of Druid operations.
The following monito
|`org.apache.druid.server.metrics.TaskSlotCountStatsMonitor`|Reports metrics
about task slot usage per emission period.|
|`org.apache.druid.server.metrics.WorkerTaskCountStatsMonitor`|Reports how
many ingestion tasks are currently running/pending/waiting, the number of
successful/failed tasks, and metrics about task slot usage for the reporting
worker, per emission period. Only supported by Middle Manager node types.|
|`org.apache.druid.server.metrics.ServiceStatusMonitor`|Reports a heartbeat
for the service.|
+|`org.apache.druid.server.metrics.GroupByStatsMonitor`|Report statistics for
groupBy queries like disk spill, merge buffer usage. |
Review Comment:
nit : disk spill can be worded better.
```suggestion
|`org.apache.druid.server.metrics.GroupByStatsMonitor`|Reports metrics for
groupBy queries like disk and merge buffer utilized by them. |
```
##########
docs/operations/metrics.md:
##########
@@ -86,6 +85,13 @@ Most metric values reset each emission period, as specified
in `druid.monitoring
|`subquery/fallback/unknownReason/count`|Number of subqueries which cannot be
materialized as frames due other reasons.|This metric is only available if the
`SubqueryCountStatsMonitor` module is included.| |
|`query/rowLimit/exceeded/count`|Number of queries whose inlined subquery
results exceeded the given row limit|This metric is only available if the
`SubqueryCountStatsMonitor` module is included.| |
|`query/byteLimit/exceeded/count`|Number of queries whose inlined subquery
results exceeded the given byte limit|This metric is only available if the
`SubqueryCountStatsMonitor` module is included.| |
+|`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch
of buffers from the merge buffer pool.|This metric is only available if the
`GroupByStatsMonitor` module is included.|Should be 0.|
Review Comment:
Recommended value doesn't seem correct. It is fine if the queries are
waiting on the merge buffer even during normal operation. Can mention, ideally
0, but it is fine if not (after rewording).
##########
server/src/main/java/org/apache/druid/server/metrics/QueryCountStatsMonitor.java:
##########
@@ -72,9 +75,11 @@ public boolean doMonitor(ServiceEmitter emitter)
}
}
- long pendingQueries = this.mergeBufferPool.getPendingRequests();
- emitter.emit(builder.setMetric("mergeBuffer/pendingRequests",
pendingQueries));
+ if (emitMergeBufferPendingRequests) {
+ long pendingQueries = this.mergeBufferPool.getPendingRequests();
+ emitter.emit(builder.setMetric("mergeBuffer/pendingRequests",
pendingQueries));
+ }
+
Review Comment:
Why is this required? Isn't it cleaner to not emit the metric from the
GroupByStatsMonitor at all and keep it as is?
##########
docs/operations/metrics.md:
##########
@@ -86,6 +85,13 @@ Most metric values reset each emission period, as specified
in `druid.monitoring
|`subquery/fallback/unknownReason/count`|Number of subqueries which cannot be
materialized as frames due other reasons.|This metric is only available if the
`SubqueryCountStatsMonitor` module is included.| |
|`query/rowLimit/exceeded/count`|Number of queries whose inlined subquery
results exceeded the given row limit|This metric is only available if the
`SubqueryCountStatsMonitor` module is included.| |
|`query/byteLimit/exceeded/count`|Number of queries whose inlined subquery
results exceeded the given byte limit|This metric is only available if the
`SubqueryCountStatsMonitor` module is included.| |
+|`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch
of buffers from the merge buffer pool.|This metric is only available if the
`GroupByStatsMonitor` module is included.|Should be 0.|
+|`mergeBuffer/usedCount`|Number of merge buffers used from the merge buffer
pool.|This metric is only available if the `GroupByStatsMonitor` module is
included.|Depends on the number of groupBy queries needing merge buffers.|
+|`mergeBuffer/acquisitionCount`|Number of times groupBy queries acquired merge
buffers.|This metric is only available if the `GroupByStatsMonitor` module is
included.|Depends on the number of groupBy queries needing merge buffers.|
+|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge
buffer for groupBy queries.|This metric is only available if the
`GroupByStatsMonitor` module is included.|Should be as low as possible.|
Review Comment:
```suggestion
|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge
buffer for groupBy queries.|This metric is only available if the
`GroupByStatsMonitor` module is included.|Varies.|
```
##########
processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java:
##########
@@ -13682,4 +13755,16 @@ private void cannotVectorize()
expectedException.expectMessage("Cannot vectorize!");
}
}
+
+ private void verifyMetrics(long queries, boolean skipMergeDictionary)
Review Comment:
What is skipMergeDictionary and when is it required?
##########
processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby;
+
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.query.QueryResourceId;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Metrics collector for groupBy queries like spilled bytes, merge buffer
acquistion time, dictionary size.
+ */
+@LazySingleton
Review Comment:
Should it be registered in a Module if this is a LazySingleton? (I am not
very familiar, just curious if this is working as expected)
##########
processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby;
+
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.query.QueryResourceId;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Metrics collector for groupBy queries like spilled bytes, merge buffer
acquistion time, dictionary size.
+ */
+@LazySingleton
+public class GroupByStatsProvider
+{
+ private final Map<QueryResourceId, PerQueryStats> perQueryStats;
+ private final AggregateStats aggregateStatsContainer;
+
+ public GroupByStatsProvider()
+ {
+ this.perQueryStats = new ConcurrentHashMap<>();
+ this.aggregateStatsContainer = new AggregateStats();
+ }
+
+ public PerQueryStats getPerQueryStatsContainer(QueryResourceId resourceId)
+ {
+ if (resourceId == null) {
+ return null;
+ }
+ return perQueryStats.computeIfAbsent(resourceId, value -> new
PerQueryStats());
+ }
+
+ public synchronized void closeQuery(QueryResourceId resourceId)
+ {
+ if (resourceId == null || !perQueryStats.containsKey(resourceId)) {
+ return;
+ }
+ PerQueryStats container = perQueryStats.remove(resourceId);
+ aggregateStatsContainer.addQueryStats(container);
+ }
Review Comment:
This is not intuitive. The callers won't remember calling `closeQuery()`. Is
it possible to mold it as a Closer/Closable? PerQueryStats can extend
Closeable, and the close can do that.
It is still not good since it doesn't seem cool to close "Stats" in first
place. Perhaps renaming `PerQueryStats` to `PerQueryStatsContainer` would also
help.
##########
server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import org.apache.druid.collections.BlockingPool;
+import org.apache.druid.collections.DefaultBlockingPool;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.query.groupby.GroupByStatsProvider;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+public class GroupByStatsMonitorTest
+{
+ private GroupByStatsProvider groupByStatsProvider;
+ private BlockingPool<ByteBuffer> mergeBufferPool;
+ private ExecutorService executorService;
+
+ @Before
+ public void setUp()
+ {
+ groupByStatsProvider = new GroupByStatsProvider()
+ {
+ @Override
+ public synchronized AggregateStats getStatsSince()
+ {
+ return new AggregateStats(
+ 1L,
+ 100L,
+ 2L,
+ 200L,
+ 300L
+ );
+ }
+ };
+
+ mergeBufferPool = new DefaultBlockingPool(() -> ByteBuffer.allocate(1024),
5);
+ executorService = Executors.newSingleThreadExecutor();
+ }
+
+ @After
+ public void tearDown()
+ {
+ executorService.shutdown();
+ }
+
+ @Test
+ public void testMonitor()
+ {
+ final GroupByStatsMonitor monitor =
+ new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool);
+ final StubServiceEmitter emitter = new StubServiceEmitter("service",
"host");
+ monitor.doMonitor(emitter);
+ emitter.flush();
+ // Trigger metric emission
+ monitor.doMonitor(emitter);
+ Map<String, Long> resultMap = emitter.getEvents()
+ .stream()
+ .collect(Collectors.toMap(
+ event -> (String)
event.toMap().get("metric"),
+ event -> (Long)
event.toMap().get("value")
+ ));
+ Assert.assertEquals(7, resultMap.size());
+ Assert.assertEquals(0, (long)
resultMap.get("mergeBuffer/pendingRequests"));
+ Assert.assertEquals(0, (long) resultMap.get("mergeBuffer/usedCount"));
+ Assert.assertEquals(1, (long)
resultMap.get("mergeBuffer/acquisitionCount"));
+ Assert.assertEquals(100, (long)
resultMap.get("mergeBuffer/acquisitionTimeNs"));
+ Assert.assertEquals(2, (long) resultMap.get("groupBy/spilledQueries"));
+ Assert.assertEquals(200, (long) resultMap.get("groupBy/spilledBytes"));
+ Assert.assertEquals(300, (long)
resultMap.get("groupBy/mergeDictionarySize"));
+ }
+
+ @Test
+ public void testMonitoringMergeBuffer_acquiredCount()
+ throws ExecutionException, InterruptedException, TimeoutException
+ {
+ executorService.submit(() -> {
+ mergeBufferPool.takeBatch(4);
+ }).get(20, TimeUnit.SECONDS);
+
+ final GroupByStatsMonitor monitor =
+ new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool);
+ final StubServiceEmitter emitter = new StubServiceEmitter("DummyService",
"DummyHost");
+ boolean ret = monitor.doMonitor(emitter);
+ Assert.assertTrue(ret);
+
+ List<Number> numbers = emitter.getMetricValues("mergeBuffer/usedCount",
Collections.emptyMap());
+ Assert.assertEquals(1, numbers.size());
+ Assert.assertEquals(4, numbers.get(0).intValue());
+ }
+
+ @Test(timeout = 2_000L)
+ public void testMonitoringMergeBuffer_pendingRequests()
+ {
+ executorService.submit(() -> {
+ mergeBufferPool.takeBatch(10);
+ });
+
+ int count = 0;
+ try {
+ // wait at most 10 secs for the executor thread to block
Review Comment:
Is the "10second" number stale?
##########
processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby;
+
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.query.QueryResourceId;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Metrics collector for groupBy queries like spilled bytes, merge buffer
acquistion time, dictionary size.
+ */
+@LazySingleton
+public class GroupByStatsProvider
+{
+ private final Map<QueryResourceId, PerQueryStats> perQueryStats;
+ private final AggregateStats aggregateStatsContainer;
+
+ public GroupByStatsProvider()
+ {
+ this.perQueryStats = new ConcurrentHashMap<>();
+ this.aggregateStatsContainer = new AggregateStats();
+ }
+
+ public PerQueryStats getPerQueryStatsContainer(QueryResourceId resourceId)
+ {
+ if (resourceId == null) {
+ return null;
+ }
+ return perQueryStats.computeIfAbsent(resourceId, value -> new
PerQueryStats());
+ }
+
+ public synchronized void closeQuery(QueryResourceId resourceId)
+ {
+ if (resourceId == null || !perQueryStats.containsKey(resourceId)) {
+ return;
+ }
+ PerQueryStats container = perQueryStats.remove(resourceId);
+ aggregateStatsContainer.addQueryStats(container);
+ }
+
+ public synchronized AggregateStats getStatsSince()
+ {
+ return aggregateStatsContainer.reset();
+ }
+
+ public static class AggregateStats
Review Comment:
Why are some stats AggregateStats, and some PerQuery? Wouldn't the monitor
aggregate them all?
##########
processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java:
##########
@@ -12383,6 +12448,10 @@ public void
testGroupByOnNullableDoubleNoLimitPushdown()
Iterable<ResultRow> results =
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "groupBy");
+
+ if (config.equals(V2_SMALL_BUFFER_CONFIG)) {
+ verifyMetrics(1, true);
+ }
Review Comment:
Why are the assertions being done on a single config?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]