This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8dc28e0 Allow Metadata and Dictionary Based Plans for No Op Filters
(#7563)
8dc28e0 is described below
commit 8dc28e010be4a9870db27da8765ec0b825c2cef9
Author: Atri Sharma <[email protected]>
AuthorDate: Tue Oct 26 06:27:48 2021 +0530
Allow Metadata and Dictionary Based Plans for No Op Filters (#7563)
When building aggregation plans, we ought to also consider the fact
that some filters might be no-ops on certain segments. For those segments,
we should consider using metadata or dictionary based plans.
---
.../plan/AggregationGroupByOrderByPlanNode.java | 12 ++-
.../core/plan/AggregationGroupByPlanNode.java | 12 ++-
.../pinot/core/plan/AggregationPlanNode.java | 15 ++--
.../apache/pinot/core/plan/DocIdSetPlanNode.java | 11 ++-
.../org/apache/pinot/core/plan/FilterPlanNode.java | 49 +++++++----
.../apache/pinot/core/plan/ProjectionPlanNode.java | 9 +-
.../apache/pinot/core/plan/TransformPlanNode.java | 11 ++-
.../apache/pinot/core/startree/StarTreeUtils.java | 21 +++--
...adataAndDictionaryAggregationPlanMakerTest.java | 53 +++++++-----
.../pinot/core/startree/v2/BaseStarTreeV2Test.java | 11 +--
.../pinot/queries/DistinctCountQueriesTest.java | 77 ++++++++++-------
...SegmentPartitionedDistinctCountQueriesTest.java | 97 ++++++++++++----------
12 files changed, 238 insertions(+), 140 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
index 81d3857..58118f4 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.operator.filter.BaseFilterOperator;
import org.apache.pinot.core.operator.query.AggregationGroupByOrderByOperator;
import org.apache.pinot.core.operator.transform.TransformOperator;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
@@ -58,6 +59,9 @@ public class AggregationGroupByOrderByPlanNode implements
PlanNode {
AggregationFunction[] aggregationFunctions =
_queryContext.getAggregationFunctions();
ExpressionContext[] groupByExpressions =
_queryContext.getGroupByExpressions().toArray(new ExpressionContext[0]);
+ FilterPlanNode filterPlanNode = new FilterPlanNode(_indexSegment,
_queryContext);
+ BaseFilterOperator filterOperator = filterPlanNode.run();
+
// Use star-tree to solve the query if possible
List<StarTreeV2> starTrees = _indexSegment.getStarTrees();
if (starTrees != null && !StarTreeUtils.isStarTreeDisabled(_queryContext))
{
@@ -65,7 +69,8 @@ public class AggregationGroupByOrderByPlanNode implements
PlanNode {
StarTreeUtils.extractAggregationFunctionPairs(aggregationFunctions);
if (aggregationFunctionColumnPairs != null) {
Map<String, List<CompositePredicateEvaluator>> predicateEvaluatorsMap =
- StarTreeUtils.extractPredicateEvaluatorsMap(_indexSegment,
_queryContext.getFilter());
+ StarTreeUtils.extractPredicateEvaluatorsMap(_indexSegment,
_queryContext.getFilter(),
+ filterPlanNode.getPredicateEvaluatorMap());
if (predicateEvaluatorsMap != null) {
for (StarTreeV2 starTreeV2 : starTrees) {
if (StarTreeUtils.isFitForStarTree(starTreeV2.getMetadata(),
aggregationFunctionColumnPairs,
@@ -83,8 +88,9 @@ public class AggregationGroupByOrderByPlanNode implements
PlanNode {
Set<ExpressionContext> expressionsToTransform =
AggregationFunctionUtils.collectExpressionsToTransform(aggregationFunctions,
groupByExpressions);
- TransformOperator transformPlanNode = new TransformPlanNode(_indexSegment,
_queryContext, expressionsToTransform,
- DocIdSetPlanNode.MAX_DOC_PER_CALL).run();
+ TransformOperator transformPlanNode =
+ new TransformPlanNode(_indexSegment, _queryContext,
expressionsToTransform, DocIdSetPlanNode.MAX_DOC_PER_CALL,
+ filterOperator).run();
return new AggregationGroupByOrderByOperator(aggregationFunctions,
groupByExpressions, transformPlanNode,
numTotalDocs, _queryContext, false);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByPlanNode.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByPlanNode.java
index 33d83de..f9d0e09 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByPlanNode.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByPlanNode.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.operator.filter.BaseFilterOperator;
import org.apache.pinot.core.operator.query.AggregationGroupByOperator;
import org.apache.pinot.core.operator.transform.TransformOperator;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
@@ -58,6 +59,9 @@ public class AggregationGroupByPlanNode implements PlanNode {
AggregationFunction[] aggregationFunctions =
_queryContext.getAggregationFunctions();
ExpressionContext[] groupByExpressions =
_queryContext.getGroupByExpressions().toArray(new ExpressionContext[0]);
+ FilterPlanNode filterPlanNode = new FilterPlanNode(_indexSegment,
_queryContext);
+ BaseFilterOperator filterOperator = filterPlanNode.run();
+
// Use star-tree to solve the query if possible
List<StarTreeV2> starTrees = _indexSegment.getStarTrees();
if (starTrees != null && !StarTreeUtils.isStarTreeDisabled(_queryContext))
{
@@ -65,7 +69,8 @@ public class AggregationGroupByPlanNode implements PlanNode {
StarTreeUtils.extractAggregationFunctionPairs(aggregationFunctions);
if (aggregationFunctionColumnPairs != null) {
Map<String, List<CompositePredicateEvaluator>> predicateEvaluatorsMap =
- StarTreeUtils.extractPredicateEvaluatorsMap(_indexSegment,
_queryContext.getFilter());
+ StarTreeUtils.extractPredicateEvaluatorsMap(_indexSegment,
_queryContext.getFilter(),
+ filterPlanNode.getPredicateEvaluatorMap());
if (predicateEvaluatorsMap != null) {
for (StarTreeV2 starTreeV2 : starTrees) {
if (StarTreeUtils.isFitForStarTree(starTreeV2.getMetadata(),
aggregationFunctionColumnPairs,
@@ -83,8 +88,9 @@ public class AggregationGroupByPlanNode implements PlanNode {
Set<ExpressionContext> expressionsToTransform =
AggregationFunctionUtils.collectExpressionsToTransform(aggregationFunctions,
groupByExpressions);
- TransformOperator transformPlanNode = new TransformPlanNode(_indexSegment,
_queryContext, expressionsToTransform,
- DocIdSetPlanNode.MAX_DOC_PER_CALL).run();
+ TransformOperator transformPlanNode =
+ new TransformPlanNode(_indexSegment, _queryContext,
expressionsToTransform, DocIdSetPlanNode.MAX_DOC_PER_CALL,
+ filterOperator).run();
return new AggregationGroupByOperator(_queryContext, groupByExpressions,
transformPlanNode, numTotalDocs, false);
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
index c1cec0b..d7318d9 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
@@ -26,6 +26,7 @@ import java.util.Set;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.filter.BaseFilterOperator;
import org.apache.pinot.core.operator.query.AggregationOperator;
import org.apache.pinot.core.operator.query.DictionaryBasedAggregationOperator;
import org.apache.pinot.core.operator.query.MetadataBasedAggregationOperator;
@@ -64,10 +65,12 @@ public class AggregationPlanNode implements PlanNode {
int numTotalDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
AggregationFunction[] aggregationFunctions =
_queryContext.getAggregationFunctions();
+ FilterPlanNode filterPlanNode = new FilterPlanNode(_indexSegment,
_queryContext);
+ BaseFilterOperator filterOperator = filterPlanNode.run();
+
// Use metadata/dictionary to solve the query if possible
- // NOTE: Skip the segment with valid doc index because the valid doc index
is equivalent to a filter
// TODO: Use the same operator for both of them so that COUNT(*), MAX(col)
can be optimized
- if (_queryContext.getFilter() == null && _indexSegment.getValidDocIds() ==
null) {
+ if (filterOperator.isResultMatchingAll()) {
if (isFitForMetadataBasedPlan(aggregationFunctions)) {
return new MetadataBasedAggregationOperator(aggregationFunctions,
_indexSegment.getSegmentMetadata(),
Collections.emptyMap());
@@ -88,7 +91,8 @@ public class AggregationPlanNode implements PlanNode {
StarTreeUtils.extractAggregationFunctionPairs(aggregationFunctions);
if (aggregationFunctionColumnPairs != null) {
Map<String, List<CompositePredicateEvaluator>> predicateEvaluatorsMap =
- StarTreeUtils.extractPredicateEvaluatorsMap(_indexSegment,
_queryContext.getFilter());
+ StarTreeUtils.extractPredicateEvaluatorsMap(_indexSegment,
_queryContext.getFilter(),
+ filterPlanNode.getPredicateEvaluatorMap());
if (predicateEvaluatorsMap != null) {
for (StarTreeV2 starTreeV2 : starTrees) {
if (StarTreeUtils.isFitForStarTree(starTreeV2.getMetadata(),
aggregationFunctionColumnPairs, null,
@@ -105,8 +109,9 @@ public class AggregationPlanNode implements PlanNode {
Set<ExpressionContext> expressionsToTransform =
AggregationFunctionUtils.collectExpressionsToTransform(aggregationFunctions,
null);
- TransformOperator transformOperator = new TransformPlanNode(_indexSegment,
_queryContext, expressionsToTransform,
- DocIdSetPlanNode.MAX_DOC_PER_CALL).run();
+ TransformOperator transformOperator =
+ new TransformPlanNode(_indexSegment, _queryContext,
expressionsToTransform, DocIdSetPlanNode.MAX_DOC_PER_CALL,
+ filterOperator).run();
return new AggregationOperator(aggregationFunctions, transformOperator,
numTotalDocs, false);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/DocIdSetPlanNode.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/DocIdSetPlanNode.java
index 6745c5f..fd58e8f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/DocIdSetPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/DocIdSetPlanNode.java
@@ -18,7 +18,9 @@
*/
package org.apache.pinot.core.plan;
+import javax.annotation.Nullable;
import org.apache.pinot.core.operator.DocIdSetOperator;
+import org.apache.pinot.core.operator.filter.BaseFilterOperator;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.segment.spi.IndexSegment;
@@ -29,17 +31,22 @@ public class DocIdSetPlanNode implements PlanNode {
private final IndexSegment _indexSegment;
private final QueryContext _queryContext;
private final int _maxDocPerCall;
+ private final BaseFilterOperator _filterOperator;
- public DocIdSetPlanNode(IndexSegment indexSegment, QueryContext
queryContext, int maxDocPerCall) {
+ public DocIdSetPlanNode(IndexSegment indexSegment, QueryContext
queryContext, int maxDocPerCall,
+ @Nullable BaseFilterOperator filterOperator) {
assert maxDocPerCall > 0 && maxDocPerCall <= MAX_DOC_PER_CALL;
_indexSegment = indexSegment;
_queryContext = queryContext;
_maxDocPerCall = maxDocPerCall;
+ _filterOperator = filterOperator;
}
@Override
public DocIdSetOperator run() {
- return new DocIdSetOperator(new FilterPlanNode(_indexSegment,
_queryContext).run(), _maxDocPerCall);
+ return new DocIdSetOperator(
+ _filterOperator != null ? _filterOperator : new
FilterPlanNode(_indexSegment, _queryContext).run(),
+ _maxDocPerCall);
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
index da818f1..276cc57 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
@@ -21,9 +21,9 @@ package org.apache.pinot.core.plan;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import javax.annotation.Nullable;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FilterContext;
import org.apache.pinot.common.request.context.FunctionContext;
@@ -60,6 +60,9 @@ public class FilterPlanNode implements PlanNode {
private final QueryContext _queryContext;
private final int _numDocs;
+ // Cache the predicate evaluators
+ private final Map<Predicate, PredicateEvaluator> _predicateEvaluatorMap =
new HashMap<>();
+
public FilterPlanNode(IndexSegment indexSegment, QueryContext queryContext) {
_indexSegment = indexSegment;
_queryContext = queryContext;
@@ -74,7 +77,7 @@ public class FilterPlanNode implements PlanNode {
ThreadSafeMutableRoaringBitmap validDocIds =
_indexSegment.getValidDocIds();
boolean applyValidDocIds = validDocIds != null &&
!QueryOptionsUtils.isSkipUpsert(_queryContext.getQueryOptions());
if (filter != null) {
- BaseFilterOperator filterOperator = constructPhysicalOperator(filter,
_queryContext.getDebugOptions());
+ BaseFilterOperator filterOperator = constructPhysicalOperator(filter);
if (applyValidDocIds) {
BaseFilterOperator validDocFilter =
new
BitmapBasedFilterOperator(validDocIds.getMutableRoaringBitmap(), false,
_numDocs);
@@ -91,6 +94,13 @@ public class FilterPlanNode implements PlanNode {
}
/**
+ * Returns a map from predicates to their evaluators.
+ */
+ public Map<Predicate, PredicateEvaluator> getPredicateEvaluatorMap() {
+ return _predicateEvaluatorMap;
+ }
+
+ /**
* H3 index can be applied iff:
* <ul>
* <li>Predicate is of type RANGE</li>
@@ -126,14 +136,13 @@ public class FilterPlanNode implements PlanNode {
/**
* Helper method to build the operator tree from the filter.
*/
- private BaseFilterOperator constructPhysicalOperator(FilterContext filter,
- @Nullable Map<String, String> debugOptions) {
+ private BaseFilterOperator constructPhysicalOperator(FilterContext filter) {
switch (filter.getType()) {
case AND:
List<FilterContext> childFilters = filter.getChildren();
List<BaseFilterOperator> childFilterOperators = new
ArrayList<>(childFilters.size());
for (FilterContext childFilter : childFilters) {
- BaseFilterOperator childFilterOperator =
constructPhysicalOperator(childFilter, debugOptions);
+ BaseFilterOperator childFilterOperator =
constructPhysicalOperator(childFilter);
if (childFilterOperator.isResultEmpty()) {
// Return empty filter operator if any of the child filter
operator's result is empty
return EmptyFilterOperator.getInstance();
@@ -142,12 +151,13 @@ public class FilterPlanNode implements PlanNode {
childFilterOperators.add(childFilterOperator);
}
}
- return FilterOperatorUtils.getAndFilterOperator(childFilterOperators,
_numDocs, debugOptions);
+ return FilterOperatorUtils.getAndFilterOperator(childFilterOperators,
_numDocs,
+ _queryContext.getDebugOptions());
case OR:
childFilters = filter.getChildren();
childFilterOperators = new ArrayList<>(childFilters.size());
for (FilterContext childFilter : childFilters) {
- BaseFilterOperator childFilterOperator =
constructPhysicalOperator(childFilter, debugOptions);
+ BaseFilterOperator childFilterOperator =
constructPhysicalOperator(childFilter);
if (childFilterOperator.isResultMatchingAll()) {
// Return match all filter operator if any of the child filter
operator matches all records
return new MatchAllFilterOperator(_numDocs);
@@ -156,7 +166,7 @@ public class FilterPlanNode implements PlanNode {
childFilterOperators.add(childFilterOperator);
}
}
- return FilterOperatorUtils.getOrFilterOperator(childFilterOperators,
_numDocs, debugOptions);
+ return FilterOperatorUtils.getOrFilterOperator(childFilterOperators,
_numDocs, _queryContext.getDebugOptions());
case PREDICATE:
Predicate predicate = filter.getPredicate();
ExpressionContext lhs = predicate.getLhs();
@@ -170,6 +180,10 @@ public class FilterPlanNode implements PlanNode {
} else {
String column = lhs.getIdentifier();
DataSource dataSource = _indexSegment.getDataSource(column);
+ PredicateEvaluator predicateEvaluator =
_predicateEvaluatorMap.get(predicate);
+ if (predicateEvaluator != null) {
+ return
FilterOperatorUtils.getLeafFilterOperator(predicateEvaluator, dataSource,
_numDocs);
+ }
switch (predicate.getType()) {
case TEXT_MATCH:
return new TextMatchFilterOperator(dataSource.getTextIndex(),
((TextMatchPredicate) predicate).getValue(),
@@ -183,19 +197,21 @@ public class FilterPlanNode implements PlanNode {
//
// Consuming segments: When FST is enabled, use
AutomatonBasedEvaluator so that regexp matching logic is
// similar to that of FSTBasedEvaluator, else use regular flow
of getting predicate evaluator.
- PredicateEvaluator evaluator;
if (dataSource.getFSTIndex() != null) {
- evaluator =
FSTBasedRegexpPredicateEvaluatorFactory.newFSTBasedEvaluator(dataSource.getFSTIndex(),
- dataSource.getDictionary(), ((RegexpLikePredicate)
predicate).getValue());
+ predicateEvaluator =
+
FSTBasedRegexpPredicateEvaluatorFactory.newFSTBasedEvaluator(dataSource.getFSTIndex(),
+ dataSource.getDictionary(), ((RegexpLikePredicate)
predicate).getValue());
} else if (dataSource instanceof MutableDataSource &&
((MutableDataSource) dataSource).isFSTEnabled()) {
- evaluator =
+ predicateEvaluator =
FSTBasedRegexpPredicateEvaluatorFactory.newAutomatonBasedEvaluator(dataSource.getDictionary(),
((RegexpLikePredicate) predicate).getValue());
} else {
- evaluator =
PredicateEvaluatorProvider.getPredicateEvaluator(predicate,
dataSource.getDictionary(),
- dataSource.getDataSourceMetadata().getDataType());
+ predicateEvaluator =
+
PredicateEvaluatorProvider.getPredicateEvaluator(predicate,
dataSource.getDictionary(),
+ dataSource.getDataSourceMetadata().getDataType());
}
- return FilterOperatorUtils.getLeafFilterOperator(evaluator,
dataSource, _numDocs);
+ _predicateEvaluatorMap.put(predicate, predicateEvaluator);
+ return
FilterOperatorUtils.getLeafFilterOperator(predicateEvaluator, dataSource,
_numDocs);
case JSON_MATCH:
JsonIndexReader jsonIndex = dataSource.getJsonIndex();
Preconditions.checkState(jsonIndex != null, "Cannot apply
JSON_MATCH on column: %s without json index",
@@ -216,9 +232,10 @@ public class FilterPlanNode implements PlanNode {
return new MatchAllFilterOperator(_numDocs);
}
default:
- PredicateEvaluator predicateEvaluator =
+ predicateEvaluator =
PredicateEvaluatorProvider.getPredicateEvaluator(predicate,
dataSource.getDictionary(),
dataSource.getDataSourceMetadata().getDataType());
+ _predicateEvaluatorMap.put(predicate, predicateEvaluator);
return
FilterOperatorUtils.getLeafFilterOperator(predicateEvaluator, dataSource,
_numDocs);
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/ProjectionPlanNode.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/ProjectionPlanNode.java
index 27db4bf..5f286b4 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/plan/ProjectionPlanNode.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/plan/ProjectionPlanNode.java
@@ -21,8 +21,10 @@ package org.apache.pinot.core.plan;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import javax.annotation.Nullable;
import org.apache.pinot.core.operator.DocIdSetOperator;
import org.apache.pinot.core.operator.ProjectionOperator;
+import org.apache.pinot.core.operator.filter.BaseFilterOperator;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.datasource.DataSource;
@@ -37,13 +39,15 @@ public class ProjectionPlanNode implements PlanNode {
private final QueryContext _queryContext;
private final Set<String> _projectionColumns;
private final int _maxDocsPerCall;
+ private final BaseFilterOperator _filterOperator;
public ProjectionPlanNode(IndexSegment indexSegment, QueryContext
queryContext, Set<String> projectionColumns,
- int maxDocsPerCall) {
+ int maxDocsPerCall, @Nullable BaseFilterOperator filterOperator) {
_indexSegment = indexSegment;
_queryContext = queryContext;
_projectionColumns = projectionColumns;
_maxDocsPerCall = maxDocsPerCall;
+ _filterOperator = filterOperator;
}
@Override
@@ -54,7 +58,8 @@ public class ProjectionPlanNode implements PlanNode {
}
// NOTE: Skip creating DocIdSetOperator when maxDocsPerCall is 0 (for
selection query with LIMIT 0)
DocIdSetOperator docIdSetOperator =
- _maxDocsPerCall > 0 ? new DocIdSetPlanNode(_indexSegment,
_queryContext, _maxDocsPerCall).run() : null;
+ _maxDocsPerCall > 0 ? new DocIdSetPlanNode(_indexSegment,
_queryContext, _maxDocsPerCall, _filterOperator).run()
+ : null;
return new ProjectionOperator(dataSourceMap, docIdSetOperator);
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java
index 6a60adb..f6f750c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java
@@ -21,8 +21,10 @@ package org.apache.pinot.core.plan;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
+import javax.annotation.Nullable;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.core.operator.ProjectionOperator;
+import org.apache.pinot.core.operator.filter.BaseFilterOperator;
import org.apache.pinot.core.operator.transform.PassThroughTransformOperator;
import org.apache.pinot.core.operator.transform.TransformOperator;
import org.apache.pinot.core.query.request.context.QueryContext;
@@ -37,13 +39,20 @@ public class TransformPlanNode implements PlanNode {
private final QueryContext _queryContext;
private final Collection<ExpressionContext> _expressions;
private final int _maxDocsPerCall;
+ private final BaseFilterOperator _filterOperator;
public TransformPlanNode(IndexSegment indexSegment, QueryContext
queryContext,
Collection<ExpressionContext> expressions, int maxDocsPerCall) {
+ this(indexSegment, queryContext, expressions, maxDocsPerCall, null);
+ }
+
+ public TransformPlanNode(IndexSegment indexSegment, QueryContext
queryContext,
+ Collection<ExpressionContext> expressions, int maxDocsPerCall, @Nullable
BaseFilterOperator filterOperator) {
_indexSegment = indexSegment;
_queryContext = queryContext;
_expressions = expressions;
_maxDocsPerCall = maxDocsPerCall;
+ _filterOperator = filterOperator;
}
@Override
@@ -57,7 +66,7 @@ public class TransformPlanNode implements PlanNode {
}
}
ProjectionOperator projectionOperator =
- new ProjectionPlanNode(_indexSegment, _queryContext,
projectionColumns, _maxDocsPerCall).run();
+ new ProjectionPlanNode(_indexSegment, _queryContext,
projectionColumns, _maxDocsPerCall, _filterOperator).run();
if (hasNonIdentifierExpression) {
return new TransformOperator(projectionOperator, _expressions);
} else {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java
index 0dee1e4..049da07 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java
@@ -33,7 +33,6 @@ import
org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FilterContext;
import org.apache.pinot.common.request.context.predicate.Predicate;
import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
-import
org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import
org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
import org.apache.pinot.core.query.request.context.QueryContext;
@@ -90,10 +89,12 @@ public class StarTreeUtils {
* (d1 > 50 AND (d2 > 10 OR d2 < 35)).
* This method represents a list of CompositePredicates per dimension. For
each dimension, all CompositePredicates in
* the list are implicitly ANDed together. Any OR predicates are nested
within a CompositePredicate.
+ *
+ * A map from predicates to their evaluators is passed in to accelerate the
computation.
*/
@Nullable
public static Map<String, List<CompositePredicateEvaluator>>
extractPredicateEvaluatorsMap(IndexSegment indexSegment,
- @Nullable FilterContext filter) {
+ @Nullable FilterContext filter, Map<Predicate, PredicateEvaluator>
predicateEvaluatorMap) {
if (filter == null) {
return Collections.emptyMap();
}
@@ -108,7 +109,8 @@ public class StarTreeUtils {
queue.addAll(filterNode.getChildren());
break;
case OR:
- Pair<String, List<PredicateEvaluator>> pair =
isOrClauseValidForStarTree(indexSegment, filterNode);
+ Pair<String, List<PredicateEvaluator>> pair =
+ isOrClauseValidForStarTree(indexSegment, filterNode,
predicateEvaluatorMap);
if (pair == null) {
return null;
}
@@ -121,8 +123,9 @@ public class StarTreeUtils {
break;
case PREDICATE:
Predicate predicate = filterNode.getPredicate();
- PredicateEvaluator predicateEvaluator =
getPredicateEvaluatorForPredicate(indexSegment, predicate);
+ PredicateEvaluator predicateEvaluator =
getPredicateEvaluator(indexSegment, predicate, predicateEvaluatorMap);
if (predicateEvaluator == null) {
+ // The predicate cannot be solved with star-tree
return null;
}
if (!predicateEvaluator.isAlwaysTrue()) {
@@ -181,7 +184,7 @@ public class StarTreeUtils {
*/
@Nullable
private static Pair<String, List<PredicateEvaluator>>
isOrClauseValidForStarTree(IndexSegment indexSegment,
- FilterContext filter) {
+ FilterContext filter, Map<Predicate, PredicateEvaluator>
predicateEvaluatorMap) {
assert filter.getType() == FilterContext.Type.OR;
List<Predicate> predicates = new ArrayList<>();
@@ -190,7 +193,7 @@ public class StarTreeUtils {
String identifier = null;
List<PredicateEvaluator> predicateEvaluators = new ArrayList<>();
for (Predicate predicate : predicates) {
- PredicateEvaluator predicateEvaluator =
getPredicateEvaluatorForPredicate(indexSegment, predicate);
+ PredicateEvaluator predicateEvaluator =
getPredicateEvaluator(indexSegment, predicate, predicateEvaluatorMap);
if (predicateEvaluator == null) {
// The predicate cannot be solved with star-tree
return null;
@@ -246,7 +249,8 @@ public class StarTreeUtils {
* star-tree.
*/
@Nullable
- private static PredicateEvaluator
getPredicateEvaluatorForPredicate(IndexSegment indexSegment, Predicate
predicate) {
+ private static PredicateEvaluator getPredicateEvaluator(IndexSegment
indexSegment, Predicate predicate,
+ Map<Predicate, PredicateEvaluator> predicateEvaluatorMap) {
ExpressionContext lhs = predicate.getLhs();
if (lhs.getType() != ExpressionContext.Type.IDENTIFIER) {
// Star-tree does not support non-identifier expression
@@ -271,7 +275,6 @@ public class StarTreeUtils {
default:
break;
}
- return PredicateEvaluatorProvider
- .getPredicateEvaluator(predicate, dictionary,
dataSource.getDataSourceMetadata().getDataType());
+ return predicateEvaluatorMap.get(predicate);
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
index d9ad71a..2b5b401 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
@@ -151,54 +151,63 @@ public class
MetadataAndDictionaryAggregationPlanMakerTest {
@DataProvider(name = "testPlanMakerDataProvider")
public Object[][] testPlanMakerDataProvider() {
List<Object[]> entries = new ArrayList<>();
+ // Selection
entries.add(new Object[]{
- "select * from testTable", /*selection query*/
- SelectionOnlyOperator.class, SelectionOnlyOperator.class
+ "select * from testTable", SelectionOnlyOperator.class,
SelectionOnlyOperator.class
});
+ // Selection
entries.add(new Object[]{
- "select column1,column5 from testTable", /*selection query*/
- SelectionOnlyOperator.class, SelectionOnlyOperator.class
+ "select column1,column5 from testTable", SelectionOnlyOperator.class,
SelectionOnlyOperator.class
});
+ // Selection with filter
entries.add(new Object[]{
- "select * from testTable where daysSinceEpoch > 100", /*selection
query with filters*/
- SelectionOnlyOperator.class, SelectionOnlyOperator.class
+ "select * from testTable where daysSinceEpoch > 100",
SelectionOnlyOperator.class, SelectionOnlyOperator.class
});
+ // COUNT from metadata
entries.add(new Object[]{
- "select count(*) from testTable", /*count(*) from metadata*/
- MetadataBasedAggregationOperator.class, AggregationOperator.class
+ "select count(*) from testTable",
MetadataBasedAggregationOperator.class, AggregationOperator.class
});
+ // COUNT from metadata with match all filter
entries.add(new Object[]{
- "select max(daysSinceEpoch),min(daysSinceEpoch) from testTable", /*min
max from dictionary*/
- DictionaryBasedAggregationOperator.class, AggregationOperator.class
+ "select count(*) from testTable where column1 > 10",
MetadataBasedAggregationOperator.class,
+ AggregationOperator.class
});
+ // MIN/MAX from dictionary
entries.add(new Object[]{
- "select minmaxrange(daysSinceEpoch) from testTable", /*min max from
dictionary*/
- DictionaryBasedAggregationOperator.class, AggregationOperator.class
+ "select max(daysSinceEpoch),min(daysSinceEpoch) from testTable",
DictionaryBasedAggregationOperator.class,
+ AggregationOperator.class
});
+ // MIN/MAX from dictionary with match all filter
entries.add(new Object[]{
- "select max(column17),min(column17) from testTable", /* minmax from
dictionary*/
+ "select max(daysSinceEpoch),min(daysSinceEpoch) from testTable where
column1 > 10",
DictionaryBasedAggregationOperator.class, AggregationOperator.class
});
+ // MINMAXRANGE from dictionary
entries.add(new Object[]{
- "select minmaxrange(column17) from testTable", /*no minmax metadata,
go to dictionary*/
+ "select minmaxrange(daysSinceEpoch) from testTable",
DictionaryBasedAggregationOperator.class,
+ AggregationOperator.class
+ });
+ // MINMAXRANGE from dictionary with match all filter
+ entries.add(new Object[]{
+ "select minmaxrange(daysSinceEpoch) from testTable where column1 > 10",
DictionaryBasedAggregationOperator.class, AggregationOperator.class
});
+ // Aggregation
entries.add(new Object[]{
- "select sum(column1) from testTable", /*aggregation query*/
- AggregationOperator.class, AggregationOperator.class
+ "select sum(column1) from testTable", AggregationOperator.class,
AggregationOperator.class
});
+ // Aggregation group-by
entries.add(new Object[]{
- "select sum(column1) from testTable group by daysSinceEpoch",
/*aggregation with group by query*/
- AggregationGroupByOperator.class, AggregationGroupByOperator.class
+ "select sum(column1) from testTable group by daysSinceEpoch",
AggregationGroupByOperator.class,
+ AggregationGroupByOperator.class
});
+ // COUNT from metadata, MIN from dictionary
entries.add(new Object[]{
- "select count(*),min(column17) from testTable",
- /*multiple aggregations query, one from metadata, one from dictionary*/
- AggregationOperator.class, AggregationOperator.class
+ "select count(*),min(column17) from testTable",
AggregationOperator.class, AggregationOperator.class
});
+ // Aggregation group-by
entries.add(new Object[]{
"select count(*),min(daysSinceEpoch) from testTable group by
daysSinceEpoch",
- /*multiple aggregations with group by*/
AggregationGroupByOperator.class, AggregationGroupByOperator.class
});
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
index 648934e..8cb2e7a 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
@@ -31,7 +31,6 @@ import java.util.Random;
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.request.context.ExpressionContext;
-import org.apache.pinot.common.request.context.FilterContext;
import org.apache.pinot.core.common.BlockDocIdIterator;
import org.apache.pinot.core.plan.FilterPlanNode;
import org.apache.pinot.core.plan.PlanNode;
@@ -224,9 +223,11 @@ abstract class BaseStarTreeV2Test<R, A> {
List<String> groupByColumns = new ArrayList<>(groupByColumnSet);
// Filter
- FilterContext filter = queryContext.getFilter();
+ FilterPlanNode filterPlanNode = new FilterPlanNode(_indexSegment,
queryContext);
+ filterPlanNode.run();
Map<String, List<CompositePredicateEvaluator>> predicateEvaluatorsMap =
- StarTreeUtils.extractPredicateEvaluatorsMap(_indexSegment, filter);
+ StarTreeUtils.extractPredicateEvaluatorsMap(_indexSegment,
queryContext.getFilter(),
+ filterPlanNode.getPredicateEvaluatorMap());
assertNotNull(predicateEvaluatorsMap);
// Extract values with star-tree
@@ -234,8 +235,8 @@ abstract class BaseStarTreeV2Test<R, A> {
new StarTreeFilterPlanNode(_starTreeV2, predicateEvaluatorsMap,
groupByColumnSet, null);
List<ForwardIndexReader> starTreeAggregationColumnReaders = new
ArrayList<>(numAggregations);
for (AggregationFunctionColumnPair aggregationFunctionColumnPair :
aggregationFunctionColumnPairs) {
- starTreeAggregationColumnReaders
-
.add(_starTreeV2.getDataSource(aggregationFunctionColumnPair.toColumnName()).getForwardIndex());
+ starTreeAggregationColumnReaders.add(
+
_starTreeV2.getDataSource(aggregationFunctionColumnPair.toColumnName()).getForwardIndex());
}
List<ForwardIndexReader> starTreeGroupByColumnReaders = new
ArrayList<>(numGroupByColumns);
for (String groupByColumn : groupByColumns) {
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java
index afa2181..3c045bc 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java
@@ -96,7 +96,7 @@ public class DistinctCountQueriesTest extends BaseQueriesTest
{
@Override
protected String getFilter() {
- // NOTE: Use a match all filter to switch between
DictionaryBasedAggregationOperator and AggregationOperator
+ // NOTE: This is a match all filter
return " WHERE intColumn >= 0";
}
@@ -185,31 +185,23 @@ public class DistinctCountQueriesTest extends
BaseQueriesTest {
@Test
public void testAggregationOnly() {
+ // Dictionary based
String query =
"SELECT DISTINCTCOUNT(intColumn), DISTINCTCOUNT(longColumn),
DISTINCTCOUNT(floatColumn), DISTINCTCOUNT"
+ "(doubleColumn), DISTINCTCOUNT(stringColumn),
DISTINCTCOUNT(bytesColumn) FROM testTable";
// Inner segment
- Operator operator = getOperatorForPqlQuery(query);
- assertTrue(operator instanceof DictionaryBasedAggregationOperator);
- IntermediateResultsBlock resultsBlock =
((DictionaryBasedAggregationOperator) operator).nextBlock();
- QueriesTestUtils
-
.testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(),
NUM_RECORDS, 0, 0, NUM_RECORDS);
- List<Object> aggregationResult = resultsBlock.getAggregationResult();
-
- operator = getOperatorForPqlQueryWithFilter(query);
- assertTrue(operator instanceof AggregationOperator);
- IntermediateResultsBlock resultsBlockWithFilter = ((AggregationOperator)
operator).nextBlock();
- QueriesTestUtils
-
.testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(),
NUM_RECORDS, 0, 6 * NUM_RECORDS,
- NUM_RECORDS);
- List<Object> aggregationResultWithFilter =
resultsBlockWithFilter.getAggregationResult();
-
- assertNotNull(aggregationResult);
- assertNotNull(aggregationResultWithFilter);
- assertEquals(aggregationResult, aggregationResultWithFilter);
- for (int i = 0; i < 6; i++) {
- assertEquals(((Set) aggregationResult.get(i)).size(), _values.size());
+ for (Object operator : Arrays.asList(getOperatorForSqlQuery(query),
getOperatorForSqlQueryWithFilter(query))) {
+ assertTrue(operator instanceof DictionaryBasedAggregationOperator);
+ IntermediateResultsBlock resultsBlock =
((DictionaryBasedAggregationOperator) operator).nextBlock();
+ QueriesTestUtils.testInnerSegmentExecutionStatistics(((Operator)
operator).getExecutionStatistics(), NUM_RECORDS,
+ 0, 0, NUM_RECORDS);
+ List<Object> aggregationResult = resultsBlock.getAggregationResult();
+ assertNotNull(aggregationResult);
+ assertEquals(aggregationResult.size(), 6);
+ for (int i = 0; i < 6; i++) {
+ assertEquals(((Set) aggregationResult.get(i)).size(), _values.size());
+ }
}
// Inter segments
@@ -217,13 +209,39 @@ public class DistinctCountQueriesTest extends
BaseQueriesTest {
for (int i = 0; i < 6; i++) {
expectedResults[i] = Integer.toString(_values.size());
}
+ for (BrokerResponseNative brokerResponse :
Arrays.asList(getBrokerResponseForPqlQuery(query),
+ getBrokerResponseForPqlQueryWithFilter(query))) {
+ QueriesTestUtils.testInterSegmentAggregationResult(brokerResponse, 4 *
NUM_RECORDS, 0, 0, 4 * NUM_RECORDS,
+ expectedResults);
+ }
+
+ // Regular aggregation
+ query = query + " WHERE intColumn >= 500";
+
+ // Inner segment
+ int expectedResult = 0;
+ for (Integer value : _values) {
+ if (value >= 500) {
+ expectedResult++;
+ }
+ }
+ Operator operator = getOperatorForSqlQuery(query);
+ assertTrue(operator instanceof AggregationOperator);
+ List<Object> aggregationResult = ((AggregationOperator)
operator).nextBlock().getAggregationResult();
+ assertNotNull(aggregationResult);
+ assertEquals(aggregationResult.size(), 6);
+ for (int i = 0; i < 6; i++) {
+ assertEquals(((Set) aggregationResult.get(i)).size(), expectedResult);
+ }
+
+ // Inter segment
BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
- QueriesTestUtils
- .testInterSegmentAggregationResult(brokerResponse, 4 * NUM_RECORDS, 0,
0, 4 * NUM_RECORDS, expectedResults);
- brokerResponse = getBrokerResponseForPqlQueryWithFilter(query);
- QueriesTestUtils
- .testInterSegmentAggregationResult(brokerResponse, 4 * NUM_RECORDS, 0,
4 * 6 * NUM_RECORDS, 4 * NUM_RECORDS,
- expectedResults);
+ List<AggregationResult> aggregationResults =
brokerResponse.getAggregationResults();
+ assertNotNull(aggregationResults);
+ assertEquals(aggregationResults.size(), 6);
+ for (int i = 0; i < 6; i++) {
+ assertEquals(aggregationResults.get(i).getValue(),
Integer.toString(expectedResult));
+ }
}
@Test
@@ -237,9 +255,8 @@ public class DistinctCountQueriesTest extends
BaseQueriesTest {
Operator operator = getOperatorForPqlQuery(query);
assertTrue(operator instanceof AggregationGroupByOperator);
IntermediateResultsBlock resultsBlock = ((AggregationGroupByOperator)
operator).nextBlock();
- QueriesTestUtils
-
.testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(),
NUM_RECORDS, 0, 6 * NUM_RECORDS,
- NUM_RECORDS);
+
QueriesTestUtils.testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(),
NUM_RECORDS, 0,
+ 6 * NUM_RECORDS, NUM_RECORDS);
AggregationGroupByResult aggregationGroupByResult =
resultsBlock.getAggregationGroupByResult();
assertNotNull(aggregationGroupByResult);
int numGroups = 0;
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentPartitionedDistinctCountQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentPartitionedDistinctCountQueriesTest.java
index 3bc4148..0520086 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentPartitionedDistinctCountQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentPartitionedDistinctCountQueriesTest.java
@@ -92,13 +92,12 @@ public class SegmentPartitionedDistinctCountQueriesTest
extends BaseQueriesTest
new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
private Set<Integer> _values;
- private long _expectedResult;
private IndexSegment _indexSegment;
private List<IndexSegment> _indexSegments;
@Override
protected String getFilter() {
- // NOTE: Use a match all filter to switch between
DictionaryBasedAggregationOperator and AggregationOperator
+ // NOTE: This is a match all filter
return " WHERE intColumn >= 0";
}
@@ -135,7 +134,6 @@ public class SegmentPartitionedDistinctCountQueriesTest
extends BaseQueriesTest
record.putValue(BYTES_COLUMN, bytesValue);
records.add(record);
}
- _expectedResult = _values.size();
SegmentGeneratorConfig segmentGeneratorConfig = new
SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
@@ -153,64 +151,79 @@ public class SegmentPartitionedDistinctCountQueriesTest
extends BaseQueriesTest
@Test
public void testAggregationOnly() {
- String query =
- "SELECT SEGMENTPARTITIONEDDISTINCTCOUNT(intColumn),
SEGMENTPARTITIONEDDISTINCTCOUNT(longColumn), "
- + "SEGMENTPARTITIONEDDISTINCTCOUNT(floatColumn),
SEGMENTPARTITIONEDDISTINCTCOUNT(doubleColumn), "
- + "SEGMENTPARTITIONEDDISTINCTCOUNT(stringColumn),
SEGMENTPARTITIONEDDISTINCTCOUNT(bytesColumn) FROM "
- + "testTable";
+ // Dictionary based
+ String query = "SELECT SEGMENTPARTITIONEDDISTINCTCOUNT(intColumn),
SEGMENTPARTITIONEDDISTINCTCOUNT(longColumn), "
+ + "SEGMENTPARTITIONEDDISTINCTCOUNT(floatColumn),
SEGMENTPARTITIONEDDISTINCTCOUNT(doubleColumn), "
+ + "SEGMENTPARTITIONEDDISTINCTCOUNT(stringColumn),
SEGMENTPARTITIONEDDISTINCTCOUNT(bytesColumn) FROM "
+ + "testTable";
// Inner segment
- Operator operator = getOperatorForPqlQuery(query);
- assertTrue(operator instanceof DictionaryBasedAggregationOperator);
- IntermediateResultsBlock resultsBlock =
((DictionaryBasedAggregationOperator) operator).nextBlock();
- QueriesTestUtils
-
.testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(),
NUM_RECORDS, 0, 0, NUM_RECORDS);
- List<Object> aggregationResult = resultsBlock.getAggregationResult();
+ for (Object operator : Arrays.asList(getOperatorForSqlQuery(query),
getOperatorForSqlQueryWithFilter(query))) {
+ assertTrue(operator instanceof DictionaryBasedAggregationOperator);
+ IntermediateResultsBlock resultsBlock =
((DictionaryBasedAggregationOperator) operator).nextBlock();
+ QueriesTestUtils.testInnerSegmentExecutionStatistics(((Operator)
operator).getExecutionStatistics(), NUM_RECORDS,
+ 0, 0, NUM_RECORDS);
+ List<Object> aggregationResult = resultsBlock.getAggregationResult();
+ assertNotNull(aggregationResult);
+ assertEquals(aggregationResult.size(), 6);
+ for (int i = 0; i < 6; i++) {
+ assertEquals(((Long) aggregationResult.get(i)).intValue(),
_values.size());
+ }
+ }
- operator = getOperatorForPqlQueryWithFilter(query);
- assertTrue(operator instanceof AggregationOperator);
- IntermediateResultsBlock resultsBlockWithFilter = ((AggregationOperator)
operator).nextBlock();
- QueriesTestUtils
-
.testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(),
NUM_RECORDS, 0, 6 * NUM_RECORDS,
- NUM_RECORDS);
- List<Object> aggregationResultWithFilter =
resultsBlockWithFilter.getAggregationResult();
+ // Inter segments (expect 4 * inner segment result)
+ String[] expectedResults = new String[6];
+ for (int i = 0; i < 6; i++) {
+ expectedResults[i] = Integer.toString(4 * _values.size());
+ }
+ for (BrokerResponseNative brokerResponse :
Arrays.asList(getBrokerResponseForPqlQuery(query),
+ getBrokerResponseForPqlQueryWithFilter(query))) {
+ QueriesTestUtils.testInterSegmentAggregationResult(brokerResponse, 4 *
NUM_RECORDS, 0, 0, 4 * NUM_RECORDS,
+ expectedResults);
+ }
+ // Regular aggregation
+ query = query + " WHERE intColumn >= 500";
+
+ // Inner segment
+ int expectedResult = 0;
+ for (Integer value : _values) {
+ if (value >= 500) {
+ expectedResult++;
+ }
+ }
+ Operator operator = getOperatorForSqlQuery(query);
+ assertTrue(operator instanceof AggregationOperator);
+ List<Object> aggregationResult = ((AggregationOperator)
operator).nextBlock().getAggregationResult();
assertNotNull(aggregationResult);
- assertNotNull(aggregationResultWithFilter);
- assertEquals(aggregationResult, aggregationResultWithFilter);
+ assertEquals(aggregationResult.size(), 6);
for (int i = 0; i < 6; i++) {
- assertEquals((long) aggregationResult.get(i), _expectedResult);
+ assertEquals(((Long) aggregationResult.get(i)).intValue(),
expectedResult);
}
- // Inter segments (expect 4 * inner segment result)
- String[] expectedResults = new String[6];
+ // Inter segment (expect 4 * inner segment result)
+ BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
+ List<AggregationResult> aggregationResults =
brokerResponse.getAggregationResults();
+ assertNotNull(aggregationResults);
+ assertEquals(aggregationResults.size(), 6);
for (int i = 0; i < 6; i++) {
- expectedResults[i] = Long.toString(4 * _expectedResult);
+ assertEquals(aggregationResults.get(i).getValue(), Integer.toString(4 *
expectedResult));
}
- BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
- QueriesTestUtils
- .testInterSegmentAggregationResult(brokerResponse, 4 * NUM_RECORDS, 0,
0, 4 * NUM_RECORDS, expectedResults);
- brokerResponse = getBrokerResponseForPqlQueryWithFilter(query);
- QueriesTestUtils
- .testInterSegmentAggregationResult(brokerResponse, 4 * NUM_RECORDS, 0,
4 * 6 * NUM_RECORDS, 4 * NUM_RECORDS,
- expectedResults);
}
@Test
public void testAggregationGroupBy() {
- String query =
- "SELECT SEGMENTPARTITIONEDDISTINCTCOUNT(intColumn),
SEGMENTPARTITIONEDDISTINCTCOUNT(longColumn), "
- + "SEGMENTPARTITIONEDDISTINCTCOUNT(floatColumn),
SEGMENTPARTITIONEDDISTINCTCOUNT(doubleColumn), "
- + "SEGMENTPARTITIONEDDISTINCTCOUNT(stringColumn),
SEGMENTPARTITIONEDDISTINCTCOUNT(bytesColumn) FROM "
- + "testTable GROUP BY intColumn";
+ String query = "SELECT SEGMENTPARTITIONEDDISTINCTCOUNT(intColumn),
SEGMENTPARTITIONEDDISTINCTCOUNT(longColumn), "
+ + "SEGMENTPARTITIONEDDISTINCTCOUNT(floatColumn),
SEGMENTPARTITIONEDDISTINCTCOUNT(doubleColumn), "
+ + "SEGMENTPARTITIONEDDISTINCTCOUNT(stringColumn),
SEGMENTPARTITIONEDDISTINCTCOUNT(bytesColumn) FROM "
+ + "testTable GROUP BY intColumn";
// Inner segment
Operator operator = getOperatorForPqlQuery(query);
assertTrue(operator instanceof AggregationGroupByOperator);
IntermediateResultsBlock resultsBlock = ((AggregationGroupByOperator)
operator).nextBlock();
- QueriesTestUtils
-
.testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(),
NUM_RECORDS, 0, 6 * NUM_RECORDS,
- NUM_RECORDS);
+
QueriesTestUtils.testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(),
NUM_RECORDS, 0,
+ 6 * NUM_RECORDS, NUM_RECORDS);
AggregationGroupByResult aggregationGroupByResult =
resultsBlock.getAggregationGroupByResult();
assertNotNull(aggregationGroupByResult);
int numGroups = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]