This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 40d0375  IMPALA-9898: Plan generation and execution for grouping sets
40d0375 is described below

commit 40d037549dc69ac85c0096c4e9db620d8523611b
Author: Aman Sinha <amsi...@cloudera.com>
AuthorDate: Thu May 7 17:51:20 2020 -0700

    IMPALA-9898: Plan generation and execution for grouping sets
    
    This patch enhances the MultiAggregateInfo to handle grouping sets
    and rollup (which is converted to grouping sets). This patch does
    not itself do parsing/validation of grouping sets syntax but rather
    provides the following supporting functionality:
      - A separate analyze method that accepts aggregation classes and
        aggregation info that have been created separately.
      - A modified Transpose phase that uses combination of aggif(),
        valid_tid() functions and CASE exprs to choose exactly which
        slots from the underlying aggregate classes need to be output
        based on the tuple id.
      - Modified materialization step where all aggregate slots and
        grouping slots are materialized in case of grouping sets.
      - Creates grouping_id value for grouping sets. The grouping_id
        function in SQL describes which expression is grouped-by in a
        particular row of a query with grouping sets.
    
    Testing:
      This patch is not individually testable but will be tested
      as part of the overall grouping set support.
    
    Change-Id: Id474c5373860b0d8014ee9c844a3fb90092be968
    Reviewed-on: http://gerrit.cloudera.org:8080/16115
    Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 .../apache/impala/analysis/MultiAggregateInfo.java | 261 +++++++++++++++++++++
 1 file changed, 261 insertions(+)

diff --git 
a/fe/src/main/java/org/apache/impala/analysis/MultiAggregateInfo.java 
b/fe/src/main/java/org/apache/impala/analysis/MultiAggregateInfo.java
index 985fcbb..0483df2 100644
--- a/fe/src/main/java/org/apache/impala/analysis/MultiAggregateInfo.java
+++ b/fe/src/main/java/org/apache/impala/analysis/MultiAggregateInfo.java
@@ -155,6 +155,11 @@ public class MultiAggregateInfo {
   // result tuple of the simplified aggregation.
   private ExprSubstitutionMap simplifiedAggSmap_;
 
+  // Indicates if this MultiAggregateInfo is associated with grouping sets.
+  private boolean isGroupingSet_;
+  // Indicates whether to generate the grouping_id column for grouping sets
+  private boolean generateGroupingId_;
+
   public MultiAggregateInfo(List<Expr> groupingExprs, List<FunctionCallExpr> 
aggExprs) {
     groupingExprs_ = Expr.cloneList(Preconditions.checkNotNull(groupingExprs));
     aggExprs_ = Expr.cloneList(Preconditions.checkNotNull(aggExprs));
@@ -189,6 +194,7 @@ public class MultiAggregateInfo {
     if (other.outputSmap_ != null) {
       outputSmap_ = other.outputSmap_.clone();
     }
+    isGroupingSet_ = other.isGroupingSet_;
   }
 
   /**
@@ -300,6 +306,56 @@ public class MultiAggregateInfo {
   }
 
   /**
+   * Version of analyze method that accepts list of aggregation class and 
aggregation
+   * info that may be created by an external planner. This is needed for 
supporting
+   * grouping sets/rollup functionality. This is unlike the default analyze 
method which
+   * internally generates these lists.
+   */
+  public void analyzeCustomClasses(Analyzer analyzer,
+      List<List<FunctionCallExpr>> aggClasses,
+      List<AggregateInfo> aggInfos) throws AnalysisException {
+    if (isAnalyzed_) return;
+    isAnalyzed_ = true;
+
+    if (aggClasses.size() == 0) return;
+    Preconditions.checkState(isGroupingSet_);
+    Preconditions.checkState(aggClasses.size() == aggInfos.size());
+
+    aggClasses_ = new ArrayList<>();
+    aggInfos_ = new ArrayList<>();
+
+    aggClasses_.addAll(aggClasses);
+    aggInfos_.addAll(aggInfos);
+
+    if (aggInfos_.size() == 1) {
+      // Only a single aggregation class, no transposition step is needed.
+      outputSmap_ = aggInfos_.get(0).getResultSmap();
+      return;
+    }
+
+    // Create agg info for the final transposition step.
+    transposeAggInfo_ = createTransposeAggInfoForGroupingSet(groupingExprs_,
+        aggExprs_, aggInfos_, analyzer);
+    List<SlotDescriptor> outputSlots = 
transposeAggInfo_.getResultTupleDesc().getSlots();
+    Preconditions.checkState(groupingExprs_.size() <= outputSlots.size());
+
+    // Maps from the original grouping and aggregate exprs in all aggregation 
classes to
+    // the final output produced by the transposing aggregation.
+    outputSmap_ = new ExprSubstitutionMap();
+    // Add mappings for grouping exprs.
+    for (int i = 0; i < groupingExprs_.size(); ++i) {
+      outputSmap_.put(groupingExprs_.get(i).clone(), new 
SlotRef(outputSlots.get(i)));
+    }
+    // Add mappings for aggregate functions.
+    int outputSlotIdx = groupingExprs_.size() + 1; // add 1 to account for the 
tid column
+    for (int i = 0; i < aggExprs_.size(); i++) {
+      outputSmap_.put(aggExprs_.get(i).clone(),
+        new SlotRef(outputSlots.get(outputSlotIdx)));
+      outputSlotIdx++;
+    }
+  }
+
+  /**
    * Returns a new AggregateInfo for transposing the union of results of the 
given input
    * AggregateInfos, with the given grouping. The AggregateInfos must have 
been created
    * based on the same grouping exprs.
@@ -370,6 +426,189 @@ public class MultiAggregateInfo {
   }
 
   /**
+   * Returns a new AggregateInfo for transposing the union of results of the 
given input
+   * AggregateInfos, each of which may belong to a different grouping set.
+   *
+   * Example:
+   *  SELECT a1, b1, SUM(c1), MIN(d1) FROM t1 GROUP BY ROLLUP(a1, b1)
+   *
+   * Currently, Impala does not support ROLLUP directly but suppose an 
external planner
+   * converts the above to the following 3 Grouping Sets: {(a1, b1), (a1), ()}
+   * We will map these to the following aggregation classes:
+   * Class 1:
+   *  Aggregate output exprs: SUM(c1), MIN(d1)
+   *  Grouping exprs: a1, b1
+   * Class 2:
+   *  Aggregate output exprs: SUM(c1), MIN(d1)
+   *  Grouping exprs: a1, NULL
+   * Class 3:
+   *  Aggregate output exprs: SUM(c1), MIN(d1)
+   *  Grouping exprs: NULL, NULL
+   *
+   * Note that all classes have the same aggregate output exprs but different
+   * grouping exprs.
+   * This is different from the multiple count distinct case. Also, the 
transpose phase is
+   * different as described below.
+   * The Transpose phase chooses the appropriate aggregate slots and grouping 
slots from
+   * each aggregation class based on the valid tuple id:
+   *   Aggregate output exprs:
+   *     AGGIF(valid_tid(1, 2, 3) IN (1, 2, 3),
+   *            CASE valid_tid(1, 2, 3)
+   *              WHEN 1 THEN SUM(c1)        // these agg exprs are same but 
point to
+   *              WHEN 2 THEN SUM(c1)        // different materialized slots
+   *              WHEN 3 THEN SUM(c1)
+   *            END),
+   *     AGGIF(valid_tid(1, 2, 3) IN (1, 2, 3),
+   *                CASE valid_tid(1, 2, 3)
+   *                  WHEN 1 THEN MIN(d1)
+   *                  WHEN 2 THEN MIN(d1)
+   *                  WHEN 3 THEN MIN(d1)
+   *                END)
+   *  Grouping output exprs:
+   *    CASE valid_tid(1, 2, 3)
+   *      WHEN 1 THEN a1                      // these grouping exprs may be 
same but
+   *      WHEN 2 THEN a1                      // point to different 
materialized slots
+   *      WHEN 3 THEN NULL
+   *    END,
+   *    CASE valid_tid(1, 2, 3)
+   *      WHEN 1 THEN b1
+   *      WHEN 2 THEN NULL
+   *      WHEN 3 THEN NULL
+   *    END
+   *
+   *    Notes:
+   *      - This method optionally projects a grouping_id field for the 
grouping sets
+   *      - The max number of grouping exprs allowed per grouping set is 64
+   *      - For the transpose phase grouping exprs we add the TupleId also - 
this
+   *        ensures that NULL values in the data are differentiated from NULLs 
in the
+   *        grouping set.
+   */
+  private AggregateInfo createTransposeAggInfoForGroupingSet(
+      List<Expr> groupingExprs, List<FunctionCallExpr> aggExprs,
+      List<AggregateInfo> aggInfos, Analyzer analyzer) throws 
AnalysisException {
+    List<TupleId> aggTids = new ArrayList<>();
+    for (AggregateInfo aggInfo : aggInfos) {
+      // we use a bigint type for the grouping_id later, so we allow up to
+      // max 64 grouping exprs in a grouping set
+      Preconditions.checkState(aggInfo.getGroupingExprs().size() <= 64,
+          "Exceeded the limit of 64 grouping exprs in a grouping set");
+      aggTids.add(aggInfo.getResultTupleId());
+    }
+    List<FunctionCallExpr> transAggExprs = new ArrayList<>();
+
+    int numGroupingExprs = groupingExprs.size();
+    int numSlots = numGroupingExprs + aggExprs.size();
+    List<Expr> inList = new ArrayList<>();
+    for (TupleId tid : aggTids) {
+      inList.add(NumericLiteral.create(tid.asInt()));
+    }
+    InPredicate tidInPred =
+      new InPredicate(new ValidTupleIdExpr(aggTids), inList, false);
+
+    List<CaseWhenClause> caseWhenClauses = new ArrayList<>();
+
+    // Create aggregate exprs for the transposing agg.
+    for (int aggIndex = numGroupingExprs; aggIndex < numSlots; aggIndex++) {
+      caseWhenClauses.clear();
+      for (AggregateInfo aggInfo : aggInfos) {
+        TupleDescriptor aggTuple = aggInfo.getResultTupleDesc();
+        Preconditions.checkState(aggIndex < aggTuple.getSlots().size());
+        Expr whenExpr = NumericLiteral.create(aggTuple.getId().asInt());
+        Expr thenExpr = new SlotRef(aggTuple.getSlots().get(aggIndex));
+        caseWhenClauses.add(new CaseWhenClause(whenExpr, thenExpr));
+      }
+      CaseExpr caseExpr =
+        new CaseExpr(new ValidTupleIdExpr(aggTids), caseWhenClauses, null);
+      caseExpr.analyzeNoThrow(analyzer);
+      List<Expr> aggFnParams = Lists.<Expr>newArrayList(tidInPred, caseExpr);
+      FunctionCallExpr aggExpr = new FunctionCallExpr("aggif", aggFnParams);
+      aggExpr.analyzeNoThrow(analyzer);
+      transAggExprs.add(aggExpr);
+    }
+
+    // Create grouping exprs for the transposing agg.
+    List<Expr> transGroupingExprs = new ArrayList<>();
+    for (int gbIndex = 0; gbIndex < groupingExprs.size(); ++gbIndex) {
+      caseWhenClauses.clear();
+      for (AggregateInfo aggInfo : aggInfos) {
+        // for a particular aggInfo, we only need to consider the group-by
+        // exprs relevant to that aggInfo
+        TupleDescriptor aggTuple = aggInfo.getResultTupleDesc();
+        Preconditions.checkState(gbIndex < aggTuple.getSlots().size());
+        Expr whenExpr = NumericLiteral.create(aggTuple.getId().asInt());
+        if (aggInfo.getGroupingExprs().size() == 0) {
+          Type nullType = groupingExprs.get(gbIndex).getType();
+          NullLiteral nullLiteral = new NullLiteral();
+          Expr thenExpr = nullLiteral.uncheckedCastTo(nullType);
+          caseWhenClauses.add(new CaseWhenClause(whenExpr, thenExpr));
+        } else {
+          Expr thenExpr = new SlotRef(aggTuple.getSlots().get(gbIndex));
+          caseWhenClauses.add(new CaseWhenClause(whenExpr, thenExpr));
+        }
+      }
+      CaseExpr caseExpr =
+        new CaseExpr(new ValidTupleIdExpr(aggTids), caseWhenClauses, null);
+      caseExpr.analyzeNoThrow(analyzer);
+      transGroupingExprs.add(caseExpr);
+    }
+
+    // add the tuple id of the aggregate class to the grouping exprs
+    caseWhenClauses.clear();
+    for (AggregateInfo aggInfo : aggInfos) {
+      TupleDescriptor aggTuple = aggInfo.getResultTupleDesc();
+      Expr whenExpr = NumericLiteral.create(aggTuple.getId().asInt(), 
Type.INT);
+      Expr thenExpr = whenExpr;
+      caseWhenClauses.add(new CaseWhenClause(whenExpr, thenExpr));
+    }
+    CaseExpr caseExprForTids =
+      new CaseExpr(new ValidTupleIdExpr(aggTids), caseWhenClauses, null);
+    caseExprForTids.analyzeNoThrow(analyzer);
+    transGroupingExprs.add(caseExprForTids);
+
+    // Create the grouping_id column if requested.
+    // consider grouping sets: ((a1, b1), (a1, NULL), (NULL,NULL))
+    // By SQL standards, in the above NULL implies '1', non-null implies '0'.
+    // We multiply this by the place value of that column which is power-of-2
+    // This gives a grouping_id for each grouping set which is encapsulated
+    // within an aggif() function as below:
+    // AGGIF(valid_tid(1, 2, 3) IN (1, 2, 3),
+    //        CASE valid_tid(1, 2, 3) WHEN 1 then <groupingId_for_gs1>
+    //                                WHEN 2 then <groupingId_for_gs2>
+    //                                WHEN 3 then <groupingId_for_gs3>
+    //                               END)
+    if (generateGroupingId_) {
+      caseWhenClauses.clear();
+      for (AggregateInfo aggInfo : aggInfos) {
+        long groupingId = 0;
+        TupleDescriptor aggTuple = aggInfo.getResultTupleDesc();
+        Expr whenExpr = NumericLiteral.create(aggTuple.getId().asInt());
+        int numGbExprs = aggInfo.getGroupingExprs().size();
+        for (int i = numGbExprs - 1, j = 0; i >= 0; i--, j++) {
+          Expr aggInfoGbExpr = aggInfo.getGroupingExprs().get(i);
+          long placeValue = 1L << j;
+          int factor = aggInfoGbExpr instanceof NullLiteral ? 1 : 0;
+          groupingId += factor * placeValue;
+        }
+        Expr thenExpr = NumericLiteral.create(groupingId, Type.BIGINT);
+        caseWhenClauses.add(new CaseWhenClause(whenExpr, thenExpr));
+      }
+      CaseExpr caseExpr =
+        new CaseExpr(new ValidTupleIdExpr(aggTids), caseWhenClauses, null);
+
+      // wrap this in an aggif expr because an AggregateInfo only allows either
+      // aggregate exprs or grouping exprs
+      List<Expr> aggFnParams = Lists.<Expr>newArrayList(tidInPred, caseExpr);
+      FunctionCallExpr aggExpr = new FunctionCallExpr("aggif", aggFnParams);
+      aggExpr.analyze(analyzer);
+      transAggExprs.add(aggExpr);
+    }
+
+    AggregateInfo result =
+        AggregateInfo.create(transGroupingExprs, transAggExprs, analyzer);
+    return result;
+  }
+
+  /**
    * Wraps the given groupingExprs into a CASE that switches on the valid 
tuple id.
    */
   private List<Expr> getTransposeGroupingExprs(
@@ -393,6 +632,14 @@ public class MultiAggregateInfo {
     return result;
   }
 
+  public void setIsGroupingSet(boolean isGroupingSet) { isGroupingSet_ = 
isGroupingSet; }
+
+  public boolean getIsGroupingSet() { return isGroupingSet_; }
+
+  public void setGenerateGroupingId(boolean generateGroupingId) {
+    generateGroupingId_ = generateGroupingId;
+  }
+
   /**
    * Determines which aggregate exprs are required to produce the final query 
result.
    * Eliminates irrelevant aggregation classes and simplifies the aggregation 
plan,
@@ -414,6 +661,20 @@ public class MultiAggregateInfo {
     Preconditions.checkNotNull(transposeAggInfo_);
     transposeAggInfo_.materializeRequiredSlots(analyzer, smap);
 
+    // For grouping sets, materialize the slots from each aggregate info.
+    // The rationale is that the final transposition phase needs the grouping
+    // and agg expr slots from each grouping set. More investigation is needed
+    // to see if we can optimize this by reducing the number of 
materializations
+    // (similar to the multiple count(distinct) case).
+    if (isGroupingSet_) {
+      materializedAggInfos_ = new ArrayList<>();
+      for (AggregateInfo aggInfo : aggInfos_) {
+        aggInfo.materializeRequiredSlots(analyzer, smap);
+      }
+      materializedAggInfos_.addAll(aggInfos_);
+      return;
+    }
+
     // Determine which aggregation classes are required based on which slots
     // are materialized in the transposition aggregation.
     List<Integer> classIdxByAggExprIdx =

Reply via email to