This is an automated email from the ASF dual-hosted git repository.
gianm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 4378f10333b perf: Optimize aggregators for groupBy queries. (#19423)
4378f10333b is described below
commit 4378f10333b598ed3d3777f96ccea009fc327bab
Author: Gian Merlino <[email protected]>
AuthorDate: Thu May 7 01:13:41 2026 -0700
perf: Optimize aggregators for groupBy queries. (#19423)
Previously aggregators were only optimized for timeseries and topN.
This patch adds optimization for groupBy too. It also adds a context
parameter "optimizeAggregators" that can be used to switch off
optimization, in case this is useful for debugging purposes.
---
.../querykit/groupby/GroupByPreShuffleFrameProcessor.java | 6 +++++-
.../druid/query/PerSegmentQueryOptimizationContext.java | 15 +--------------
.../main/java/org/apache/druid/query/QueryContext.java | 8 ++++++++
.../main/java/org/apache/druid/query/QueryContexts.java | 15 ++++++++++++---
.../query/aggregation/FilteredAggregatorFactory.java | 2 +-
.../java/org/apache/druid/query/groupby/GroupByQuery.java | 14 ++++++++++++++
.../apache/druid/query/timeseries/TimeseriesQuery.java | 3 +++
.../main/java/org/apache/druid/query/topn/TopNQuery.java | 3 +++
8 files changed, 47 insertions(+), 19 deletions(-)
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
index ea342efdb6f..e960e9cff79 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
@@ -49,6 +49,7 @@ import org.apache.druid.msq.input.table.SegmentsInputSlice;
import org.apache.druid.msq.querykit.BaseLeafFrameProcessor;
import org.apache.druid.msq.querykit.ReadableInput;
import org.apache.druid.msq.querykit.SegmentReferenceHolder;
+import org.apache.druid.query.PerSegmentQueryOptimizationContext;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.aggregation.MetricManipulatorFns;
import org.apache.druid.query.groupby.GroupByQuery;
@@ -193,8 +194,11 @@ public class GroupByPreShuffleFrameProcessor extends
BaseLeafFrameProcessor
rowSequence =
Sequences.simple(List.of(GroupByTimeBoundaryUtils.computeTimeBoundaryResult(query,
tbi)));
} else {
// Resolve this query using a cursor.
+ final GroupByQuery segmentQuery = (GroupByQuery) query
+ .withQuerySegmentSpec(new
SpecificSegmentSpec(segmentHolder.getDescriptor()))
+ .optimizeForSegment(new
PerSegmentQueryOptimizationContext(segmentHolder.getDescriptor()));
rowSequence = groupingEngine.process(
- query.withQuerySegmentSpec(new
SpecificSegmentSpec(segmentHolder.getDescriptor())),
+ segmentQuery,
Objects.requireNonNull(segment.as(CursorFactory.class)),
tbi,
bufferPool,
diff --git
a/processing/src/main/java/org/apache/druid/query/PerSegmentQueryOptimizationContext.java
b/processing/src/main/java/org/apache/druid/query/PerSegmentQueryOptimizationContext.java
index 46a58776fa3..0d1257cbe46 100644
---
a/processing/src/main/java/org/apache/druid/query/PerSegmentQueryOptimizationContext.java
+++
b/processing/src/main/java/org/apache/druid/query/PerSegmentQueryOptimizationContext.java
@@ -25,19 +25,6 @@ package org.apache.druid.query;
*
* @see PerSegmentOptimizingQueryRunner
*/
-public class PerSegmentQueryOptimizationContext
+public record PerSegmentQueryOptimizationContext(SegmentDescriptor
segmentDescriptor)
{
- private final SegmentDescriptor segmentDescriptor;
-
- public PerSegmentQueryOptimizationContext(
- SegmentDescriptor segmentDescriptor
- )
- {
- this.segmentDescriptor = segmentDescriptor;
- }
-
- public SegmentDescriptor getSegmentDescriptor()
- {
- return segmentDescriptor;
- }
}
diff --git a/processing/src/main/java/org/apache/druid/query/QueryContext.java
b/processing/src/main/java/org/apache/druid/query/QueryContext.java
index 42ed66978ff..39f325d1089 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryContext.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryContext.java
@@ -461,6 +461,14 @@ public class QueryContext
);
}
+ public boolean isOptimizeAggregators()
+ {
+ return getBoolean(
+ QueryContexts.OPTIMIZE_AGGREGATORS_KEY,
+ QueryContexts.DEFAULT_OPTIMIZE_AGGREGATORS
+ );
+ }
+
public long getMaxQueuedBytes(long defaultValue)
{
return getLong(QueryContexts.MAX_QUEUED_BYTES_KEY, defaultValue);
diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java
b/processing/src/main/java/org/apache/druid/query/QueryContexts.java
index 6df392d9571..1cb8aa24cf4 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java
@@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Numbers;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.aggregation.AggregatorFactory;
import javax.annotation.Nullable;
import java.math.BigDecimal;
@@ -66,9 +67,16 @@ public class QueryContexts
public static final String MAX_NUMERIC_IN_FILTERS = "maxNumericInFilters";
public static final String CURSOR_AUTO_ARRANGE_FILTERS =
"cursorAutoArrangeFilters";
public static final String CLONE_QUERY_MODE = "cloneQueryMode";
- // This flag controls whether a SQL join query with left scan should be
attempted to be run as direct table access
- // instead of being wrapped inside a query. With direct table access
enabled, Druid can push down the join operation to
- // data servers.
+ /**
+ * This flag controls whether {@link
AggregatorFactory#optimizeForSegment(PerSegmentQueryOptimizationContext)}
+ * is used. It is undocumented because its main purpose is to help
developers debug issues with the optimizations.
+ */
+ public static final String OPTIMIZE_AGGREGATORS_KEY = "optimizeAggregators";
+ /**
+ * This flag controls whether a SQL join query with left scan should be
attempted to be run as direct table access
+ * instead of being wrapped inside a query. With direct table access
enabled, Druid can push down the join operation to
+ * data servers.
+ */
public static final String SQL_JOIN_LEFT_SCAN_DIRECT =
"enableJoinLeftTableScanDirect";
public static final String USE_FILTER_CNF_KEY = "useFilterCNF";
public static final String NUM_RETRIES_ON_MISSING_SEGMENTS_KEY =
"numRetriesOnMissingSegments";
@@ -176,6 +184,7 @@ public class QueryContexts
public static final boolean DEFAULT_CATALOG_VALIDATION_ENABLED = true;
public static final boolean DEFAULT_USE_NESTED_FOR_UNKNOWN_TYPE_IN_SUBQUERY
= false;
public static final boolean DEFAULT_EXTENDED_FILTERED_SUM_REWRITE_ENABLED =
true;
+ public static final boolean DEFAULT_OPTIMIZE_AGGREGATORS = true;
public static final boolean DEFAULT_CTX_FULL_REPORT = false;
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/FilteredAggregatorFactory.java
b/processing/src/main/java/org/apache/druid/query/aggregation/FilteredAggregatorFactory.java
index 9b8c0d20381..e6ed1cb3da3 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/FilteredAggregatorFactory.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/FilteredAggregatorFactory.java
@@ -234,7 +234,7 @@ public class FilteredAggregatorFactory extends
AggregatorFactory
return this;
}
- Interval segmentInterval =
optimizationContext.getSegmentDescriptor().getInterval();
+ Interval segmentInterval =
optimizationContext.segmentDescriptor().getInterval();
List<Interval> filterIntervals = intervalDimFilter.getIntervals();
List<Interval> excludedFilterIntervals = new ArrayList<>();
List<Interval> effectiveFilterIntervals = new ArrayList<>();
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
index d31f130f665..64a8ca0b5f6 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
@@ -44,6 +44,7 @@ import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.DimensionComparisonUtils;
+import org.apache.druid.query.PerSegmentQueryOptimizationContext;
import org.apache.druid.query.Queries;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
@@ -874,6 +875,19 @@ public class GroupByQuery extends BaseQuery<ResultRow>
return new
Builder(this).setPostAggregatorSpecs(postAggregatorSpecs).build();
}
+ @Override
+ public Query<ResultRow>
optimizeForSegment(PerSegmentQueryOptimizationContext optimizationContext)
+ {
+ if (!context().isOptimizeAggregators()) {
+ return this;
+ }
+ final List<AggregatorFactory> optimizedAggs = new
ArrayList<>(aggregatorSpecs.size());
+ for (AggregatorFactory aggregatorFactory : aggregatorSpecs) {
+
optimizedAggs.add(aggregatorFactory.optimizeForSegment(optimizationContext));
+ }
+ return withAggregatorSpecs(optimizedAggs);
+ }
+
private static void verifyOutputNames(
List<DimensionSpec> dimensions,
List<AggregatorFactory> aggregators,
diff --git
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQuery.java
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQuery.java
index b2165cd5b70..7db72689d1b 100644
---
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQuery.java
+++
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQuery.java
@@ -230,6 +230,9 @@ public class TimeseriesQuery extends
BaseQuery<Result<TimeseriesResultValue>>
@Override
public Query<Result<TimeseriesResultValue>>
optimizeForSegment(PerSegmentQueryOptimizationContext optimizationContext)
{
+ if (!context().isOptimizeAggregators()) {
+ return this;
+ }
return
Druids.TimeseriesQueryBuilder.copy(this).aggregators(optimizeAggs(optimizationContext)).build();
}
diff --git
a/processing/src/main/java/org/apache/druid/query/topn/TopNQuery.java
b/processing/src/main/java/org/apache/druid/query/topn/TopNQuery.java
index aee76989db3..679acd43e8c 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/TopNQuery.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQuery.java
@@ -226,6 +226,9 @@ public class TopNQuery extends
BaseQuery<Result<TopNResultValue>>
@Override
public Query<Result<TopNResultValue>>
optimizeForSegment(PerSegmentQueryOptimizationContext optimizationContext)
{
+ if (!context().isOptimizeAggregators()) {
+ return this;
+ }
return new
TopNQueryBuilder(this).aggregators(optimizeAggs(optimizationContext)).build();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]