This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6b438091b4d Extract trimSize calculation logic to QueryContext (#16526)
6b438091b4d is described below
commit 6b438091b4d92ae61b2f3bf08738a6cc7f730aed
Author: Song Fu <[email protected]>
AuthorDate: Wed Aug 13 14:09:36 2025 -0700
Extract trimSize calculation logic to QueryContext (#16526)
---
.../operator/query/FilteredGroupByOperator.java | 6 ++----
.../pinot/core/operator/query/GroupByOperator.java | 14 +-------------
.../core/query/request/context/QueryContext.java | 22 ++++++++++++++++++++++
3 files changed, 25 insertions(+), 17 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
index af438a5eb30..554539a3721 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
@@ -48,7 +48,6 @@ import
org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.startree.executor.StarTreeGroupByExecutor;
-import org.apache.pinot.core.util.GroupByUtils;
import org.apache.pinot.spi.trace.Tracing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -188,9 +187,8 @@ public class FilteredGroupByOperator extends
BaseOperator<GroupByResultsBlock> {
// - There are more groups than the trim size
// TODO: Currently the groups are not trimmed if there is no ordering
specified. Consider ordering on group-by
// columns if no ordering is specified.
- int minGroupTrimSize = _queryContext.getMinSegmentGroupTrimSize();
- if (_queryContext.getOrderByExpressions() != null && minGroupTrimSize > 0)
{
- int trimSize = GroupByUtils.getTableCapacity(_queryContext.getLimit(),
minGroupTrimSize);
+ int trimSize = _queryContext.getEffectiveSegmentGroupTrimSize();
+ if (trimSize > 0) {
if (groupKeyGenerator.getNumKeys() > trimSize) {
TableResizer tableResizer = new TableResizer(_dataSchema,
_queryContext);
Collection<IntermediateRecord> intermediateRecords =
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java
index 1a15f58e71f..7af165b5da4 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java
@@ -27,7 +27,6 @@ import java.util.stream.Collectors;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.request.context.ExpressionContext;
-import org.apache.pinot.common.request.context.OrderByExpressionContext;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.data.table.IntermediateRecord;
@@ -44,7 +43,6 @@ import
org.apache.pinot.core.query.aggregation.groupby.DefaultGroupByExecutor;
import org.apache.pinot.core.query.aggregation.groupby.GroupByExecutor;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.startree.executor.StarTreeGroupByExecutor;
-import org.apache.pinot.core.util.GroupByUtils;
import org.apache.pinot.spi.trace.Tracing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -140,17 +138,7 @@ public class GroupByOperator extends
BaseOperator<GroupByResultsBlock> {
// - There are more groups than the trim size
// TODO: Currently the groups are not trimmed if there is no ordering
specified. Consider ordering on group-by
// columns if no ordering is specified.
- int minGroupTrimSize = _queryContext.getMinSegmentGroupTrimSize();
- int trimSize = -1;
- List<OrderByExpressionContext> orderByExpressions =
_queryContext.getOrderByExpressions();
- if (!_queryContext.isUnsafeTrim()) {
- // if orderby key is groupby key, and there's no having clause
- // keep at most `limit` rows only
- trimSize = _queryContext.getLimit();
- } else if (orderByExpressions != null && minGroupTrimSize > 0) {
- // max(minSegmentGroupTrimSize, 5 * LIMIT)
- trimSize = GroupByUtils.getTableCapacity(_queryContext.getLimit(),
minGroupTrimSize);
- }
+ int trimSize = _queryContext.getEffectiveSegmentGroupTrimSize();
if (trimSize > 0) {
if (groupByExecutor.getNumGroups() > trimSize) {
TableResizer tableResizer = new TableResizer(_dataSchema,
_queryContext);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
index 64b7ac35648..9de96e29f8a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
@@ -39,6 +39,7 @@ import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import
org.apache.pinot.core.query.aggregation.function.AggregationFunctionFactory;
+import org.apache.pinot.core.util.GroupByUtils;
import org.apache.pinot.core.util.MemoizedClassAssociation;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.spi.config.table.FieldConfig;
@@ -133,6 +134,8 @@ public class QueryContext {
private int _numThreadsExtractFinalResult =
InstancePlanMakerImplV2.DEFAULT_NUM_THREADS_EXTRACT_FINAL_RESULT;
// Parallel chunk size for final reduce
private int _chunkSizeExtractFinalResult =
InstancePlanMakerImplV2.DEFAULT_CHUNK_SIZE_EXTRACT_FINAL_RESULT;
+ // Segment trim size for group by operator
+ private int _effectiveSegmentGroupTrimSize;
// Whether null handling is enabled
private boolean _nullHandlingEnabled;
// Whether server returns the final result
@@ -439,6 +442,7 @@ public class QueryContext {
public void setMinSegmentGroupTrimSize(int minSegmentGroupTrimSize) {
_minSegmentGroupTrimSize = minSegmentGroupTrimSize;
+ _effectiveSegmentGroupTrimSize = calculateEffectiveSegmentGroupTrimSize();
}
public int getMinServerGroupTrimSize() {
@@ -520,6 +524,24 @@ public class QueryContext {
return ((ConcurrentHashMap<K, V>)
_sharedValues.apply(type)).computeIfAbsent(key, mapper);
}
+ public int getEffectiveSegmentGroupTrimSize() {
+ return _effectiveSegmentGroupTrimSize;
+ }
+
+ private int calculateEffectiveSegmentGroupTrimSize() {
+ int minGroupTrimSize = getMinSegmentGroupTrimSize();
+ List<OrderByExpressionContext> orderByExpressions =
getOrderByExpressions();
+ if (!isUnsafeTrim() && !hasFilteredAggregations()) {
+ // if orderby key is groupby key, and there's no having clause, and
there's no filtered aggr,
+ // keep at most `limit` rows only
+ return getLimit();
+ } else if (orderByExpressions != null && minGroupTrimSize > 0) {
+ // otherwise trim to max(minSegmentGroupTrimSize, 5 * LIMIT)
+ return GroupByUtils.getTableCapacity(getLimit(), minGroupTrimSize);
+ }
+ return -1;
+ }
+
/**
* NOTE: For debugging only.
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]