This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 55804f787 IMPALA-12959: Calcite planner: Implement count star 
optimization...
55804f787 is described below

commit 55804f78740bf5e05aaee2634dec249fefb431a0
Author: Steve Carlin <[email protected]>
AuthorDate: Thu Jan 30 12:58:05 2025 -0800

    IMPALA-12959: Calcite planner: Implement count star optimization...
    
    IMPALA-13779: Handle partition key scan optimization
    
    IMPALA-13780: Handle full acid selects
    
    The 3 commits referenced here are somewhat related in that they all
    involve changes for the HdfsScanRel column layout and have been
    combined.
    
    For the optimizations, some infrastructure code was added. Information
    from the Aggregation RelNode is needed by the TableScan RelNode and
    vice versa. The mechanism to send information to children RelNodes is
    by using the ParentPlanRelContext. The mechanism for sending information
    up to the parent is by using the NodeWithExprs object. If the conditions
    are met for the optimizations (equivalent to the conditions in the current
    Impala planner), the optimizations are applied.
    
    For count star optimization, the STAT_NUM_ROWS fake column is added to hold
    the information, and then the aggregate applies a sum_init_zero on this
    column.
    
    For partition key scan, if the conditions are met, the Impala HdfsScanNode
    is sent a flag in its constructor that handles the optimization.
    
    For acid selects, the SingleNodePlanner has code to handle the additional
    PlanNodes needed. Some code involving column number calculation was needed
    to deal with the extra columns that are present in a full acid table.
    
    One extra note: In HdfsScanNode, a Preconditions check was removed.
    This state check ensured that the countStarSlot only existed when the
    aggregation substitution map was set. This does not apply to Calcite which
    does not use the substitution map to handle the count star optimization.
    
    Change-Id: I975beefedd2cceb34dad0f93343a46d1b7094c13
    Reviewed-on: http://gerrit.cloudera.org:8080/22425
    Reviewed-by: Michael Smith <[email protected]>
    Reviewed-by: Joe McDonnell <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../org/apache/impala/planner/HdfsScanNode.java    |   1 -
 .../java/org/apache/impala/planner/ScanNode.java   |  21 ++-
 .../apache/impala/planner/SingleNodePlanner.java   |  13 +-
 .../impala/calcite/rel/node/ImpalaAggRel.java      |  65 +++++++-
 .../impala/calcite/rel/node/ImpalaAnalyticRel.java |   2 +-
 .../impala/calcite/rel/node/ImpalaHdfsScanRel.java | 166 ++++++++++++++++++---
 .../impala/calcite/rel/node/ImpalaPlanRel.java     |  18 +++
 .../impala/calcite/rel/node/ImpalaProjectRel.java  |   3 +-
 .../impala/calcite/rel/node/ImpalaSortRel.java     |   2 +-
 .../impala/calcite/rel/node/NodeCreationUtils.java |   2 +-
 .../impala/calcite/rel/node/NodeWithExprs.java     |  19 +--
 .../calcite/rel/node/ParentPlanRelContext.java     |  12 ++
 .../calcite/rel/phys/ImpalaHdfsScanNode.java       |   7 +-
 .../apache/impala/calcite/schema/CalciteTable.java |  93 +++++++++---
 14 files changed, 344 insertions(+), 80 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index eb026b717..80a59ec99 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -2037,7 +2037,6 @@ public class HdfsScanNode extends ScanNode {
       msg.hdfs_scan_node.setDeterministic_scanrange_assignment(
           deterministicScanRangeAssignment_);
     }
-    Preconditions.checkState((optimizedAggSmap_ == null) == (countStarSlot_ == 
null));
     if (countStarSlot_ != null) {
       
msg.hdfs_scan_node.setCount_star_slot_offset(countStarSlot_.getByteOffset());
     }
diff --git a/fe/src/main/java/org/apache/impala/planner/ScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
index c79a28127..efd6d149a 100644
--- a/fe/src/main/java/org/apache/impala/planner/ScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
@@ -87,7 +87,7 @@ abstract public class ScanNode extends PlanNode {
   // then be applied to the MultiAggregateInfo in this query block. We do not 
apply the
   // smap in this class directly to avoid side effects and make it easier to 
reason about.
   protected MultiAggregateInfo aggInfo_ = null;
-  protected static final String STATS_NUM_ROWS = "stats: num_rows";
+  public static final String STATS_NUM_ROWS = "stats: num_rows";
 
   // Should be applied to the AggregateInfo from the same query block. We 
cannot use the
   // PlanNode.outputSmap_ for this purpose because we don't want the smap 
entries to be
@@ -168,11 +168,7 @@ abstract public class ScanNode extends PlanNode {
     countFn.analyzeNoThrow(analyzer);
 
     // Create the sum function.
-    SlotDescriptor sd = analyzer.addSlotDescriptor(getTupleDesc());
-    sd.setType(Type.BIGINT);
-    sd.setIsMaterialized(true);
-    sd.setIsNullable(false);
-    sd.setLabel(STATS_NUM_ROWS);
+    SlotDescriptor sd = createCountStarOptimizationDesc(getTupleDesc(), 
analyzer);
     List<Expr> args = new ArrayList<>();
     args.add(new SlotRef(sd));
     FunctionCallExpr sumFn = new FunctionCallExpr("sum_init_zero", args);
@@ -183,6 +179,19 @@ abstract public class ScanNode extends PlanNode {
     return sd;
   }
 
+  /**
+   * Creates special SlotDescriptor for the count star optimization.
+   */
+  public static SlotDescriptor createCountStarOptimizationDesc(TupleDescriptor 
tupleDesc,
+      Analyzer analyzer) {
+    SlotDescriptor sd = analyzer.addSlotDescriptor(tupleDesc);
+    sd.setType(Type.BIGINT);
+    sd.setIsMaterialized(true);
+    sd.setIsNullable(false);
+    sd.setLabel(ScanNode.STATS_NUM_ROWS);
+    return sd;
+  }
+
   /**
    * Returns true if the count(*) optimization can be applied to the query 
block
    * of this scan node.
diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java 
b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
index 07333895e..016a791f4 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
@@ -1675,7 +1675,8 @@ public class SingleNodePlanner implements 
SingleNodePlannerIntf {
     } else if (addAcidSlotsIfNeeded(analyzer, hdfsTblRef, partitions)) {
       // We are scanning a full ACID table that has delete delta files. Let's 
create
       // a LEFT ANTI JOIN between the insert deltas and delete deltas.
-      return createAcidJoinNode(analyzer, hdfsTblRef, conjuncts, partitions, 
pair.second);
+      return createAcidJoinNode(analyzer, hdfsTblRef, conjuncts, partitions, 
pair.second,
+          ctx_);
     } else {
       HdfsScanNode scanNode =
           new HdfsScanNode(ctx_.getNextNodeId(), tupleDesc, conjuncts, 
partitions,
@@ -1760,9 +1761,9 @@ public class SingleNodePlanner implements 
SingleNodePlannerIntf {
    * adds a LEFT ANTI HASH JOIN with BROADCAST distribution mode. I.e. delete 
events will
    * be broadcasted to the nodes that scan the insert files.
    */
-  private PlanNode createAcidJoinNode(Analyzer analyzer, TableRef hdfsTblRef,
+  public static PlanNode createAcidJoinNode(Analyzer analyzer, TableRef 
hdfsTblRef,
       List<Expr> conjuncts, List<? extends FeFsPartition> partitions,
-      List<Expr> partConjuncts)
+      List<Expr> partConjuncts, PlannerContext ctx)
       throws ImpalaException {
     FeTable feTable = hdfsTblRef.getTable();
     Preconditions.checkState(AcidUtils.isFullAcidTable(
@@ -1781,11 +1782,11 @@ public class SingleNodePlanner implements 
SingleNodePlannerIntf {
     TableRef deleteDeltaRef = TableRef.newTableRef(analyzer, 
hdfsTblRef.getPath(),
         hdfsTblRef.getUniqueAlias() + "-delete-delta");
     addAcidSlots(analyzer, deleteDeltaRef);
-    HdfsScanNode deltaScanNode = new HdfsScanNode(ctx_.getNextNodeId(),
+    HdfsScanNode deltaScanNode = new HdfsScanNode(ctx.getNextNodeId(),
         hdfsTblRef.getDesc(), conjuncts, insertDeltaPartitions, hdfsTblRef,
         /*aggInfo=*/null, partConjuncts, /*isPartitionKeyScan=*/false);
     deltaScanNode.init(analyzer);
-    HdfsScanNode deleteDeltaScanNode = new HdfsScanNode(ctx_.getNextNodeId(),
+    HdfsScanNode deleteDeltaScanNode = new HdfsScanNode(ctx.getNextNodeId(),
         deleteDeltaRef.getDesc(), Collections.emptyList(), 
deleteDeltaPartitions,
         deleteDeltaRef, /*aggInfo=*/null, partConjuncts, 
/*isPartitionKeyScan=*/false);
     deleteDeltaScanNode.init(analyzer);
@@ -1799,7 +1800,7 @@ public class SingleNodePlanner implements 
SingleNodePlannerIntf {
     JoinNode acidJoin = new HashJoinNode(deltaScanNode, deleteDeltaScanNode,
         /*straight_join=*/true, DistributionMode.BROADCAST, 
JoinOperator.LEFT_ANTI_JOIN,
         acidJoinConjuncts, /*otherJoinConjuncts=*/Collections.emptyList());
-    acidJoin.setId(ctx_.getNextNodeId());
+    acidJoin.setId(ctx.getNextNodeId());
     acidJoin.init(analyzer);
     acidJoin.setIsDeleteRowsJoin();
     return acidJoin;
diff --git 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaAggRel.java
 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaAggRel.java
index 2805d0267..c610351c9 100644
--- 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaAggRel.java
+++ 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaAggRel.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
@@ -29,6 +30,7 @@ import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.hint.RelHint;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.fun.SqlSingleValueAggFunction;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.impala.calcite.functions.AnalyzedFunctionCallExpr;
@@ -45,6 +47,7 @@ import org.apache.impala.analysis.FunctionParams;
 import org.apache.impala.analysis.MultiAggregateInfo;
 import org.apache.impala.analysis.SlotDescriptor;
 import org.apache.impala.calcite.util.SimplifiedAnalyzer;
+import org.apache.impala.catalog.AggregateFunction;
 import org.apache.impala.catalog.Function;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
@@ -103,7 +106,8 @@ public class ImpalaAggRel extends Aggregate
 
     List<Expr> groupingExprs = getGroupingExprs(inputWithExprs.outputExprs_);
     List<FunctionCallExpr> aggExprs = getAggregateExprs(context.ctx_,
-        inputWithExprs.outputExprs_, simplifiedAnalyzer);
+        inputWithExprs.outputExprs_, simplifiedAnalyzer,
+        inputWithExprs.countStarOptimization_);
     List<List<Expr>> groupingSets =
         getGroupingSets(simplifiedAnalyzer, inputWithExprs.outputExprs_);
 
@@ -143,7 +147,7 @@ public class ImpalaAggRel extends Aggregate
     aggNode.init(simplifiedAnalyzer);
     simplifiedAnalyzer.clearUnassignedConjuncts();
 
-    return new NodeWithExprs(aggNode, outputExprs, inputWithExprs);
+    return new NodeWithExprs(aggNode, outputExprs);
   }
 
   private NodeWithExprs getChildPlanNode(ParentPlanRelContext context
@@ -153,6 +157,8 @@ public class ImpalaAggRel extends Aggregate
         new ParentPlanRelContext.Builder(context, this);
     // filter condition handled by agg node, so no need to pass it to the 
child.
     builder.setFilterCondition(null);
+    builder.setParentAggregate(this);
+    builder.setInputRefs(ImmutableBitSet.of(RelOptUtil.getAllFields(this)));
     return relInput.getPlanNode(builder.build());
   }
 
@@ -263,17 +269,64 @@ public class ImpalaAggRel extends Aggregate
     return transposePhaseAgg;
   }
 
+  /**
+   * Returns true if the agg call is for distinct columns. This is used
+   * by the partition key scan optimization which can limit the number of
+   * rows scanned in the table scan if only distinct rows are needed by
+   * this agg.
+   */
+  public boolean hasDistinctOnly() throws ImpalaException {
+    for (AggregateCall aggCall : getAggCallList()) {
+      Function fn = getFunction(aggCall);
+      if (fn == null) {
+        return false;
+      }
+      Preconditions.checkState(fn instanceof AggregateFunction);
+      AggregateFunction aggFn = (AggregateFunction) fn;
+      if (!aggFn.ignoresDistinct() && !aggCall.isDistinct()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public boolean hasCountStarOnly() {
+    if (getAggCallList().size() == 0) {
+      return false;
+    }
+    for (AggregateCall aggCall : getAggCallList()) {
+      if (!aggCall.getAggregation().getKind().equals(SqlKind.COUNT)) {
+        return false;
+      }
+      if (aggCall.getArgList().size() > 0) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   private List<FunctionCallExpr> getAggregateExprs(PlannerContext ctx,
-      List<Expr> inputExprs, Analyzer analyzer) throws ImpalaException ,
+      List<Expr> inputExprs, Analyzer analyzer,
+      Expr countStarOptimization) throws ImpalaException,
       AnalysisException {
     List<FunctionCallExpr> exprs = Lists.newArrayList();
     ImpalaPlanRel input = (ImpalaPlanRel) getInput(0);
     for (AggregateCall aggCall : getAggCallList()) {
+      if (hasCountStarOnly()) {
+        if (countStarOptimization != null) {
+          List<Expr> args = new ArrayList<>();
+          args.add(countStarOptimization);
+          FunctionCallExpr sumFn = new FunctionCallExpr("sum_init_zero", args);
+          sumFn.analyzeNoThrow(analyzer);
+          exprs.add(sumFn);
+          continue;
+        }
+      }
       List<Expr> operands = aggCall.getArgList().stream()
           .map(t -> inputExprs.get(t))
           .collect(Collectors.toList());
 
-      Function fn = getFunction(ctx, aggCall);
+      Function fn = getFunction(aggCall);
       Preconditions.checkState(fn != null, "Could not find the Impala function 
for " +
           aggCall.getAggregation().getName());
 
@@ -289,7 +342,7 @@ public class ImpalaAggRel extends Aggregate
     return exprs;
   }
 
-  private Function getFunction(PlannerContext ctx, AggregateCall aggCall)
+  private Function getFunction(AggregateCall aggCall)
       throws ImpalaException {
     RelDataType retType = aggCall.getType();
     SqlAggFunction aggFunction = aggCall.getAggregation();
@@ -326,7 +379,7 @@ public class ImpalaAggRel extends Aggregate
 
     cardinalityCheckNode.init(ctx.getRootAnalyzer());
 
-    return new NodeWithExprs(cardinalityCheckNode, outputExprs, 
inputNodeWithExprs);
+    return new NodeWithExprs(cardinalityCheckNode, outputExprs);
   }
 
   public Aggregate copy(RelTraitSet relTraitSet, RelNode relNode,
diff --git 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaAnalyticRel.java
 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaAnalyticRel.java
index 50e191009..f39b69ad6 100644
--- 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaAnalyticRel.java
+++ 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaAnalyticRel.java
@@ -165,7 +165,7 @@ public class ImpalaAnalyticRel extends Project
     List<Expr> outputExprs =
         getOutputExprs(mapping, projects, context.ctx_.getRootAnalyzer());
 
-    NodeWithExprs retNode = new NodeWithExprs(planNode, outputExprs, 
inputNodeWithExprs);
+    NodeWithExprs retNode = new NodeWithExprs(planNode, outputExprs);
 
     // If there is a filter condition, a SelectNode will get added on top
     // of the retNode.
diff --git 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaHdfsScanRel.java
 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaHdfsScanRel.java
index 4761b5ea4..e4ea19686 100644
--- 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaHdfsScanRel.java
+++ 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaHdfsScanRel.java
@@ -18,10 +18,17 @@
 package org.apache.impala.calcite.rel.node;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 
+import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.BaseTableRef;
 import org.apache.impala.analysis.Expr;
+import org.apache.impala.analysis.Path;
+import org.apache.impala.analysis.ExprSubstitutionMap;
 import org.apache.impala.analysis.SlotDescriptor;
 import org.apache.impala.analysis.SlotRef;
 import org.apache.impala.analysis.TupleDescriptor;
@@ -30,16 +37,24 @@ import 
org.apache.impala.calcite.rel.util.ExprConjunctsConverter;
 import org.apache.impala.calcite.rel.util.PrunedPartitionHelper;
 import org.apache.impala.calcite.schema.CalciteTable;
 import org.apache.impala.calcite.util.SimplifiedAnalyzer;
+import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FeFsPartition;
 import org.apache.impala.catalog.HdfsTable;
+import org.apache.impala.catalog.Type;
 import org.apache.impala.common.ImpalaException;
+import org.apache.impala.common.UnsupportedFeatureException;
 import org.apache.impala.planner.PlanNode;
 import org.apache.impala.planner.PlanNodeId;
+import org.apache.impala.planner.SingleNodePlanner;
+import org.apache.impala.planner.ScanNode;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 /**
  * ImpalaHdfsScanRel. Calcite RelNode which maps to an Impala TableScan node.
  */
@@ -58,34 +73,57 @@ public class ImpalaHdfsScanRel extends TableScan
     BaseTableRef baseTblRef =
         table.createBaseTableRef((SimplifiedAnalyzer) 
context.ctx_.getRootAnalyzer());
 
-    // Create the Tuple Descriptor which will contain only the relevant columns
-    // from the table needed for the query.
-    TupleDescriptor tupleDesc = table.createTupleAndSlotDesc(baseTblRef,
-        getInputRefFieldNames(context), context.ctx_.getRootAnalyzer());
+    produceSlotDescriptorsForTable(baseTblRef, context);
+
+    TupleDescriptor tupleDesc = baseTblRef.getDesc();
 
     // outputExprs will contain all the needed columns from the table
     List<Expr> outputExprs = createScanOutputExprs(tupleDesc.getSlots());
 
+    Analyzer analyzer = context.ctx_.getRootAnalyzer();
     // break up the filter condition (if given) to ones that can be used for
     // partition pruning and ones that cannot.
     ExprConjunctsConverter converter = new ExprConjunctsConverter(
         context.filterCondition_, outputExprs, getCluster().getRexBuilder(),
-        context.ctx_.getRootAnalyzer());
+        analyzer);
 
     PrunedPartitionHelper pph = new PrunedPartitionHelper(table, converter,
         tupleDesc, getCluster().getRexBuilder(), 
context.ctx_.getRootAnalyzer());
-
     List<? extends FeFsPartition> impalaPartitions = pph.getPrunedPartitions();
+
     List<Expr> partitionConjuncts = pph.getPartitionedConjuncts();
     List<Expr> filterConjuncts = pph.getNonPartitionedConjuncts();
 
     PlanNodeId nodeId = context.ctx_.getNextNodeId();
 
-    PlanNode physicalNode = new ImpalaHdfsScanNode(nodeId, tupleDesc, 
impalaPartitions,
-        baseTblRef, null, partitionConjuncts, filterConjuncts);
-    physicalNode.init(context.ctx_.getRootAnalyzer());
+    // Under special conditions, a count star optimization can be applied, 
which
+    // needs a special slot descriptor and slot ref.
+    SlotDescriptor countStarDesc =
+        canUseCountStarOptimization(table, context, filterConjuncts)
+            ? ScanNode.createCountStarOptimizationDesc(tupleDesc, analyzer)
+            : null;
+
+    Expr countStarOptimizationExpr = countStarDesc != null
+        ? new SlotRef(countStarDesc)
+        : null;
 
-    return new NodeWithExprs(physicalNode, outputExprs);
+    PlanNode physicalNode;
+    if (SingleNodePlanner.addAcidSlotsIfNeeded(analyzer, baseTblRef,
+        impalaPartitions)) {
+      // Add the Acid Join Node if needed, which places a join node on top of 
the scan
+      // node to handle the acid deltas.
+      physicalNode = SingleNodePlanner.createAcidJoinNode(analyzer,
+          baseTblRef, filterConjuncts, impalaPartitions, partitionConjuncts,
+          context.ctx_);
+    } else {
+      physicalNode = new ImpalaHdfsScanNode(nodeId, tupleDesc, 
impalaPartitions,
+          baseTblRef, null, partitionConjuncts, filterConjuncts, countStarDesc,
+          isPartitionScanOnly(context, table));
+    }
+    physicalNode.setOutputSmap(new ExprSubstitutionMap());
+    physicalNode.init(analyzer);
+
+    return new NodeWithExprs(physicalNode, outputExprs, 
countStarOptimizationExpr);
   }
 
   /**
@@ -96,27 +134,115 @@ public class ImpalaHdfsScanRel extends TableScan
    * If a column isn't projected out by the parent of the scan node, the array
    * location for the column will remain null.
    */
-  private List<Expr> createScanOutputExprs(List<SlotDescriptor> slotDescs) {
-    int totalCols = getRowType().getFieldNames().size();
+  private List<Expr> createScanOutputExprs(List<SlotDescriptor> slotDescs)
+      throws ImpalaException {
+    CalciteTable calciteTable = (CalciteTable) getTable();
+    HdfsTable table = calciteTable.getHdfsTable();
+    // IMPALA-12961: The output expressions are contained in a list which
+    // may have holes in it (if the table scan column is not in the output).
+    // The width of the list must include all columns, including the acid ones,
+    // even though the acid columns are not currently supported in the
+    // Calcite.getRowType()
+    int totalCols = calciteTable.getNumberColumnsIncludingAcid();
 
     // Initialize all fields to null.
-    // TODO: Should this be a map instead of a list?  See IMPALA-12961 for 
details.
     List<Expr> scanOutputExprs = new 
ArrayList<>(Collections.nCopies(totalCols, null));
 
-    HdfsTable table = ((CalciteTable) getTable()).getHdfsTable();
-    Preconditions.checkState(totalCols == table.getColumns().size());
-    int nonPartitionedCols = totalCols - table.getNumClusteringCols();
     for (SlotDescriptor slotDesc : slotDescs) {
+      Column impalaColumn = slotDesc.getColumn();
+      if (impalaColumn == null) {
+        // The slot descriptor list does not contain a column in the case of 
the
+        // STATS_NUM_ROWS label, which is only used for count star 
optimization.
+        
Preconditions.checkState(slotDesc.getLabel().equals(ScanNode.STATS_NUM_ROWS));
+        continue;
+      }
+
       // On a "select *" with partitioned columns, the partition columns occur 
after the
       // nonpartitioned columns.  But Impala displays the partition columns 
first. The
-      // modular arithmetic provides the correct position number for Impala.
-      int position =
-          (slotDesc.getColumn().getPosition() + nonPartitionedCols) % 
totalCols;
-      scanOutputExprs.set(position, new SlotRef(slotDesc));
+      // getCalcitePosition() method gets the correct column number.
+      Integer calcitePosition =
+          calciteTable.getCalcitePosition(impalaColumn.getPosition());
+      if (calcitePosition == null ) {
+        throw new UnsupportedFeatureException(
+            "Calcite does not support column: " + 
slotDesc.getColumn().getName());
+      }
+
+      scanOutputExprs.set(calcitePosition, new SlotRef(slotDesc));
     }
     return scanOutputExprs;
   }
 
+  /**
+   * Produce the slot descriptors for the given table.
+   *
+   * This code is a bit clunky, imo. The analyze method for SlotRef registers 
the
+   * SlotDescriptor for global use within the analyzer. It's not too apparent, 
you
+   * gotta dig underneath the covers to see this, but it's a necessary step to
+   * produce the SlotDescriptor objects needed for the query.
+   *
+   */
+  private void produceSlotDescriptorsForTable(BaseTableRef baseTblRef,
+      ParentPlanRelContext context) throws ImpalaException {
+    for (String fieldName : getInputRefFieldNames(context)) {
+      SlotRef slotref =
+          new SlotRef(Path.createRawPath(baseTblRef.getUniqueAlias(), 
fieldName));
+      slotref.analyze(context.ctx_.getRootAnalyzer());
+      SlotDescriptor slotDesc = slotref.getDesc();
+      if (slotDesc.getType().isComplexType()) {
+        throw new UnsupportedFeatureException(String.format(fieldName + " "
+            + "is a complex type (array/map) column. "
+            + "This is not currently supported."));
+      } else {
+        slotDesc.setIsMaterialized(true);
+      }
+    }
+  }
+
+  /**
+   * Retrieves the special count star descriptor if it exists, which is used
+   * for count star optimization.
+   */
+  private SlotDescriptor getCountStarDescriptor(List<SlotDescriptor> 
slotDescs) {
+    Optional<SlotDescriptor> returnVal = slotDescs.stream()
+        .filter(n -> n.getLabel().equals(ScanNode.STATS_NUM_ROWS)).findFirst();
+    return returnVal.isPresent() ? returnVal.get() : null;
+  }
+
+  /**
+   * Returns true if the partition scan optimization can be applied. The 
parentAggregate
+   * will be set if the aggregate is close to this scan rel node. The 
parentAggregate will
+   * be above the TableScan RelNode either directly, or above a Filter or 
Project.
+   * It can be applied if only clustered columns are being projected up into 
the Agg
+   * RelNode.
+   */
+  private boolean isPartitionScanOnly(ParentPlanRelContext context, 
CalciteTable table)
+      throws ImpalaException {
+    return context.parentAggregate_ != null &&
+        context.parentAggregate_.hasDistinctOnly() &&
+        table.isOnlyClusteredCols(getInputRefFieldNames(context));
+  }
+
+  /**
+   * Returns true if we can use the count star optimization. The re
+   */
+  private boolean canUseCountStarOptimization(CalciteTable table,
+      ParentPlanRelContext context, List<Expr> filterConjuncts) {
+    // The count(*) will exist in the parent aggregate if it can be used.
+    if (context.parentAggregate_ == null ||
+        !context.parentAggregate_.hasCountStarOnly()) {
+      return false;
+    }
+
+    if (!table.canApplyCountStarOptimization(getInputRefFieldNames(context))) {
+      return false;
+    }
+
+    // Can only use the optimization if there are no filters applied on this 
scan.
+    if (filterConjuncts.size() > 0) {
+      return false;
+    }
+    return true;
+  }
 
   private List<String> getInputRefFieldNames(ParentPlanRelContext context) {
     // If the parent context didn't pass in input refs, we will select all the
diff --git 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaPlanRel.java
 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaPlanRel.java
index 6500f927f..3ba9e8087 100644
--- 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaPlanRel.java
+++ 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaPlanRel.java
@@ -90,4 +90,22 @@ public interface ImpalaPlanRel {
     }
     throw new RuntimeException("Unknown RelNode: " +  relNode);
   }
+
+  /**
+   * Returns true for the nodes that should pass through the "parent aggregate"
+   * to its child. Some queries have optimizations which require the table scan
+   * to be aware of an aggregate done immediately after the table scan (e.g.
+   * partitiion key scanning, count star optimization). Only the RelNodes
+   * mentioned below can be in between the Aggregate RelNode and the Table Scan
+   * RelNode.
+   */
+  public static boolean canPassThroughParentAggregate(ImpalaPlanRel planRel) {
+    switch (getRelNodeType((RelNode) planRel)) {
+      case FILTER:
+      case PROJECT:
+        return true;
+      default:
+        return false;
+    }
+  }
 }
diff --git 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaProjectRel.java
 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaProjectRel.java
index 8569bc633..3abc1101f 100644
--- 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaProjectRel.java
+++ 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaProjectRel.java
@@ -91,7 +91,8 @@ public class ImpalaProjectRel extends Project
 
     // There is no Impala Plan Node mapped to Project, so we just return the 
child
     // PlanNode. However, the outputExprs change with the Project.
-    return new NodeWithExprs(inputWithExprs.planNode_, outputExprs, 
inputWithExprs);
+    return new NodeWithExprs(inputWithExprs.planNode_, outputExprs,
+        inputWithExprs.countStarOptimization_);
   }
 
   /**
diff --git 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaSortRel.java
 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaSortRel.java
index 17eee414d..ce6f0939e 100644
--- 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaSortRel.java
+++ 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaSortRel.java
@@ -143,7 +143,7 @@ public class ImpalaSortRel extends Sort
         inputNodeWithExprs.planNode_, sortInfo, limit_, offset_, limit_ != -1,
         context.ctx_.getRootAnalyzer());
 
-    NodeWithExprs retNode = new NodeWithExprs(sortNode, outputExprs, 
inputNodeWithExprs);
+    NodeWithExprs retNode = new NodeWithExprs(sortNode, outputExprs);
 
     // If there is a filter condition, a SelectNode will get added on top
     // of the retNode.
diff --git 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/NodeCreationUtils.java
 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/NodeCreationUtils.java
index 879a3b317..8d6ebaf26 100644
--- 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/NodeCreationUtils.java
+++ 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/NodeCreationUtils.java
@@ -87,7 +87,7 @@ public class NodeCreationUtils {
 
     unionNode.init(analyzer);
 
-    return new NodeWithExprs(unionNode, outputExprs, childrenPlanNodes);
+    return new NodeWithExprs(unionNode, outputExprs);
   }
 
   public static List<Expr> createOutputExprs(List<SlotDescriptor> slotDescs) {
diff --git 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/NodeWithExprs.java
 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/NodeWithExprs.java
index 1c8ac67ac..5247e7be0 100644
--- 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/NodeWithExprs.java
+++ 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/NodeWithExprs.java
@@ -31,27 +31,20 @@ import java.util.List;
 public class NodeWithExprs {
   public final PlanNode planNode_;
   public final List<Expr> outputExprs_;
+  public final Expr countStarOptimization_;
 
   public NodeWithExprs(PlanNode planNode, List<Expr> outputExprs) {
-    this.planNode_ = planNode;
-    this.outputExprs_ = outputExprs;
+    this(planNode, outputExprs, null);
   }
 
-  public NodeWithExprs(PlanNode planNode, List<Expr> outputExprs,
-      List<NodeWithExprs> childrenNodeWithExprs) {
-    this.planNode_ = planNode;
-    this.outputExprs_ = outputExprs;
+  public NodeWithExprs(PlanNode planNode, NodeWithExprs childNodeWithExprs) {
+    this(planNode, childNodeWithExprs.outputExprs_, null);
   }
 
   public NodeWithExprs(PlanNode planNode, List<Expr> outputExprs,
-      NodeWithExprs childNodeWithExprs) {
+      Expr countStarOptimization) {
     this.planNode_ = planNode;
     this.outputExprs_ = outputExprs;
-  }
-
-  public NodeWithExprs(PlanNode planNode, NodeWithExprs childNodeWithExprs) {
-
-    this.planNode_ = planNode;
-    this.outputExprs_ = childNodeWithExprs.outputExprs_;
+    this.countStarOptimization_ = countStarOptimization;
   }
 }
diff --git 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ParentPlanRelContext.java
 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ParentPlanRelContext.java
index aca145c8b..9bc4ce8b7 100644
--- 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ParentPlanRelContext.java
+++ 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ParentPlanRelContext.java
@@ -43,6 +43,8 @@ public class ParentPlanRelContext {
 
   public final RelDataType parentRowType_;
 
+  public ImpalaAggRel parentAggregate_;
+
   /**
    * Constructor meant for root node.
    */
@@ -52,6 +54,7 @@ public class ParentPlanRelContext {
     this.inputRefs_ = null;
     this.parentType_ = null;
     this.parentRowType_ = null;
+    this.parentAggregate_ = null;
   }
 
   private ParentPlanRelContext(Builder builder) {
@@ -60,6 +63,7 @@ public class ParentPlanRelContext {
     this.inputRefs_ = builder.inputRefs_;
     this.parentType_ = builder.parentType_;
     this.parentRowType_ = builder.parentRowType_;
+    this.parentAggregate_ = builder.parentAggregate_;
   }
 
   public static class Builder {
@@ -68,6 +72,7 @@ public class ParentPlanRelContext {
     private ImmutableBitSet inputRefs_;
     private ImpalaPlanRel.RelNodeType parentType_;
     private RelDataType  parentRowType_;
+    private ImpalaAggRel parentAggregate_;
 
     /**
      * Should only be called from root level.
@@ -81,6 +86,9 @@ public class ParentPlanRelContext {
       this.context_ = planRelContext.ctx_;
       this.filterCondition_ = planRelContext.filterCondition_;
       this.parentType_ = planRel.relNodeType();
+      this.parentAggregate_ = 
ImpalaPlanRel.canPassThroughParentAggregate(planRel)
+          ? planRelContext.parentAggregate_
+          : null;
     }
 
     public void setFilterCondition(RexNode filterCondition) {
@@ -99,6 +107,10 @@ public class ParentPlanRelContext {
       this.parentType_ = parentType;
     }
 
+    public void setParentAggregate(ImpalaAggRel parentAggregate) {
+      this.parentAggregate_ = parentAggregate;
+    }
+
     public ParentPlanRelContext build() {
       return new ParentPlanRelContext(this);
     }
diff --git 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/phys/ImpalaHdfsScanNode.java
 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/phys/ImpalaHdfsScanNode.java
index c23041e51..49e28c4da 100644
--- 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/phys/ImpalaHdfsScanNode.java
+++ 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/phys/ImpalaHdfsScanNode.java
@@ -20,6 +20,7 @@ package org.apache.impala.calcite.rel.phys;
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.MultiAggregateInfo;
+import org.apache.impala.analysis.SlotDescriptor;
 import org.apache.impala.analysis.TableRef;
 import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.catalog.FeFsPartition;
@@ -39,10 +40,12 @@ public class ImpalaHdfsScanNode extends HdfsScanNode {
   public ImpalaHdfsScanNode(PlanNodeId id, TupleDescriptor tupleDesc,
       List<? extends FeFsPartition> partitions,
       TableRef hdfsTblRef, MultiAggregateInfo aggInfo, List<Expr> 
partConjuncts,
-      List<Expr> assignedConjuncts) {
+      List<Expr> assignedConjuncts, SlotDescriptor countStarDescriptor,
+      boolean isPartitionScanOnly) {
     super(id, tupleDesc, assignedConjuncts, partitions, hdfsTblRef, aggInfo,
-        partConjuncts, false);
+        partConjuncts, isPartitionScanOnly);
     this.assignedConjuncts_ = assignedConjuncts;
+    this.countStarSlot_ = countStarDescriptor;
   }
 
   @Override
diff --git 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteTable.java
 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteTable.java
index 53f4fd243..bc3697088 100644
--- 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteTable.java
+++ 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteTable.java
@@ -51,6 +51,7 @@ import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FeFsPartition;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.FeView;
+import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.IcebergTable;
 import org.apache.impala.calcite.rel.util.ImpalaBaseTableRef;
@@ -62,11 +63,15 @@ import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.common.UnsupportedFeatureException;
+import org.apache.impala.util.AcidUtils;
 
 import com.google.common.collect.ImmutableList;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 
 public class CalciteTable extends RelOptAbstractTable
@@ -74,6 +79,7 @@ public class CalciteTable extends RelOptAbstractTable
 
   private final HdfsTable table_;
 
+  private final Map<Integer, Integer> impalaPositionMap_;
 
   private final List<String> qualifiedTableName_;
 
@@ -82,6 +88,7 @@ public class CalciteTable extends RelOptAbstractTable
     super(reader, table.getName(), buildColumnsForRelDataType(table));
     this.table_ = (HdfsTable) table;
     this.qualifiedTableName_ = table.getTableName().toPath();
+    this.impalaPositionMap_ = buildPositionMap();
 
     checkIfTableIsSupported(table);
   }
@@ -91,6 +98,7 @@ public class CalciteTable extends RelOptAbstractTable
     RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl(new 
ImpalaTypeSystemImpl());
 
     RelDataTypeFactory.Builder builder = new 
RelDataTypeFactory.Builder(typeFactory);
+
     // skip clustering columns, save them for the end
     for (Column column : table.getColumnsInHiveOrder()) {
       if (column.getType().isComplexType()) {
@@ -128,28 +136,6 @@ public class CalciteTable extends RelOptAbstractTable
     return baseTblRef;
   }
 
-  // Create tuple and slot descriptors for this base table
-  public TupleDescriptor createTupleAndSlotDesc(BaseTableRef baseTblRef,
-      List<String> fieldNames, Analyzer analyzer) throws ImpalaException {
-    // create the slot descriptors corresponding to this tuple descriptor
-    // by supplying the field names from Calcite's output schema for this node
-    for (int i = 0; i < fieldNames.size(); i++) {
-      String fieldName = fieldNames.get(i);
-      SlotRef slotref =
-          new SlotRef(Path.createRawPath(baseTblRef.getUniqueAlias(), 
fieldName));
-      slotref.analyze(analyzer);
-      SlotDescriptor slotDesc = slotref.getDesc();
-      if (slotDesc.getType().isCollectionType()) {
-        throw new AnalysisException(String.format(fieldName + " "
-            + "is a complex type (array/map/struct) column. "
-            + "This is not currently supported."));
-      }
-      slotDesc.setIsMaterialized(true);
-    }
-    TupleDescriptor tupleDesc = baseTblRef.getDesc();
-    return tupleDesc;
-  }
-
   /**
    * Return the pruned partitions
    * TODO: Currently all partitions are returned since filters aren't yet 
supported.
@@ -231,4 +217,67 @@ public class CalciteTable extends RelOptAbstractTable
   public SqlMonotonicity getMonotonicity(String columnName) {
     return SqlMonotonicity.NOT_MONOTONIC;
   }
+
+  @Override
+  public double getRowCount() {
+    return (double) table_.getNumRows();
+  }
+
+  /**
+   * Returns a position map from Impala column numbers to Calcite
+   * column numbers. Calcite places the columns in "Hive Order"
+   * so that a 'select *' will return the column list in the
+   * right order, but this map is needed because sometimes we
+   * only have the "Column.getPosition()" column number which
+   * is the Impala column number.
+   */
+  private Map<Integer, Integer> buildPositionMap() {
+
+    Map<Integer, Integer> impalaPositionMap = new HashMap<>();
+    // skip clustering columns, save them for the end
+    int i = 0;
+    for (Column column : table_.getColumnsInHiveOrder()) {
+      impalaPositionMap.put(column.getPosition(), i);
+      i++;
+    }
+
+    return impalaPositionMap;
+  }
+
+  public int getCalcitePosition(int impalaPosition) {
+    return impalaPositionMap_.get(impalaPosition);
+  }
+
+  public int getNumberColumnsIncludingAcid() {
+    return impalaPositionMap_.keySet().size();
+  }
+
+  /**
+   * Returns true if the conditions on the table meet the requirements
+   * needed to apply the count star optimization.
+   */
+  public boolean canApplyCountStarOptimization(List<String> fieldNames) {
+    Set<HdfsFileFormat> fileFormats = table_.getFileFormats();
+    if (fileFormats.size() != 1) {
+      return false;
+    }
+    if (!fileFormats.contains(HdfsFileFormat.ORC) &&
+        !fileFormats.contains(HdfsFileFormat.PARQUET) &&
+        !fileFormats.contains(HdfsFileFormat.HUDI_PARQUET)) {
+      return false;
+    }
+    if (AcidUtils.isFullAcidTable(table_.getMetaStoreTable().getParameters())) 
{
+      return false;
+    }
+    return isOnlyClusteredCols(fieldNames);
+  }
+
+  public boolean isOnlyClusteredCols(List<String> fieldNames) {
+    for (int i = 0; i < fieldNames.size(); i++) {
+      if (!table_.isClusteringColumn(table_.getColumn(fieldNames.get(i)))) {
+        return false;
+      }
+    }
+    return true;
+  }
 }


Reply via email to