This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9c7e771d22 Fixes filter aggs perf (#10356)
9c7e771d22 is described below
commit 9c7e771d22a0cbd3391593e9976669fff4d4b442
Author: Evan Galpin <[email protected]>
AuthorDate: Wed Mar 1 15:53:47 2023 -0800
Fixes filter aggs perf (#10356)
---
.../operator/filter/CombinedFilterOperator.java | 3 +++
.../function/AggregationFunctionUtils.java | 18 +++++++++------
.../pinot/queries/FilteredAggregationsTest.java | 27 ++++++++++++++++++++++
...erSegmentAggregationSingleValueQueriesTest.java | 12 +++++-----
...terSegmentAggregationMultiValueQueriesTest.java | 2 +-
...SegmentAggregationMultiValueRawQueriesTest.java | 2 +-
6 files changed, 49 insertions(+), 15 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/CombinedFilterOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/CombinedFilterOperator.java
index c0fa2fa04f..9b1b0e0672 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/CombinedFilterOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/CombinedFilterOperator.java
@@ -59,6 +59,9 @@ public class CombinedFilterOperator extends
BaseFilterOperator {
@Override
protected FilterBlock getNextBlock() {
Tracing.activeRecording().setNumChildren(2);
+ if (_mainFilterOperator instanceof MatchAllFilterOperator) {
+ return _subFilterOperator.nextBlock();
+ }
BlockDocIdSet mainFilterDocIdSet =
_mainFilterOperator.nextBlock().getNonScanFilterBLockDocIdSet();
BlockDocIdSet subFilterDocIdSet =
_subFilterOperator.nextBlock().getBlockDocIdSet();
return new FilterBlock(new AndDocIdSet(Arrays.asList(mainFilterDocIdSet,
subFilterDocIdSet), _queryOptions));
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
index 89e9d3d8e5..99393d30f1 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
@@ -223,10 +223,11 @@ public class AggregationFunctionUtils {
// If it is, populate the corresponding filter operator and corresponding
transform operator
assert aggregationFunctions != null;
for (Pair<AggregationFunction, FilterContext> inputPair :
aggregationFunctions) {
- if (inputPair.getLeft() != null) {
- FilterContext currentFilterExpression = inputPair.getRight();
+ AggregationFunction aggFunc = inputPair.getLeft();
+ FilterContext currentFilterExpression = inputPair.getRight();
+ if (currentFilterExpression != null) {
if (filterContextToAggFuncsMap.get(currentFilterExpression) != null) {
-
filterContextToAggFuncsMap.get(currentFilterExpression).getLeft().add(inputPair.getLeft());
+
filterContextToAggFuncsMap.get(currentFilterExpression).getLeft().add(aggFunc);
continue;
}
Pair<FilterPlanNode, BaseFilterOperator> filterPlanOpPair =
@@ -241,10 +242,10 @@ public class AggregationFunctionUtils {
// fetching the relevant TransformOperator when resolving blocks
during aggregation
// execution
List<AggregationFunction> aggFunctionList = new ArrayList<>();
- aggFunctionList.add(inputPair.getLeft());
+ aggFunctionList.add(aggFunc);
filterContextToAggFuncsMap.put(currentFilterExpression,
Pair.of(aggFunctionList, newTransformOperator));
} else {
- nonFilteredAggregationFunctions.add(inputPair.getLeft());
+ nonFilteredAggregationFunctions.add(aggFunc);
}
}
// Convert to array since FilteredGroupByOperator expects it
@@ -255,8 +256,11 @@ public class AggregationFunctionUtils {
}
aggToTransformOpList.add(Pair.of(aggregationFunctionList.toArray(new
AggregationFunction[0]), pair.getRight()));
}
- aggToTransformOpList.add(
- Pair.of(nonFilteredAggregationFunctions.toArray(new
AggregationFunction[0]), mainTransformOperator));
+
+ if (!nonFilteredAggregationFunctions.isEmpty()) {
+ aggToTransformOpList.add(
+ Pair.of(nonFilteredAggregationFunctions.toArray(new
AggregationFunction[0]), mainTransformOperator));
+ }
return aggToTransformOpList;
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/FilteredAggregationsTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/FilteredAggregationsTest.java
index 2ea664ec67..d3636e4fc9 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/FilteredAggregationsTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/FilteredAggregationsTest.java
@@ -94,6 +94,7 @@ public class FilteredAggregationsTest extends BaseQueriesTest
{
invertedIndexCols.add(INT_COL_NAME);
indexLoadingConfig.setInvertedIndexColumns(invertedIndexCols);
+ indexLoadingConfig.setRangeIndexColumns(invertedIndexCols);
ImmutableSegment firstImmutableSegment =
ImmutableSegmentLoader.load(new File(INDEX_DIR, FIRST_SEGMENT_NAME),
indexLoadingConfig);
ImmutableSegment secondImmutableSegment =
@@ -446,4 +447,30 @@ public class FilteredAggregationsTest extends
BaseQueriesTest {
+ "ORDER BY testAvg";
testQuery(filterQuery, nonFilterQuery);
}
+
+ @Test
+ public void testSameNumScannedFilteredAggMatchAll() {
+ // For a single filtered aggregation, the same number of docs should be
scanned regardless of which portions of
+ // the filter are in the filter expression Vs. the main predicate i.e. the
applied filters are commutative.
+ String filterQuery =
+ "SELECT SUM(INT_COL) FILTER(WHERE INT_COL > 25000) testSum FROM
MyTable";
+ String nonFilterQuery =
+ "SELECT SUM(INT_COL) testSum FROM MyTable WHERE INT_COL > 25000";
+ long filterQueryDocsScanned =
getBrokerResponse(filterQuery).getNumDocsScanned();
+ long nonFilterQueryDocsScanned =
getBrokerResponse(nonFilterQuery).getNumDocsScanned();
+ assertEquals(filterQueryDocsScanned, nonFilterQueryDocsScanned);
+ }
+
+ @Test
+ public void testSameNumScannedFilteredAgg() {
+ // For a single filtered aggregation, the same number of docs should be
scanned regardless of which portions of
+ // the filter are in the filter expression Vs. the main predicate i.e. the
applied filters are commutative.
+ String filterQuery =
+ "SELECT SUM(INT_COL) FILTER(WHERE INT_COL > 25000) testSum FROM
MyTable WHERE INT_COL < 1000000";
+ String nonFilterQuery =
+ "SELECT SUM(INT_COL) testSum FROM MyTable WHERE INT_COL > 25000 AND
INT_COL < 1000000";
+ long filterQueryDocsScanned =
getBrokerResponse(filterQuery).getNumDocsScanned();
+ long nonFilterQueryDocsScanned =
getBrokerResponse(nonFilterQuery).getNumDocsScanned();
+ assertEquals(filterQueryDocsScanned, nonFilterQueryDocsScanned);
+ }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationSingleValueQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationSingleValueQueriesTest.java
index 8d0e187521..a59008be4c 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationSingleValueQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationSingleValueQueriesTest.java
@@ -66,8 +66,8 @@ public class InnerSegmentAggregationSingleValueQueriesTest
extends BaseSingleVal
+ "FROM testTable WHERE column3 > 0";
FilteredAggregationOperator aggregationOperator = getOperator(query);
AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock();
-
QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(),
180000L, 0L,
- 540000L, 30000L);
+
QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(),
150000L, 0L,
+ 450000L, 30000L);
QueriesTestUtils.testInnerSegmentAggregationResult(resultsBlock.getResults(),
22266008882250L, 30000, 2147419555,
32289159189150L, 28175373944314L, 30000L);
@@ -76,8 +76,8 @@ public class InnerSegmentAggregationSingleValueQueriesTest
extends BaseSingleVal
+ "FROM testTable";
aggregationOperator = getOperator(query);
resultsBlock = aggregationOperator.nextBlock();
-
QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(),
180000L, 0L,
- 540000L, 30000L);
+
QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(),
150000L, 0L,
+ 450000L, 30000L);
QueriesTestUtils.testInnerSegmentAggregationResult(resultsBlock.getResults(),
22266008882250L, 30000, 2147419555,
32289159189150L, 28175373944314L, 30000L);
@@ -86,8 +86,8 @@ public class InnerSegmentAggregationSingleValueQueriesTest
extends BaseSingleVal
+ "SUM(column3), AVG(column7) FILTER(WHERE column7 > 0 AND column7 <
100) FROM testTable";
aggregationOperator = getOperator(query);
resultsBlock = aggregationOperator.nextBlock();
-
QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(),
150000L, 0L,
- 450000L, 30000L);
+
QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(),
120000L, 0L,
+ 360000L, 30000L);
QueriesTestUtils.testInnerSegmentAggregationResult(resultsBlock.getResults(),
22266008882250L, 30000, 2147419555,
32289159189150L, 0L, 0L);
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
index 4d32358814..c4abd67d4e 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
@@ -595,7 +595,7 @@ public class InterSegmentAggregationMultiValueQueriesTest
extends BaseMultiValue
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.LONG});
ResultTable expectedResultTable =
new ResultTable(expectedDataSchema, Collections.singletonList(new
Object[]{370236L}));
- QueriesTestUtils.testInterSegmentsResult(brokerResponse, 740472L, 400000L,
0L, 400000L, expectedResultTable);
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 370236L, 0L, 0L,
400000L, expectedResultTable);
}
@Test
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
index 06d89e6573..df2dbb4e4e 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
@@ -534,7 +534,7 @@ public class
InterSegmentAggregationMultiValueRawQueriesTest extends BaseMultiVa
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.LONG});
ResultTable expectedResultTable =
new ResultTable(expectedDataSchema, Collections.singletonList(new
Object[]{370236L}));
- QueriesTestUtils.testInterSegmentsResult(brokerResponse, 740472L, 400000L,
0L, 400000L, expectedResultTable);
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 370236L, 0L, 0L,
400000L, expectedResultTable);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]