This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 81ceb1d020 Simplify filtered aggregate transform operator creation 
(#10410)
81ceb1d020 is described below

commit 81ceb1d02046f7e45403227d41d06ccbb77336e3
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Mon Mar 13 14:40:15 2023 -0700

    Simplify filtered aggregate transform operator creation (#10410)
---
 .../operator/filter/CombinedFilterOperator.java    |   8 +-
 .../operator/query/FilteredGroupByOperator.java    |  34 ++---
 .../pinot/core/plan/AggregationPlanNode.java       |  16 +--
 .../apache/pinot/core/plan/GroupByPlanNode.java    |  34 ++---
 .../function/AggregationFunctionUtils.java         | 153 ++++++++++-----------
 ...erSegmentAggregationSingleValueQueriesTest.java |   6 +-
 ...terSegmentAggregationMultiValueQueriesTest.java |   2 +-
 ...SegmentAggregationMultiValueRawQueriesTest.java |   2 +-
 8 files changed, 112 insertions(+), 143 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/CombinedFilterOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/CombinedFilterOperator.java
index 9b1b0e0672..4c4f2a689d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/CombinedFilterOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/CombinedFilterOperator.java
@@ -22,7 +22,6 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.core.common.BlockDocIdSet;
-import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.blocks.FilterBlock;
 import org.apache.pinot.core.operator.docidsets.AndDocIdSet;
 import org.apache.pinot.spi.trace.Tracing;
@@ -41,13 +40,15 @@ public class CombinedFilterOperator extends 
BaseFilterOperator {
 
   public CombinedFilterOperator(BaseFilterOperator mainFilterOperator, 
BaseFilterOperator subFilterOperator,
       Map<String, String> queryOptions) {
+    assert !mainFilterOperator.isResultEmpty() && 
!mainFilterOperator.isResultMatchingAll()
+        && !subFilterOperator.isResultEmpty() && 
!subFilterOperator.isResultMatchingAll();
     _mainFilterOperator = mainFilterOperator;
     _subFilterOperator = subFilterOperator;
     _queryOptions = queryOptions;
   }
 
   @Override
-  public List<Operator> getChildOperators() {
+  public List<BaseFilterOperator> getChildOperators() {
     return Arrays.asList(_mainFilterOperator, _subFilterOperator);
   }
 
@@ -59,9 +60,6 @@ public class CombinedFilterOperator extends 
BaseFilterOperator {
   @Override
   protected FilterBlock getNextBlock() {
     Tracing.activeRecording().setNumChildren(2);
-    if (_mainFilterOperator instanceof MatchAllFilterOperator) {
-      return _subFilterOperator.nextBlock();
-    }
     BlockDocIdSet mainFilterDocIdSet = 
_mainFilterOperator.nextBlock().getNonScanFilterBLockDocIdSet();
     BlockDocIdSet subFilterDocIdSet = 
_subFilterOperator.nextBlock().getBlockDocIdSet();
     return new FilterBlock(new AndDocIdSet(Arrays.asList(mainFilterDocIdSet, 
subFilterDocIdSet), _queryOptions));
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
index 872a999f54..7103f47f56 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
@@ -53,36 +53,37 @@ import org.apache.pinot.spi.trace.Tracing;
 public class FilteredGroupByOperator extends BaseOperator<GroupByResultsBlock> 
{
   private static final String EXPLAIN_NAME = "GROUP_BY_FILTERED";
 
+  private final QueryContext _queryContext;
   private final AggregationFunction[] _aggregationFunctions;
-  private final List<Pair<AggregationFunction[], TransformOperator>> 
_aggFunctionsWithTransformOperator;
   private final ExpressionContext[] _groupByExpressions;
+  private final List<Pair<AggregationFunction[], TransformOperator>> 
_aggFunctionsWithTransformOperator;
   private final long _numTotalDocs;
+  private final DataSchema _dataSchema;
+
   private long _numDocsScanned;
   private long _numEntriesScannedInFilter;
   private long _numEntriesScannedPostFilter;
-  private final DataSchema _dataSchema;
-  private final QueryContext _queryContext;
 
-  public FilteredGroupByOperator(AggregationFunction[] aggregationFunctions,
-      List<Pair<AggregationFunction, FilterContext>> 
filteredAggregationFunctions,
-      List<Pair<AggregationFunction[], TransformOperator>> 
aggFunctionsWithTransformOperator,
-      ExpressionContext[] groupByExpressions, long numTotalDocs, QueryContext 
queryContext) {
-    _aggregationFunctions = aggregationFunctions;
+  public FilteredGroupByOperator(QueryContext queryContext,
+      List<Pair<AggregationFunction[], TransformOperator>> 
aggFunctionsWithTransformOperator, long numTotalDocs) {
+    assert queryContext.getAggregationFunctions() != null && 
queryContext.getFilteredAggregationFunctions() != null
+        && queryContext.getGroupByExpressions() != null;
+    _queryContext = queryContext;
+    _aggregationFunctions = queryContext.getAggregationFunctions();
+    _groupByExpressions = queryContext.getGroupByExpressions().toArray(new 
ExpressionContext[0]);
     _aggFunctionsWithTransformOperator = aggFunctionsWithTransformOperator;
-    _groupByExpressions = groupByExpressions;
     _numTotalDocs = numTotalDocs;
-    _queryContext = queryContext;
 
     // NOTE: The indexedTable expects that the data schema will have group by 
columns before aggregation columns
-    int numGroupByExpressions = groupByExpressions.length;
-    int numAggregationFunctions = aggregationFunctions.length;
+    int numGroupByExpressions = _groupByExpressions.length;
+    int numAggregationFunctions = _aggregationFunctions.length;
     int numColumns = numGroupByExpressions + numAggregationFunctions;
     String[] columnNames = new String[numColumns];
     DataSchema.ColumnDataType[] columnDataTypes = new 
DataSchema.ColumnDataType[numColumns];
 
     // Extract column names and data types for group-by columns
     for (int i = 0; i < numGroupByExpressions; i++) {
-      ExpressionContext groupByExpression = groupByExpressions[i];
+      ExpressionContext groupByExpression = _groupByExpressions[i];
       columnNames[i] = groupByExpression.toString();
       columnDataTypes[i] = DataSchema.ColumnDataType.fromDataTypeSV(
           
aggFunctionsWithTransformOperator.get(i).getRight().getResultMetadata(groupByExpression).getDataType());
@@ -91,10 +92,9 @@ public class FilteredGroupByOperator extends 
BaseOperator<GroupByResultsBlock> {
     // Extract column names and data types for aggregation functions
     for (int i = 0; i < numAggregationFunctions; i++) {
       int index = numGroupByExpressions + i;
-      Pair<AggregationFunction, FilterContext> filteredAggPair = 
filteredAggregationFunctions.get(i);
-      AggregationFunction aggregationFunction = filteredAggPair.getLeft();
-      String columnName =
-          AggregationFunctionUtils.getResultColumnName(aggregationFunction, 
filteredAggPair.getRight());
+      Pair<AggregationFunction, FilterContext> pair = 
queryContext.getFilteredAggregationFunctions().get(i);
+      AggregationFunction aggregationFunction = pair.getLeft();
+      String columnName = 
AggregationFunctionUtils.getResultColumnName(aggregationFunction, 
pair.getRight());
       columnNames[index] = columnName;
       columnDataTypes[index] = 
aggregationFunction.getIntermediateResultColumnType();
     }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
index 74e5951412..47f846abff 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
@@ -80,18 +80,10 @@ public class AggregationPlanNode implements PlanNode {
    * Build the operator to be used for filtered aggregations
    */
   private FilteredAggregationOperator buildFilteredAggOperator() {
-    int numTotalDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
-    // Build the operator chain for the main predicate
-    Pair<FilterPlanNode, BaseFilterOperator> filterOperatorPair =
-        AggregationFunctionUtils.buildFilterOperator(_indexSegment, 
_queryContext);
-    TransformOperator transformOperator =
-        
AggregationFunctionUtils.buildTransformOperatorForFilteredAggregates(_indexSegment,
 _queryContext,
-            filterOperatorPair.getRight(), null);
-
-    List<Pair<AggregationFunction[], TransformOperator>> aggToTransformOpList =
-        AggregationFunctionUtils.buildFilteredAggTransformPairs(_indexSegment, 
_queryContext,
-            filterOperatorPair.getRight(), transformOperator, null);
-    return new 
FilteredAggregationOperator(_queryContext.getAggregationFunctions(), 
aggToTransformOpList, numTotalDocs);
+    List<Pair<AggregationFunction[], TransformOperator>> transformOperators =
+        
AggregationFunctionUtils.buildFilteredAggregateTransformOperators(_indexSegment,
 _queryContext);
+    return new 
FilteredAggregationOperator(_queryContext.getAggregationFunctions(), 
transformOperators,
+        _indexSegment.getSegmentMetadata().getTotalDocs());
   }
 
   /**
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/GroupByPlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/GroupByPlanNode.java
index ccb51143e6..b2cef40628 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/GroupByPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/GroupByPlanNode.java
@@ -55,37 +55,23 @@ public class GroupByPlanNode implements PlanNode {
 
   @Override
   public Operator<GroupByResultsBlock> run() {
-    assert _queryContext.getAggregationFunctions() != null;
-    assert _queryContext.getGroupByExpressions() != null;
-
-    if (_queryContext.hasFilteredAggregations()) {
-      return buildFilteredGroupByPlan();
-    }
-    return buildNonFilteredGroupByPlan();
+    assert _queryContext.getAggregationFunctions() != null && 
_queryContext.getGroupByExpressions() != null;
+    return _queryContext.hasFilteredAggregations() ? 
buildFilteredGroupByPlan() : buildNonFilteredGroupByPlan();
   }
 
   private FilteredGroupByOperator buildFilteredGroupByPlan() {
-    int numTotalDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
-    // Build the operator chain for the main predicate so the filter plan can 
be run only one time
-    Pair<FilterPlanNode, BaseFilterOperator> filterOperatorPair =
-        AggregationFunctionUtils.buildFilterOperator(_indexSegment, 
_queryContext);
-    ExpressionContext[] groupByExpressions = 
_queryContext.getGroupByExpressions().toArray(new ExpressionContext[0]);
-    TransformOperator transformOperator =
-        
AggregationFunctionUtils.buildTransformOperatorForFilteredAggregates(_indexSegment,
 _queryContext,
-            filterOperatorPair.getRight(), groupByExpressions);
-
-    List<Pair<AggregationFunction[], TransformOperator>> aggToTransformOpList =
-        AggregationFunctionUtils.buildFilteredAggTransformPairs(_indexSegment, 
_queryContext,
-            filterOperatorPair.getRight(), transformOperator, 
groupByExpressions);
-    return new FilteredGroupByOperator(_queryContext.getAggregationFunctions(),
-        _queryContext.getFilteredAggregationFunctions(), aggToTransformOpList,
-        _queryContext.getGroupByExpressions().toArray(new 
ExpressionContext[0]), numTotalDocs, _queryContext);
+    List<Pair<AggregationFunction[], TransformOperator>> transformOperators =
+        
AggregationFunctionUtils.buildFilteredAggregateTransformOperators(_indexSegment,
 _queryContext);
+    return new FilteredGroupByOperator(_queryContext, transformOperators,
+        _indexSegment.getSegmentMetadata().getTotalDocs());
   }
 
   private GroupByOperator buildNonFilteredGroupByPlan() {
     int numTotalDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
     AggregationFunction[] aggregationFunctions = 
_queryContext.getAggregationFunctions();
-    ExpressionContext[] groupByExpressions = 
_queryContext.getGroupByExpressions().toArray(new ExpressionContext[0]);
+    List<ExpressionContext> groupByExpressionList = 
_queryContext.getGroupByExpressions();
+    assert aggregationFunctions != null && groupByExpressionList != null;
+    ExpressionContext[] groupByExpressions = groupByExpressionList.toArray(new 
ExpressionContext[0]);
 
     FilterPlanNode filterPlanNode = new FilterPlanNode(_indexSegment, 
_queryContext);
     BaseFilterOperator filterOperator = filterPlanNode.run();
@@ -115,7 +101,7 @@ public class GroupByPlanNode implements PlanNode {
     }
 
     Set<ExpressionContext> expressionsToTransform =
-        
AggregationFunctionUtils.collectExpressionsToTransform(aggregationFunctions, 
groupByExpressions);
+        
AggregationFunctionUtils.collectExpressionsToTransform(aggregationFunctions, 
groupByExpressionList);
     TransformOperator transformPlanNode =
         new TransformPlanNode(_indexSegment, _queryContext, 
expressionsToTransform, DocIdSetPlanNode.MAX_DOC_PER_CALL,
             filterOperator).run();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
index 99393d30f1..f7d24f11c9 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.core.query.aggregation.function;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -84,13 +83,13 @@ public class AggregationFunctionUtils {
    *          or group-by expressions.
    */
   public static Set<ExpressionContext> 
collectExpressionsToTransform(AggregationFunction[] aggregationFunctions,
-      @Nullable ExpressionContext[] groupByExpressions) {
+      @Nullable List<ExpressionContext> groupByExpressions) {
     Set<ExpressionContext> expressions = new HashSet<>();
     for (AggregationFunction aggregationFunction : aggregationFunctions) {
       expressions.addAll(aggregationFunction.getInputExpressions());
     }
     if (groupByExpressions != null) {
-      expressions.addAll(Arrays.asList(groupByExpressions));
+      expressions.addAll(groupByExpressions);
     }
     return expressions;
   }
@@ -179,97 +178,91 @@ public class AggregationFunctionUtils {
   }
 
   /**
-   * Build a filter operator from the given FilterContext.
-   *
-   * It returns the FilterPlanNode to allow reusing plan level components such 
as predicate
-   * evaluator map
+   * Build pairs of filtered aggregation functions and corresponding transform 
operator.
    */
-  public static Pair<FilterPlanNode, BaseFilterOperator> 
buildFilterOperator(IndexSegment indexSegment,
-      QueryContext queryContext, FilterContext filterContext) {
-    FilterPlanNode filterPlanNode = new FilterPlanNode(indexSegment, 
queryContext, filterContext);
-    return Pair.of(filterPlanNode, filterPlanNode.run());
-  }
+  public static List<Pair<AggregationFunction[], TransformOperator>> 
buildFilteredAggregateTransformOperators(
+      IndexSegment indexSegment, QueryContext queryContext) {
+    assert queryContext.getAggregationFunctions() != null && 
queryContext.getFilteredAggregationFunctions() != null;
 
-  public static Pair<FilterPlanNode, BaseFilterOperator> 
buildFilterOperator(IndexSegment indexSegment,
-      QueryContext queryContext) {
-    return buildFilterOperator(indexSegment, queryContext, 
queryContext.getFilter());
-  }
+    BaseFilterOperator mainFilterOperator = new FilterPlanNode(indexSegment, 
queryContext).run();
 
-  public static TransformOperator 
buildTransformOperatorForFilteredAggregates(IndexSegment indexSegment,
-      QueryContext queryContext, BaseFilterOperator filterOperator, @Nullable 
ExpressionContext[] groupByExpressions) {
-    AggregationFunction[] aggregationFunctions = 
queryContext.getAggregationFunctions();
-    assert aggregationFunctions != null;
-    Set<ExpressionContext> expressionsToTransform =
-        collectExpressionsToTransform(aggregationFunctions, 
groupByExpressions);
-    return new TransformPlanNode(indexSegment, queryContext, 
expressionsToTransform, DocIdSetPlanNode.MAX_DOC_PER_CALL,
-        filterOperator).run();
-  }
-
-  /**
-   * Build pairs of filtered aggregation functions and corresponding transform 
operator
-   * @param mainPredicateFilterOperator Filter operator corresponding to the 
main predicate
-   * @param mainTransformOperator Transform operator corresponding to the main 
predicate
-   */
-  public static List<Pair<AggregationFunction[], TransformOperator>> 
buildFilteredAggTransformPairs(
-      IndexSegment indexSegment, QueryContext queryContext, BaseFilterOperator 
mainPredicateFilterOperator,
-      TransformOperator mainTransformOperator, @Nullable ExpressionContext[] 
groupByExpressions) {
-    Map<FilterContext, Pair<List<AggregationFunction>, TransformOperator>> 
filterContextToAggFuncsMap = new HashMap<>();
-    List<AggregationFunction> nonFilteredAggregationFunctions = new 
ArrayList<>();
-    List<Pair<AggregationFunction, FilterContext>> aggregationFunctions =
-        queryContext.getFilteredAggregationFunctions();
-    List<Pair<AggregationFunction[], TransformOperator>> aggToTransformOpList 
= new ArrayList<>();
+    // No need to process sub-filters when main filter has empty result
+    if (mainFilterOperator.isResultEmpty()) {
+      AggregationFunction[] aggregationFunctions = 
queryContext.getAggregationFunctions();
+      Set<ExpressionContext> expressions =
+          collectExpressionsToTransform(aggregationFunctions, 
queryContext.getGroupByExpressions());
+      TransformOperator transformOperator =
+          new TransformPlanNode(indexSegment, queryContext, expressions, 
DocIdSetPlanNode.MAX_DOC_PER_CALL,
+              mainFilterOperator).run();
+      return Collections.singletonList(Pair.of(aggregationFunctions, 
transformOperator));
+    }
 
-    // For each aggregation function, check if the aggregation function is a 
filtered agg.
-    // If it is, populate the corresponding filter operator and corresponding 
transform operator
-    assert aggregationFunctions != null;
-    for (Pair<AggregationFunction, FilterContext> inputPair : 
aggregationFunctions) {
-      AggregationFunction aggFunc = inputPair.getLeft();
-      FilterContext currentFilterExpression = inputPair.getRight();
-      if (currentFilterExpression != null) {
-        if (filterContextToAggFuncsMap.get(currentFilterExpression) != null) {
-          
filterContextToAggFuncsMap.get(currentFilterExpression).getLeft().add(aggFunc);
-          continue;
-        }
-        Pair<FilterPlanNode, BaseFilterOperator> filterPlanOpPair =
-            buildFilterOperator(indexSegment, queryContext, 
currentFilterExpression);
-        BaseFilterOperator wrappedFilterOperator =
-            new CombinedFilterOperator(mainPredicateFilterOperator, 
filterPlanOpPair.getRight(),
-                queryContext.getQueryOptions());
-        TransformOperator newTransformOperator =
-            buildTransformOperatorForFilteredAggregates(indexSegment, 
queryContext, wrappedFilterOperator,
-                groupByExpressions);
-        // For each transform operator, associate it with the underlying 
expression. This allows
-        // fetching the relevant TransformOperator when resolving blocks 
during aggregation
-        // execution
-        List<AggregationFunction> aggFunctionList = new ArrayList<>();
-        aggFunctionList.add(aggFunc);
-        filterContextToAggFuncsMap.put(currentFilterExpression, 
Pair.of(aggFunctionList, newTransformOperator));
+    // For each aggregation function, check if the aggregation function is a 
filtered aggregate. If so, populate the
+    // corresponding filter operator.
+    Map<FilterContext, Pair<BaseFilterOperator, List<AggregationFunction>>> 
filterOperators = new HashMap<>();
+    List<AggregationFunction> nonFilteredFunctions = new ArrayList<>();
+    for (Pair<AggregationFunction, FilterContext> functionFilterPair : 
queryContext.getFilteredAggregationFunctions()) {
+      AggregationFunction aggregationFunction = functionFilterPair.getLeft();
+      FilterContext filter = functionFilterPair.getRight();
+      if (filter != null) {
+        filterOperators.computeIfAbsent(filter, k -> {
+          BaseFilterOperator combinedFilterOperator;
+          BaseFilterOperator subFilterOperator = new 
FilterPlanNode(indexSegment, queryContext, filter).run();
+          if (mainFilterOperator.isResultMatchingAll()) {
+            combinedFilterOperator = subFilterOperator;
+          } else {
+            if (subFilterOperator.isResultEmpty()) {
+              combinedFilterOperator = subFilterOperator;
+            } else if (subFilterOperator.isResultMatchingAll()) {
+              combinedFilterOperator = mainFilterOperator;
+            } else {
+              combinedFilterOperator =
+                  new CombinedFilterOperator(mainFilterOperator, 
subFilterOperator, queryContext.getQueryOptions());
+            }
+          }
+          return Pair.of(combinedFilterOperator, new ArrayList<>());
+        }).getRight().add(aggregationFunction);
       } else {
-        nonFilteredAggregationFunctions.add(aggFunc);
+        nonFilteredFunctions.add(aggregationFunction);
       }
     }
-    // Convert to array since FilteredGroupByOperator expects it
-    for (Pair<List<AggregationFunction>, TransformOperator> pair : 
filterContextToAggFuncsMap.values()) {
-      List<AggregationFunction> aggregationFunctionList = pair.getLeft();
-      if (aggregationFunctionList == null) {
-        throw new IllegalStateException("Null aggregation list seen");
+
+    // Create the transform operators
+    List<Pair<AggregationFunction[], TransformOperator>> transformOperators = 
new ArrayList<>();
+    List<ExpressionContext> groupByExpressions = 
queryContext.getGroupByExpressions();
+    for (Pair<BaseFilterOperator, List<AggregationFunction>> 
filterOperatorFunctionsPair : filterOperators.values()) {
+      BaseFilterOperator filterOperator = 
filterOperatorFunctionsPair.getLeft();
+      if (filterOperator == mainFilterOperator) {
+        // This can happen when the sub filter matches all documents, and we 
can treat the function as non-filtered
+        nonFilteredFunctions.addAll(filterOperatorFunctionsPair.getRight());
+      } else {
+        AggregationFunction[] aggregationFunctions =
+            filterOperatorFunctionsPair.getRight().toArray(new 
AggregationFunction[0]);
+        Set<ExpressionContext> expressions = 
collectExpressionsToTransform(aggregationFunctions, groupByExpressions);
+        TransformOperator transformOperator =
+            new TransformPlanNode(indexSegment, queryContext, expressions, 
DocIdSetPlanNode.MAX_DOC_PER_CALL,
+                filterOperator).run();
+        transformOperators.add(Pair.of(aggregationFunctions, 
transformOperator));
       }
-      aggToTransformOpList.add(Pair.of(aggregationFunctionList.toArray(new 
AggregationFunction[0]), pair.getRight()));
     }
 
-    if (!nonFilteredAggregationFunctions.isEmpty()) {
-      aggToTransformOpList.add(
-          Pair.of(nonFilteredAggregationFunctions.toArray(new 
AggregationFunction[0]), mainTransformOperator));
+    if (!nonFilteredFunctions.isEmpty()) {
+      AggregationFunction[] aggregationFunctions = 
nonFilteredFunctions.toArray(new AggregationFunction[0]);
+      Set<ExpressionContext> expressions = 
collectExpressionsToTransform(aggregationFunctions, groupByExpressions);
+      TransformOperator transformOperator =
+          new TransformPlanNode(indexSegment, queryContext, expressions, 
DocIdSetPlanNode.MAX_DOC_PER_CALL,
+              mainFilterOperator).run();
+      transformOperators.add(Pair.of(aggregationFunctions, transformOperator));
     }
 
-    return aggToTransformOpList;
+    return transformOperators;
   }
 
   public static String getResultColumnName(AggregationFunction 
aggregationFunction, @Nullable FilterContext filter) {
-      String columnName = aggregationFunction.getResultColumnName();
-      if (filter != null) {
-        columnName += " FILTER(WHERE " + filter + ")";
-      }
-      return columnName;
+    String columnName = aggregationFunction.getResultColumnName();
+    if (filter != null) {
+      columnName += " FILTER(WHERE " + filter + ")";
+    }
+    return columnName;
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationSingleValueQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationSingleValueQueriesTest.java
index a59008be4c..dbf9565f67 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationSingleValueQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationSingleValueQueriesTest.java
@@ -67,7 +67,7 @@ public class InnerSegmentAggregationSingleValueQueriesTest 
extends BaseSingleVal
     FilteredAggregationOperator aggregationOperator = getOperator(query);
     AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock();
     
QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(),
 150000L, 0L,
-        450000L, 30000L);
+        120000L, 30000L);
     
QueriesTestUtils.testInnerSegmentAggregationResult(resultsBlock.getResults(), 
22266008882250L, 30000, 2147419555,
         32289159189150L, 28175373944314L, 30000L);
 
@@ -77,7 +77,7 @@ public class InnerSegmentAggregationSingleValueQueriesTest 
extends BaseSingleVal
     aggregationOperator = getOperator(query);
     resultsBlock = aggregationOperator.nextBlock();
     
QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(),
 150000L, 0L,
-        450000L, 30000L);
+        120000L, 30000L);
     
QueriesTestUtils.testInnerSegmentAggregationResult(resultsBlock.getResults(), 
22266008882250L, 30000, 2147419555,
         32289159189150L, 28175373944314L, 30000L);
 
@@ -87,7 +87,7 @@ public class InnerSegmentAggregationSingleValueQueriesTest 
extends BaseSingleVal
     aggregationOperator = getOperator(query);
     resultsBlock = aggregationOperator.nextBlock();
     
QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(),
 120000L, 0L,
-        360000L, 30000L);
+        90000L, 30000L);
     
QueriesTestUtils.testInnerSegmentAggregationResult(resultsBlock.getResults(), 
22266008882250L, 30000, 2147419555,
         32289159189150L, 0L, 0L);
   }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
index c4abd67d4e..0a1cfec438 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
@@ -595,7 +595,7 @@ public class InterSegmentAggregationMultiValueQueriesTest 
extends BaseMultiValue
         new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.LONG});
     ResultTable expectedResultTable =
         new ResultTable(expectedDataSchema, Collections.singletonList(new 
Object[]{370236L}));
-    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 370236L, 0L, 0L, 
400000L, expectedResultTable);
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 370236L, 400000L, 
0L, 400000L, expectedResultTable);
   }
 
   @Test
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
index df2dbb4e4e..66e464b7b0 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
@@ -534,7 +534,7 @@ public class 
InterSegmentAggregationMultiValueRawQueriesTest extends BaseMultiVa
         new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.LONG});
     ResultTable expectedResultTable =
         new ResultTable(expectedDataSchema, Collections.singletonList(new 
Object[]{370236L}));
-    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 370236L, 0L, 0L, 
400000L, expectedResultTable);
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 370236L, 400000L, 
0L, 400000L, expectedResultTable);
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to