This is an automated email from the ASF dual-hosted git repository.
gwphua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new de983e485c6 [Groupby Query Metrics] Add merge buffer tracking (#18731)
de983e485c6 is described below
commit de983e485c6b900605220c8d231dbec7c0698ef0
Author: Virushade <[email protected]>
AuthorDate: Wed Mar 4 10:34:00 2026 +0800
[Groupby Query Metrics] Add merge buffer tracking (#18731)
* Add byte buffer tracking for underlying hash tables
* Byte buffer tracking for underlying offset handlers
* Fix tests
* Fix quidem tests
* Documentation
* bytesUsed naming
* Add max metrics
* Add missing calculation in BufferHashGrouper
* Checkstyle
* Checkstyle
* GroupByStatsProvider javadocs
* Fix GroupByStatsProviderTest comments
* Fix doc order for GroupByStatsProvider metrics
* Fix test for GroupByStatsMonitorTest
* Update
server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java
Co-authored-by: Abhishek Radhakrishnan <[email protected]>
* Revert stylistic changes in BufferHashGrouper
* Rename mergeBufferUsage to mergeBufferUsedBytes
* Order of maxAcquisitionTimeNs
* Track the open addressing hash table
* Remove max metrics, push them in another PR...
* Remove max metrics in GroupByStatsProviderTest
* LimitedBufferHashGrouper to use parent method to report
maxTableBufferUsedBytes
* Standardised merge buffer names
* Tests for buffer hash grouper
Tests for buffer hash grouper
* Address multiplication cast
* Javadocs for getMergeBufferUsedBytes
* Remix comments in test for peak calculations
* Clean up after merging conflicts
* Standardize maxMergeBufferUsedBytes
* Test duplicate buffer adds
* Test
* Add javadocs for update
---------
Co-authored-by: Abhishek Radhakrishnan <[email protected]>
---
docs/operations/metrics.md | 6 ++
.../druid/query/groupby/GroupByStatsProvider.java | 70 ++++++++++++++++------
.../epinephelinae/AbstractBufferHashGrouper.java | 12 ++++
.../groupby/epinephelinae/BufferHashGrouper.java | 14 ++++-
.../groupby/epinephelinae/ByteBufferHashTable.java | 28 +++++++++
.../groupby/epinephelinae/ByteBufferIntList.java | 9 +++
.../epinephelinae/ByteBufferMinMaxOffsetHeap.java | 13 +++-
.../groupby/epinephelinae/ConcurrentGrouper.java | 4 +-
.../epinephelinae/LimitedBufferHashGrouper.java | 14 +++++
.../groupby/epinephelinae/SpillingGrouper.java | 15 ++++-
.../query/groupby/GroupByStatsProviderTest.java | 13 +++-
.../epinephelinae/BufferHashGrouperTest.java | 55 +++++++++++++++++
.../LimitedBufferHashGrouperTest.java | 56 +++++++++++++++++
.../druid/server/metrics/GroupByStatsMonitor.java | 13 ++--
.../server/metrics/GroupByStatsMonitorTest.java | 30 ++++++----
15 files changed, 303 insertions(+), 49 deletions(-)
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 523ea15185b..4e422696fc8 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -90,6 +90,8 @@ Most metric values reset each emission period, as specified
in `druid.monitoring
|`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of
buffers from the merge buffer pool.|This metric is only available if the
`GroupByStatsMonitor` module is included.|Depends on the number of groupBy
queries needing merge buffers.|
|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge
buffer for groupBy queries.|This metric is only available if the
`GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire
merge buffer for any single groupBy query within the emission period.|This
metric is only available if the `GroupByStatsMonitor` module is
included.|Varies|
+|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process
groupBy queries.|This metric is only available if the `GroupByStatsMonitor`
module is included.|Varies|
+|`mergeBuffer/maxBytesUsed`|Maximum number of bytes used by merge buffers for
any single groupBy query within the emission period.|This metric is only
available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the
disk.|This metric is only available if the `GroupByStatsMonitor` module is
included.|Varies|
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy
queries.|This metric is only available if the `GroupByStatsMonitor` module is
included.|Varies|
|`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any
single groupBy query within the emission period.|This metric is only available
if the `GroupByStatsMonitor` module is included.|Varies|
@@ -117,6 +119,8 @@ Most metric values reset each emission period, as specified
in `druid.monitoring
|`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of
buffers from the merge buffer pool.|This metric is only available if the
`GroupByStatsMonitor` module is included.|Depends on the number of groupBy
queries needing merge buffers.|
|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge
buffer for groupBy queries.|This metric is only available if the
`GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire
merge buffer for any single groupBy query within the emission period.|This
metric is only available if the `GroupByStatsMonitor` module is
included.|Varies|
+|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process
groupBy queries.|This metric is only available if the `GroupByStatsMonitor`
module is included.|Varies|
+|`mergeBuffer/maxBytesUsed`|Maximum number of bytes used by merge buffers for
any single groupBy query within the emission period.|This metric is only
available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the
disk.|This metric is only available if the `GroupByStatsMonitor` module is
included.|Varies|
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy
queries.|This metric is only available if the `GroupByStatsMonitor` module is
included.|Varies|
|`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any
single groupBy query within the emission period.|This metric is only available
if the `GroupByStatsMonitor` module is included.|Varies|
@@ -147,6 +151,8 @@ to represent the task ID are deprecated and will be removed
in a future release.
|`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of
buffers from the merge buffer pool. This metric is only available if the
`GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Depends on the
number of groupBy queries needing merge buffers.|
|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge
buffer for groupBy queries. This metric is only available if the
`GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire
merge buffer for any single groupBy query within the emission period. This
metric is only available if the `GroupByStatsMonitor` module is
included.|`dataSource`, `taskId`|Varies|
+|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process
groupBy queries.|This metric is only available if the `GroupByStatsMonitor`
module is included.|`dataSource`, `taskId`|Varies|
+|`mergeBuffer/maxBytesUsed`|Maximum number of bytes used by merge buffers for
any single groupBy query within the emission period. This metric is only
available if the `GroupByStatsMonitor` module is included.|`dataSource`,
`taskId`|Varies|
|`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the
disk. This metric is only available if the `GroupByStatsMonitor` module is
included.|`dataSource`, `taskId`|Varies|
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy
queries. This metric is only available if the `GroupByStatsMonitor` module is
included.|`dataSource`, `taskId`|Varies|
|`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any
single groupBy query within the emission period. This metric is only available
if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java
index 51f56400555..f6b92a7b62c 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java
@@ -27,7 +27,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
/**
- * Metrics collector for groupBy queries like spilled bytes, merge buffer
acquistion time, dictionary size.
+ * Collects groupBy query metrics (spilled bytes, merge buffer usage,
dictionary size) per-query, then
+ * aggregates them when queries complete. Stats are retrieved and reset
periodically via {@link #getStatsSince()}.
*/
@LazySingleton
public class GroupByStatsProvider
@@ -60,7 +61,9 @@ public class GroupByStatsProvider
public synchronized AggregateStats getStatsSince()
{
- return aggregateStatsContainer.reset();
+ AggregateStats aggregateStats = new
AggregateStats(aggregateStatsContainer);
+ aggregateStatsContainer.reset();
+ return aggregateStats;
}
public static class AggregateStats
@@ -68,6 +71,8 @@ public class GroupByStatsProvider
private long mergeBufferQueries = 0;
private long mergeBufferAcquisitionTimeNs = 0;
private long maxMergeBufferAcquisitionTimeNs = 0;
+ private long totalMergeBufferUsedBytes = 0;
+ private long maxMergeBufferUsedBytes = 0;
private long spilledQueries = 0;
private long spilledBytes = 0;
private long maxSpilledBytes = 0;
@@ -78,10 +83,28 @@ public class GroupByStatsProvider
{
}
+ public AggregateStats(AggregateStats aggregateStats)
+ {
+ this(
+ aggregateStats.mergeBufferQueries,
+ aggregateStats.mergeBufferAcquisitionTimeNs,
+ aggregateStats.maxMergeBufferAcquisitionTimeNs,
+ aggregateStats.totalMergeBufferUsedBytes,
+ aggregateStats.maxMergeBufferUsedBytes,
+ aggregateStats.spilledQueries,
+ aggregateStats.spilledBytes,
+ aggregateStats.maxSpilledBytes,
+ aggregateStats.mergeDictionarySize,
+ aggregateStats.maxMergeDictionarySize
+ );
+ }
+
public AggregateStats(
long mergeBufferQueries,
long mergeBufferAcquisitionTimeNs,
long maxMergeBufferAcquisitionTimeNs,
+ long totalMergeBufferUsedBytes,
+ long maxMergeBufferUsedBytes,
long spilledQueries,
long spilledBytes,
long maxSpilledBytes,
@@ -92,6 +115,8 @@ public class GroupByStatsProvider
this.mergeBufferQueries = mergeBufferQueries;
this.mergeBufferAcquisitionTimeNs = mergeBufferAcquisitionTimeNs;
this.maxMergeBufferAcquisitionTimeNs = maxMergeBufferAcquisitionTimeNs;
+ this.totalMergeBufferUsedBytes = totalMergeBufferUsedBytes;
+ this.maxMergeBufferUsedBytes = maxMergeBufferUsedBytes;
this.spilledQueries = spilledQueries;
this.spilledBytes = spilledBytes;
this.maxSpilledBytes = maxSpilledBytes;
@@ -114,6 +139,16 @@ public class GroupByStatsProvider
return maxMergeBufferAcquisitionTimeNs;
}
+ public long getTotalMergeBufferUsedBytes()
+ {
+ return totalMergeBufferUsedBytes;
+ }
+
+ public long getMaxMergeBufferUsedBytes()
+ {
+ return maxMergeBufferUsedBytes;
+ }
+
public long getSpilledQueries()
{
return spilledQueries;
@@ -148,6 +183,8 @@ public class GroupByStatsProvider
maxMergeBufferAcquisitionTimeNs,
perQueryStats.getMergeBufferAcquisitionTimeNs()
);
+ totalMergeBufferUsedBytes +=
perQueryStats.getMaxMergeBufferUsedBytes();
+ maxMergeBufferUsedBytes = Math.max(maxMergeBufferUsedBytes,
perQueryStats.getMaxMergeBufferUsedBytes());
}
if (perQueryStats.getSpilledBytes() > 0) {
@@ -160,36 +197,25 @@ public class GroupByStatsProvider
maxMergeDictionarySize = Math.max(maxMergeDictionarySize,
perQueryStats.getMergeDictionarySize());
}
- public AggregateStats reset()
+ public void reset()
{
- AggregateStats aggregateStats =
- new AggregateStats(
- mergeBufferQueries,
- mergeBufferAcquisitionTimeNs,
- maxMergeBufferAcquisitionTimeNs,
- spilledQueries,
- spilledBytes,
- maxSpilledBytes,
- mergeDictionarySize,
- maxMergeDictionarySize
- );
-
this.mergeBufferQueries = 0;
this.mergeBufferAcquisitionTimeNs = 0;
this.maxMergeBufferAcquisitionTimeNs = 0;
+ this.totalMergeBufferUsedBytes = 0;
+ this.maxMergeBufferUsedBytes = 0;
this.spilledQueries = 0;
this.spilledBytes = 0;
this.maxSpilledBytes = 0;
this.mergeDictionarySize = 0;
this.maxMergeDictionarySize = 0;
-
- return aggregateStats;
}
}
public static class PerQueryStats
{
private final AtomicLong mergeBufferAcquisitionTimeNs = new AtomicLong(0);
+ private final AtomicLong maxMergeBufferUsedBytes = new AtomicLong(0);
private final AtomicLong spilledBytes = new AtomicLong(0);
private final AtomicLong mergeDictionarySize = new AtomicLong(0);
@@ -198,6 +224,11 @@ public class GroupByStatsProvider
mergeBufferAcquisitionTimeNs.addAndGet(delay);
}
+ public void maxMergeBufferUsedBytes(long bytes)
+ {
+ maxMergeBufferUsedBytes.addAndGet(bytes);
+ }
+
public void spilledBytes(long bytes)
{
spilledBytes.addAndGet(bytes);
@@ -213,6 +244,11 @@ public class GroupByStatsProvider
return mergeBufferAcquisitionTimeNs.get();
}
+ public long getMaxMergeBufferUsedBytes()
+ {
+ return maxMergeBufferUsedBytes.get();
+ }
+
public long getSpilledBytes()
{
return spilledBytes.get();
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java
index 70cf5832cf3..a5edb38cfa4 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java
@@ -173,6 +173,18 @@ public abstract class AbstractBufferHashGrouper<KeyType>
implements Grouper<KeyT
aggregators.reset();
}
+ /**
+ * Retrieves the size of the merge buffers used for this groupby query. This
value is retrieved when
+ * {@link SpillingGrouper#close()} is called.
+ * <p></p>
+ * This method is implemented to return the highest memory value used, this
is helpful especially in
+ * reporting the highest number of bytes used throughout the entire query
lifecycle.
+ */
+ public long getMaxMergeBufferUsedBytes()
+ {
+ return hashTable.getMaxMergeBufferUsedBytes();
+ }
+
/**
* Populate a {@link ReusableEntry} with values from a particular bucket.
*/
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java
index 4970ebe9e83..670a03cb2de 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java
@@ -26,7 +26,6 @@ import
org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.aggregation.AggregatorAdapters;
import org.apache.druid.query.aggregation.AggregatorFactory;
-import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.AbstractList;
import java.util.Collections;
@@ -50,7 +49,6 @@ public class BufferHashGrouper<KeyType> extends
AbstractBufferHashGrouper<KeyTyp
// to get a comparator that uses the ordering defined by the
OrderByColumnSpec of a query.
private final boolean useDefaultSorting;
- @Nullable
private ByteBufferIntList offsetList;
public BufferHashGrouper(
@@ -154,6 +152,18 @@ public class BufferHashGrouper<KeyType> extends
AbstractBufferHashGrouper<KeyTyp
aggregators.reset();
}
+ @Override
+ public long getMaxMergeBufferUsedBytes()
+ {
+ if (!initialized) {
+ return 0L;
+ }
+
+ long hashTableUsage = hashTable.getMaxMergeBufferUsedBytes();
+ long offSetListUsage = offsetList.getMaxMergeBufferUsedBytes();
+ return hashTableUsage + offSetListUsage;
+ }
+
@Override
public CloseableIterator<Entry<KeyType>> iterator(boolean sorted)
{
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferHashTable.java
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferHashTable.java
index 9de58ca1b14..0f64d08613c 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferHashTable.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferHashTable.java
@@ -26,6 +26,14 @@ import javax.annotation.Nullable;
import java.nio.ByteBuffer;
+/**
+ * A fixed-width, open-addressing hash table that lives inside a
caller-provided byte buffer.
+ * <p>
+ * The table uses a contiguous slice of the input {@link ByteBuffer} as its
backing store. Each bucket holds
+ * at most one entry, and occupies {@code bucketSizeWithHash} number of bytes.
Collisions are resolved by continuously
+ * probing the next bucket to find an empty bucket to slot the new entry. The
current table view {@code tableBuffer}
+ * is maintained as a {@link ByteBuffer} slice that moves and grows within the
arena as the table expands.
+ */
public class ByteBufferHashTable
{
public static int calculateTableArenaSizeWithPerBucketAdditionalSize(
@@ -79,6 +87,9 @@ public class ByteBufferHashTable
@Nullable
protected BucketUpdateHandler bucketUpdateHandler;
+ // Tracks maximum bytes used for the entire lifecycle of this hash table.
+ protected long maxMergeBufferUsedBytes;
+
public ByteBufferHashTable(
float maxLoadFactor,
int initialBuckets,
@@ -97,6 +108,7 @@ public class ByteBufferHashTable
this.maxSizeForTesting = maxSizeForTesting;
this.tableArenaSize = buffer.capacity();
this.bucketUpdateHandler = bucketUpdateHandler;
+ this.maxMergeBufferUsedBytes = 0;
}
public void reset()
@@ -139,6 +151,7 @@ public class ByteBufferHashTable
bufferDup.position(tableStart);
bufferDup.limit(tableStart + maxBuckets * bucketSizeWithHash);
tableBuffer = bufferDup.slice();
+ updateMaxMergeBufferUsedBytes();
// Clear used bits of new table
for (int i = 0; i < maxBuckets; i++) {
@@ -245,6 +258,7 @@ public class ByteBufferHashTable
tableBuffer.putInt(Groupers.getUsedFlag(keyHash));
tableBuffer.put(keyBuffer);
size++;
+ updateMaxMergeBufferUsedBytes();
if (bucketUpdateHandler != null) {
bucketUpdateHandler.handleNewBucket(offset);
@@ -427,6 +441,20 @@ public class ByteBufferHashTable
return growthCount;
}
+ /**
+ * To maintain an accurate tracking of the maximum bytes used per query,
this function is to be called immediately
+ * whenever either of {@link #size} or {@link #bucketSizeWithHash} is
changed.
+ */
+ protected void updateMaxMergeBufferUsedBytes()
+ {
+ maxMergeBufferUsedBytes = Math.max(maxMergeBufferUsedBytes, (long) size *
bucketSizeWithHash);
+ }
+
+ public long getMaxMergeBufferUsedBytes()
+ {
+ return maxMergeBufferUsedBytes;
+ }
+
public interface BucketUpdateHandler
{
void handleNewBucket(int bucketOffset);
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferIntList.java
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferIntList.java
index 28de255c13a..33a79451993 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferIntList.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferIntList.java
@@ -30,6 +30,8 @@ public class ByteBufferIntList
private final int maxElements;
private int numElements;
+ private int maxMergeBufferUsedBytes;
+
public ByteBufferIntList(
ByteBuffer buffer,
int maxElements
@@ -38,6 +40,7 @@ public class ByteBufferIntList
this.buffer = buffer;
this.maxElements = maxElements;
this.numElements = 0;
+ this.maxMergeBufferUsedBytes = 0;
if (buffer.capacity() < (maxElements * Integer.BYTES)) {
throw new IAE(
@@ -55,6 +58,7 @@ public class ByteBufferIntList
}
buffer.putInt(numElements * Integer.BYTES, val);
numElements++;
+ maxMergeBufferUsedBytes = Math.max(maxMergeBufferUsedBytes, numElements *
Integer.BYTES);
}
public void set(int index, int val)
@@ -71,4 +75,9 @@ public class ByteBufferIntList
{
numElements = 0;
}
+
+ public int getMaxMergeBufferUsedBytes()
+ {
+ return maxMergeBufferUsedBytes;
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferMinMaxOffsetHeap.java
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferMinMaxOffsetHeap.java
index cfa7295e6b4..ff2746bca29 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferMinMaxOffsetHeap.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferMinMaxOffsetHeap.java
@@ -44,6 +44,7 @@ public class ByteBufferMinMaxOffsetHeap
private int heapSize;
private int maxHeapSize;
+ private int maxMergeBufferUsedBytes;
public ByteBufferMinMaxOffsetHeap(
ByteBuffer buf,
@@ -55,6 +56,7 @@ public class ByteBufferMinMaxOffsetHeap
this.buf = buf;
this.limit = limit;
this.heapSize = 0;
+ this.maxMergeBufferUsedBytes = 0;
this.minComparator = minComparator;
this.maxComparator = Ordering.from(minComparator).reverse();
this.heapIndexUpdater = heapIndexUpdater;
@@ -71,9 +73,9 @@ public class ByteBufferMinMaxOffsetHeap
int pos = heapSize;
buf.putInt(pos * Integer.BYTES, offset);
heapSize++;
- if (heapSize > maxHeapSize) {
- maxHeapSize = heapSize;
- }
+
+ maxHeapSize = Math.max(maxHeapSize, heapSize);
+ maxMergeBufferUsedBytes = Math.max(maxMergeBufferUsedBytes, maxHeapSize *
Integer.BYTES);
if (heapIndexUpdater != null) {
heapIndexUpdater.updateHeapIndexForOffset(offset, pos);
@@ -226,6 +228,11 @@ public class ByteBufferMinMaxOffsetHeap
return heapSize;
}
+ public int getMaxMergeBufferUsedBytes()
+ {
+ return maxMergeBufferUsedBytes;
+ }
+
private void bubbleUp(int pos)
{
if (isEvenLevel(pos)) {
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java
index 8242c9d8cf5..b4b4cb34701 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java
@@ -332,7 +332,7 @@ public class ConcurrentGrouper<KeyType> implements
Grouper<KeyType>
throw new ISE("Grouper is closed");
}
- groupers.forEach(Grouper::reset);
+ groupers.forEach(SpillingGrouper::reset);
}
@Override
@@ -496,7 +496,7 @@ public class ConcurrentGrouper<KeyType> implements
Grouper<KeyType>
{
if (!closed) {
closed = true;
- groupers.forEach(Grouper::close);
+ groupers.forEach(SpillingGrouper::close);
}
}
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java
index 3c80de8f31e..2bb544d0cda 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java
@@ -458,6 +458,18 @@ public class LimitedBufferHashGrouper<KeyType> extends
AbstractBufferHashGrouper
}
}
+ @Override
+ public long getMaxMergeBufferUsedBytes()
+ {
+ if (!initialized) {
+ return 0L;
+ }
+
+ long hashTableUsage = super.getMaxMergeBufferUsedBytes();
+ long offSetHeapUsage = offsetHeap.getMaxMergeBufferUsedBytes();
+ return hashTableUsage + offSetHeapUsage;
+ }
+
private class AlternatingByteBufferHashTable extends ByteBufferHashTable
{
// The base buffer is split into two alternating halves, with one
sub-buffer in use at a given time.
@@ -509,6 +521,7 @@ public class LimitedBufferHashGrouper<KeyType> extends
AbstractBufferHashGrouper
public void reset()
{
size = 0;
+ updateMaxMergeBufferUsedBytes();
growthCount = 0;
// clear the used bits of the first buffer
for (int i = 0; i < maxBuckets; i++) {
@@ -570,6 +583,7 @@ public class LimitedBufferHashGrouper<KeyType> extends
AbstractBufferHashGrouper
}
size = numCopied;
+ updateMaxMergeBufferUsedBytes();
tableBuffer = newTableBuffer;
growthCount++;
}
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
index fadcfa02c95..688c9f06566 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
@@ -68,7 +68,7 @@ public class SpillingGrouper<KeyType> implements
Grouper<KeyType>
"Not enough disk space to execute this query. Try raising
druid.query.groupBy.maxOnDiskStorage."
);
- private final Grouper<KeyType> grouper;
+ private final AbstractBufferHashGrouper<KeyType> grouper;
private final KeySerde<KeyType> keySerde;
private final LimitedTemporaryStorage temporaryStorage;
private final ObjectMapper spillMapper;
@@ -218,12 +218,23 @@ public class SpillingGrouper<KeyType> implements
Grouper<KeyType>
@Override
public void close()
{
- perQueryStats.dictionarySize(keySerde.getDictionarySize());
+ perQueryStats.dictionarySize(getDictionarySizeEstimate());
+ perQueryStats.maxMergeBufferUsedBytes(getMaxMergeBufferUsedBytes());
grouper.close();
keySerde.reset();
deleteFiles();
}
+ private long getMaxMergeBufferUsedBytes()
+ {
+ return grouper.isInitialized() ? grouper.getMaxMergeBufferUsedBytes() : 0L;
+ }
+
+ private long getDictionarySizeEstimate()
+ {
+ return keySerde.getDictionarySize();
+ }
+
/**
* Returns a dictionary of string keys added to this grouper. Note that the
dictionary of keySerde is spilled on
* local storage whenever the inner grouper is spilled. If there are
spilled dictionaries, this method loads them
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByStatsProviderTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByStatsProviderTest.java
index 4a6cfb83d10..207b37d65f4 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByStatsProviderTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByStatsProviderTest.java
@@ -35,6 +35,7 @@ public class GroupByStatsProviderTest
stats1.mergeBufferAcquisitionTime(300);
stats1.mergeBufferAcquisitionTime(400);
+ stats1.maxMergeBufferUsedBytes(50);
stats1.spilledBytes(200);
stats1.spilledBytes(400);
stats1.dictionarySize(100);
@@ -45,6 +46,7 @@ public class GroupByStatsProviderTest
stats2.mergeBufferAcquisitionTime(500);
stats2.mergeBufferAcquisitionTime(600);
+ stats2.maxMergeBufferUsedBytes(100);
stats2.spilledBytes(400);
stats2.spilledBytes(600);
stats2.dictionarySize(300);
@@ -54,6 +56,8 @@ public class GroupByStatsProviderTest
Assert.assertEquals(0L, aggregateStats.getMergeBufferQueries());
Assert.assertEquals(0L, aggregateStats.getMergeBufferAcquisitionTimeNs());
Assert.assertEquals(0L,
aggregateStats.getMaxMergeBufferAcquisitionTimeNs());
+ Assert.assertEquals(0L, aggregateStats.getTotalMergeBufferUsedBytes());
+ Assert.assertEquals(0L, aggregateStats.getMaxMergeBufferUsedBytes());
Assert.assertEquals(0L, aggregateStats.getSpilledQueries());
Assert.assertEquals(0L, aggregateStats.getSpilledBytes());
Assert.assertEquals(0L, aggregateStats.getMaxSpilledBytes());
@@ -67,6 +71,8 @@ public class GroupByStatsProviderTest
Assert.assertEquals(2, aggregateStats.getMergeBufferQueries());
Assert.assertEquals(1800L,
aggregateStats.getMergeBufferAcquisitionTimeNs());
Assert.assertEquals(1100L,
aggregateStats.getMaxMergeBufferAcquisitionTimeNs());
+ Assert.assertEquals(150L, aggregateStats.getTotalMergeBufferUsedBytes());
+ Assert.assertEquals(100L, aggregateStats.getMaxMergeBufferUsedBytes());
Assert.assertEquals(2L, aggregateStats.getSpilledQueries());
Assert.assertEquals(1600L, aggregateStats.getSpilledBytes());
Assert.assertEquals(1000L, aggregateStats.getMaxSpilledBytes());
@@ -74,7 +80,6 @@ public class GroupByStatsProviderTest
Assert.assertEquals(700L, aggregateStats.getMaxMergeDictionarySize());
}
-
@Test
public void testMetricsWithMultipleQueries()
{
@@ -83,24 +88,28 @@ public class GroupByStatsProviderTest
QueryResourceId r1 = new QueryResourceId("r1");
GroupByStatsProvider.PerQueryStats stats1 =
statsProvider.getPerQueryStatsContainer(r1);
stats1.mergeBufferAcquisitionTime(2000);
+ stats1.maxMergeBufferUsedBytes(50);
stats1.spilledBytes(100);
stats1.dictionarySize(200);
QueryResourceId r2 = new QueryResourceId("r2");
GroupByStatsProvider.PerQueryStats stats2 =
statsProvider.getPerQueryStatsContainer(r2);
stats2.mergeBufferAcquisitionTime(100);
+ stats2.maxMergeBufferUsedBytes(500);
stats2.spilledBytes(150);
stats2.dictionarySize(250);
QueryResourceId r3 = new QueryResourceId("r3");
GroupByStatsProvider.PerQueryStats stats3 =
statsProvider.getPerQueryStatsContainer(r3);
stats3.mergeBufferAcquisitionTime(200);
+ stats3.maxMergeBufferUsedBytes(100);
stats3.spilledBytes(3000);
stats3.dictionarySize(300);
QueryResourceId r4 = new QueryResourceId("r4");
GroupByStatsProvider.PerQueryStats stats4 =
statsProvider.getPerQueryStatsContainer(r4);
stats4.mergeBufferAcquisitionTime(300);
+ stats4.maxMergeBufferUsedBytes(75);
stats4.spilledBytes(200);
stats4.dictionarySize(1500);
@@ -112,11 +121,13 @@ public class GroupByStatsProviderTest
GroupByStatsProvider.AggregateStats aggregateStats =
statsProvider.getStatsSince();
Assert.assertEquals(2000L,
aggregateStats.getMaxMergeBufferAcquisitionTimeNs());
+ Assert.assertEquals(500L, aggregateStats.getMaxMergeBufferUsedBytes());
Assert.assertEquals(3000L, aggregateStats.getMaxSpilledBytes());
Assert.assertEquals(1500L, aggregateStats.getMaxMergeDictionarySize());
Assert.assertEquals(4L, aggregateStats.getMergeBufferQueries());
Assert.assertEquals(2600L,
aggregateStats.getMergeBufferAcquisitionTimeNs());
+ Assert.assertEquals(725L, aggregateStats.getTotalMergeBufferUsedBytes());
Assert.assertEquals(4L, aggregateStats.getSpilledQueries());
Assert.assertEquals(3450L, aggregateStats.getSpilledBytes());
Assert.assertEquals(2250L, aggregateStats.getMergeDictionarySize());
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouperTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouperTest.java
index c96bc50dd78..ec5d64794cc 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouperTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouperTest.java
@@ -156,6 +156,61 @@ public class BufferHashGrouperTest extends
InitializedNullHandlingTest
}
}
+ @Test
+ public void testMaxMergeBufferUsedBytes()
+ {
+ final GroupByTestColumnSelectorFactory columnSelectorFactory =
GrouperTestUtil.newColumnSelectorFactory();
+ final BufferHashGrouper<IntKey> grouper = new BufferHashGrouper<>(
+ Suppliers.ofInstance(ByteBuffer.allocate(1000)),
+ GrouperTestUtil.intKeySerde(),
+ AggregatorAdapters.factorizeBuffered(
+ columnSelectorFactory,
+ ImmutableList.of(
+ new LongSumAggregatorFactory("valueSum", "value"),
+ new CountAggregatorFactory("count")
+ )
+ ),
+ Integer.MAX_VALUE,
+ 0,
+ 0,
+ true
+ );
+ grouper.init();
+
+ long initialUsage = grouper.getMaxMergeBufferUsedBytes();
+ Assert.assertEquals(0L, initialUsage);
+
+ columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value",
10L)));
+
+ grouper.aggregate(new IntKey(1));
+ final long expectedBucketSize = grouper.getMaxMergeBufferUsedBytes();
+
+ grouper.aggregate(new IntKey(2));
+ grouper.aggregate(new IntKey(3));
+
+ Assert.assertEquals(3L * expectedBucketSize,
grouper.getMaxMergeBufferUsedBytes());
+
+ grouper.aggregate(new IntKey(4));
+ grouper.aggregate(new IntKey(5));
+
+ Assert.assertEquals(5L * expectedBucketSize,
grouper.getMaxMergeBufferUsedBytes());
+
+ grouper.reset();
+ Assert.assertEquals(0, grouper.getSize());
+ Assert.assertEquals(5L * expectedBucketSize,
grouper.getMaxMergeBufferUsedBytes());
+
+ grouper.aggregate(new IntKey(1));
+ grouper.aggregate(new IntKey(6));
+ grouper.aggregate(new IntKey(7));
+ grouper.aggregate(new IntKey(8));
+ grouper.aggregate(new IntKey(9));
+ grouper.aggregate(new IntKey(10));
+
+ Assert.assertEquals(6L * expectedBucketSize,
grouper.getMaxMergeBufferUsedBytes());
+
+ grouper.close();
+ }
+
private ResourceHolder<Grouper<IntKey>> makeGrouper(
GroupByTestColumnSelectorFactory columnSelectorFactory,
int bufferSize,
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouperTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouperTest.java
index 07631840231..df9851ba829 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouperTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouperTest.java
@@ -307,6 +307,62 @@ public class LimitedBufferHashGrouperTest extends
InitializedNullHandlingTest
Assert.assertEquals(LIMIT, i);
}
+ @Test
+ public void testMaxMergeBufferUsedBytesTracksMaxUsageAfterReset()
+ {
+ final GroupByTestColumnSelectorFactory columnSelectorFactory =
GrouperTestUtil.newColumnSelectorFactory();
+ final LimitedBufferHashGrouper<IntKey> grouper =
makeGrouper(columnSelectorFactory, 20000);
+
+ Assert.assertEquals(0L, grouper.getMaxMergeBufferUsedBytes());
+ columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value",
10L)));
+
+ Assert.assertTrue(String.valueOf(KEY_BASE), grouper.aggregate(new
IntKey(KEY_BASE)).isOk());
+ final long usagePerEntry = grouper.getMaxMergeBufferUsedBytes();
+
+ grouper.reset();
+ Assert.assertEquals(0, grouper.getSize());
+ Assert.assertEquals(usagePerEntry, grouper.getMaxMergeBufferUsedBytes());
+
+ // Add 10 entries after reset
+ for (int i = 0; i < 10; i++) {
+ Assert.assertTrue(String.valueOf(i + KEY_BASE), grouper.aggregate(new
IntKey(i + KEY_BASE)).isOk());
+ }
+
+ Assert.assertEquals(10 * usagePerEntry,
grouper.getMaxMergeBufferUsedBytes());
+ }
+
+ @Test
+ public void testMaxMergeBufferUsedBytesAfterBufferSwap()
+ {
+ // This test closely follows the flow of testLimitAndBufferSwapping().
+ final GroupByTestColumnSelectorFactory columnSelectorFactory =
GrouperTestUtil.newColumnSelectorFactory();
+ final LimitedBufferHashGrouper<IntKey> grouper =
makeGrouper(columnSelectorFactory, 20000);
+
+ columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value",
10L)));
+
+ // Calculate usage per entry from first entry
+ Assert.assertTrue(String.valueOf(KEY_BASE), grouper.aggregate(new
IntKey(KEY_BASE)).isOk());
+ final long usagePerEntry = grouper.getMaxMergeBufferUsedBytes();
+
+ // This results in 13 swaps and final size of 116 (100 keys + 16 new keys
after last swap)
+ for (int i = 1; i < NUM_ROWS; i++) {
+ Assert.assertTrue(String.valueOf(i + KEY_BASE), grouper.aggregate(new
IntKey(i + KEY_BASE)).isOk());
+ }
+
+ Assert.assertEquals(13, grouper.getGrowthCount());
+ Assert.assertEquals(116, grouper.getSize());
+ Assert.assertEquals(168, grouper.getMaxSize());
+
+ final long bucketSizeWithHash = usagePerEntry - Integer.BYTES;
+ final long hashTablePeak = grouper.getMaxSize() * bucketSizeWithHash;
+ // Heap can temporarily have LIMIT + 1 before removing one
+ final long heapPeak = ((long) LIMIT + 1) * Integer.BYTES;
+ // Peak usage is the sum of hash table peak and heap peak, which peak at
different sizes...
+ final long expectedPeakUsage = hashTablePeak + heapPeak;
+
+ Assert.assertEquals(expectedPeakUsage,
grouper.getMaxMergeBufferUsedBytes());
+ }
+
private static LimitedBufferHashGrouper<IntKey> makeGrouper(
GroupByTestColumnSelectorFactory columnSelectorFactory,
int bufferSize
diff --git
a/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java
b/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java
index ecb702cc70d..e5f46020fe0 100644
---
a/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java
+++
b/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java
@@ -58,21 +58,16 @@ public class GroupByStatsMonitor extends AbstractMonitor
final ServiceMetricEvent.Builder builder = new
ServiceMetricEvent.Builder();
emitter.emit(builder.setMetric("mergeBuffer/pendingRequests",
mergeBufferPool.getPendingRequests()));
-
emitter.emit(builder.setMetric("mergeBuffer/used",
mergeBufferPool.getUsedResourcesCount()));
GroupByStatsProvider.AggregateStats statsContainer =
groupByStatsProvider.getStatsSince();
if (statsContainer.getMergeBufferQueries() > 0) {
emitter.emit(builder.setMetric("mergeBuffer/queries",
statsContainer.getMergeBufferQueries()));
- emitter.emit(builder.setMetric(
- "mergeBuffer/acquisitionTimeNs",
- statsContainer.getMergeBufferAcquisitionTimeNs()
- ));
- emitter.emit(builder.setMetric(
- "mergeBuffer/maxAcquisitionTimeNs",
- statsContainer.getMaxMergeBufferAcquisitionTimeNs()
- ));
+ emitter.emit(builder.setMetric("mergeBuffer/acquisitionTimeNs",
statsContainer.getMergeBufferAcquisitionTimeNs()));
+ emitter.emit(builder.setMetric("mergeBuffer/maxAcquisitionTimeNs",
statsContainer.getMaxMergeBufferAcquisitionTimeNs()));
+ emitter.emit(builder.setMetric("mergeBuffer/bytesUsed",
statsContainer.getTotalMergeBufferUsedBytes()));
+ emitter.emit(builder.setMetric("mergeBuffer/maxBytesUsed",
statsContainer.getMaxMergeBufferUsedBytes()));
}
if (statsContainer.getSpilledQueries() > 0) {
diff --git
a/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java
b/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java
index 8c9d4dc4c81..eaca043e02e 100644
---
a/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java
+++
b/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java
@@ -61,6 +61,8 @@ public class GroupByStatsMonitorTest
1L,
100L,
100L,
+ 200L,
+ 200L,
2L,
200L,
200L,
@@ -70,7 +72,7 @@ public class GroupByStatsMonitorTest
}
};
- mergeBufferPool = new DefaultBlockingPool(() -> ByteBuffer.allocate(1024),
5);
+ mergeBufferPool = new DefaultBlockingPool<>(() ->
ByteBuffer.allocate(1024), 5);
executorService = Executors.newSingleThreadExecutor();
}
@@ -83,8 +85,7 @@ public class GroupByStatsMonitorTest
@Test
public void testMonitor()
{
- final GroupByStatsMonitor monitor =
- new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool);
+ final GroupByStatsMonitor monitor = new
GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool);
final StubServiceEmitter emitter = new StubServiceEmitter("service",
"host");
emitter.start();
monitor.doMonitor(emitter);
@@ -92,12 +93,14 @@ public class GroupByStatsMonitorTest
// Trigger metric emission
monitor.doMonitor(emitter);
- Assert.assertEquals(10, emitter.getNumEmittedEvents());
+ Assert.assertEquals(12, emitter.getNumEmittedEvents());
emitter.verifyValue("mergeBuffer/pendingRequests", 0L);
emitter.verifyValue("mergeBuffer/used", 0L);
emitter.verifyValue("mergeBuffer/queries", 1L);
emitter.verifyValue("mergeBuffer/acquisitionTimeNs", 100L);
emitter.verifyValue("mergeBuffer/maxAcquisitionTimeNs", 100L);
+ emitter.verifyValue("mergeBuffer/bytesUsed", 200L);
+ emitter.verifyValue("mergeBuffer/maxBytesUsed", 200L);
emitter.verifyValue("groupBy/spilledQueries", 2L);
emitter.verifyValue("groupBy/spilledBytes", 200L);
emitter.verifyValue("groupBy/maxSpilledBytes", 200L);
@@ -112,15 +115,11 @@ public class GroupByStatsMonitorTest
final String taskId = "taskId1";
final String groupId = "test_groupid";
final String taskType = "test_tasktype";
- final GroupByStatsMonitor monitor = new GroupByStatsMonitor(
- groupByStatsProvider,
- mergeBufferPool
- );
+ final GroupByStatsMonitor monitor = new
GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool);
final StubServiceEmitter emitter = new StubServiceEmitter("service",
"host", new TestTaskHolder(dataSource, taskId, taskType, groupId));
emitter.start();
monitor.doMonitor(emitter);
emitter.flush();
- // Trigger metric emission
monitor.doMonitor(emitter);
final Map<String, Object> dimFilters = Map.of(
@@ -136,6 +135,8 @@ public class GroupByStatsMonitorTest
verifyMetricValue(emitter, "mergeBuffer/queries", dimFilters, 1L);
verifyMetricValue(emitter, "mergeBuffer/acquisitionTimeNs", dimFilters,
100L);
verifyMetricValue(emitter, "mergeBuffer/maxAcquisitionTimeNs", dimFilters,
100L);
+ verifyMetricValue(emitter, "mergeBuffer/bytesUsed", dimFilters, 200L);
+ verifyMetricValue(emitter, "mergeBuffer/maxBytesUsed", dimFilters, 200L);
verifyMetricValue(emitter, "groupBy/spilledQueries", dimFilters, 2L);
verifyMetricValue(emitter, "groupBy/spilledBytes", dimFilters, 200L);
verifyMetricValue(emitter, "groupBy/maxSpilledBytes", dimFilters, 200L);
@@ -151,8 +152,7 @@ public class GroupByStatsMonitorTest
mergeBufferPool.takeBatch(4);
}).get(20, TimeUnit.SECONDS);
- final GroupByStatsMonitor monitor =
- new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool);
+ final GroupByStatsMonitor monitor = new
GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool);
final StubServiceEmitter emitter = new StubServiceEmitter("DummyService",
"DummyHost");
boolean ret = monitor.doMonitor(emitter);
Assert.assertTrue(ret);
@@ -180,8 +180,7 @@ public class GroupByStatsMonitorTest
}
}
- final GroupByStatsMonitor monitor =
- new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool);
+ final GroupByStatsMonitor monitor = new
GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool);
final StubServiceEmitter emitter = new
StubServiceEmitter("DummyService", "DummyHost");
boolean ret = monitor.doMonitor(emitter);
Assert.assertTrue(ret);
@@ -203,18 +202,21 @@ public class GroupByStatsMonitorTest
QueryResourceId r1 = new QueryResourceId("r1");
GroupByStatsProvider.PerQueryStats stats1 =
statsProvider.getPerQueryStatsContainer(r1);
stats1.mergeBufferAcquisitionTime(100);
+ stats1.maxMergeBufferUsedBytes(50);
stats1.spilledBytes(200);
stats1.dictionarySize(100);
QueryResourceId r2 = new QueryResourceId("r2");
GroupByStatsProvider.PerQueryStats stats2 =
statsProvider.getPerQueryStatsContainer(r2);
stats2.mergeBufferAcquisitionTime(500);
+ stats2.maxMergeBufferUsedBytes(30);
stats2.spilledBytes(100);
stats2.dictionarySize(300);
QueryResourceId r3 = new QueryResourceId("r3");
GroupByStatsProvider.PerQueryStats stats3 =
statsProvider.getPerQueryStatsContainer(r3);
stats3.mergeBufferAcquisitionTime(200);
+ stats3.maxMergeBufferUsedBytes(150);
stats3.spilledBytes(800);
stats3.dictionarySize(200);
@@ -230,11 +232,13 @@ public class GroupByStatsMonitorTest
emitter.verifyValue("mergeBuffer/queries", 3L);
emitter.verifyValue("mergeBuffer/acquisitionTimeNs", 800L);
+ emitter.verifyValue("mergeBuffer/bytesUsed", 230L);
emitter.verifyValue("groupBy/spilledQueries", 3L);
emitter.verifyValue("groupBy/spilledBytes", 1100L);
emitter.verifyValue("groupBy/mergeDictionarySize", 600L);
emitter.verifyValue("mergeBuffer/maxAcquisitionTimeNs", 500L);
+ emitter.verifyValue("mergeBuffer/maxBytesUsed", 150L);
emitter.verifyValue("groupBy/maxSpilledBytes", 800L);
emitter.verifyValue("groupBy/maxMergeDictionarySize", 300L);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]