LakshSingla commented on code in PR #17360:
URL: https://github.com/apache/druid/pull/17360#discussion_r1827505139


##########
docs/operations/metrics.md:
##########
@@ -86,6 +85,13 @@ Most metric values reset each emission period, as specified 
in `druid.monitoring
 |`subquery/fallback/unknownReason/count`|Number of subqueries which cannot be 
materialized as frames due other reasons.|This metric is only available if the 
`SubqueryCountStatsMonitor` module is included.| |
 |`query/rowLimit/exceeded/count`|Number of queries whose inlined subquery 
results exceeded the given row limit|This metric is only available if the 
`SubqueryCountStatsMonitor` module is included.| |
 |`query/byteLimit/exceeded/count`|Number of queries whose inlined subquery 
results exceeded the given byte limit|This metric is only available if the 
`SubqueryCountStatsMonitor` module is included.| |
+|`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch 
of buffers from the merge buffer pool.|This metric is only available if the 
`GroupByStatsMonitor` module is included.|Should be 0.|
+|`mergeBuffer/usedCount`|Number of merge buffers used from the merge buffer 
pool.|This metric is only available if the `GroupByStatsMonitor` module is 
included.|Depends on the number of groupBy queries needing merge buffers.|
+|`mergeBuffer/acquisitionCount`|Number of times groupBy queries acquired merge 
buffers.|This metric is only available if the `GroupByStatsMonitor` module is 
included.|Depends on the number of groupBy queries needing merge buffers.|

Review Comment:
   > Number of times groupBy queries acquired merge buffers.|
   
   Is it cumulative or per query? A query should acquire merge buffer only 
once. I think if we reword the metric name or the documentation, it will be 
clear to the users. 



##########
docs/operations/metrics.md:
##########
@@ -103,7 +109,13 @@ Most metric values reset each emission period, as 
specified in `druid.monitoring
 |`query/failed/count`|Number of failed queries.|This metric is only available 
if the `QueryCountStatsMonitor` module is included.||
 |`query/interrupted/count`|Number of queries interrupted due to 
cancellation.|This metric is only available if the `QueryCountStatsMonitor` 
module is included.||
 |`query/timeout/count`|Number of timed out queries.|This metric is only 
available if the `QueryCountStatsMonitor` module is included.||
-|`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch 
of buffers from the merge buffer pool.|This metric is only available if the 
`QueryCountStatsMonitor` module is included.||
+|`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch 
of buffers from the merge buffer pool.|This metric is only available if the 
`GroupByStatsMonitor` module is included.|Should be 0.|
+|`mergeBuffer/usedCount`|Number of merge buffers used from the merge buffer 
pool.|This metric is only available if the `GroupByStatsMonitor` module is 
included.|Depends on the number of groupBy queries needing merge buffers.|
+|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge 
buffer for groupBy queries.|This metric is only available if the 
`GroupByStatsMonitor` module is included.|Should be as low as possible.|
+|`mergeBuffer/acquisitionCount`|Number of times groupBy queries acquired merge 
buffers.|This metric is only available if the `GroupByStatsMonitor` module is 
included.|Depends on the number of groupBy queries needing merge buffers.|
+|`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the 
disk.|This metric is only available if the `GroupByStatsMonitor` module is 
included.|Varies|
+|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy 
queries.|This metric is only available if the `GroupByStatsMonitor` module is 
included.|Varies|
+|`groupBy/mergeDictionarySize`|Size of on-heap merge dictionary in bytes.|This 
metric is only available if the `GroupByStatsMonitor` module is 
included.|Varies|

Review Comment:
   I wonder if there's a way to reference all these in a single place instead. 
It seems wasteful that two copies of the same documentation are maintained. LMK 
if there are any prior metrics that can do that. 



##########
docs/configuration/index.md:
##########
@@ -408,6 +408,7 @@ Metric monitoring is an essential part of Druid operations. 
The following monito
 |`org.apache.druid.server.metrics.TaskSlotCountStatsMonitor`|Reports metrics 
about task slot usage per emission period.|
 |`org.apache.druid.server.metrics.WorkerTaskCountStatsMonitor`|Reports how 
many ingestion tasks are currently running/pending/waiting, the number of 
successful/failed tasks, and metrics about task slot usage for the reporting 
worker, per emission period. Only supported by Middle Manager node types.|
 |`org.apache.druid.server.metrics.ServiceStatusMonitor`|Reports a heartbeat 
for the service.|
+|`org.apache.druid.server.metrics.GroupByStatsMonitor`|Report statistics for 
groupBy queries like disk spill, merge buffer usage. |

Review Comment:
   nit : disk spill can be worded better. 
   ```suggestion
   |`org.apache.druid.server.metrics.GroupByStatsMonitor`|Reports metrics for 
groupBy queries like disk and merge buffer utilized by them. |
   ```
   



##########
docs/operations/metrics.md:
##########
@@ -86,6 +85,13 @@ Most metric values reset each emission period, as specified 
in `druid.monitoring
 |`subquery/fallback/unknownReason/count`|Number of subqueries which cannot be 
materialized as frames due other reasons.|This metric is only available if the 
`SubqueryCountStatsMonitor` module is included.| |
 |`query/rowLimit/exceeded/count`|Number of queries whose inlined subquery 
results exceeded the given row limit|This metric is only available if the 
`SubqueryCountStatsMonitor` module is included.| |
 |`query/byteLimit/exceeded/count`|Number of queries whose inlined subquery 
results exceeded the given byte limit|This metric is only available if the 
`SubqueryCountStatsMonitor` module is included.| |
+|`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch 
of buffers from the merge buffer pool.|This metric is only available if the 
`GroupByStatsMonitor` module is included.|Should be 0.|

Review Comment:
   Recommended value doesn't seem correct. It is fine if the queries are 
waiting on the merge buffer even during normal operation. Can mention, ideally 
0, but it is fine if not (after rewording). 



##########
server/src/main/java/org/apache/druid/server/metrics/QueryCountStatsMonitor.java:
##########
@@ -72,9 +75,11 @@ public boolean doMonitor(ServiceEmitter emitter)
       }
     }
 
-    long pendingQueries = this.mergeBufferPool.getPendingRequests();
-    emitter.emit(builder.setMetric("mergeBuffer/pendingRequests", 
pendingQueries));
+    if (emitMergeBufferPendingRequests) {
+      long pendingQueries = this.mergeBufferPool.getPendingRequests();
+      emitter.emit(builder.setMetric("mergeBuffer/pendingRequests", 
pendingQueries));
+    }
+

Review Comment:
   Why is this required? Isn't it cleaner to not emit the metric from the 
GroupByStatsMonitor at all and keep it as is? 



##########
docs/operations/metrics.md:
##########
@@ -86,6 +85,13 @@ Most metric values reset each emission period, as specified 
in `druid.monitoring
 |`subquery/fallback/unknownReason/count`|Number of subqueries which cannot be 
materialized as frames due other reasons.|This metric is only available if the 
`SubqueryCountStatsMonitor` module is included.| |
 |`query/rowLimit/exceeded/count`|Number of queries whose inlined subquery 
results exceeded the given row limit|This metric is only available if the 
`SubqueryCountStatsMonitor` module is included.| |
 |`query/byteLimit/exceeded/count`|Number of queries whose inlined subquery 
results exceeded the given byte limit|This metric is only available if the 
`SubqueryCountStatsMonitor` module is included.| |
+|`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch 
of buffers from the merge buffer pool.|This metric is only available if the 
`GroupByStatsMonitor` module is included.|Should be 0.|
+|`mergeBuffer/usedCount`|Number of merge buffers used from the merge buffer 
pool.|This metric is only available if the `GroupByStatsMonitor` module is 
included.|Depends on the number of groupBy queries needing merge buffers.|
+|`mergeBuffer/acquisitionCount`|Number of times groupBy queries acquired merge 
buffers.|This metric is only available if the `GroupByStatsMonitor` module is 
included.|Depends on the number of groupBy queries needing merge buffers.|
+|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge 
buffer for groupBy queries.|This metric is only available if the 
`GroupByStatsMonitor` module is included.|Should be as low as possible.|

Review Comment:
   ```suggestion
   |`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge 
buffer for groupBy queries.|This metric is only available if the 
`GroupByStatsMonitor` module is included.|Varies.|
   ```



##########
processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java:
##########
@@ -13682,4 +13755,16 @@ private void cannotVectorize()
       expectedException.expectMessage("Cannot vectorize!");
     }
   }
+
+  private void verifyMetrics(long queries, boolean skipMergeDictionary)

Review Comment:
   What is skipMergeDictionary and when is it required? 



##########
processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby;
+
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.query.QueryResourceId;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Metrics collector for groupBy queries like spilled bytes, merge buffer 
acquistion time, dictionary size.
+ */
+@LazySingleton

Review Comment:
   Should it be registered in a Module if this is a LazySingleton? (I am not 
very familiar, just curious if this is working as expected)



##########
processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby;
+
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.query.QueryResourceId;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Metrics collector for groupBy queries like spilled bytes, merge buffer 
acquistion time, dictionary size.
+ */
+@LazySingleton
+public class GroupByStatsProvider
+{
+  private final Map<QueryResourceId, PerQueryStats> perQueryStats;
+  private final AggregateStats aggregateStatsContainer;
+
+  public GroupByStatsProvider()
+  {
+    this.perQueryStats = new ConcurrentHashMap<>();
+    this.aggregateStatsContainer = new AggregateStats();
+  }
+
+  public PerQueryStats getPerQueryStatsContainer(QueryResourceId resourceId)
+  {
+    if (resourceId == null) {
+      return null;
+    }
+    return perQueryStats.computeIfAbsent(resourceId, value -> new 
PerQueryStats());
+  }
+
+  public synchronized void closeQuery(QueryResourceId resourceId)
+  {
+    if (resourceId == null || !perQueryStats.containsKey(resourceId)) {
+      return;
+    }
+    PerQueryStats container = perQueryStats.remove(resourceId);
+    aggregateStatsContainer.addQueryStats(container);
+  }

Review Comment:
   This is not intuitive. The callers won't remember calling `closeQuery()`. Is 
it possible to mold it as a Closer/Closable? PerQueryStats can extend 
Closeable, and the close can do that. 
   
   It is still not good since it doesn't seem cool to close "Stats" in first 
place.  Perhaps renaming `PerQueryStats` to `PerQueryStatsContainer` would also 
help. 



##########
server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import org.apache.druid.collections.BlockingPool;
+import org.apache.druid.collections.DefaultBlockingPool;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.query.groupby.GroupByStatsProvider;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+public class GroupByStatsMonitorTest
+{
+  private GroupByStatsProvider groupByStatsProvider;
+  private BlockingPool<ByteBuffer> mergeBufferPool;
+  private ExecutorService executorService;
+
+  @Before
+  public void setUp()
+  {
+    groupByStatsProvider = new GroupByStatsProvider()
+    {
+      @Override
+      public synchronized AggregateStats getStatsSince()
+      {
+        return new AggregateStats(
+                1L,
+                100L,
+                2L,
+                200L,
+                300L
+            );
+      }
+    };
+
+    mergeBufferPool = new DefaultBlockingPool(() -> ByteBuffer.allocate(1024), 
5);
+    executorService = Executors.newSingleThreadExecutor();
+  }
+
+  @After
+  public void tearDown()
+  {
+    executorService.shutdown();
+  }
+
+  @Test
+  public void testMonitor()
+  {
+    final GroupByStatsMonitor monitor =
+        new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool);
+    final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
+    monitor.doMonitor(emitter);
+    emitter.flush();
+    // Trigger metric emission
+    monitor.doMonitor(emitter);
+    Map<String, Long> resultMap = emitter.getEvents()
+                                         .stream()
+                                         .collect(Collectors.toMap(
+                                             event -> (String) 
event.toMap().get("metric"),
+                                             event -> (Long) 
event.toMap().get("value")
+                                         ));
+    Assert.assertEquals(7, resultMap.size());
+    Assert.assertEquals(0, (long) 
resultMap.get("mergeBuffer/pendingRequests"));
+    Assert.assertEquals(0, (long) resultMap.get("mergeBuffer/usedCount"));
+    Assert.assertEquals(1, (long) 
resultMap.get("mergeBuffer/acquisitionCount"));
+    Assert.assertEquals(100, (long) 
resultMap.get("mergeBuffer/acquisitionTimeNs"));
+    Assert.assertEquals(2, (long) resultMap.get("groupBy/spilledQueries"));
+    Assert.assertEquals(200, (long) resultMap.get("groupBy/spilledBytes"));
+    Assert.assertEquals(300, (long) 
resultMap.get("groupBy/mergeDictionarySize"));
+  }
+
+  @Test
+  public void testMonitoringMergeBuffer_acquiredCount()
+      throws ExecutionException, InterruptedException, TimeoutException
+  {
+    executorService.submit(() -> {
+      mergeBufferPool.takeBatch(4);
+    }).get(20, TimeUnit.SECONDS);
+
+    final GroupByStatsMonitor monitor =
+        new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool);
+    final StubServiceEmitter emitter = new StubServiceEmitter("DummyService", 
"DummyHost");
+    boolean ret = monitor.doMonitor(emitter);
+    Assert.assertTrue(ret);
+
+    List<Number> numbers = emitter.getMetricValues("mergeBuffer/usedCount", 
Collections.emptyMap());
+    Assert.assertEquals(1, numbers.size());
+    Assert.assertEquals(4, numbers.get(0).intValue());
+  }
+
+  @Test(timeout = 2_000L)
+  public void testMonitoringMergeBuffer_pendingRequests()
+  {
+    executorService.submit(() -> {
+      mergeBufferPool.takeBatch(10);
+    });
+
+    int count = 0;
+    try {
+      // wait at most 10 secs for the executor thread to block

Review Comment:
   Is the "10second" number stale? 



##########
processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby;
+
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.query.QueryResourceId;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Metrics collector for groupBy queries like spilled bytes, merge buffer 
acquistion time, dictionary size.
+ */
+@LazySingleton
+public class GroupByStatsProvider
+{
+  private final Map<QueryResourceId, PerQueryStats> perQueryStats;
+  private final AggregateStats aggregateStatsContainer;
+
+  public GroupByStatsProvider()
+  {
+    this.perQueryStats = new ConcurrentHashMap<>();
+    this.aggregateStatsContainer = new AggregateStats();
+  }
+
+  public PerQueryStats getPerQueryStatsContainer(QueryResourceId resourceId)
+  {
+    if (resourceId == null) {
+      return null;
+    }
+    return perQueryStats.computeIfAbsent(resourceId, value -> new 
PerQueryStats());
+  }
+
+  public synchronized void closeQuery(QueryResourceId resourceId)
+  {
+    if (resourceId == null || !perQueryStats.containsKey(resourceId)) {
+      return;
+    }
+    PerQueryStats container = perQueryStats.remove(resourceId);
+    aggregateStatsContainer.addQueryStats(container);
+  }
+
+  public synchronized AggregateStats getStatsSince()
+  {
+    return aggregateStatsContainer.reset();
+  }
+
+  public static class AggregateStats

Review Comment:
   Why are some stats AggregateStats, and some PerQuery? Wouldn't the monitor 
aggregate them all? 



##########
processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java:
##########
@@ -12383,6 +12448,10 @@ public void 
testGroupByOnNullableDoubleNoLimitPushdown()
 
     Iterable<ResultRow> results = 
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
     TestHelper.assertExpectedObjects(expectedResults, results, "groupBy");
+
+    if (config.equals(V2_SMALL_BUFFER_CONFIG)) {
+      verifyMetrics(1, true);
+    }

Review Comment:
   Why are the assertions being done on a single config? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to