This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2a5d5b7 Introduce in-Segment Trim for GroupBy OrderBy Query (#6991)
2a5d5b7 is described below
commit 2a5d5b7517a46ec76d73742b31c6d9e457920f21
Author: wuwenw <[email protected]>
AuthorDate: Thu Jun 10 17:15:06 2021 -0400
Introduce in-Segment Trim for GroupBy OrderBy Query (#6991)
One of the major bottlenecks for the current GroupBy OrderBy query on high
cardinality columns is the merge phase. Essentially every segment brings a
large number of intermediate results to a global concurrent map for further
aggregation and merge, which takes up a lot of space and is very
time-consuming. This PR introduces an optimization option that each segment
trims its intermediate results to a given size. The size is configurable by the
user and is guaranteed to be max(limit N * [...]
---
.../pinot/core/data/table/IntermediateRecord.java | 42 +++
.../apache/pinot/core/data/table/TableResizer.java | 58 +++--
.../operator/blocks/IntermediateResultsBlock.java | 29 ++-
.../combine/GroupByOrderByCombineOperator.java | 37 ++-
.../query/AggregationGroupByOrderByOperator.java | 29 ++-
.../plan/AggregationGroupByOrderByPlanNode.java | 13 +-
.../core/plan/maker/InstancePlanMakerImplV2.java | 40 ++-
.../groupby/DefaultGroupByExecutor.java | 13 +
.../groupby/DictionaryBasedGroupKeyGenerator.java | 36 +++
.../query/aggregation/groupby/GroupByExecutor.java | 18 ++
.../aggregation/groupby/GroupKeyGenerator.java | 5 +
.../NoDictionaryMultiColumnGroupKeyGenerator.java | 5 +
.../NoDictionarySingleColumnGroupKeyGenerator.java | 5 +
.../org/apache/pinot/core/util/GroupByUtils.java | 15 +-
.../pinot/core/data/table/TableResizerTest.java | 85 ++++++
.../groupby/GroupByInSegmentTrimTest.java | 284 +++++++++++++++++++++
.../InterSegmentOrderByMultiValueQueriesTest.java | 14 +
.../InterSegmentOrderBySingleValueQueriesTest.java | 16 ++
18 files changed, 693 insertions(+), 51 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IntermediateRecord.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IntermediateRecord.java
new file mode 100644
index 0000000..520c3e8
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IntermediateRecord.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.table;
+
+/**
+ * Helper class to store a subset of Record fields
+ * IntermediateRecord is derived from a Record
+ * Some of the main properties of an IntermediateRecord are:
+ *
+ * 1. Key in IntermediateRecord is expected to be identical to the one in the
Record
+ * 2. For values, IntermediateRecord should only have the columns needed for
order by
+ * 3. Inside the values, the columns should be ordered by the order by sequence
+ * 4. For order by on aggregations, final results are extracted
+ * 5. There is a mandatory field to store the original record to prevent from
duplicate looking up
+ */
+public class IntermediateRecord {
+ public final Key _key;
+ public final Comparable[] _values;
+ public final Record _record;
+
+ IntermediateRecord(Key key, Comparable[] values, Record record) {
+ _key = key;
+ _values = values;
+ _record = record;
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java
index e306350..9f95bc0 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
@@ -34,6 +35,8 @@ import
org.apache.pinot.common.request.context.OrderByExpressionContext;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
import org.apache.pinot.core.query.postaggregation.PostAggregationFunction;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.spi.utils.ByteArray;
@@ -128,7 +131,7 @@ public class TableResizer {
for (int i = 0; i < _numOrderByExpressions; i++) {
intermediateRecordValues[i] = _orderByValueExtractors[i].extract(record);
}
- return new IntermediateRecord(key, intermediateRecordValues);
+ return new IntermediateRecord(key, intermediateRecordValues, record);
}
/**
@@ -166,7 +169,7 @@ public class TableResizer {
PriorityQueue<IntermediateRecord> priorityQueue =
convertToIntermediateRecordsPQ(recordsMap, trimToSize, comparator);
for (IntermediateRecord recordToRetain : priorityQueue) {
- trimmedRecordsMap.put(recordToRetain._key,
recordsMap.get(recordToRetain._key));
+ trimmedRecordsMap.put(recordToRetain._key, recordToRetain._record);
}
return trimmedRecordsMap;
}
@@ -203,34 +206,51 @@ public class TableResizer {
}
int numRecordsToRetain = Math.min(numRecords, trimToSize);
// make PQ of sorted records to retain
- PriorityQueue<IntermediateRecord> priorityQueue =
convertToIntermediateRecordsPQ(recordsMap, numRecordsToRetain,
_intermediateRecordComparator.reversed());
+ PriorityQueue<IntermediateRecord> priorityQueue =
+ convertToIntermediateRecordsPQ(recordsMap, numRecordsToRetain,
_intermediateRecordComparator.reversed());
Record[] sortedArray = new Record[numRecordsToRetain];
while (!priorityQueue.isEmpty()) {
IntermediateRecord intermediateRecord = priorityQueue.poll();
- Record record = recordsMap.get(intermediateRecord._key);
- sortedArray[--numRecordsToRetain] = record;
+ sortedArray[--numRecordsToRetain] = intermediateRecord._record;;
}
return Arrays.asList(sortedArray);
}
/**
- * Helper class to store a subset of Record fields
- * IntermediateRecord is derived from a Record
- * Some of the main properties of an IntermediateRecord are:
- *
- * 1. Key in IntermediateRecord is expected to be identical to the one in
the Record
- * 2. For values, IntermediateRecord should only have the columns needed for
order by
- * 3. Inside the values, the columns should be ordered by the order by
sequence
- * 4. For order by on aggregations, final results should extracted if the
intermediate result is non-comparable
+ * Trims the aggregation results using a priority queue and returns the
priority queue.
+ * This method is to be called from individual segment if the intermediate
results need to be trimmed.
*/
- private static class IntermediateRecord {
- final Key _key;
- final Comparable[] _values;
+ public PriorityQueue<IntermediateRecord>
trimInSegmentResults(Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator,
+ GroupByResultHolder[] _groupByResultHolders, int size) {
+ int numAggregationFunctions = _aggregationFunctions.length;
+ int numColumns = numAggregationFunctions + _numGroupByExpressions;
- IntermediateRecord(Key key, Comparable[] values) {
- _key = key;
- _values = values;
+ // Get comparator
+ Comparator<IntermediateRecord> comparator =
_intermediateRecordComparator.reversed();
+ PriorityQueue<IntermediateRecord> priorityQueue = new
PriorityQueue<>(size, comparator);
+ while (groupKeyIterator.hasNext()) {
+ // Iterate over keys
+ GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next();
+ Object[] keys = groupKey._keys;
+ Object[] values = Arrays.copyOf(keys, numColumns);
+ int groupId = groupKey._groupId;
+ for (int i = 0; i < numAggregationFunctions; i++) {
+ values[_numGroupByExpressions + i] =
+
_aggregationFunctions[i].extractGroupByResult(_groupByResultHolders[i],
groupId);
+ }
+ // {key, intermediate_record, record}
+ IntermediateRecord intermediateRecord = getIntermediateRecord(new
Key(keys), new Record(values));
+ if (priorityQueue.size() < size) {
+ priorityQueue.offer(intermediateRecord);
+ } else {
+ IntermediateRecord peek = priorityQueue.peek();
+ if (comparator.compare(peek, intermediateRecord) < 0) {
+ priorityQueue.poll();
+ priorityQueue.offer(intermediateRecord);
+ }
+ }
}
+ return priorityQueue;
}
/**
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
index b50a836..ac99fff 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
@@ -38,6 +38,7 @@ import org.apache.pinot.core.common.BlockDocIdValueSet;
import org.apache.pinot.core.common.BlockMetadata;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.data.table.IntermediateRecord;
import org.apache.pinot.core.data.table.Record;
import org.apache.pinot.core.data.table.Table;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
@@ -57,6 +58,7 @@ public class IntermediateResultsBlock implements Block {
private AggregationGroupByResult _aggregationGroupByResult;
private List<Map<String, Object>> _combinedAggregationGroupByResult;
private List<ProcessingException> _processingExceptions;
+ private Collection<IntermediateRecord> _intermediateRecords;
private long _numDocsScanned;
private long _numEntriesScannedInFilter;
private long _numEntriesScannedPostFilter;
@@ -117,6 +119,17 @@ public class IntermediateResultsBlock implements Block {
_dataSchema = dataSchema;
}
+ /**
+ * Constructor for aggregation group-by order-by result with {@link
AggregationGroupByResult} and
+ * with a collection of intermediate records.
+ */
+ public IntermediateResultsBlock(AggregationFunction[] aggregationFunctions,
+ Collection<IntermediateRecord> intermediateRecords, DataSchema
dataSchema) {
+ _aggregationFunctions = aggregationFunctions;
+ _dataSchema = dataSchema;
+ _intermediateRecords = intermediateRecords;
+ }
+
public IntermediateResultsBlock(Table table) {
_table = table;
_dataSchema = table.getDataSchema();
@@ -210,14 +223,14 @@ public class IntermediateResultsBlock implements Block {
_executionThreadCpuTimeNs = executionThreadCpuTimeNs;
}
- public void setNumServerThreads(int numServerThreads) {
- _numServerThreads = numServerThreads;
- }
-
public int getNumServerThreads() {
return _numServerThreads;
}
+ public void setNumServerThreads(int numServerThreads) {
+ _numServerThreads = numServerThreads;
+ }
+
@VisibleForTesting
public long getNumDocsScanned() {
return _numDocsScanned;
@@ -281,6 +294,14 @@ public class IntermediateResultsBlock implements Block {
_numGroupsLimitReached = numGroupsLimitReached;
}
+ /**
+ * Get the collection of intermediate records
+ */
+ @Nullable
+ public Collection<IntermediateRecord> getIntermediateRecords() {
+ return _intermediateRecords;
+ }
+
public DataTable getDataTable()
throws Exception {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
index 071a682..a230561 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.operator.combine;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -34,6 +35,7 @@ import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
+import org.apache.pinot.core.data.table.IntermediateRecord;
import org.apache.pinot.core.data.table.Key;
import org.apache.pinot.core.data.table.Record;
import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
@@ -128,19 +130,30 @@ public class GroupByOrderByCombineOperator extends
BaseCombineOperator {
}
// Merge aggregation group-by result.
- AggregationGroupByResult aggregationGroupByResult =
intermediateResultsBlock.getAggregationGroupByResult();
- if (aggregationGroupByResult != null) {
- // Iterate over the group-by keys, for each key, update the group-by
result in the indexedTable
- Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator =
aggregationGroupByResult.getGroupKeyIterator();
- while (groupKeyIterator.hasNext()) {
- GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next();
- Object[] keys = groupKey._keys;
- Object[] values = Arrays.copyOf(keys, _numColumns);
- int groupId = groupKey._groupId;
- for (int i = 0; i < _numAggregationFunctions; i++) {
- values[_numGroupByExpressions + i] =
aggregationGroupByResult.getResultForGroupId(i, groupId);
+ // Iterate over the group-by keys, for each key, update the group-by
result in the indexedTable
+ Collection<IntermediateRecord> intermediateRecords =
intermediateResultsBlock.getIntermediateRecords();
+ // For now, only GroupBy OrderBy query has pre-constructed intermediate
records
+ if (intermediateRecords == null) {
+ // Merge aggregation group-by result.
+ AggregationGroupByResult aggregationGroupByResult =
intermediateResultsBlock.getAggregationGroupByResult();
+ if (aggregationGroupByResult != null) {
+ // Iterate over the group-by keys, for each key, update the group-by
result in the indexedTable
+ Iterator<GroupKeyGenerator.GroupKey> dicGroupKeyIterator =
aggregationGroupByResult.getGroupKeyIterator();
+ while (dicGroupKeyIterator.hasNext()) {
+ GroupKeyGenerator.GroupKey groupKey = dicGroupKeyIterator.next();
+ Object[] keys = groupKey._keys;
+ Object[] values = Arrays.copyOf(keys, _numColumns);
+ int groupId = groupKey._groupId;
+ for (int i = 0; i < _numAggregationFunctions; i++) {
+ values[_numGroupByExpressions + i] =
aggregationGroupByResult.getResultForGroupId(i, groupId);
+ }
+ _indexedTable.upsert(new Key(keys), new Record(values));
}
- _indexedTable.upsert(new Key(keys), new Record(values));
+ }
+ } else {
+ for (IntermediateRecord intermediateResult : intermediateRecords) {
+ //TODO: change upsert api so that it accepts intermediateRecord
directly
+ _indexedTable.upsert(intermediateResult._key,
intermediateResult._record);
}
}
} catch (EarlyTerminationException e) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java
index 883d718..ebc6947 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java
@@ -18,8 +18,11 @@
*/
package org.apache.pinot.core.operator.query;
+import java.util.Collection;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.data.table.IntermediateRecord;
+import org.apache.pinot.core.data.table.TableResizer;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.operator.ExecutionStatistics;
import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
@@ -28,8 +31,11 @@ import
org.apache.pinot.core.operator.transform.TransformOperator;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.aggregation.groupby.DefaultGroupByExecutor;
import org.apache.pinot.core.query.aggregation.groupby.GroupByExecutor;
+import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.startree.executor.StarTreeGroupByExecutor;
+import static org.apache.pinot.core.util.GroupByUtils.getTableCapacity;
+
/**
* The <code>AggregationGroupByOrderByOperator</code> class provides the
operator for aggregation group-by query on a
@@ -43,16 +49,19 @@ public class AggregationGroupByOrderByOperator extends
BaseOperator<Intermediate
private final ExpressionContext[] _groupByExpressions;
private final int _maxInitialResultHolderCapacity;
private final int _numGroupsLimit;
+ private final int _minSegmentTrimSize;
private final TransformOperator _transformOperator;
private final long _numTotalDocs;
private final boolean _useStarTree;
private final DataSchema _dataSchema;
+ private final QueryContext _queryContext;
private int _numDocsScanned = 0;
public AggregationGroupByOrderByOperator(AggregationFunction[]
aggregationFunctions,
ExpressionContext[] groupByExpressions, int
maxInitialResultHolderCapacity, int numGroupsLimit,
- TransformOperator transformOperator, long numTotalDocs, boolean
useStarTree) {
+ int minSegmentTrimSize, TransformOperator transformOperator, long
numTotalDocs, QueryContext queryContext,
+ boolean useStarTree) {
_aggregationFunctions = aggregationFunctions;
_groupByExpressions = groupByExpressions;
_maxInitialResultHolderCapacity = maxInitialResultHolderCapacity;
@@ -60,6 +69,8 @@ public class AggregationGroupByOrderByOperator extends
BaseOperator<Intermediate
_transformOperator = transformOperator;
_numTotalDocs = numTotalDocs;
_useStarTree = useStarTree;
+ _queryContext = queryContext;
+ _minSegmentTrimSize = minSegmentTrimSize;
// NOTE: The indexedTable expects that the the data schema will have group
by columns before aggregation columns
int numGroupByExpressions = groupByExpressions.length;
@@ -106,8 +117,20 @@ public class AggregationGroupByOrderByOperator extends
BaseOperator<Intermediate
groupByExecutor.process(transformBlock);
}
- // Build intermediate result block based on aggregation group-by result
from the executor
- return new IntermediateResultsBlock(_aggregationFunctions,
groupByExecutor.getResult(), _dataSchema);
+ // There is no OrderBy or minSegmentTrimSize is set to be negative or 0
+ if (_queryContext.getOrderByExpressions() == null || _minSegmentTrimSize
<= 0) {
+ // Build intermediate result block based on aggregation group-by result
from the executor
+ return new IntermediateResultsBlock(_aggregationFunctions,
groupByExecutor.getResult(), _dataSchema);
+ }
+ int trimSize = getTableCapacity(_queryContext.getLimit(),
_minSegmentTrimSize);
+ // Num of groups hasn't reached the threshold
+ if (groupByExecutor.getNumGroups() <= trimSize) {
+ return new IntermediateResultsBlock(_aggregationFunctions,
groupByExecutor.getResult(), _dataSchema);
+ }
+ // Trim
+ TableResizer tableResizer = new TableResizer(_dataSchema, _queryContext);
+ Collection<IntermediateRecord> intermediateRecords =
groupByExecutor.trimGroupByResult(trimSize, tableResizer);
+ return new IntermediateResultsBlock(_aggregationFunctions,
intermediateRecords, _dataSchema);
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
index a392948..915eb5e 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
@@ -33,7 +33,6 @@ import org.apache.pinot.segment.spi.IndexSegment;
import
org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
-
/**
* The <code>AggregationGroupByOrderByPlanNode</code> class provides the
execution plan for aggregation group-by order-by query on a
* single segment.
@@ -43,13 +42,15 @@ public class AggregationGroupByOrderByPlanNode implements
PlanNode {
private final IndexSegment _indexSegment;
private final int _maxInitialResultHolderCapacity;
private final int _numGroupsLimit;
+ private final int _minSegmentTrimSize;
private final AggregationFunction[] _aggregationFunctions;
private final ExpressionContext[] _groupByExpressions;
private final TransformPlanNode _transformPlanNode;
private final StarTreeTransformPlanNode _starTreeTransformPlanNode;
+ private final QueryContext _queryContext;
public AggregationGroupByOrderByPlanNode(IndexSegment indexSegment,
QueryContext queryContext,
- int maxInitialResultHolderCapacity, int numGroupsLimit) {
+ int maxInitialResultHolderCapacity, int numGroupsLimit, int
minSegmentTrimSize) {
_indexSegment = indexSegment;
_maxInitialResultHolderCapacity = maxInitialResultHolderCapacity;
_numGroupsLimit = numGroupsLimit;
@@ -58,6 +59,8 @@ public class AggregationGroupByOrderByPlanNode implements
PlanNode {
List<ExpressionContext> groupByExpressions =
queryContext.getGroupByExpressions();
assert groupByExpressions != null;
_groupByExpressions = groupByExpressions.toArray(new ExpressionContext[0]);
+ _queryContext = queryContext;
+ _minSegmentTrimSize = minSegmentTrimSize;
List<StarTreeV2> starTrees = indexSegment.getStarTrees();
if (starTrees != null && !StarTreeUtils.isStarTreeDisabled(queryContext)) {
@@ -95,11 +98,13 @@ public class AggregationGroupByOrderByPlanNode implements
PlanNode {
if (_transformPlanNode != null) {
// Do not use star-tree
return new AggregationGroupByOrderByOperator(_aggregationFunctions,
_groupByExpressions,
- _maxInitialResultHolderCapacity, _numGroupsLimit,
_transformPlanNode.run(), numTotalDocs, false);
+ _maxInitialResultHolderCapacity, _numGroupsLimit,
_minSegmentTrimSize, _transformPlanNode.run(), numTotalDocs,
+ _queryContext, false);
} else {
// Use star-tree
return new AggregationGroupByOrderByOperator(_aggregationFunctions,
_groupByExpressions,
- _maxInitialResultHolderCapacity, _numGroupsLimit,
_starTreeTransformPlanNode.run(), numTotalDocs, true);
+ _maxInitialResultHolderCapacity, _numGroupsLimit,
_minSegmentTrimSize, _starTreeTransformPlanNode.run(),
+ numTotalDocs, _queryContext, true);
}
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index 4c801e3..3f94d80 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -44,6 +44,7 @@ import
org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils
import org.apache.pinot.core.query.config.QueryExecutorConfig;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
+import org.apache.pinot.core.util.GroupByUtils;
import org.apache.pinot.core.util.QueryOptions;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.segment.spi.IndexSegment;
@@ -56,28 +57,32 @@ import org.slf4j.LoggerFactory;
* The <code>InstancePlanMakerImplV2</code> class is the default
implementation of {@link PlanMaker}.
*/
public class InstancePlanMakerImplV2 implements PlanMaker {
- private static final Logger LOGGER =
LoggerFactory.getLogger(InstancePlanMakerImplV2.class);
-
public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY_KEY =
"max.init.group.holder.capacity";
public static final int DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY = 10_000;
public static final String NUM_GROUPS_LIMIT = "num.groups.limit";
public static final int DEFAULT_NUM_GROUPS_LIMIT = 100_000;
-
+ public static final String ENABLE_SEGMENT_GROUP_TRIM =
"enable.segment.group.trim";
+ public static final boolean DEFAULT_ENABLE_SEGMENT_GROUP_TRIM = false;
+ public static final String MIN_SEGMENT_GROUP_TRIM_SIZE =
"min.segment.group.trim.size";
+ public static final int DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE = -1;
// set as pinot.server.query.executor.groupby.trim.threshold
public static final String GROUPBY_TRIM_THRESHOLD = "groupby.trim.threshold";
public static final int DEFAULT_GROUPBY_TRIM_THRESHOLD = 1_000_000;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(InstancePlanMakerImplV2.class);
private final int _maxInitialResultHolderCapacity;
// Limit on number of groups stored for each segment, beyond which no new
group will be created
private final int _numGroupsLimit;
// Used for SQL GROUP BY (server combine)
private final int _groupByTrimThreshold;
+ private final int _minSegmentGroupTrimSize;
@VisibleForTesting
public InstancePlanMakerImplV2() {
_maxInitialResultHolderCapacity =
DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY;
_numGroupsLimit = DEFAULT_NUM_GROUPS_LIMIT;
_groupByTrimThreshold = DEFAULT_GROUPBY_TRIM_THRESHOLD;
+ _minSegmentGroupTrimSize = DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE;
}
@VisibleForTesting
@@ -85,6 +90,15 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
_maxInitialResultHolderCapacity = maxInitialResultHolderCapacity;
_numGroupsLimit = numGroupsLimit;
_groupByTrimThreshold = DEFAULT_GROUPBY_TRIM_THRESHOLD;
+ _minSegmentGroupTrimSize = DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE;
+ }
+
+ @VisibleForTesting
+ public InstancePlanMakerImplV2(int minSegmentGroupTrimSize) {
+ _maxInitialResultHolderCapacity =
DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY;
+ _numGroupsLimit = DEFAULT_NUM_GROUPS_LIMIT;
+ _groupByTrimThreshold = DEFAULT_GROUPBY_TRIM_THRESHOLD;
+ _minSegmentGroupTrimSize = minSegmentGroupTrimSize;
}
/**
@@ -105,8 +119,22 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
Preconditions.checkState(_maxInitialResultHolderCapacity <=
_numGroupsLimit,
"Invalid configuration: maxInitialResultHolderCapacity: %d must be
smaller or equal to numGroupsLimit: %d",
_maxInitialResultHolderCapacity, _numGroupsLimit);
- LOGGER.info("Initializing plan maker with maxInitialResultHolderCapacity:
{}, numGroupsLimit: {}",
- _maxInitialResultHolderCapacity, _numGroupsLimit);
+ boolean enableSegmentGroupTrim =
+ queryExecutorConfig.getConfig().getProperty(ENABLE_SEGMENT_GROUP_TRIM,
DEFAULT_ENABLE_SEGMENT_GROUP_TRIM);
+ int minSegmentGroupTrimSize =
+
queryExecutorConfig.getConfig().getProperty(MIN_SEGMENT_GROUP_TRIM_SIZE,
DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE);
+
+ if (minSegmentGroupTrimSize > 0) {
+ _minSegmentGroupTrimSize = minSegmentGroupTrimSize;
+ } else if (enableSegmentGroupTrim) {
+ _minSegmentGroupTrimSize = GroupByUtils.DEFAULT_MIN_NUM_GROUPS;
+ } else {
+ _minSegmentGroupTrimSize = DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE;
+ }
+ LOGGER.info(
+ "Initializing plan maker with maxInitialResultHolderCapacity: {},
numGroupsLimit: {}, enableSegmentTrim: {}, minSegmentGroupTrimSize: {}",
+ _maxInitialResultHolderCapacity, _numGroupsLimit,
minSegmentGroupTrimSize > 0 || enableSegmentGroupTrim,
+ _minSegmentGroupTrimSize);
}
@Override
@@ -138,7 +166,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
// new Combine operator only when GROUP_BY_MODE explicitly set to SQL
if (queryOptions.isGroupByModeSQL()) {
return new AggregationGroupByOrderByPlanNode(indexSegment,
queryContext, _maxInitialResultHolderCapacity,
- _numGroupsLimit);
+ _numGroupsLimit, _minSegmentGroupTrimSize);
}
return new AggregationGroupByPlanNode(indexSegment, queryContext,
_maxInitialResultHolderCapacity,
_numGroupsLimit);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java
index d67b4f2..de9cfa9 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java
@@ -18,9 +18,12 @@
*/
package org.apache.pinot.core.query.aggregation.groupby;
+import java.util.Collection;
import java.util.Map;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.data.table.IntermediateRecord;
+import org.apache.pinot.core.data.table.TableResizer;
import org.apache.pinot.core.operator.blocks.TransformBlock;
import org.apache.pinot.core.operator.transform.TransformOperator;
import org.apache.pinot.core.operator.transform.TransformResultMetadata;
@@ -144,4 +147,14 @@ public class DefaultGroupByExecutor implements
GroupByExecutor {
public AggregationGroupByResult getResult() {
return new AggregationGroupByResult(_groupKeyGenerator,
_aggregationFunctions, _groupByResultHolders);
}
+
+ @Override
+ public int getNumGroups() {
+ return _groupKeyGenerator.getNumKeys();
+ }
+
+ @Override
+ public Collection<IntermediateRecord> trimGroupByResult(int trimSize,
TableResizer tableResizer) {
+ return
tableResizer.trimInSegmentResults(_groupKeyGenerator.getGroupKeys(),
_groupByResultHolders, trimSize);
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
index 79b7afe..3c63a0d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
@@ -206,6 +206,9 @@ public class DictionaryBasedGroupKeyGenerator implements
GroupKeyGenerator {
return _rawKeyHolder.getStringGroupKeys();
}
+ @Override
+ public int getNumKeys() { return _rawKeyHolder.getNumKeys(); }
+
private interface RawKeyHolder {
/**
@@ -240,11 +243,18 @@ public class DictionaryBasedGroupKeyGenerator implements
GroupKeyGenerator {
* Returns an iterator of {@link StringGroupKey}. Use this interface to
iterate through all the group keys.
*/
Iterator<StringGroupKey> getStringGroupKeys();
+
+ /**
+ * Returns current number of unique keys
+ */
+ int getNumKeys();
+
}
private class ArrayBasedHolder implements RawKeyHolder {
// TODO: using bitmap might better
private final boolean[] _flags = new boolean[_globalGroupIdUpperBound];
+ private int _numKeys = 0;
@Override
public void processSingleValue(int numDocs, int[] outGroupIds) {
@@ -254,6 +264,10 @@ public class DictionaryBasedGroupKeyGenerator implements
GroupKeyGenerator {
groupId = groupId * _cardinalities[j] + _singleValueDictIds[j][i];
}
outGroupIds[i] = groupId;
+ // if the flag is false, then increase the key num
+ if (!_flags[groupId]) {
+ _numKeys++;
+ }
_flags[groupId] = true;
}
}
@@ -263,6 +277,9 @@ public class DictionaryBasedGroupKeyGenerator implements
GroupKeyGenerator {
for (int i = 0; i < numDocs; i++) {
int[] groupIds = getIntRawKeys(i);
for (int groupId : groupIds) {
+ if (!_flags[groupId]) {
+ _numKeys++;
+ }
_flags[groupId] = true;
}
outGroupIds[i] = groupIds;
@@ -331,6 +348,11 @@ public class DictionaryBasedGroupKeyGenerator implements
GroupKeyGenerator {
}
};
}
+
+ @Override
+ public int getNumKeys() {
+ return _numKeys;
+ }
}
private class IntMapBasedHolder implements RawKeyHolder {
@@ -419,6 +441,11 @@ public class DictionaryBasedGroupKeyGenerator implements
GroupKeyGenerator {
}
};
}
+
+ @Override
+ public int getNumKeys() {
+ return _groupIdMap.size();
+ }
}
/**
@@ -634,6 +661,10 @@ public class DictionaryBasedGroupKeyGenerator implements
GroupKeyGenerator {
}
};
}
+ @Override
+ public int getNumKeys() {
+ return _groupIdMap.size();
+ }
}
/**
@@ -836,6 +867,11 @@ public class DictionaryBasedGroupKeyGenerator implements
GroupKeyGenerator {
}
};
}
+
+ @Override
+ public int getNumKeys() {
+ return _groupIdMap.size();
+ }
}
/**
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupByExecutor.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupByExecutor.java
index c1266c5..869ef5d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupByExecutor.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupByExecutor.java
@@ -18,6 +18,9 @@
*/
package org.apache.pinot.core.query.aggregation.groupby;
+import java.util.Collection;
+import org.apache.pinot.core.data.table.IntermediateRecord;
+import org.apache.pinot.core.data.table.TableResizer;
import org.apache.pinot.core.operator.blocks.TransformBlock;
@@ -40,4 +43,19 @@ public interface GroupByExecutor {
* @return Result of aggregation
*/
AggregationGroupByResult getResult();
+
+ /**
+ * Returns the number of generated results
+ *
+ * @return Number of results
+ */
+ int getNumGroups();
+
+ /**
+ * Trim the GroupBy result up to the threshold max(configurable_threshold *
5, minTrimSize)
+ * TODO: benchmark the performance of PQ vs. topK
+ * <p>Should be called after all transform blocks has been processed.
+ *
+ */
+ Collection<IntermediateRecord> trimGroupByResult(int trimSize, TableResizer
tableResizer);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupKeyGenerator.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupKeyGenerator.java
index c9ceaea..592ad19 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupKeyGenerator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupKeyGenerator.java
@@ -78,6 +78,11 @@ public interface GroupKeyGenerator {
Iterator<StringGroupKey> getStringGroupKeys();
/**
+ * Return current number of unique keys
+ */
+ int getNumKeys();
+
+ /**
* This class encapsulates the integer group id and the group keys.
*/
class GroupKey {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionaryMultiColumnGroupKeyGenerator.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionaryMultiColumnGroupKeyGenerator.java
index d8c7c8e..9f9422a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionaryMultiColumnGroupKeyGenerator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionaryMultiColumnGroupKeyGenerator.java
@@ -337,6 +337,11 @@ public class NoDictionaryMultiColumnGroupKeyGenerator
implements GroupKeyGenerat
}
}
+ @Override
+ public int getNumKeys() {
+ return _groupKeyMap.size();
+ }
+
/**
* Iterator for {@link GroupKey}.
*/
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionarySingleColumnGroupKeyGenerator.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionarySingleColumnGroupKeyGenerator.java
index 12b827b..cfee809 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionarySingleColumnGroupKeyGenerator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionarySingleColumnGroupKeyGenerator.java
@@ -200,6 +200,11 @@ public class NoDictionarySingleColumnGroupKeyGenerator
implements GroupKeyGenera
}
}
+ @Override
+ public int getNumKeys() {
+ return _groupKeyMap.size();
+ }
+
private int getKeyForValue(int value) {
Int2IntMap map = (Int2IntMap) _groupKeyMap;
int groupId = map.get(value);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java
index dd29a29..bf0aa03 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java
@@ -25,7 +25,7 @@ public final class GroupByUtils {
private GroupByUtils() {
}
- private static final int NUM_RESULTS_LOWER_LIMIT = 5000;
+ public static final int DEFAULT_MIN_NUM_GROUPS = 5000;
/**
* (For PQL semantic) Returns the capacity of the table required by the
given query.
@@ -33,7 +33,16 @@ public final class GroupByUtils {
* in PQL semantic.
*/
public static int getTableCapacity(int limit) {
- return Math.max(limit * 5, NUM_RESULTS_LOWER_LIMIT);
+ return Math.max(limit * 5, DEFAULT_MIN_NUM_GROUPS);
+ }
+
+ /**
+ * Returns the capacity of the table required by the given query.
+ * NOTE: It returns {@code max(limit * 5, minNumGroups)} where minNumGroups
is configured by the user
+ * (Default: 5000)
+ */
+ public static int getTableCapacity(int limit, int minNumGroups) {
+ return Math.max(limit * 5, minNumGroups);
}
/**
@@ -46,7 +55,7 @@ public final class GroupByUtils {
public static int getTableCapacity(QueryContext queryContext) {
int limit = queryContext.getLimit();
if (queryContext.getOrderByExpressions() != null ||
queryContext.getHavingFilter() != null) {
- return Math.max(limit * 5, NUM_RESULTS_LOWER_LIMIT);
+ return Math.max(limit * 5, DEFAULT_MIN_NUM_GROUPS);
} else {
return limit;
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java
index 0f8eb1a..bf85846 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java
@@ -21,9 +21,15 @@ package org.apache.pinot.core.data.table;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.PriorityQueue;
import org.apache.pinot.common.utils.DataSchema;
+import
org.apache.pinot.core.query.aggregation.groupby.DoubleGroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
+import
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.apache.pinot.segment.local.customobject.AvgPair;
import org.testng.annotations.BeforeClass;
@@ -43,10 +49,13 @@ public class TableResizerTest {
new DataSchema(new String[]{"d1", "d2", "d3", "sum(m1)", "max(m2)",
"distinctcount(m3)", "avg(m4)"},
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.DOUBLE,
DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
DataSchema.ColumnDataType.OBJECT, DataSchema.ColumnDataType.OBJECT});
private static final int TRIM_TO_SIZE = 3;
+ private static final int NUM_RESULT_HOLDER = 4;
private Map<Key, Record> _recordsMap;
private List<Record> _records;
private List<Key> _keys;
+ private List<GroupKeyGenerator.GroupKey> _groupKeys;
+ private GroupByResultHolder[] _groupByResultHolders;
@BeforeClass
public void setUp() {
@@ -66,6 +75,35 @@ public class TableResizerTest {
new Key(new Object[]{"c", 300, 5.0})
);
//@formatter:on
+ List<Object[]> objectArray = Arrays.asList(
+ new Object[]{"a", 10, 1.0},
+ new Object[]{"b", 10, 2.0},
+ new Object[]{"c", 200, 3.0},
+ new Object[]{"c", 50, 4.0},
+ new Object[]{"c", 300, 5.0});
+
+ // Use _keys for _groupKeys
+ _groupKeys = new LinkedList<>();
+ for (int i = 0; i < _keys.size(); ++i) {
+ GroupKeyGenerator.GroupKey groupKey = new GroupKeyGenerator.GroupKey();
+ groupKey._keys = objectArray.get(i);
+ groupKey._groupId = i;
+ _groupKeys.add(groupKey);
+ }
+
+ // groupByResults are the same as _records
+ _groupByResultHolders = new GroupByResultHolder[NUM_RESULT_HOLDER];
+ _groupByResultHolders[0] = new
DoubleGroupByResultHolder(_groupKeys.size(), _groupKeys.size(), 0.0);
+ _groupByResultHolders[1] = new
DoubleGroupByResultHolder(_groupKeys.size(), _groupKeys.size(), 0.0);
+ _groupByResultHolders[2] = new
ObjectGroupByResultHolder(_groupKeys.size(), _groupKeys.size());
+ _groupByResultHolders[3] = new
ObjectGroupByResultHolder(_groupKeys.size(), _groupKeys.size());
+ for (int i = 0; i < _groupKeys.size(); ++i) {
+ _groupByResultHolders[0].setValueForKey(_groupKeys.get(i)._groupId,
(double)_records.get(i).getValues()[3]);
+ _groupByResultHolders[1].setValueForKey(_groupKeys.get(i)._groupId,
(double)_records.get(i).getValues()[4]);
+ _groupByResultHolders[2].setValueForKey(_groupKeys.get(i)._groupId,
_records.get(i).getValues()[5]);
+ _groupByResultHolders[3].setValueForKey(_groupKeys.get(i)._groupId,
_records.get(i).getValues()[6]);
+ }
+
_recordsMap = new HashMap<>();
int numRecords = _records.size();
for (int i = 0; i < numRecords; i++) {
@@ -289,4 +327,51 @@ public class TableResizerTest {
assertEquals(sortedRecords.get(1), _records.get(0));
assertEquals(sortedRecords.get(2), _records.get(3));
}
+
+ /**
+ * Tests in-segment trim from 15 records to 10 records
+ */
+ @Test
+ public void testInSegmentTrim() {
+ TableResizer tableResizer =
+ new TableResizer(DATA_SCHEMA,
QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d3 DESC"));
+ PriorityQueue<IntermediateRecord> results =
+ tableResizer.trimInSegmentResults(_groupKeys.listIterator(),
_groupByResultHolders, TRIM_TO_SIZE);
+ assertEquals(results.size(), TRIM_TO_SIZE);
+ IntermediateRecord[] resultArray = new IntermediateRecord[results.size()];
+ for (int i = 0; i < TRIM_TO_SIZE; ++i) {
+ IntermediateRecord result = results.poll();
+ resultArray[i] = result;
+ }
+ // _records[4], _records[3], _records[2]
+ assertEquals(resultArray[0]._record, _records.get(2));
+ assertEquals(resultArray[1]._record, _records.get(3));
+ assertEquals(resultArray[2]._record, _records.get(4));
+
+ tableResizer = new TableResizer(DATA_SCHEMA, QueryContextConverterUtils
+ .getQueryContextFromSQL(QUERY_PREFIX + "SUM(m1) DESC, max(m2) DESC,
DISTINCTCOUNT(m3) DESC"));
+ results = tableResizer.trimInSegmentResults(_groupKeys.listIterator(),
_groupByResultHolders, TRIM_TO_SIZE);
+ assertEquals(results.size(), TRIM_TO_SIZE);
+ for (int i = 0; i < TRIM_TO_SIZE; ++i) {
+ IntermediateRecord result = results.poll();
+ resultArray[i] = result;
+ }
+ // _records[2], _records[3], _records[1]
+ assertEquals(resultArray[0]._record, _records.get(1));
+ assertEquals(resultArray[1]._record, _records.get(3));
+ assertEquals(resultArray[2]._record, _records.get(2));
+
+ tableResizer = new TableResizer(DATA_SCHEMA,
+ QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX +
"DISTINCTCOUNT(m3) DESC, AVG(m4) ASC"));
+ results = tableResizer.trimInSegmentResults(_groupKeys.listIterator(),
_groupByResultHolders, TRIM_TO_SIZE);
+ assertEquals(results.size(), TRIM_TO_SIZE);
+ for (int i = 0; i < TRIM_TO_SIZE; ++i) {
+ IntermediateRecord result = results.poll();
+ resultArray[i] = result;
+ }
+ // _records[2], _records[3], _records[1]
+ assertEquals(resultArray[0]._record, _records.get(1));
+ assertEquals(resultArray[1]._record, _records.get(3));
+ assertEquals(resultArray[2]._record, _records.get(4));
+ }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/groupby/GroupByInSegmentTrimTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/groupby/GroupByInSegmentTrimTest.java
new file mode 100644
index 0000000..c50ef8e
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/groupby/GroupByInSegmentTrimTest.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.groupby;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.data.table.IntermediateRecord;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.query.AggregationGroupByOrderByOperator;
+import org.apache.pinot.core.plan.AggregationGroupByOrderByPlanNode;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.Pair;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static java.lang.Math.max;
+import static org.testng.Assert.assertEquals;
+
+
+/**
+ * Unit test for GroupBy Trim functionalities.
+ * - Builds a segment with random data.
+ * - Uses AggregationGroupByOrderByPlanNode class to construct an
AggregationGroupByOrderByOperator
+ * - Perform aggregationGroupBy and OrderBy on the data
+ * - Also computes those results itself.
+ * - Asserts that the aggregation results returned by the class are the same as
+ * returned by the local computations.
+ *
+ * Currently tests 'max' functions, and can be easily extended to
+ * test other conditions such as GroupBy without OrderBy
+ */
+public class GroupByInSegmentTrimTest {
+ private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(),
"GroupByInSegmentTrimTest");
+ private static final String SEGMENT_NAME = "TestGroupByInSegment";
+
+ private static final String METRIC_PREFIX = "metric_";
+ private static final int NUM_ROWS = 1000;
+ private static final int NUM_COLUMN = 2;
+ private static final int MAX_INITIAL_RESULT_HOLDER_CAPACITY = 10_000;
+ private static final int NUM_GROUPS_LIMIT = 100_000;
+ private static IndexSegment _indexSegment;
+ private static String[] _columns;
+ private static double[][] _inputData;
+ private static Map<Double, Double> _resultMap;
+
+ /**
+ * Initializations prior to the test:
+ * - Build a segment with metric columns (that will be aggregated and
grouped) containing
+ * randomly generated data.
+ *
+ * @throws Exception
+ */
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ _resultMap = new HashMap<>();
+ // Current Schema: Columns: metrics_0(double), metrics_1(double)
+ _inputData = new double[NUM_COLUMN][NUM_ROWS];
+ _columns = new String[NUM_COLUMN];
+ setupSegment();
+ }
+
+ /**
+ * Test the GroupBy OrderBy query and compute the expected results to match
+ */
+ @Test(dataProvider = "QueryDataProvider")
+ void TestGroupByOrderByOperator(int trimSize, List<Pair<Double, Double>>
expectedResult, QueryContext queryContext) {
+ // Create a query plan
+ AggregationGroupByOrderByPlanNode aggregationGroupByOrderByPlanNode =
+ new AggregationGroupByOrderByPlanNode(_indexSegment, queryContext,
MAX_INITIAL_RESULT_HOLDER_CAPACITY,
+ NUM_GROUPS_LIMIT, trimSize);
+
+ // Get the query executor
+ AggregationGroupByOrderByOperator aggregationGroupByOrderByOperator =
aggregationGroupByOrderByPlanNode.run();
+
+ // Extract the execution result
+ IntermediateResultsBlock resultsBlock =
aggregationGroupByOrderByOperator.nextBlock();
+ ArrayList<Pair<Double, Double>> extractedResult =
extractTestResult(resultsBlock);
+
+ assertEquals(extractedResult, expectedResult);
+ }
+
+ /**
+ * Helper method to setup the index segment on which to perform aggregation
tests.
+ * - Generates a segment with {@link #NUM_COLUMN} and {@link #NUM_ROWS}
+ * - Random 'double' data filled in the metric columns. The data is also
populated
+ * into the _inputData[], so it can be used to test the results.
+ *
+ * @throws Exception
+ */
+ private void setupSegment()
+ throws Exception {
+ if (INDEX_DIR.exists()) {
+ FileUtils.deleteQuietly(INDEX_DIR);
+ }
+
+ // Segment Config
+ SegmentGeneratorConfig config =
+ new SegmentGeneratorConfig(new
TableConfigBuilder(TableType.OFFLINE).setTableName("test").build(),
+ buildSchema());
+ config.setSegmentName(SEGMENT_NAME);
+ config.setOutDir(INDEX_DIR.getAbsolutePath());
+
+ // Fill the data table
+ List<GenericRow> rows = new ArrayList<>(NUM_ROWS);
+ int baseValue = 10;
+ for (int i = 0; i < NUM_ROWS; i++) {
+ GenericRow genericRow = new GenericRow();
+
+ for (int j = 0; j < _columns.length; j++) {
+ String metricName = _columns[j];
+ double value = baseValue + i + j;
+ _inputData[j][i] = value;
+ genericRow.putValue(metricName, value);
+ }
+ // Compute the max result and insert into a grouped map
+ computeMaxResult(_inputData[0][i], _inputData[1][i]);
+ rows.add(genericRow);
+ baseValue += 10;
+ }
+
+ SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
+ driver.init(config, new GenericRowRecordReader(rows));
+ driver.build();
+
+ _indexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR,
driver.getSegmentName()), ReadMode.heap);
+ }
+
+ /**
+ * Helper method to build schema for the segment on which aggregation tests
will be run.
+ *
+ * @return table schema
+ */
+ private Schema buildSchema() {
+ Schema schema = new Schema();
+
+ for (int i = 0; i < NUM_COLUMN; i++) {
+ String metricName = METRIC_PREFIX + i;
+ MetricFieldSpec metricFieldSpec = new MetricFieldSpec(metricName,
FieldSpec.DataType.DOUBLE);
+ schema.addField(metricFieldSpec);
+ _columns[i] = metricName;
+ }
+ return schema;
+ }
+
+ /**
+ * Helper method to compute the aggregation result grouped by the key
+ *
+ */
+ private void computeMaxResult(Double key, Double result) {
+ if (_resultMap.get(key) == null || _resultMap.get(key) < result) {
+ _resultMap.put(key, result);
+ }
+ }
+
+ /**
+ * Helper method to extract the result from IntermediateResultsBlock
+ *
+ * @return A list of expected results
+ */
+ private ArrayList<Pair<Double, Double>>
extractTestResult(IntermediateResultsBlock resultsBlock) {
+ AggregationGroupByResult result =
resultsBlock.getAggregationGroupByResult();
+ if (result != null) {
+ // No trim
+ return extractAggregationResult(result);
+ } else {
+ // In case of trim
+ return extractIntermediateResult(resultsBlock.getIntermediateRecords());
+ }
+ }
+
+ /**
+ * Helper method to extract the result from AggregationGroupByResult
+ *
+ * @return A list of expected results
+ */
+ private ArrayList<Pair<Double, Double>>
extractAggregationResult(AggregationGroupByResult aggregationGroupByResult) {
+ ArrayList<Pair<Double, Double>> result = new ArrayList<>();
+ Iterator<GroupKeyGenerator.GroupKey> iterator =
aggregationGroupByResult.getGroupKeyIterator();
+ int i = 0;
+ while (iterator.hasNext()) {
+ GroupKeyGenerator.GroupKey groupKey = iterator.next();
+ Double key = (Double) groupKey._keys[0];
+ Double value = (Double) aggregationGroupByResult.getResultForGroupId(i,
groupKey._groupId);
+ result.add(new Pair<>(key, value));
+ }
+ result.sort((o1, o2) -> (int) (o2.getSecond() - o1.getSecond()));
+ return result;
+ }
+
+ /**
+ * Helper method to extract the result from Collection<IntermediateRecord>
+ *
+ * @return A list of expected results
+ */
+ private ArrayList<Pair<Double, Double>>
extractIntermediateResult(Collection<IntermediateRecord> intermediateRecord) {
+ ArrayList<Pair<Double, Double>> result = new ArrayList<>();
+ PriorityQueue<IntermediateRecord> resultPQ = new
PriorityQueue<>(intermediateRecord);
+ while (!resultPQ.isEmpty()) {
+ IntermediateRecord head = resultPQ.poll();
+ result.add(new Pair<>((Double) head._record.getValues()[0], (Double)
head._record.getValues()[1]));
+ }
+ Collections.reverse(result);
+ return result;
+ }
+
+ @DataProvider
+ public static Object[][] QueryDataProvider() {
+ List<Object[]> data = new ArrayList<>();
+ ArrayList<Pair<Double, Double>> expectedResult = computeExpectedResult();
+ // Testcase1: low limit + high trim size
+ QueryContext queryContext =
QueryContextConverterUtils.getQueryContextFromSQL(
+ "SELECT metric_0, max(metric_1) FROM testTable GROUP BY metric_0 ORDER
BY max(metric_1) DESC LIMIT 1");
+ int trimSize = 100;
+ int expectedSize = max(trimSize, 5 * queryContext.getLimit());
+ data.add(new Object[]{trimSize, expectedResult.subList(0, expectedSize),
queryContext});
+ // Testcase2: high limit + low trim size
+ queryContext = QueryContextConverterUtils.getQueryContextFromSQL(
+ "SELECT metric_0, max(metric_1) FROM testTable GROUP BY metric_0 ORDER
BY max(metric_1) DESC LIMIT 50");
+ trimSize = 10;
+ expectedSize = max(trimSize, 5 * queryContext.getLimit());
+ data.add(new Object[]{trimSize, expectedResult.subList(0, expectedSize),
queryContext});
+ // Testcase3: high limit + high trim size (No trim)
+ queryContext = QueryContextConverterUtils.getQueryContextFromSQL(
+ "SELECT metric_0, max(metric_1) FROM testTable GROUP BY metric_0 ORDER
BY max(metric_1) DESC LIMIT 500");
+ trimSize = 1000;
+ expectedSize = 1000;
+ data.add(new Object[]{trimSize, expectedResult.subList(0, expectedSize),
queryContext});
+
+ return data.toArray(new Object[data.size()][]);
+ }
+
+ /**
+ * Helper method to compute the expected result
+ *
+ * @return A list of expected results
+ */
+ private static ArrayList<Pair<Double, Double>> computeExpectedResult() {
+ ArrayList<Pair<Double, Double>> result = new ArrayList<>();
+ for (Map.Entry<Double, Double> entry : _resultMap.entrySet()) {
+ result.add(new Pair<>(entry.getKey(), entry.getValue()));
+ }
+ result.sort((o1, o2) -> (int) (o2.getSecond() - o1.getSecond()));
+ return result;
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderByMultiValueQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderByMultiValueQueriesTest.java
index c9ce9db..5dfcead 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderByMultiValueQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderByMultiValueQueriesTest.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -42,6 +43,19 @@ public class InterSegmentOrderByMultiValueQueriesTest
extends BaseMultiValueQuer
}
/**
+ * Tests the in-segment build option for GroupBy OrderBy query. (No trim)
+ */
+ @Test(dataProvider = "orderByDataProvider")
+ public void testGroupByOrderByMVSegmentTrimSQLResults(String query,
List<Object[]> expectedResults,
+ long expectedNumEntriesScannedPostFilter, DataSchema expectedDataSchema)
{
+ InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2(1);
+ BrokerResponseNative brokerResponse = getBrokerResponseForSqlQuery(query,
planMaker);
+ QueriesTestUtils
+ .testInterSegmentResultTable(brokerResponse, 400000L, 0,
expectedNumEntriesScannedPostFilter, 400000L,
+ expectedResults, expectedResults.size(), expectedDataSchema);
+ }
+
+ /**
* Provides various combinations of order by.
* In order to calculate the expected results, the results from a group by
were taken, and then ordered accordingly.
*/
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderBySingleValueQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderBySingleValueQueriesTest.java
index a10e542..ddd2d11 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderBySingleValueQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderBySingleValueQueriesTest.java
@@ -30,6 +30,7 @@ import
org.apache.pinot.common.response.broker.AggregationResult;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.GroupByResult;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
import
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
import org.testng.Assert;
@@ -73,6 +74,21 @@ public class InterSegmentOrderBySingleValueQueriesTest
extends BaseSingleValueQu
}
/**
+ * Tests the in-segment build option for GroupBy OrderBy query. (No trim)
+ */
+ @Test(dataProvider = "orderBySQLResultTableProvider")
+ public void testGroupByOrderByMVSegmentTrimSQLResults(String query,
List<Object[]> expectedResults,
+ long expectedNumDocsScanned, long expectedNumEntriesScannedInFilter,
long expectedNumEntriesScannedPostFilter,
+ long expectedNumTotalDocs, DataSchema expectedDataSchema) {
+ InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2(1);
+ BrokerResponseNative brokerResponse = getBrokerResponseForSqlQuery(query,
planMaker);
+ QueriesTestUtils
+ .testInterSegmentResultTable(brokerResponse, expectedNumDocsScanned,
expectedNumEntriesScannedInFilter,
+ expectedNumEntriesScannedPostFilter, expectedNumTotalDocs,
expectedResults, expectedResults.size(),
+ expectedDataSchema);
+ }
+
+ /**
* Tests the query options for groupByMode, responseFormat.
* pql, pql - does not execute order by, returns aggregationResults
* pql, sql - does not execute order by, returns aggregationResults
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]