This is an automated email from the ASF dual-hosted git repository.
rohangarg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 274ccbfd85c Reset buffer aggregators when resetting Groupers. (#16296)
274ccbfd85c is described below
commit 274ccbfd85c86a1333344c4ac2aedce241027728
Author: Gian Merlino <[email protected]>
AuthorDate: Wed Apr 24 02:39:24 2024 -0700
Reset buffer aggregators when resetting Groupers. (#16296)
Buffer aggregators can contain some cached objects within them, such as
Memory references or HLL Unions. Prior to this patch, various Grouper
implementations were not releasing this state when resetting their own
internal state, which could lead to excessive memory use.
This patch renames AggregatorAdapater#close to "reset", and updates
Grouper implementations to call this reset method whenever they reset
their internal state.
The base method on BufferAggregator and VectorAggregator remains named
"close", for compatibility with existing extensions, but the contract
is adjusted to say that the aggregator may be reused after the method
is called. All existing implementations in core already adhere to this
new contract, except for the ArrayOfDoubles build flavors, which are
updated in this patch to adhere.
Additionally, this patch harmonizes buffer sketch helpers to call their
clear method "clear" rather than a mix of "clear" and "close". (Others
were already using "clear".)
---
.../hll/HllSketchMergeBufferAggregator.java | 2 +-
.../hll/HllSketchMergeBufferAggregatorHelper.java | 2 +-
.../hll/HllSketchMergeVectorAggregator.java | 2 +-
.../datasketches/theta/SketchBufferAggregator.java | 2 +-
.../theta/SketchBufferAggregatorHelper.java | 4 ++--
.../datasketches/theta/SketchVectorAggregator.java | 2 +-
.../tuple/ArrayOfDoublesSketchBuildAggregator.java | 16 +++++++++++-----
.../ArrayOfDoublesSketchBuildBufferAggregator.java | 6 ++++--
.../druid/query/aggregation/AggregatorAdapters.java | 20 +++++++++-----------
.../druid/query/aggregation/BufferAggregator.java | 6 +++++-
.../druid/query/aggregation/VectorAggregator.java | 6 +++++-
.../epinephelinae/AbstractBufferHashGrouper.java | 2 +-
.../groupby/epinephelinae/BufferArrayGrouper.java | 3 ++-
.../groupby/epinephelinae/BufferHashGrouper.java | 1 +
.../groupby/epinephelinae/HashVectorGrouper.java | 3 ++-
.../epinephelinae/LimitedBufferHashGrouper.java | 1 +
.../query/timeseries/TimeseriesQueryEngine.java | 6 +++---
.../apache/druid/query/topn/BaseTopNAlgorithm.java | 6 +++---
.../druid/query/topn/HeapBasedTopNAlgorithm.java | 2 +-
.../apache/druid/query/topn/PooledTopNAlgorithm.java | 2 +-
.../query/topn/TimeExtractionTopNAlgorithm.java | 2 +-
.../groupby/epinephelinae/HashVectorGrouperTest.java | 2 +-
22 files changed, 58 insertions(+), 40 deletions(-)
diff --git
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java
index 8e3bfffa073..0458b5084c4 100644
---
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java
+++
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java
@@ -75,7 +75,7 @@ public class HllSketchMergeBufferAggregator implements
BufferAggregator
@Override
public void close()
{
- helper.close();
+ helper.clear();
}
@Override
diff --git
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java
index 22653019772..1fa9ee4c9a3 100644
---
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java
+++
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java
@@ -142,7 +142,7 @@ public class HllSketchMergeBufferAggregatorHelper
}
}
- public void close()
+ public void clear()
{
unions.clear();
memCache.clear();
diff --git
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java
index 5fec9b94ba2..31ad26cb5d7 100644
---
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java
+++
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java
@@ -102,7 +102,7 @@ public class HllSketchMergeVectorAggregator implements
VectorAggregator
@Override
public void close()
{
- helper.close();
+ helper.clear();
}
@Override
diff --git
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java
index 34aae3f36e1..60d83f4e0a7 100644
---
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java
+++
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java
@@ -85,7 +85,7 @@ public class SketchBufferAggregator implements
BufferAggregator
@Override
public void close()
{
- helper.close();
+ helper.clear();
}
@Override
diff --git
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregatorHelper.java
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregatorHelper.java
index 49856c9e80d..e2f699012a1 100644
---
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregatorHelper.java
+++
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregatorHelper.java
@@ -95,7 +95,7 @@ final class SketchBufferAggregatorHelper
/**
* Returns a {@link Union} associated with a particular buffer location.
*
- * The Union object will be cached in this helper until {@link #close()} is
called.
+ * The Union object will be cached in this helper until {@link #clear()} is
called.
*/
public Union getOrCreateUnion(ByteBuffer buf, int position)
{
@@ -122,7 +122,7 @@ final class SketchBufferAggregatorHelper
return union;
}
- public void close()
+ public void clear()
{
unions.clear();
memCache.clear();
diff --git
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java
index a862265d561..7d10bc30fb5 100644
---
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java
+++
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java
@@ -107,6 +107,6 @@ public class SketchVectorAggregator implements
VectorAggregator
@Override
public void close()
{
- helper.close();
+ helper.clear();
}
}
diff --git
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildAggregator.java
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildAggregator.java
index 7ca1061889d..b093e730f0b 100644
---
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildAggregator.java
+++
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildAggregator.java
@@ -27,7 +27,6 @@ import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.data.IndexedInts;
import javax.annotation.Nullable;
-
import java.nio.ByteBuffer;
import java.util.LinkedHashMap;
import java.util.List;
@@ -48,6 +47,7 @@ public class ArrayOfDoublesSketchBuildAggregator implements
Aggregator
@Nullable
private ArrayOfDoublesUpdatableSketch sketch;
+ private final int nominalEntries;
private final boolean canLookupUtf8;
private final boolean canCacheById;
private final LinkedHashMap<Integer, Object> stringCache = new
LinkedHashMap<Integer, Object>()
@@ -67,10 +67,7 @@ public class ArrayOfDoublesSketchBuildAggregator implements
Aggregator
{
this.keySelector = keySelector;
this.valueSelectors = valueSelectors.toArray(new
BaseDoubleColumnValueSelector[0]);
- values = new double[valueSelectors.size()];
- sketch = new
ArrayOfDoublesUpdatableSketchBuilder().setNominalEntries(nominalEntries)
-
.setNumberOfValues(valueSelectors.size()).build();
-
+ this.nominalEntries = nominalEntries;
this.canCacheById = this.keySelector.nameLookupPossibleInAdvance();
this.canLookupUtf8 = this.keySelector.supportsLookupNameUtf8();
}
@@ -83,6 +80,15 @@ public class ArrayOfDoublesSketchBuildAggregator implements
Aggregator
@Override
public void aggregate()
{
+ if (values == null) {
+ values = new double[valueSelectors.length];
+ }
+
+ if (sketch == null) {
+ sketch = new
ArrayOfDoublesUpdatableSketchBuilder().setNominalEntries(nominalEntries)
+
.setNumberOfValues(valueSelectors.length).build();
+ }
+
final IndexedInts keys = keySelector.getRow();
for (int i = 0; i < valueSelectors.length; i++) {
if (valueSelectors[i].isNull()) {
diff --git
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java
index 18906d12936..b925220c89f 100644
---
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java
+++
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java
@@ -73,8 +73,6 @@ public class ArrayOfDoublesSketchBuildBufferAggregator
implements BufferAggregat
this.valueSelectors = valueSelectors.toArray(new
BaseDoubleColumnValueSelector[0]);
this.nominalEntries = nominalEntries;
this.maxIntermediateSize = maxIntermediateSize;
- values = new double[valueSelectors.size()];
-
this.canCacheById = this.keySelector.nameLookupPossibleInAdvance();
this.canLookupUtf8 = this.keySelector.supportsLookupNameUtf8();
}
@@ -92,6 +90,10 @@ public class ArrayOfDoublesSketchBuildBufferAggregator
implements BufferAggregat
@Override
public void aggregate(final ByteBuffer buf, final int position)
{
+ if (values == null) {
+ values = new double[valueSelectors.length];
+ }
+
for (int i = 0; i < valueSelectors.length; i++) {
if (valueSelectors[i].isNull()) {
return;
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorAdapters.java
b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorAdapters.java
index 8ae7a33b08d..25c9102bcf7 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorAdapters.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorAdapters.java
@@ -26,7 +26,6 @@ import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import javax.annotation.Nullable;
-import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
@@ -42,7 +41,7 @@ import java.util.stream.Collectors;
* (2) Query engines are freed from the need to manage how much space each
individual aggregator needs. They only
* need to allocate a block of size "spaceNeeded".
*/
-public class AggregatorAdapters implements Closeable
+public class AggregatorAdapters
{
private static final Logger log = new Logger(AggregatorAdapters.class);
@@ -230,14 +229,14 @@ public class AggregatorAdapters implements Closeable
}
/**
- * Close all of our aggregators.
+ * Reset all of our aggregators, releasing resources held by them. After
this, this instance may be reused or
+ * it may be discarded.
*/
- @Override
- public void close()
+ public void reset()
{
for (Adapter adapter : adapters) {
try {
- adapter.close();
+ adapter.reset();
}
catch (Exception e) {
log.warn(e, "Could not close aggregator [%s], skipping.",
adapter.getFactory().getName());
@@ -250,7 +249,7 @@ public class AggregatorAdapters implements Closeable
* BufferAggregator and VectorAggregator. Private, since it doesn't escape
this class and the
* only two implementations are private static classes below.
*/
- private interface Adapter extends Closeable
+ private interface Adapter
{
void init(ByteBuffer buf, int position);
@@ -259,8 +258,7 @@ public class AggregatorAdapters implements Closeable
void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer,
ByteBuffer newBuffer);
- @Override
- void close();
+ void reset();
AggregatorFactory getFactory();
@@ -293,7 +291,7 @@ public class AggregatorAdapters implements Closeable
}
@Override
- public void close()
+ public void reset()
{
aggregator.close();
}
@@ -352,7 +350,7 @@ public class AggregatorAdapters implements Closeable
}
@Override
- public void close()
+ public void reset()
{
aggregator.close();
}
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java
b/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java
index e9fdbeaa061..20d13491b0f 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java
@@ -158,7 +158,11 @@ public interface BufferAggregator extends HotLoopCallee
}
/**
- * Release any resources used by the aggregator
+ * Release any resources used by the aggregator. The aggregator may be
reused after this call, by calling
+ * {@link #init(ByteBuffer, int)} followed by other methods as normal.
+ *
+ * This call would be more properly named "reset", but we use the name
"close" to improve compatibility with
+ * existing aggregator implementations in extensions.
*/
void close();
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/VectorAggregator.java
b/processing/src/main/java/org/apache/druid/query/aggregation/VectorAggregator.java
index befff12ba6e..a3e506e59c8 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/VectorAggregator.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/VectorAggregator.java
@@ -83,7 +83,11 @@ public interface VectorAggregator
}
/**
- * Release any resources used by the aggregator.
+ * Release any resources used by the aggregator. The aggregator may be
reused after this call, by calling
+ * {@link #init(ByteBuffer, int)} followed by other methods as normal.
+ *
+ * This call would be more properly named "reset", but we use the name
"close" to improve compatibility with
+ * existing aggregator implementations in extensions.
*/
void close();
}
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java
index f3bc195dcbd..70cf5832cf3 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java
@@ -170,7 +170,7 @@ public abstract class AbstractBufferHashGrouper<KeyType>
implements Grouper<KeyT
public void close()
{
keySerde.reset();
- aggregators.close();
+ aggregators.reset();
}
/**
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java
index 0fcb4ddeb2e..616ac190dd8 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java
@@ -269,6 +269,7 @@ public class BufferArrayGrouper implements VectorGrouper,
IntGrouper
{
// Clear the entire usedFlagBuffer
usedFlagMemory.clear();
+ aggregators.reset();
}
@Override
@@ -280,7 +281,7 @@ public class BufferArrayGrouper implements VectorGrouper,
IntGrouper
@Override
public void close()
{
- aggregators.close();
+ aggregators.reset();
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java
index 167b322b9d4..c4d04697716 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java
@@ -158,6 +158,7 @@ public class BufferHashGrouper<KeyType> extends
AbstractBufferHashGrouper<KeyTyp
offsetList.reset();
hashTable.reset();
keySerde.reset();
+ aggregators.reset();
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouper.java
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouper.java
index ad12d0503f8..e5c2801c3b7 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouper.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouper.java
@@ -205,6 +205,7 @@ public class HashVectorGrouper implements VectorGrouper
}
this.hashTable = createTable(buffer, tableStart, numBuckets);
+ this.aggregators.reset();
}
@Override
@@ -256,7 +257,7 @@ public class HashVectorGrouper implements VectorGrouper
@Override
public void close()
{
- aggregators.close();
+ aggregators.reset();
}
@VisibleForTesting
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java
index 756a8227f5e..90a0e1e250d 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java
@@ -185,6 +185,7 @@ public class LimitedBufferHashGrouper<KeyType> extends
AbstractBufferHashGrouper
hashTable.reset();
keySerde.reset();
offsetHeap.reset();
+ aggregators.reset();
heapIndexUpdater.setHashTableBuffer(hashTable.getTableBuffer());
hasIterated = false;
offsetHeapIterableSize = 0;
diff --git
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java
index 7ae290dd7d4..c5e83b84e87 100644
---
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java
+++
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java
@@ -164,9 +164,9 @@ public class TimeseriesQueryEngine
}
final VectorColumnSelectorFactory columnSelectorFactory =
cursor.getColumnSelectorFactory();
- final AggregatorAdapters aggregators = closer.register(
- AggregatorAdapters.factorizeVector(columnSelectorFactory,
query.getAggregatorSpecs())
- );
+ final AggregatorAdapters aggregators =
+ AggregatorAdapters.factorizeVector(columnSelectorFactory,
query.getAggregatorSpecs());
+ closer.register(aggregators::reset);
final ResourceHolder<ByteBuffer> bufferHolder =
closer.register(bufferPool.take());
diff --git
a/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java
b/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java
index 843d248221e..f34464a49d0 100644
---
a/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java
+++
b/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java
@@ -120,7 +120,7 @@ public abstract class BaseTopNAlgorithm<DimValSelector,
DimValAggregateStore, Pa
updateResults(params, theDimValSelector, aggregatesStore, resultBuilder);
- closeAggregators(aggregatesStore);
+ resetAggregators(aggregatesStore);
numProcessed += numToProcess;
params.getCursor().reset();
@@ -151,7 +151,7 @@ public abstract class BaseTopNAlgorithm<DimValSelector,
DimValAggregateStore, Pa
}
long processedRows = scanAndAggregate(params, null, aggregatesStore);
updateResults(params, null, aggregatesStore, resultBuilder);
- closeAggregators(aggregatesStore);
+ resetAggregators(aggregatesStore);
params.getCursor().reset();
if (queryMetrics != null) {
queryMetrics.addProcessedRows(processedRows);
@@ -199,7 +199,7 @@ public abstract class BaseTopNAlgorithm<DimValSelector,
DimValAggregateStore, Pa
TopNResultBuilder resultBuilder
);
- protected abstract void closeAggregators(
+ protected abstract void resetAggregators(
DimValAggregateStore dimValAggregateStore
);
diff --git
a/processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java
b/processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java
index 14f3b729e1e..ba5fbf25108 100644
---
a/processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java
+++
b/processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java
@@ -112,7 +112,7 @@ public class HeapBasedTopNAlgorithm
}
@Override
- protected void closeAggregators(TopNColumnAggregatesProcessor processor)
+ protected void resetAggregators(TopNColumnAggregatesProcessor processor)
{
processor.closeAggregators();
}
diff --git
a/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java
b/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java
index 6ddda5eb1be..d0c0fb064e0 100644
---
a/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java
+++
b/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java
@@ -768,7 +768,7 @@ public class PooledTopNAlgorithm
}
@Override
- protected void closeAggregators(BufferAggregator[] bufferAggregators)
+ protected void resetAggregators(BufferAggregator[] bufferAggregators)
{
for (BufferAggregator agg : bufferAggregators) {
agg.close();
diff --git
a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java
b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java
index 70e01e49aed..3b60bb65ee1 100644
---
a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java
+++
b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java
@@ -135,7 +135,7 @@ public class TimeExtractionTopNAlgorithm extends
BaseTopNAlgorithm<int[], Map<Ob
}
@Override
- protected void closeAggregators(Map<Object, Aggregator[]> stringMap)
+ protected void resetAggregators(Map<Object, Aggregator[]> stringMap)
{
for (Aggregator[] aggregators : stringMap.values()) {
for (Aggregator agg : aggregators) {
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouperTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouperTest.java
index d5a863a7542..fd0314a607c 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouperTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouperTest.java
@@ -45,7 +45,7 @@ public class HashVectorGrouperTest
);
grouper.initVectorized(512);
grouper.close();
- Mockito.verify(aggregatorAdapters, Mockito.times(1)).close();
+ Mockito.verify(aggregatorAdapters, Mockito.times(2)).reset();
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]