This is an automated email from the ASF dual-hosted git repository.

gwphua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new de983e485c6 [Groupby Query Metrics] Add merge buffer tracking (#18731)
de983e485c6 is described below

commit de983e485c6b900605220c8d231dbec7c0698ef0
Author: Virushade <[email protected]>
AuthorDate: Wed Mar 4 10:34:00 2026 +0800

    [Groupby Query Metrics] Add merge buffer tracking (#18731)
    
    * Add byte buffer tracking for underlying hash tables
    
    * Byte buffer tracking for underlying offset handlers
    
    * Fix tests
    
    * Fix quidem tests
    
    * Documentation
    
    * bytesUsed naming
    
    * Add max metrics
    
    * Add missing calculation in BufferHashGrouper
    
    * Checkstyle
    
    * Checkstyle
    
    * GroupByStatsProvider javadocs
    
    * Fix GroupByStatsProviderTest comments
    
    * Fix doc order for GroupByStatsProvider metrics
    
    * Fix test for GroupByStatsMonitorTest
    
    * Update 
server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java
    
    Co-authored-by: Abhishek Radhakrishnan <[email protected]>
    
    * Revert stylistic changes in BufferHashGrouper
    
    * Rename mergeBufferUsage to mergeBufferUsedBytes
    
    * Order of maxAcquisitionTimeNs
    
    * Track the open addressing hash table
    
    * Remove max metrics, push them in another PR...
    
    * Remove max metrics in GroupByStatsProviderTest
    
    * LimitedBufferHashGrouper to use parent method to report 
maxTableBufferUsedBytes
    
    * Standardised merge buffer names
    
    * Tests for buffer hash grouper
    
    Tests for buffer hash grouper
    
    * Address multiplication cast
    
    * Javadocs for getMergeBufferUsedBytes
    
    * Remix comments in test for peak calculations
    
    * Clean up after merging conflicts
    
    * Standardize maxMergeBufferUsedBytes
    
    * Test duplicate buffer adds
    
    * Test
    
    * Add javadocs for update
    
    ---------
    
    Co-authored-by: Abhishek Radhakrishnan <[email protected]>
---
 docs/operations/metrics.md                         |  6 ++
 .../druid/query/groupby/GroupByStatsProvider.java  | 70 ++++++++++++++++------
 .../epinephelinae/AbstractBufferHashGrouper.java   | 12 ++++
 .../groupby/epinephelinae/BufferHashGrouper.java   | 14 ++++-
 .../groupby/epinephelinae/ByteBufferHashTable.java | 28 +++++++++
 .../groupby/epinephelinae/ByteBufferIntList.java   |  9 +++
 .../epinephelinae/ByteBufferMinMaxOffsetHeap.java  | 13 +++-
 .../groupby/epinephelinae/ConcurrentGrouper.java   |  4 +-
 .../epinephelinae/LimitedBufferHashGrouper.java    | 14 +++++
 .../groupby/epinephelinae/SpillingGrouper.java     | 15 ++++-
 .../query/groupby/GroupByStatsProviderTest.java    | 13 +++-
 .../epinephelinae/BufferHashGrouperTest.java       | 55 +++++++++++++++++
 .../LimitedBufferHashGrouperTest.java              | 56 +++++++++++++++++
 .../druid/server/metrics/GroupByStatsMonitor.java  | 13 ++--
 .../server/metrics/GroupByStatsMonitorTest.java    | 30 ++++++----
 15 files changed, 303 insertions(+), 49 deletions(-)

diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 523ea15185b..4e422696fc8 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -90,6 +90,8 @@ Most metric values reset each emission period, as specified 
in `druid.monitoring
 |`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of 
buffers from the merge buffer pool.|This metric is only available if the 
`GroupByStatsMonitor` module is included.|Depends on the number of groupBy 
queries needing merge buffers.|
 |`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge 
buffer for groupBy queries.|This metric is only available if the 
`GroupByStatsMonitor` module is included.|Varies|
 |`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire 
merge buffer for any single groupBy query within the emission period.|This 
metric is only available if the `GroupByStatsMonitor` module is 
included.|Varies|
+|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process 
groupBy queries.|This metric is only available if the `GroupByStatsMonitor` 
module is included.|Varies|
+|`mergeBuffer/maxBytesUsed`|Maximum number of bytes used by merge buffers for 
any single groupBy query within the emission period.|This metric is only 
available if the `GroupByStatsMonitor` module is included.|Varies|
 |`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the 
disk.|This metric is only available if the `GroupByStatsMonitor` module is 
included.|Varies|
 |`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy 
queries.|This metric is only available if the `GroupByStatsMonitor` module is 
included.|Varies|
 |`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any 
single groupBy query within the emission period.|This metric is only available 
if the `GroupByStatsMonitor` module is included.|Varies|
@@ -117,6 +119,8 @@ Most metric values reset each emission period, as specified 
in `druid.monitoring
 |`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of 
buffers from the merge buffer pool.|This metric is only available if the 
`GroupByStatsMonitor` module is included.|Depends on the number of groupBy 
queries needing merge buffers.|
 |`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge 
buffer for groupBy queries.|This metric is only available if the 
`GroupByStatsMonitor` module is included.|Varies|
 |`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire 
merge buffer for any single groupBy query within the emission period.|This 
metric is only available if the `GroupByStatsMonitor` module is 
included.|Varies|
+|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process 
groupBy queries.|This metric is only available if the `GroupByStatsMonitor` 
module is included.|Varies|
+|`mergeBuffer/maxBytesUsed`|Maximum number of bytes used by merge buffers for 
any single groupBy query within the emission period.|This metric is only 
available if the `GroupByStatsMonitor` module is included.|Varies|
 |`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the 
disk.|This metric is only available if the `GroupByStatsMonitor` module is 
included.|Varies|
 |`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy 
queries.|This metric is only available if the `GroupByStatsMonitor` module is 
included.|Varies|
 |`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any 
single groupBy query within the emission period.|This metric is only available 
if the `GroupByStatsMonitor` module is included.|Varies|
@@ -147,6 +151,8 @@ to represent the task ID are deprecated and will be removed 
in a future release.
 |`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of 
buffers from the merge buffer pool. This metric is only available if the 
`GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Depends on the 
number of groupBy queries needing merge buffers.|
 |`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge 
buffer for groupBy queries. This metric is only available if the 
`GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
 |`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire 
merge buffer for any single groupBy query within the emission period. This 
metric is only available if the `GroupByStatsMonitor` module is 
included.|`dataSource`, `taskId`|Varies|
+|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process 
groupBy queries.|This metric is only available if the `GroupByStatsMonitor` 
module is included.|`dataSource`, `taskId`|Varies|
+|`mergeBuffer/maxBytesUsed`|Maximum number of bytes used by merge buffers for 
any single groupBy query within the emission period. This metric is only 
available if the `GroupByStatsMonitor` module is included.|`dataSource`, 
`taskId`|Varies|
 |`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the 
disk. This metric is only available if the `GroupByStatsMonitor` module is 
included.|`dataSource`, `taskId`|Varies|
 |`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy 
queries. This metric is only available if the `GroupByStatsMonitor` module is 
included.|`dataSource`, `taskId`|Varies|
 |`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any 
single groupBy query within the emission period. This metric is only available 
if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
diff --git 
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java
 
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java
index 51f56400555..f6b92a7b62c 100644
--- 
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java
+++ 
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java
@@ -27,7 +27,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
- * Metrics collector for groupBy queries like spilled bytes, merge buffer 
acquistion time, dictionary size.
+ * Collects groupBy query metrics (spilled bytes, merge buffer usage, 
dictionary size) per-query, then
+ * aggregates them when queries complete. Stats are retrieved and reset 
periodically via {@link #getStatsSince()}.
  */
 @LazySingleton
 public class GroupByStatsProvider
@@ -60,7 +61,9 @@ public class GroupByStatsProvider
 
   public synchronized AggregateStats getStatsSince()
   {
-    return aggregateStatsContainer.reset();
+    AggregateStats aggregateStats = new 
AggregateStats(aggregateStatsContainer);
+    aggregateStatsContainer.reset();
+    return aggregateStats;
   }
 
   public static class AggregateStats
@@ -68,6 +71,8 @@ public class GroupByStatsProvider
     private long mergeBufferQueries = 0;
     private long mergeBufferAcquisitionTimeNs = 0;
     private long maxMergeBufferAcquisitionTimeNs = 0;
+    private long totalMergeBufferUsedBytes = 0;
+    private long maxMergeBufferUsedBytes = 0;
     private long spilledQueries = 0;
     private long spilledBytes = 0;
     private long maxSpilledBytes = 0;
@@ -78,10 +83,28 @@ public class GroupByStatsProvider
     {
     }
 
+    public AggregateStats(AggregateStats aggregateStats)
+    {
+      this(
+          aggregateStats.mergeBufferQueries,
+          aggregateStats.mergeBufferAcquisitionTimeNs,
+          aggregateStats.maxMergeBufferAcquisitionTimeNs,
+          aggregateStats.totalMergeBufferUsedBytes,
+          aggregateStats.maxMergeBufferUsedBytes,
+          aggregateStats.spilledQueries,
+          aggregateStats.spilledBytes,
+          aggregateStats.maxSpilledBytes,
+          aggregateStats.mergeDictionarySize,
+          aggregateStats.maxMergeDictionarySize
+      );
+    }
+
     public AggregateStats(
         long mergeBufferQueries,
         long mergeBufferAcquisitionTimeNs,
         long maxMergeBufferAcquisitionTimeNs,
+        long totalMergeBufferUsedBytes,
+        long maxMergeBufferUsedBytes,
         long spilledQueries,
         long spilledBytes,
         long maxSpilledBytes,
@@ -92,6 +115,8 @@ public class GroupByStatsProvider
       this.mergeBufferQueries = mergeBufferQueries;
       this.mergeBufferAcquisitionTimeNs = mergeBufferAcquisitionTimeNs;
       this.maxMergeBufferAcquisitionTimeNs = maxMergeBufferAcquisitionTimeNs;
+      this.totalMergeBufferUsedBytes = totalMergeBufferUsedBytes;
+      this.maxMergeBufferUsedBytes = maxMergeBufferUsedBytes;
       this.spilledQueries = spilledQueries;
       this.spilledBytes = spilledBytes;
       this.maxSpilledBytes = maxSpilledBytes;
@@ -114,6 +139,16 @@ public class GroupByStatsProvider
       return maxMergeBufferAcquisitionTimeNs;
     }
 
+    public long getTotalMergeBufferUsedBytes()
+    {
+      return totalMergeBufferUsedBytes;
+    }
+
+    public long getMaxMergeBufferUsedBytes()
+    {
+      return maxMergeBufferUsedBytes;
+    }
+
     public long getSpilledQueries()
     {
       return spilledQueries;
@@ -148,6 +183,8 @@ public class GroupByStatsProvider
             maxMergeBufferAcquisitionTimeNs,
             perQueryStats.getMergeBufferAcquisitionTimeNs()
         );
+        totalMergeBufferUsedBytes += 
perQueryStats.getMaxMergeBufferUsedBytes();
+        maxMergeBufferUsedBytes = Math.max(maxMergeBufferUsedBytes, 
perQueryStats.getMaxMergeBufferUsedBytes());
       }
 
       if (perQueryStats.getSpilledBytes() > 0) {
@@ -160,36 +197,25 @@ public class GroupByStatsProvider
       maxMergeDictionarySize = Math.max(maxMergeDictionarySize, 
perQueryStats.getMergeDictionarySize());
     }
 
-    public AggregateStats reset()
+    public void reset()
     {
-      AggregateStats aggregateStats =
-          new AggregateStats(
-              mergeBufferQueries,
-              mergeBufferAcquisitionTimeNs,
-              maxMergeBufferAcquisitionTimeNs,
-              spilledQueries,
-              spilledBytes,
-              maxSpilledBytes,
-              mergeDictionarySize,
-              maxMergeDictionarySize
-          );
-
       this.mergeBufferQueries = 0;
       this.mergeBufferAcquisitionTimeNs = 0;
       this.maxMergeBufferAcquisitionTimeNs = 0;
+      this.totalMergeBufferUsedBytes = 0;
+      this.maxMergeBufferUsedBytes = 0;
       this.spilledQueries = 0;
       this.spilledBytes = 0;
       this.maxSpilledBytes = 0;
       this.mergeDictionarySize = 0;
       this.maxMergeDictionarySize = 0;
-
-      return aggregateStats;
     }
   }
 
   public static class PerQueryStats
   {
     private final AtomicLong mergeBufferAcquisitionTimeNs = new AtomicLong(0);
+    private final AtomicLong maxMergeBufferUsedBytes = new AtomicLong(0);
     private final AtomicLong spilledBytes = new AtomicLong(0);
     private final AtomicLong mergeDictionarySize = new AtomicLong(0);
 
@@ -198,6 +224,11 @@ public class GroupByStatsProvider
       mergeBufferAcquisitionTimeNs.addAndGet(delay);
     }
 
+    public void maxMergeBufferUsedBytes(long bytes)
+    {
+      maxMergeBufferUsedBytes.addAndGet(bytes);
+    }
+
     public void spilledBytes(long bytes)
     {
       spilledBytes.addAndGet(bytes);
@@ -213,6 +244,11 @@ public class GroupByStatsProvider
       return mergeBufferAcquisitionTimeNs.get();
     }
 
+    public long getMaxMergeBufferUsedBytes()
+    {
+      return maxMergeBufferUsedBytes.get();
+    }
+
     public long getSpilledBytes()
     {
       return spilledBytes.get();
diff --git 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java
 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java
index 70cf5832cf3..a5edb38cfa4 100644
--- 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java
+++ 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java
@@ -173,6 +173,18 @@ public abstract class AbstractBufferHashGrouper<KeyType> 
implements Grouper<KeyT
     aggregators.reset();
   }
 
+  /**
+   * Retrieves the size of the merge buffers used for this groupby query. This 
value is retrieved when
+   * {@link SpillingGrouper#close()} is called.
+   * <p></p>
+   * This method is implemented to return the highest memory value used, this 
is helpful especially in
+   * reporting the highest number of bytes used throughout the entire query 
lifecycle.
+   */
+  public long getMaxMergeBufferUsedBytes()
+  {
+    return hashTable.getMaxMergeBufferUsedBytes();
+  }
+
   /**
    * Populate a {@link ReusableEntry} with values from a particular bucket.
    */
diff --git 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java
 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java
index 4970ebe9e83..670a03cb2de 100644
--- 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java
+++ 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java
@@ -26,7 +26,6 @@ import 
org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.query.aggregation.AggregatorAdapters;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 
-import javax.annotation.Nullable;
 import java.nio.ByteBuffer;
 import java.util.AbstractList;
 import java.util.Collections;
@@ -50,7 +49,6 @@ public class BufferHashGrouper<KeyType> extends 
AbstractBufferHashGrouper<KeyTyp
   // to get a comparator that uses the ordering defined by the 
OrderByColumnSpec of a query.
   private final boolean useDefaultSorting;
 
-  @Nullable
   private ByteBufferIntList offsetList;
 
   public BufferHashGrouper(
@@ -154,6 +152,18 @@ public class BufferHashGrouper<KeyType> extends 
AbstractBufferHashGrouper<KeyTyp
     aggregators.reset();
   }
 
+  @Override
+  public long getMaxMergeBufferUsedBytes()
+  {
+    if (!initialized) {
+      return 0L;
+    }
+
+    long hashTableUsage = hashTable.getMaxMergeBufferUsedBytes();
+    long offSetListUsage = offsetList.getMaxMergeBufferUsedBytes();
+    return hashTableUsage + offSetListUsage;
+  }
+
   @Override
   public CloseableIterator<Entry<KeyType>> iterator(boolean sorted)
   {
diff --git 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferHashTable.java
 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferHashTable.java
index 9de58ca1b14..0f64d08613c 100644
--- 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferHashTable.java
+++ 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferHashTable.java
@@ -26,6 +26,14 @@ import javax.annotation.Nullable;
 
 import java.nio.ByteBuffer;
 
+/**
+ * A fixed-width, open-addressing hash table that lives inside a 
caller-provided byte buffer.
+ * <p>
+ * The table uses a contiguous slice of the input {@link ByteBuffer} as its 
backing store. Each bucket holds
+ * at most one entry, and occupies {@code bucketSizeWithHash} number of bytes. 
Collisions are resolved by continuously
+ * probing the next bucket to find an empty bucket to slot the new entry. The 
current table view {@code tableBuffer}
+ * is maintained as a {@link ByteBuffer} slice that moves and grows within the 
arena as the table expands.
+ */
 public class ByteBufferHashTable
 {
   public static int calculateTableArenaSizeWithPerBucketAdditionalSize(
@@ -79,6 +87,9 @@ public class ByteBufferHashTable
   @Nullable
   protected BucketUpdateHandler bucketUpdateHandler;
 
+  // Tracks maximum bytes used for the entire lifecycle of this hash table.
+  protected long maxMergeBufferUsedBytes;
+
   public ByteBufferHashTable(
       float maxLoadFactor,
       int initialBuckets,
@@ -97,6 +108,7 @@ public class ByteBufferHashTable
     this.maxSizeForTesting = maxSizeForTesting;
     this.tableArenaSize = buffer.capacity();
     this.bucketUpdateHandler = bucketUpdateHandler;
+    this.maxMergeBufferUsedBytes = 0;
   }
 
   public void reset()
@@ -139,6 +151,7 @@ public class ByteBufferHashTable
     bufferDup.position(tableStart);
     bufferDup.limit(tableStart + maxBuckets * bucketSizeWithHash);
     tableBuffer = bufferDup.slice();
+    updateMaxMergeBufferUsedBytes();
 
     // Clear used bits of new table
     for (int i = 0; i < maxBuckets; i++) {
@@ -245,6 +258,7 @@ public class ByteBufferHashTable
     tableBuffer.putInt(Groupers.getUsedFlag(keyHash));
     tableBuffer.put(keyBuffer);
     size++;
+    updateMaxMergeBufferUsedBytes();
 
     if (bucketUpdateHandler != null) {
       bucketUpdateHandler.handleNewBucket(offset);
@@ -427,6 +441,20 @@ public class ByteBufferHashTable
     return growthCount;
   }
 
+  /**
+   * To maintain an accurate tracking of the maximum bytes used per query, 
this function is to be called immediately
+   * whenever either of {@link #size} or {@link #bucketSizeWithHash} is 
changed.
+   */
+  protected void updateMaxMergeBufferUsedBytes()
+  {
+    maxMergeBufferUsedBytes = Math.max(maxMergeBufferUsedBytes, (long) size * 
bucketSizeWithHash);
+  }
+
+  public long getMaxMergeBufferUsedBytes()
+  {
+    return maxMergeBufferUsedBytes;
+  }
+
   public interface BucketUpdateHandler
   {
     void handleNewBucket(int bucketOffset);
diff --git 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferIntList.java
 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferIntList.java
index 28de255c13a..33a79451993 100644
--- 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferIntList.java
+++ 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferIntList.java
@@ -30,6 +30,8 @@ public class ByteBufferIntList
   private final int maxElements;
   private int numElements;
 
+  private int maxMergeBufferUsedBytes;
+
   public ByteBufferIntList(
       ByteBuffer buffer,
       int maxElements
@@ -38,6 +40,7 @@ public class ByteBufferIntList
     this.buffer = buffer;
     this.maxElements = maxElements;
     this.numElements = 0;
+    this.maxMergeBufferUsedBytes = 0;
 
     if (buffer.capacity() < (maxElements * Integer.BYTES)) {
       throw new IAE(
@@ -55,6 +58,7 @@ public class ByteBufferIntList
     }
     buffer.putInt(numElements * Integer.BYTES, val);
     numElements++;
+    maxMergeBufferUsedBytes = Math.max(maxMergeBufferUsedBytes, numElements * 
Integer.BYTES);
   }
 
   public void set(int index, int val)
@@ -71,4 +75,9 @@ public class ByteBufferIntList
   {
     numElements = 0;
   }
+
+  public int getMaxMergeBufferUsedBytes()
+  {
+    return maxMergeBufferUsedBytes;
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferMinMaxOffsetHeap.java
 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferMinMaxOffsetHeap.java
index cfa7295e6b4..ff2746bca29 100644
--- 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferMinMaxOffsetHeap.java
+++ 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferMinMaxOffsetHeap.java
@@ -44,6 +44,7 @@ public class ByteBufferMinMaxOffsetHeap
 
   private int heapSize;
   private int maxHeapSize;
+  private int maxMergeBufferUsedBytes;
 
   public ByteBufferMinMaxOffsetHeap(
       ByteBuffer buf,
@@ -55,6 +56,7 @@ public class ByteBufferMinMaxOffsetHeap
     this.buf = buf;
     this.limit = limit;
     this.heapSize = 0;
+    this.maxMergeBufferUsedBytes = 0;
     this.minComparator = minComparator;
     this.maxComparator = Ordering.from(minComparator).reverse();
     this.heapIndexUpdater = heapIndexUpdater;
@@ -71,9 +73,9 @@ public class ByteBufferMinMaxOffsetHeap
     int pos = heapSize;
     buf.putInt(pos * Integer.BYTES, offset);
     heapSize++;
-    if (heapSize > maxHeapSize) {
-      maxHeapSize = heapSize;
-    }
+
+    maxHeapSize = Math.max(maxHeapSize, heapSize);
+    maxMergeBufferUsedBytes = Math.max(maxMergeBufferUsedBytes, maxHeapSize * 
Integer.BYTES);
 
     if (heapIndexUpdater != null) {
       heapIndexUpdater.updateHeapIndexForOffset(offset, pos);
@@ -226,6 +228,11 @@ public class ByteBufferMinMaxOffsetHeap
     return heapSize;
   }
 
+  public int getMaxMergeBufferUsedBytes()
+  {
+    return maxMergeBufferUsedBytes;
+  }
+
   private void bubbleUp(int pos)
   {
     if (isEvenLevel(pos)) {
diff --git 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java
 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java
index 8242c9d8cf5..b4b4cb34701 100644
--- 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java
+++ 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java
@@ -332,7 +332,7 @@ public class ConcurrentGrouper<KeyType> implements 
Grouper<KeyType>
       throw new ISE("Grouper is closed");
     }
 
-    groupers.forEach(Grouper::reset);
+    groupers.forEach(SpillingGrouper::reset);
   }
 
   @Override
@@ -496,7 +496,7 @@ public class ConcurrentGrouper<KeyType> implements 
Grouper<KeyType>
   {
     if (!closed) {
       closed = true;
-      groupers.forEach(Grouper::close);
+      groupers.forEach(SpillingGrouper::close);
     }
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java
 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java
index 3c80de8f31e..2bb544d0cda 100644
--- 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java
+++ 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java
@@ -458,6 +458,18 @@ public class LimitedBufferHashGrouper<KeyType> extends 
AbstractBufferHashGrouper
     }
   }
 
+  @Override
+  public long getMaxMergeBufferUsedBytes()
+  {
+    if (!initialized) {
+      return 0L;
+    }
+
+    long hashTableUsage = super.getMaxMergeBufferUsedBytes();
+    long offSetHeapUsage = offsetHeap.getMaxMergeBufferUsedBytes();
+    return hashTableUsage + offSetHeapUsage;
+  }
+
   private class AlternatingByteBufferHashTable extends ByteBufferHashTable
   {
     // The base buffer is split into two alternating halves, with one 
sub-buffer in use at a given time.
@@ -509,6 +521,7 @@ public class LimitedBufferHashGrouper<KeyType> extends 
AbstractBufferHashGrouper
     public void reset()
     {
       size = 0;
+      updateMaxMergeBufferUsedBytes();
       growthCount = 0;
       // clear the used bits of the first buffer
       for (int i = 0; i < maxBuckets; i++) {
@@ -570,6 +583,7 @@ public class LimitedBufferHashGrouper<KeyType> extends 
AbstractBufferHashGrouper
       }
 
       size = numCopied;
+      updateMaxMergeBufferUsedBytes();
       tableBuffer = newTableBuffer;
       growthCount++;
     }
diff --git 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
index fadcfa02c95..688c9f06566 100644
--- 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
+++ 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
@@ -68,7 +68,7 @@ public class SpillingGrouper<KeyType> implements 
Grouper<KeyType>
       "Not enough disk space to execute this query. Try raising 
druid.query.groupBy.maxOnDiskStorage."
   );
 
-  private final Grouper<KeyType> grouper;
+  private final AbstractBufferHashGrouper<KeyType> grouper;
   private final KeySerde<KeyType> keySerde;
   private final LimitedTemporaryStorage temporaryStorage;
   private final ObjectMapper spillMapper;
@@ -218,12 +218,23 @@ public class SpillingGrouper<KeyType> implements 
Grouper<KeyType>
   @Override
   public void close()
   {
-    perQueryStats.dictionarySize(keySerde.getDictionarySize());
+    perQueryStats.dictionarySize(getDictionarySizeEstimate());
+    perQueryStats.maxMergeBufferUsedBytes(getMaxMergeBufferUsedBytes());
     grouper.close();
     keySerde.reset();
     deleteFiles();
   }
 
+  private long getMaxMergeBufferUsedBytes()
+  {
+    return grouper.isInitialized() ? grouper.getMaxMergeBufferUsedBytes() : 0L;
+  }
+
+  private long getDictionarySizeEstimate()
+  {
+    return keySerde.getDictionarySize();
+  }
+
   /**
    * Returns a dictionary of string keys added to this grouper.  Note that the 
dictionary of keySerde is spilled on
    * local storage whenever the inner grouper is spilled.  If there are 
spilled dictionaries, this method loads them
diff --git 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByStatsProviderTest.java
 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByStatsProviderTest.java
index 4a6cfb83d10..207b37d65f4 100644
--- 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByStatsProviderTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByStatsProviderTest.java
@@ -35,6 +35,7 @@ public class GroupByStatsProviderTest
 
     stats1.mergeBufferAcquisitionTime(300);
     stats1.mergeBufferAcquisitionTime(400);
+    stats1.maxMergeBufferUsedBytes(50);
     stats1.spilledBytes(200);
     stats1.spilledBytes(400);
     stats1.dictionarySize(100);
@@ -45,6 +46,7 @@ public class GroupByStatsProviderTest
 
     stats2.mergeBufferAcquisitionTime(500);
     stats2.mergeBufferAcquisitionTime(600);
+    stats2.maxMergeBufferUsedBytes(100);
     stats2.spilledBytes(400);
     stats2.spilledBytes(600);
     stats2.dictionarySize(300);
@@ -54,6 +56,8 @@ public class GroupByStatsProviderTest
     Assert.assertEquals(0L, aggregateStats.getMergeBufferQueries());
     Assert.assertEquals(0L, aggregateStats.getMergeBufferAcquisitionTimeNs());
     Assert.assertEquals(0L, 
aggregateStats.getMaxMergeBufferAcquisitionTimeNs());
+    Assert.assertEquals(0L, aggregateStats.getTotalMergeBufferUsedBytes());
+    Assert.assertEquals(0L, aggregateStats.getMaxMergeBufferUsedBytes());
     Assert.assertEquals(0L, aggregateStats.getSpilledQueries());
     Assert.assertEquals(0L, aggregateStats.getSpilledBytes());
     Assert.assertEquals(0L, aggregateStats.getMaxSpilledBytes());
@@ -67,6 +71,8 @@ public class GroupByStatsProviderTest
     Assert.assertEquals(2, aggregateStats.getMergeBufferQueries());
     Assert.assertEquals(1800L, 
aggregateStats.getMergeBufferAcquisitionTimeNs());
     Assert.assertEquals(1100L, 
aggregateStats.getMaxMergeBufferAcquisitionTimeNs());
+    Assert.assertEquals(150L, aggregateStats.getTotalMergeBufferUsedBytes());
+    Assert.assertEquals(100L, aggregateStats.getMaxMergeBufferUsedBytes());
     Assert.assertEquals(2L, aggregateStats.getSpilledQueries());
     Assert.assertEquals(1600L, aggregateStats.getSpilledBytes());
     Assert.assertEquals(1000L, aggregateStats.getMaxSpilledBytes());
@@ -74,7 +80,6 @@ public class GroupByStatsProviderTest
     Assert.assertEquals(700L, aggregateStats.getMaxMergeDictionarySize());
   }
 
-
   @Test
   public void testMetricsWithMultipleQueries()
   {
@@ -83,24 +88,28 @@ public class GroupByStatsProviderTest
     QueryResourceId r1 = new QueryResourceId("r1");
     GroupByStatsProvider.PerQueryStats stats1 = 
statsProvider.getPerQueryStatsContainer(r1);
     stats1.mergeBufferAcquisitionTime(2000);
+    stats1.maxMergeBufferUsedBytes(50);
     stats1.spilledBytes(100);
     stats1.dictionarySize(200);
 
     QueryResourceId r2 = new QueryResourceId("r2");
     GroupByStatsProvider.PerQueryStats stats2 = 
statsProvider.getPerQueryStatsContainer(r2);
     stats2.mergeBufferAcquisitionTime(100);
+    stats2.maxMergeBufferUsedBytes(500);
     stats2.spilledBytes(150);
     stats2.dictionarySize(250);
 
     QueryResourceId r3 = new QueryResourceId("r3");
     GroupByStatsProvider.PerQueryStats stats3 = 
statsProvider.getPerQueryStatsContainer(r3);
     stats3.mergeBufferAcquisitionTime(200);
+    stats3.maxMergeBufferUsedBytes(100);
     stats3.spilledBytes(3000);
     stats3.dictionarySize(300);
 
     QueryResourceId r4 = new QueryResourceId("r4");
     GroupByStatsProvider.PerQueryStats stats4 = 
statsProvider.getPerQueryStatsContainer(r4);
     stats4.mergeBufferAcquisitionTime(300);
+    stats4.maxMergeBufferUsedBytes(75);
     stats4.spilledBytes(200);
     stats4.dictionarySize(1500);
 
@@ -112,11 +121,13 @@ public class GroupByStatsProviderTest
     GroupByStatsProvider.AggregateStats aggregateStats = 
statsProvider.getStatsSince();
 
     Assert.assertEquals(2000L, 
aggregateStats.getMaxMergeBufferAcquisitionTimeNs());
+    Assert.assertEquals(500L, aggregateStats.getMaxMergeBufferUsedBytes());
     Assert.assertEquals(3000L, aggregateStats.getMaxSpilledBytes());
     Assert.assertEquals(1500L, aggregateStats.getMaxMergeDictionarySize());
 
     Assert.assertEquals(4L, aggregateStats.getMergeBufferQueries());
     Assert.assertEquals(2600L, 
aggregateStats.getMergeBufferAcquisitionTimeNs());
+    Assert.assertEquals(725L, aggregateStats.getTotalMergeBufferUsedBytes());
     Assert.assertEquals(4L, aggregateStats.getSpilledQueries());
     Assert.assertEquals(3450L, aggregateStats.getSpilledBytes());
     Assert.assertEquals(2250L, aggregateStats.getMergeDictionarySize());
diff --git 
a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouperTest.java
 
b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouperTest.java
index c96bc50dd78..ec5d64794cc 100644
--- 
a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouperTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouperTest.java
@@ -156,6 +156,61 @@ public class BufferHashGrouperTest extends 
InitializedNullHandlingTest
     }
   }
 
+  @Test
+  public void testMaxMergeBufferUsedBytes()
+  {
+    final GroupByTestColumnSelectorFactory columnSelectorFactory = 
GrouperTestUtil.newColumnSelectorFactory();
+    final BufferHashGrouper<IntKey> grouper = new BufferHashGrouper<>(
+        Suppliers.ofInstance(ByteBuffer.allocate(1000)),
+        GrouperTestUtil.intKeySerde(),
+        AggregatorAdapters.factorizeBuffered(
+            columnSelectorFactory,
+            ImmutableList.of(
+                new LongSumAggregatorFactory("valueSum", "value"),
+                new CountAggregatorFactory("count")
+            )
+        ),
+        Integer.MAX_VALUE,
+        0,
+        0,
+        true
+    );
+    grouper.init();
+
+    long initialUsage = grouper.getMaxMergeBufferUsedBytes();
+    Assert.assertEquals(0L, initialUsage);
+
+    columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 
10L)));
+
+    grouper.aggregate(new IntKey(1));
+    final long expectedBucketSize = grouper.getMaxMergeBufferUsedBytes();
+
+    grouper.aggregate(new IntKey(2));
+    grouper.aggregate(new IntKey(3));
+
+    Assert.assertEquals(3L * expectedBucketSize, 
grouper.getMaxMergeBufferUsedBytes());
+
+    grouper.aggregate(new IntKey(4));
+    grouper.aggregate(new IntKey(5));
+
+    Assert.assertEquals(5L * expectedBucketSize, 
grouper.getMaxMergeBufferUsedBytes());
+
+    grouper.reset();
+    Assert.assertEquals(0, grouper.getSize());
+    Assert.assertEquals(5L * expectedBucketSize, 
grouper.getMaxMergeBufferUsedBytes());
+
+    grouper.aggregate(new IntKey(1));
+    grouper.aggregate(new IntKey(6));
+    grouper.aggregate(new IntKey(7));
+    grouper.aggregate(new IntKey(8));
+    grouper.aggregate(new IntKey(9));
+    grouper.aggregate(new IntKey(10));
+
+    Assert.assertEquals(6L * expectedBucketSize, 
grouper.getMaxMergeBufferUsedBytes());
+
+    grouper.close();
+  }
+
   private ResourceHolder<Grouper<IntKey>> makeGrouper(
       GroupByTestColumnSelectorFactory columnSelectorFactory,
       int bufferSize,
diff --git 
a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouperTest.java
 
b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouperTest.java
index 07631840231..df9851ba829 100644
--- 
a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouperTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouperTest.java
@@ -307,6 +307,62 @@ public class LimitedBufferHashGrouperTest extends 
InitializedNullHandlingTest
     Assert.assertEquals(LIMIT, i);
   }
 
+  @Test
+  public void testMaxMergeBufferUsedBytesTracksMaxUsageAfterReset()
+  {
+    final GroupByTestColumnSelectorFactory columnSelectorFactory = 
GrouperTestUtil.newColumnSelectorFactory();
+    final LimitedBufferHashGrouper<IntKey> grouper = 
makeGrouper(columnSelectorFactory, 20000);
+
+    Assert.assertEquals(0L, grouper.getMaxMergeBufferUsedBytes());
+    columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 
10L)));
+
+    Assert.assertTrue(String.valueOf(KEY_BASE), grouper.aggregate(new 
IntKey(KEY_BASE)).isOk());
+    final long usagePerEntry = grouper.getMaxMergeBufferUsedBytes();
+
+    grouper.reset();
+    Assert.assertEquals(0, grouper.getSize());
+    Assert.assertEquals(usagePerEntry, grouper.getMaxMergeBufferUsedBytes());
+
+    // Add 10 entries after reset
+    for (int i = 0; i < 10; i++) {
+      Assert.assertTrue(String.valueOf(i + KEY_BASE), grouper.aggregate(new 
IntKey(i + KEY_BASE)).isOk());
+    }
+
+    Assert.assertEquals(10 * usagePerEntry, 
grouper.getMaxMergeBufferUsedBytes());
+  }
+
+  @Test
+  public void testMaxMergeBufferUsedBytesAfterBufferSwap()
+  {
+    // This test closely follows the flow of testLimitAndBufferSwapping().
+    final GroupByTestColumnSelectorFactory columnSelectorFactory = 
GrouperTestUtil.newColumnSelectorFactory();
+    final LimitedBufferHashGrouper<IntKey> grouper = 
makeGrouper(columnSelectorFactory, 20000);
+
+    columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 
10L)));
+
+    // Calculate usage per entry from first entry
+    Assert.assertTrue(String.valueOf(KEY_BASE), grouper.aggregate(new 
IntKey(KEY_BASE)).isOk());
+    final long usagePerEntry = grouper.getMaxMergeBufferUsedBytes();
+
+    // This results in 13 swaps and final size of 116 (100 keys + 16 new keys 
after last swap)
+    for (int i = 1; i < NUM_ROWS; i++) {
+      Assert.assertTrue(String.valueOf(i + KEY_BASE), grouper.aggregate(new 
IntKey(i + KEY_BASE)).isOk());
+    }
+
+    Assert.assertEquals(13, grouper.getGrowthCount());
+    Assert.assertEquals(116, grouper.getSize());
+    Assert.assertEquals(168, grouper.getMaxSize());
+
+    final long bucketSizeWithHash = usagePerEntry - Integer.BYTES;
+    final long hashTablePeak = grouper.getMaxSize() * bucketSizeWithHash;
+    // Heap can temporarily have LIMIT + 1 before removing one
+    final long heapPeak = ((long) LIMIT + 1) * Integer.BYTES;
+    // Peak usage is the sum of hash table peak and heap peak, which peak at 
different sizes...
+    final long expectedPeakUsage = hashTablePeak + heapPeak;
+
+    Assert.assertEquals(expectedPeakUsage, 
grouper.getMaxMergeBufferUsedBytes());
+  }
+
   private static LimitedBufferHashGrouper<IntKey> makeGrouper(
       GroupByTestColumnSelectorFactory columnSelectorFactory,
       int bufferSize
diff --git 
a/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java 
b/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java
index ecb702cc70d..e5f46020fe0 100644
--- 
a/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java
+++ 
b/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java
@@ -58,21 +58,16 @@ public class GroupByStatsMonitor extends AbstractMonitor
     final ServiceMetricEvent.Builder builder = new 
ServiceMetricEvent.Builder();
 
     emitter.emit(builder.setMetric("mergeBuffer/pendingRequests", 
mergeBufferPool.getPendingRequests()));
-
     emitter.emit(builder.setMetric("mergeBuffer/used", 
mergeBufferPool.getUsedResourcesCount()));
 
     GroupByStatsProvider.AggregateStats statsContainer = 
groupByStatsProvider.getStatsSince();
 
     if (statsContainer.getMergeBufferQueries() > 0) {
       emitter.emit(builder.setMetric("mergeBuffer/queries", 
statsContainer.getMergeBufferQueries()));
-      emitter.emit(builder.setMetric(
-          "mergeBuffer/acquisitionTimeNs",
-          statsContainer.getMergeBufferAcquisitionTimeNs()
-      ));
-      emitter.emit(builder.setMetric(
-          "mergeBuffer/maxAcquisitionTimeNs",
-          statsContainer.getMaxMergeBufferAcquisitionTimeNs()
-      ));
+      emitter.emit(builder.setMetric("mergeBuffer/acquisitionTimeNs", 
statsContainer.getMergeBufferAcquisitionTimeNs()));
+      emitter.emit(builder.setMetric("mergeBuffer/maxAcquisitionTimeNs", 
statsContainer.getMaxMergeBufferAcquisitionTimeNs()));
+      emitter.emit(builder.setMetric("mergeBuffer/bytesUsed", 
statsContainer.getTotalMergeBufferUsedBytes()));
+      emitter.emit(builder.setMetric("mergeBuffer/maxBytesUsed", 
statsContainer.getMaxMergeBufferUsedBytes()));
     }
 
     if (statsContainer.getSpilledQueries() > 0) {
diff --git 
a/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java
 
b/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java
index 8c9d4dc4c81..eaca043e02e 100644
--- 
a/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java
@@ -61,6 +61,8 @@ public class GroupByStatsMonitorTest
             1L,
             100L,
             100L,
+            200L,
+            200L,
             2L,
             200L,
             200L,
@@ -70,7 +72,7 @@ public class GroupByStatsMonitorTest
       }
     };
 
-    mergeBufferPool = new DefaultBlockingPool(() -> ByteBuffer.allocate(1024), 
5);
+    mergeBufferPool = new DefaultBlockingPool<>(() -> 
ByteBuffer.allocate(1024), 5);
     executorService = Executors.newSingleThreadExecutor();
   }
 
@@ -83,8 +85,7 @@ public class GroupByStatsMonitorTest
   @Test
   public void testMonitor()
   {
-    final GroupByStatsMonitor monitor =
-        new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool);
+    final GroupByStatsMonitor monitor = new 
GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool);
     final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
     emitter.start();
     monitor.doMonitor(emitter);
@@ -92,12 +93,14 @@ public class GroupByStatsMonitorTest
     // Trigger metric emission
     monitor.doMonitor(emitter);
 
-    Assert.assertEquals(10, emitter.getNumEmittedEvents());
+    Assert.assertEquals(12, emitter.getNumEmittedEvents());
     emitter.verifyValue("mergeBuffer/pendingRequests", 0L);
     emitter.verifyValue("mergeBuffer/used", 0L);
     emitter.verifyValue("mergeBuffer/queries", 1L);
     emitter.verifyValue("mergeBuffer/acquisitionTimeNs", 100L);
     emitter.verifyValue("mergeBuffer/maxAcquisitionTimeNs", 100L);
+    emitter.verifyValue("mergeBuffer/bytesUsed", 200L);
+    emitter.verifyValue("mergeBuffer/maxBytesUsed", 200L);
     emitter.verifyValue("groupBy/spilledQueries", 2L);
     emitter.verifyValue("groupBy/spilledBytes", 200L);
     emitter.verifyValue("groupBy/maxSpilledBytes", 200L);
@@ -112,15 +115,11 @@ public class GroupByStatsMonitorTest
     final String taskId = "taskId1";
     final String groupId = "test_groupid";
     final String taskType = "test_tasktype";
-    final GroupByStatsMonitor monitor = new GroupByStatsMonitor(
-        groupByStatsProvider,
-        mergeBufferPool
-    );
+    final GroupByStatsMonitor monitor = new 
GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool);
     final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host", new TestTaskHolder(dataSource, taskId, taskType, groupId));
     emitter.start();
     monitor.doMonitor(emitter);
     emitter.flush();
-    // Trigger metric emission
     monitor.doMonitor(emitter);
 
     final Map<String, Object> dimFilters = Map.of(
@@ -136,6 +135,8 @@ public class GroupByStatsMonitorTest
     verifyMetricValue(emitter, "mergeBuffer/queries", dimFilters, 1L);
     verifyMetricValue(emitter, "mergeBuffer/acquisitionTimeNs", dimFilters, 
100L);
     verifyMetricValue(emitter, "mergeBuffer/maxAcquisitionTimeNs", dimFilters, 
100L);
+    verifyMetricValue(emitter, "mergeBuffer/bytesUsed", dimFilters, 200L);
+    verifyMetricValue(emitter, "mergeBuffer/maxBytesUsed", dimFilters, 200L);
     verifyMetricValue(emitter, "groupBy/spilledQueries", dimFilters, 2L);
     verifyMetricValue(emitter, "groupBy/spilledBytes", dimFilters, 200L);
     verifyMetricValue(emitter, "groupBy/maxSpilledBytes", dimFilters, 200L);
@@ -151,8 +152,7 @@ public class GroupByStatsMonitorTest
       mergeBufferPool.takeBatch(4);
     }).get(20, TimeUnit.SECONDS);
 
-    final GroupByStatsMonitor monitor =
-        new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool);
+    final GroupByStatsMonitor monitor = new 
GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool);
     final StubServiceEmitter emitter = new StubServiceEmitter("DummyService", 
"DummyHost");
     boolean ret = monitor.doMonitor(emitter);
     Assert.assertTrue(ret);
@@ -180,8 +180,7 @@ public class GroupByStatsMonitorTest
         }
       }
 
-      final GroupByStatsMonitor monitor =
-          new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool);
+      final GroupByStatsMonitor monitor = new 
GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool);
       final StubServiceEmitter emitter = new 
StubServiceEmitter("DummyService", "DummyHost");
       boolean ret = monitor.doMonitor(emitter);
       Assert.assertTrue(ret);
@@ -203,18 +202,21 @@ public class GroupByStatsMonitorTest
     QueryResourceId r1 = new QueryResourceId("r1");
     GroupByStatsProvider.PerQueryStats stats1 = 
statsProvider.getPerQueryStatsContainer(r1);
     stats1.mergeBufferAcquisitionTime(100);
+    stats1.maxMergeBufferUsedBytes(50);
     stats1.spilledBytes(200);
     stats1.dictionarySize(100);
 
     QueryResourceId r2 = new QueryResourceId("r2");
     GroupByStatsProvider.PerQueryStats stats2 = 
statsProvider.getPerQueryStatsContainer(r2);
     stats2.mergeBufferAcquisitionTime(500);
+    stats2.maxMergeBufferUsedBytes(30);
     stats2.spilledBytes(100);
     stats2.dictionarySize(300);
 
     QueryResourceId r3 = new QueryResourceId("r3");
     GroupByStatsProvider.PerQueryStats stats3 = 
statsProvider.getPerQueryStatsContainer(r3);
     stats3.mergeBufferAcquisitionTime(200);
+    stats3.maxMergeBufferUsedBytes(150);
     stats3.spilledBytes(800);
     stats3.dictionarySize(200);
 
@@ -230,11 +232,13 @@ public class GroupByStatsMonitorTest
 
     emitter.verifyValue("mergeBuffer/queries", 3L);
     emitter.verifyValue("mergeBuffer/acquisitionTimeNs", 800L);
+    emitter.verifyValue("mergeBuffer/bytesUsed", 230L);
     emitter.verifyValue("groupBy/spilledQueries", 3L);
     emitter.verifyValue("groupBy/spilledBytes", 1100L);
     emitter.verifyValue("groupBy/mergeDictionarySize", 600L);
 
     emitter.verifyValue("mergeBuffer/maxAcquisitionTimeNs", 500L);
+    emitter.verifyValue("mergeBuffer/maxBytesUsed", 150L);
     emitter.verifyValue("groupBy/maxSpilledBytes", 800L);
     emitter.verifyValue("groupBy/maxMergeDictionarySize", 300L);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to