http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/fe/src/main/java/org/apache/impala/planner/AggregationNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java index 03010b8..c18d7de 100644 --- a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java +++ b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java @@ -17,31 +17,34 @@ package org.apache.impala.planner; -import java.util.ArrayList; +import java.util.Collections; import java.util.List; -import java.util.Set; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.impala.analysis.AggregateInfo; import org.apache.impala.analysis.Analyzer; +import org.apache.impala.analysis.CaseExpr; +import org.apache.impala.analysis.CaseWhenClause; import org.apache.impala.analysis.Expr; import org.apache.impala.analysis.FunctionCallExpr; -import org.apache.impala.analysis.SlotId; +import org.apache.impala.analysis.MultiAggregateInfo; +import org.apache.impala.analysis.MultiAggregateInfo.AggPhase; +import org.apache.impala.analysis.NumericLiteral; +import org.apache.impala.analysis.TupleId; +import org.apache.impala.analysis.ValidTupleIdExpr; import org.apache.impala.common.InternalException; import org.apache.impala.thrift.TAggregationNode; +import org.apache.impala.thrift.TAggregator; import org.apache.impala.thrift.TExplainLevel; import org.apache.impala.thrift.TExpr; import org.apache.impala.thrift.TPlanNode; import org.apache.impala.thrift.TPlanNodeType; import org.apache.impala.thrift.TQueryOptions; import org.apache.impala.util.BitUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; /** * Aggregation computation. @@ -57,23 +60,33 @@ public class AggregationNode extends PlanNode { // Conservative minimum size of hash table for low-cardinality aggregations. private final static long MIN_HASH_TBL_MEM = 10L * 1024L * 1024L; - private final AggregateInfo aggInfo_; + private final MultiAggregateInfo multiAggInfo_; + private final AggPhase aggPhase_; - // Set to true if this aggregation node needs to run the Finalize step. This - // node is the root node of a distributed aggregation. - private boolean needsFinalize_; + // Aggregation-class infos derived from 'multiAggInfo_' and 'aggPhase_' in c'tor. + private final List<AggregateInfo> aggInfos_; - // If true, use streaming preaggregation algorithm. Not valid if this is a merge agg. - private boolean useStreamingPreagg_; + // If true, this node produces intermediate aggregation tuples. + private boolean useIntermediateTuple_ = false; - /** - * Create an agg node from aggInfo. - */ - public AggregationNode(PlanNodeId id, PlanNode input, AggregateInfo aggInfo) { - super(id, aggInfo.getOutputTupleId().asList(), "AGGREGATE"); - aggInfo_ = aggInfo; + // If true, this node performs the finalize step. + private boolean needsFinalize_ = false; + + // If true, this node uses streaming preaggregation. Invalid if this is a merge agg. + private boolean useStreamingPreagg_ = false; + + // Resource profiles for each aggregation class. + private List<ResourceProfile> resourceProfiles_; + + public AggregationNode( + PlanNodeId id, PlanNode input, MultiAggregateInfo multiAggInfo, AggPhase aggPhase) { + super(id, "AGGREGATE"); children_.add(input); + multiAggInfo_ = multiAggInfo; + aggInfos_ = multiAggInfo_.getMaterializedAggInfos(aggPhase); + aggPhase_ = aggPhase; needsFinalize_ = true; + computeTupleIds(); } /** @@ -81,11 +94,48 @@ public class AggregationNode extends PlanNode { */ private AggregationNode(PlanNodeId id, AggregationNode src) { super(id, src, "AGGREGATE"); - aggInfo_ = src.aggInfo_; + multiAggInfo_ = src.multiAggInfo_; + aggPhase_ = src.aggPhase_; + aggInfos_ = src.aggInfos_; needsFinalize_ = src.needsFinalize_; + useIntermediateTuple_ = src.useIntermediateTuple_; } - public AggregateInfo getAggInfo() { return aggInfo_; } + @Override + public void computeTupleIds() { + clearTupleIds(); + for (AggregateInfo aggInfo : aggInfos_) { + TupleId aggClassTupleId = null; + if (useIntermediateTuple_) { + aggClassTupleId = aggInfo.getIntermediateTupleId(); + } else { + aggClassTupleId = aggInfo.getOutputTupleId(); + } + tupleIds_.add(aggClassTupleId); + tblRefIds_.add(aggClassTupleId); + // Nullable tuples are only required to distinguish between multiple + // aggregation classes. + if (aggInfos_.size() > 1) { + nullableTupleIds_.add(aggClassTupleId); + } + } + } + + /** + * Sets this node as a preaggregation. + */ + public void setIsPreagg(PlannerContext ctx) { + if (ctx.getQueryOptions().disable_streaming_preaggregations) { + useStreamingPreagg_ = false; + return; + } + for (AggregateInfo aggInfo : aggInfos_) { + if (aggInfo.getGroupingExprs().size() > 0) { + useStreamingPreagg_ = true; + return; + } + } + } /** * Unsets this node as requiring finalize. Only valid to call this if it is @@ -96,25 +146,21 @@ public class AggregationNode extends PlanNode { needsFinalize_ = false; } - /** - * Sets this node as a preaggregation. Only valid to call this if it is not marked - * as a preaggregation - */ - public void setIsPreagg(PlannerContext ctx_) { - TQueryOptions query_options = ctx_.getQueryOptions(); - useStreamingPreagg_ = !query_options.disable_streaming_preaggregations && - aggInfo_.getGroupingExprs().size() > 0; + public void setIntermediateTuple() { + useIntermediateTuple_ = true; + computeTupleIds(); } - /** - * Have this node materialize the aggregation's intermediate tuple instead of - * the output tuple. - */ - public void setIntermediateTuple() { - Preconditions.checkState(!tupleIds_.isEmpty()); - Preconditions.checkState(tupleIds_.get(0).equals(aggInfo_.getOutputTupleId())); - tupleIds_.clear(); - tupleIds_.add(aggInfo_.getIntermediateTupleId()); + public MultiAggregateInfo getMultiAggInfo() { return multiAggInfo_; } + public AggPhase getAggPhase() { return aggPhase_; } + public boolean hasGrouping() { return multiAggInfo_.hasGrouping(); } + public boolean isSingleClassAgg() { return aggInfos_.size() == 1; } + + public boolean isDistinctAgg() { + for (AggregateInfo aggInfo : aggInfos_) { + if (aggInfo.isDistinctAgg()) return true; + } + return false; } @Override @@ -122,47 +168,40 @@ public class AggregationNode extends PlanNode { @Override public void init(Analyzer analyzer) throws InternalException { - // Assign predicates to the top-most agg in the single-node plan that can evaluate - // them, as follows: For non-distinct aggs place them in the 1st phase agg node. For - // distinct aggs place them in the 2nd phase agg node. The conjuncts are - // transferred to the proper place in the multi-node plan via transferConjuncts(). - if (tupleIds_.get(0).equals(aggInfo_.getResultTupleId()) && !aggInfo_.isMerge()) { - // Ignore predicates bound by a grouping slot produced by a SlotRef grouping expr. - // Those predicates are already evaluated below this agg node (e.g., in a scan), - // because the grouping slot must be in the same equivalence class as another slot - // below this agg node. We must not ignore other grouping slots in order to retain - // conjuncts bound by those grouping slots in createEquivConjuncts() (IMPALA-2089). - // Those conjuncts cannot be redundant because our equivalence classes do not - // capture dependencies with non-SlotRef exprs. - Set<SlotId> groupBySlots = Sets.newHashSet(); - for (int i = 0; i < aggInfo_.getGroupingExprs().size(); ++i) { - if (aggInfo_.getGroupingExprs().get(i).unwrapSlotRef(true) == null) continue; - groupBySlots.add(aggInfo_.getOutputTupleDesc().getSlots().get(i).getId()); - } - ArrayList<Expr> bindingPredicates = - analyzer.getBoundPredicates(tupleIds_.get(0), groupBySlots, true); - conjuncts_.addAll(bindingPredicates); - - // also add remaining unassigned conjuncts_ - assignConjuncts(analyzer); - - analyzer.createEquivConjuncts(tupleIds_.get(0), conjuncts_, groupBySlots); + Preconditions.checkState(tupleIds_.size() == aggInfos_.size()); + // Assign conjuncts to the top-most agg in the single-node plan. They are transferred + // to the proper place in the distributed plan via transferConjuncts(). + if (aggPhase_ == multiAggInfo_.getConjunctAssignmentPhase()) { + conjuncts_.clear(); + // TODO: If this is the transposition phase, then we can push conjuncts that + // reference a single aggregation class down into the aggregators of the + // previous phase. + conjuncts_.addAll(multiAggInfo_.collectConjuncts(analyzer, true)); + conjuncts_ = orderConjunctsByCost(conjuncts_); } - conjuncts_ = orderConjunctsByCost(conjuncts_); + // Compute the mem layout for both tuples here for simplicity. - aggInfo_.getOutputTupleDesc().computeMemLayout(); - aggInfo_.getIntermediateTupleDesc().computeMemLayout(); + for (AggregateInfo aggInfo : aggInfos_) { + aggInfo.getOutputTupleDesc().computeMemLayout(); + aggInfo.getIntermediateTupleDesc().computeMemLayout(); + } - // do this at the end so it can take all conjuncts into account + // Do at the end so it can take all conjuncts into account computeStats(analyzer); // don't call createDefaultSMap(), it would point our conjuncts (= Having clause) // to our input; our conjuncts don't get substituted because they already // refer to our output outputSmap_ = getCombinedChildSmap(); - aggInfo_.substitute(outputSmap_, analyzer); - // assert consistent aggregate expr and slot materialization - aggInfo_.checkConsistency(); + + // Substitute exprs and check consistency. + // All of the AggregationNodes corresponding to a MultiAggregationInfo will have the + // same outputSmap_, so just substitute it once. + if (aggPhase_ == AggPhase.FIRST) multiAggInfo_.substitute(outputSmap_, analyzer); + for (AggregateInfo aggInfo : aggInfos_) { + aggInfo.substitute(outputSmap_, analyzer); + aggInfo.checkConsistency(); + } } @Override @@ -179,21 +218,27 @@ public class AggregationNode extends PlanNode { // some others, the estimate doesn't overshoot dramatically) // cardinality: product of # of distinct values produced by grouping exprs - // Any non-grouping aggregation has at least one distinct value - cardinality_ = aggInfo_.getGroupingExprs().isEmpty() ? 1 : - Expr.getNumDistinctValues(aggInfo_.getGroupingExprs()); - // take HAVING predicate into account - if (LOG.isTraceEnabled()) { - LOG.trace("Agg: cardinality=" + Long.toString(cardinality_)); + cardinality_ = 0; + for (AggregateInfo aggInfo : aggInfos_) { + List<Expr> groupingExprs = aggInfo.getGroupingExprs(); + if (groupingExprs.isEmpty()) { + // Non-grouping aggregation class. + cardinality_ += 1; + } else { + long ndvs = Expr.getNumDistinctValues(groupingExprs); + // if we ended up with an overflow, the estimate is certain to be wrong + if (ndvs < 0) { + cardinality_ = -1; + break; + } + cardinality_ += checkedAdd(cardinality_, ndvs); + } } + + // Take conjuncts into account. if (cardinality_ > 0) { - cardinality_ = Math.round((double) cardinality_ * computeSelectivity()); - if (LOG.isTraceEnabled()) { - LOG.trace("sel=" + Double.toString(computeSelectivity())); - } + cardinality_ = (long) Math.round((double) cardinality_ * computeSelectivity()); } - // if we ended up with an overflow, the estimate is certain to be wrong - if (cardinality_ < 0) cardinality_ = -1; // Sanity check the cardinality_ based on the input cardinality_. if (getChild(0).getCardinality() != -1) { if (cardinality_ == -1) { @@ -205,38 +250,141 @@ public class AggregationNode extends PlanNode { } } cardinality_ = capAtLimit(cardinality_); - if (LOG.isTraceEnabled()) { - LOG.trace("stats Agg: cardinality=" + Long.toString(cardinality_)); - } } - @Override - protected String debugString() { - return Objects.toStringHelper(this) - .add("aggInfo", aggInfo_.debugString()) - .addValue(super.debugString()) - .toString(); + /** + * Returns a list of exprs suitable for hash partitioning the output of this node + * before the merge aggregation step. Only valid to call if this node is not a merge + * or transposing aggregation. The returned exprs are bound by the intermediate tuples. + * Takes the SHUFFLE_DISTINCT_EXPRS query option into account. + * + * For single-class aggregations the returned exprs are typically the grouping exprs. + * With SHUFFLE_DISTINCT_EXPRS=true the distinct exprs are also included if this is the + * non-merge first-phase aggregation of a distinct aggregation. + * + * For multi-class aggregations the returned exprs are a list of CASE exprs which you + * can think of as a "union" of the merge partition exprs of each class. Each CASE + * switches on the valid tuple id of an input row to determine the aggregation class, + * and selects the corresponding partition expr. + * The main challenges with crafting these exprs are: + * 1. Different aggregation classes can have a different number of distinct exprs + * Solution: The returned list is maximally wide to accommodate the widest + * aggregation class. For classes that have fewer than the max distinct exprs we add + * constant dummy exprs in the corresponding branch of the CASE. + * 2. A CASE expr must return a single output type, but different aggregation classes + * may have incompatible distinct exprs, so selecting the distinct exprs directly + * in the CASE branches would not always work (unless we cast everything to STRING, + * which we try to avoid). Instead, we call MURMUR_HASH() on the exprs to produce + * a hash value. That way, all branches of a CASE return the same type. + * Considering that the backend will do another round of hashing, there's an + * unnecessary double hashing here that we deemed acceptable for now and has + * potential for cleanup (maybe the FE should always add a MURMUR_HASH()). + * The handling of SHUFFLE_DISTINCT_EXPRS is analogous to the single-class case. + * + * Example: + * SELECT COUNT(DISTINCT a,b), COUNT(DISTINCT c) FROM t GROUP BY d + * Suppose the two aggregation classes have intermediate tuple ids 0 and 1. + * + * Challenges explained on this example: + * 1. First class has distinct exprs a,b and second class has c. We need to accommodate + * the widest class (a,b) and also hash on the grouping expr (d), so there will be + * three cases. + * 2. The types of a and c might be incompatible + * + * The first-phase partition exprs are a list of the following 3 exprs: + * CASE valid_tid() + * WHEN 0 THEN murmur_hash(d) <-- d SlotRef into tuple 0 + * WHEN 1 THEN murmur_hash(d) <-- d SlotRef into tuple 1 + * END, + * CASE valid_tid() + * WHEN 0 THEN murmur_hash(a) + * WHEN 1 THEN murmur_hash(c) + * END, + * CASE valid_tid() + * WHEN 0 THEN murmur_hash(b) + * WHEN 1 THEN 0 <-- dummy constant integer + * END + */ + public List<Expr> getMergePartitionExprs(Analyzer analyzer) { + Preconditions.checkState(!tupleIds_.isEmpty()); + Preconditions.checkState(!aggPhase_.isMerge() && !aggPhase_.isTranspose()); + + boolean shuffleDistinctExprs = analyzer.getQueryOptions().shuffle_distinct_exprs; + if (aggInfos_.size() == 1) { + AggregateInfo aggInfo = aggInfos_.get(0); + List<Expr> groupingExprs = null; + if (aggPhase_.isFirstPhase() && hasGrouping() && !shuffleDistinctExprs) { + groupingExprs = multiAggInfo_.getSubstGroupingExprs(); + } else { + groupingExprs = aggInfo.getPartitionExprs(); + if (groupingExprs == null) groupingExprs = aggInfo.getGroupingExprs(); + } + return Expr.substituteList( + groupingExprs, aggInfo.getIntermediateSmap(), analyzer, false); + } + + int maxNumExprs = 0; + for (AggregateInfo aggInfo : aggInfos_) { + if (aggInfo.getGroupingExprs() == null) continue; + maxNumExprs = Math.max(maxNumExprs, aggInfo.getGroupingExprs().size()); + } + if (maxNumExprs == 0) return Collections.emptyList(); + + List<Expr> result = Lists.newArrayList(); + for (int i = 0; i < maxNumExprs; ++i) { + List<CaseWhenClause> caseWhenClauses = Lists.newArrayList(); + for (AggregateInfo aggInfo : aggInfos_) { + TupleId tid; + if (aggInfo.isDistinctAgg()) { + tid = aggInfo.getOutputTupleId(); + } else { + tid = aggInfo.getIntermediateTupleId(); + } + List<Expr> groupingExprs = aggInfo.getGroupingExprs(); + if (aggPhase_.isFirstPhase() && hasGrouping() && !shuffleDistinctExprs) { + groupingExprs = multiAggInfo_.getSubstGroupingExprs(); + } + Expr whenExpr = NumericLiteral.create(tid.asInt()); + Expr thenExpr; + if (groupingExprs == null || i >= groupingExprs.size()) { + thenExpr = NumericLiteral.create(0); + } else { + thenExpr = new FunctionCallExpr( + "murmur_hash", Lists.newArrayList(groupingExprs.get(i).clone())); + thenExpr.analyzeNoThrow(analyzer); + thenExpr = thenExpr.substitute(aggInfo.getIntermediateSmap(), analyzer, true); + } + caseWhenClauses.add(new CaseWhenClause(whenExpr, thenExpr)); + } + CaseExpr caseExpr = + new CaseExpr(new ValidTupleIdExpr(tupleIds_), caseWhenClauses, null); + caseExpr.analyzeNoThrow(analyzer); + result.add(caseExpr); + } + return result; } @Override protected void toThrift(TPlanNode msg) { + msg.agg_node = new TAggregationNode(); msg.node_type = TPlanNodeType.AGGREGATION_NODE; - - List<TExpr> aggregateFunctions = Lists.newArrayList(); - // only serialize agg exprs that are being materialized - for (FunctionCallExpr e: aggInfo_.getMaterializedAggregateExprs()) { - aggregateFunctions.add(e.treeToThrift()); - } - aggInfo_.checkConsistency(); - msg.agg_node = new TAggregationNode( - aggregateFunctions, - aggInfo_.getIntermediateTupleId().asInt(), - aggInfo_.getOutputTupleId().asInt(), needsFinalize_, - useStreamingPreagg_, - getChild(0).getCardinality()); - List<Expr> groupingExprs = aggInfo_.getGroupingExprs(); - if (groupingExprs != null) { - msg.agg_node.setGrouping_exprs(Expr.treesToThrift(groupingExprs)); + boolean replicateInput = aggPhase_ == AggPhase.FIRST && aggInfos_.size() > 1; + msg.agg_node.setReplicate_input(replicateInput); + msg.agg_node.setEstimated_input_cardinality(getChild(0).getCardinality()); + for (int i = 0; i < aggInfos_.size(); ++i) { + AggregateInfo aggInfo = aggInfos_.get(i); + List<TExpr> aggregateFunctions = Lists.newArrayList(); + for (FunctionCallExpr e : aggInfo.getMaterializedAggregateExprs()) { + aggregateFunctions.add(e.treeToThrift()); + } + TAggregator taggregator = new TAggregator(aggregateFunctions, + aggInfo.getIntermediateTupleId().asInt(), aggInfo.getOutputTupleId().asInt(), + needsFinalize_, useStreamingPreagg_, resourceProfiles_.get(i).toThrift()); + List<Expr> groupingExprs = aggInfo.getGroupingExprs(); + if (!groupingExprs.isEmpty()) { + taggregator.setGrouping_exprs(Expr.treesToThrift(groupingExprs)); + } + msg.agg_node.addToAggregators(taggregator); } } @@ -257,34 +405,67 @@ public class AggregationNode extends PlanNode { output.append("\n"); if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) { - ArrayList<FunctionCallExpr> aggExprs = aggInfo_.getMaterializedAggregateExprs(); - if (!aggExprs.isEmpty()) { - output.append(detailPrefix + "output: ") - .append(getExplainString(aggExprs) + "\n"); - } - // TODO: is this the best way to display this. It currently would - // have DISTINCT_PC(DISTINCT_PC(col)) for the merge phase but not - // very obvious what that means if you don't already know. - - // TODO: group by can be very long. Break it into multiple lines - if (!aggInfo_.getGroupingExprs().isEmpty()) { - output.append(detailPrefix + "group by: ") - .append(getExplainString(aggInfo_.getGroupingExprs()) + "\n"); + if (aggInfos_.size() == 1) { + output.append(getAggInfoExplainString(detailPrefix, aggInfos_.get(0))); + } else { + for (int i = 0; i < aggInfos_.size(); ++i) { + AggregateInfo aggInfo = aggInfos_.get(i); + output.append(String.format("%sClass %d\n", detailPrefix, i)); + output.append(getAggInfoExplainString(detailPrefix + " ", aggInfo)); + } } if (!conjuncts_.isEmpty()) { - output.append(detailPrefix + "having: ") - .append(getExplainString(conjuncts_) + "\n"); + output.append(detailPrefix) + .append("having: ") + .append(getExplainString(conjuncts_)) + .append("\n"); } } return output.toString(); } + private StringBuilder getAggInfoExplainString(String prefix, AggregateInfo aggInfo) { + StringBuilder output = new StringBuilder(); + List<FunctionCallExpr> aggExprs = aggInfo.getMaterializedAggregateExprs(); + List<Expr> groupingExprs = aggInfo.getGroupingExprs(); + if (!aggExprs.isEmpty()) { + output.append(prefix) + .append("output: ") + .append(getExplainString(aggExprs)) + .append("\n"); + } + if (!groupingExprs.isEmpty()) { + output.append(prefix) + .append("group by: ") + .append(getExplainString(groupingExprs)) + .append("\n"); + } + return output; + } + @Override public void computeNodeResourceProfile(TQueryOptions queryOptions) { + resourceProfiles_ = Lists.newArrayListWithCapacity(aggInfos_.size()); + resourceProfiles_.clear(); + for (AggregateInfo aggInfo : aggInfos_) { + resourceProfiles_.add(computeAggClassResourceProfile(queryOptions, aggInfo)); + } + if (aggInfos_.size() == 1) { + nodeResourceProfile_ = resourceProfiles_.get(0); + } else { + nodeResourceProfile_ = ResourceProfile.noReservation(0); + for (ResourceProfile aggProfile : resourceProfiles_) { + nodeResourceProfile_ = nodeResourceProfile_.sum(aggProfile); + } + } + } + + private ResourceProfile computeAggClassResourceProfile( + TQueryOptions queryOptions, AggregateInfo aggInfo) { Preconditions.checkNotNull( fragment_, "PlanNode must be placed into a fragment before calling this method."); - long perInstanceCardinality = fragment_.getPerInstanceNdv( - queryOptions.getMt_dop(), aggInfo_.getGroupingExprs()); + long perInstanceCardinality = + fragment_.getPerInstanceNdv(queryOptions.getMt_dop(), aggInfo.getGroupingExprs()); long perInstanceMemEstimate; long perInstanceDataBytes = -1; if (perInstanceCardinality == -1) { @@ -299,12 +480,12 @@ public class AggregationNode extends PlanNode { PlannerContext.HASH_TBL_SPACE_OVERHEAD, MIN_HASH_TBL_MEM); } - // Must be kept in sync with PartitionedAggregationNode::MinReservation() in be. + // Must be kept in sync with GroupingAggregator::MinReservation() in backend. long perInstanceMinMemReservation; long bufferSize = queryOptions.getDefault_spillable_buffer_size(); long maxRowBufferSize = computeMaxSpillableBufferSize(bufferSize, queryOptions.getMax_row_size()); - if (aggInfo_.getGroupingExprs().isEmpty()) { + if (aggInfo.getGroupingExprs().isEmpty()) { perInstanceMinMemReservation = 0; } else { // This is a grouping pre-aggregation or merge aggregation. @@ -330,17 +511,18 @@ public class AggregationNode extends PlanNode { perInstanceMinMemReservation = bufferSize * PARTITION_FANOUT + Math.max(64 * 1024 * PARTITION_FANOUT, bufferSize); } else { - long minBuffers = PARTITION_FANOUT + 1 + (aggInfo_.needsSerialize() ? 1 : 0); + long minBuffers = PARTITION_FANOUT + 1 + (aggInfo.needsSerialize() ? 1 : 0); // Two of the buffers need to be buffers large enough to hold the maximum-sized // row to serve as input and output buffers while repartitioning. perInstanceMinMemReservation = bufferSize * (minBuffers - 2) + maxRowBufferSize * 2; } } - nodeResourceProfile_ = new ResourceProfileBuilder() + return new ResourceProfileBuilder() .setMemEstimateBytes(perInstanceMemEstimate) .setMinMemReservationBytes(perInstanceMinMemReservation) .setSpillableBufferBytes(bufferSize) - .setMaxRowBufferBytes(maxRowBufferSize).build(); + .setMaxRowBufferBytes(maxRowBufferSize) + .build(); } }
http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java index 4302244..ac3115b 100644 --- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java @@ -20,19 +20,17 @@ package org.apache.impala.planner; import java.util.ArrayList; import java.util.List; -import org.apache.impala.analysis.AggregateInfo; import org.apache.impala.analysis.AnalysisContext; import org.apache.impala.analysis.Analyzer; import org.apache.impala.analysis.Expr; import org.apache.impala.analysis.InsertStmt; import org.apache.impala.analysis.JoinOperator; -import static org.apache.impala.analysis.JoinOperator.*; +import org.apache.impala.analysis.MultiAggregateInfo.AggPhase; import org.apache.impala.analysis.QueryStmt; import org.apache.impala.catalog.FeKuduTable; import org.apache.impala.common.ImpalaException; import org.apache.impala.common.InternalException; import org.apache.impala.planner.JoinNode.DistributionMode; -import org.apache.impala.planner.RuntimeFilterGenerator.RuntimeFilter; import org.apache.impala.util.KuduUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -789,21 +787,21 @@ public class DistributedPlanner { * Returns a fragment that materializes the aggregation result of 'node'. * If the child fragment is partitioned, the result fragment will be partitioned on * the grouping exprs of 'node'. - * If 'node' is phase 1 of a 2-phase DISTINCT aggregation, this will simply - * add 'node' to the child fragment and return the child fragment; the new - * fragment will be created by the subsequent call of createAggregationFragment() - * for the phase 2 AggregationNode. + * If 'node' is phase 1 of a 2-phase DISTINCT aggregation or the 'transpose' phase of a + * multiple-distinct aggregation, this will simply add 'node' to the child fragment and + * return the child fragment; the new fragment will be created by the call of + * createAggregationFragment() for the phase 2 AggregationNode. */ private PlanFragment createAggregationFragment(AggregationNode node, PlanFragment childFragment, ArrayList<PlanFragment> fragments) throws ImpalaException { - if (!childFragment.isPartitioned()) { + if (!childFragment.isPartitioned() || node.getAggPhase() == AggPhase.TRANSPOSE) { // nothing to distribute; do full aggregation directly within childFragment childFragment.addPlanRoot(node); return childFragment; } - if (node.getAggInfo().isDistinctAgg()) { + if (node.isDistinctAgg()) { // 'node' is phase 1 of a DISTINCT aggregation; the actual agg fragment // will get created in the next createAggregationFragment() call // for the parent AggregationNode @@ -813,7 +811,7 @@ public class DistributedPlanner { // Check if 'node' is phase 2 of a DISTINCT aggregation. boolean isDistinct = node.getChild(0) instanceof AggregationNode - && ((AggregationNode)(node.getChild(0))).getAggInfo().isDistinctAgg(); + && ((AggregationNode) (node.getChild(0))).isDistinctAgg(); if (isDistinct) { return createPhase2DistinctAggregationFragment(node, childFragment, fragments); } else { @@ -827,18 +825,15 @@ public class DistributedPlanner { * aggregation. */ private PlanFragment createMergeAggregationFragment( - AggregationNode node, PlanFragment childFragment) - throws ImpalaException { + AggregationNode node, PlanFragment childFragment) throws ImpalaException { Preconditions.checkArgument(childFragment.isPartitioned()); - ArrayList<Expr> groupingExprs = node.getAggInfo().getGroupingExprs(); - boolean hasGrouping = !groupingExprs.isEmpty(); - + List<Expr> partitionExprs = node.getMergePartitionExprs(ctx_.getRootAnalyzer()); + boolean hasGrouping = !partitionExprs.isEmpty(); DataPartition parentPartition = null; if (hasGrouping) { - List<Expr> partitionExprs = node.getAggInfo().getPartitionExprs(); - if (partitionExprs == null) partitionExprs = groupingExprs; - boolean childHasCompatPartition = ctx_.getRootAnalyzer().setsHaveValueTransfer( - partitionExprs, childFragment.getDataPartition().getPartitionExprs(), true); + boolean childHasCompatPartition = node.isSingleClassAgg() + && ctx_.getRootAnalyzer().setsHaveValueTransfer(partitionExprs, + childFragment.getDataPartition().getPartitionExprs(), true); if (childHasCompatPartition) { // The data is already partitioned on the required expressions. We can do the // aggregation in the child fragment without an extra merge step. @@ -848,13 +843,8 @@ public class DistributedPlanner { childFragment.addPlanRoot(node); return childFragment; } - // the parent fragment is partitioned on the grouping exprs; - // substitute grouping exprs to reference the *output* of the agg, not the input - partitionExprs = Expr.substituteList(partitionExprs, - node.getAggInfo().getIntermediateSmap(), ctx_.getRootAnalyzer(), false); parentPartition = DataPartition.hashPartitioned(partitionExprs); } else { - // the parent fragment is unpartitioned parentPartition = DataPartition.UNPARTITIONED; } @@ -874,7 +864,7 @@ public class DistributedPlanner { // place a merge aggregation step in a new fragment PlanFragment mergeFragment = createParentFragment(childFragment, parentPartition); AggregationNode mergeAggNode = new AggregationNode(ctx_.getNextNodeId(), - mergeFragment.getPlanRoot(), node.getAggInfo().getMergeAggInfo()); + mergeFragment.getPlanRoot(), node.getMultiAggInfo(), AggPhase.FIRST_MERGE); mergeAggNode.init(ctx_.getRootAnalyzer()); mergeAggNode.setLimit(limit); // Merge of non-grouping agg only processes one tuple per Impala daemon - codegen @@ -904,65 +894,44 @@ public class DistributedPlanner { private PlanFragment createPhase2DistinctAggregationFragment( AggregationNode phase2AggNode, PlanFragment childFragment, ArrayList<PlanFragment> fragments) throws ImpalaException { - // When a query has both grouping and distinct exprs, impala can optionally include - // the distinct exprs in the hash exchange of the first aggregation phase to spread - // the data among more nodes. However, this plan requires another hash exchange on the - // grouping exprs in the second phase which is not required when omitting the distinct - // exprs in the first phase. Shuffling by both is better if the grouping exprs have - // low NDVs. - boolean shuffleDistinctExprs = ctx_.getQueryOptions().shuffle_distinct_exprs || - phase2AggNode.getAggInfo().getGroupingExprs().isEmpty(); // The phase-1 aggregation node is already in the child fragment. Preconditions.checkState(phase2AggNode.getChild(0) == childFragment.getPlanRoot()); - - AggregateInfo phase1AggInfo = ((AggregationNode) phase2AggNode.getChild(0)) - .getAggInfo(); - ArrayList<Expr> partitionExprs; + // When a query has both grouping and distinct exprs, Impala can optionally include + // the distinct exprs in the hash exchange of the first aggregation phase to spread + // the data among more nodes. However, this plan requires another hash exchange on + // the grouping exprs in the second phase which is not required when omitting the + // distinct exprs in the first phase. Shuffling by both is better if the grouping + // exprs have low NDVs. + boolean shuffleDistinctExprs = ctx_.getQueryOptions().shuffle_distinct_exprs; + boolean hasGrouping = phase2AggNode.hasGrouping(); + + AggregationNode phase1AggNode = ((AggregationNode) phase2AggNode.getChild(0)); // With grouping, the output partition exprs of the child are the (input) grouping // exprs of the parent. The grouping exprs reference the output tuple of phase-1 // but the partitioning happens on the intermediate tuple of the phase-1. - if (shuffleDistinctExprs) { - // We need to do - // - child fragment: - // * phase-1 aggregation - // - first merge fragment, hash-partitioned on grouping and distinct exprs: - // * merge agg of phase-1 - // * phase-2 agg - // - second merge fragment, partitioned on grouping exprs or unpartitioned - // without grouping exprs - // * merge agg of phase-2 - partitionExprs = Expr.substituteList( - phase1AggInfo.getGroupingExprs(), phase1AggInfo.getIntermediateSmap(), - ctx_.getRootAnalyzer(), false); - } else { - // We need to do - // - child fragment: - // * phase-1 aggregation - // - merge fragment, hash-partitioned on grouping exprs: - // * merge agg of phase-1 - // * phase-2 agg - partitionExprs = Expr.substituteList(phase2AggNode.getAggInfo().getGroupingExprs(), - phase1AggInfo.getOutputToIntermediateSmap(), ctx_.getRootAnalyzer(), false); - } + List<Expr> phase1PartitionExprs = + phase1AggNode.getMergePartitionExprs(ctx_.getRootAnalyzer()); + PlanFragment firstMergeFragment; - boolean childHasCompatPartition = ctx_.getRootAnalyzer().setsHaveValueTransfer( - partitionExprs, childFragment.getDataPartition().getPartitionExprs(), true); + boolean childHasCompatPartition = phase1AggNode.isSingleClassAgg() + && ctx_.getRootAnalyzer().setsHaveValueTransfer(phase1PartitionExprs, + childFragment.getDataPartition().getPartitionExprs(), true); if (childHasCompatPartition) { // The data is already partitioned on the required expressions, we can skip the // phase-1 merge step. childFragment.addPlanRoot(phase2AggNode); firstMergeFragment = childFragment; } else { - DataPartition mergePartition = DataPartition.hashPartitioned(partitionExprs); - // Convert the existing node to a preaggregation. - AggregationNode preaggNode = (AggregationNode)phase2AggNode.getChild(0); - preaggNode.setIsPreagg(ctx_); + phase1AggNode.setIntermediateTuple(); + phase1AggNode.setIsPreagg(ctx_); + + DataPartition phase1MergePartition = + DataPartition.hashPartitioned(phase1PartitionExprs); // place phase-1 merge aggregation step in a new fragment - firstMergeFragment = createParentFragment(childFragment, mergePartition); - AggregateInfo phase1MergeAggInfo = phase1AggInfo.getMergeAggInfo(); - AggregationNode phase1MergeAggNode = - new AggregationNode(ctx_.getNextNodeId(), preaggNode, phase1MergeAggInfo); + firstMergeFragment = createParentFragment(childFragment, phase1MergePartition); + AggregationNode phase1MergeAggNode = new AggregationNode(ctx_.getNextNodeId(), + phase1AggNode, phase1AggNode.getMultiAggInfo(), AggPhase.FIRST_MERGE); phase1MergeAggNode.init(ctx_.getRootAnalyzer()); phase1MergeAggNode.unsetNeedsFinalize(); phase1MergeAggNode.setIntermediateTuple(); @@ -972,27 +941,30 @@ public class DistributedPlanner { // if there is a limit, it had already been placed with the phase-2 aggregation // step (which is where it should be) firstMergeFragment.addPlanRoot(phase2AggNode); - if (shuffleDistinctExprs) fragments.add(firstMergeFragment); + if (shuffleDistinctExprs || !hasGrouping) fragments.add(firstMergeFragment); } - if (!shuffleDistinctExprs) return firstMergeFragment; + if (!shuffleDistinctExprs && hasGrouping) return firstMergeFragment; + phase2AggNode.unsetNeedsFinalize(); phase2AggNode.setIntermediateTuple(); // Limit should be applied at the final merge aggregation node long limit = phase2AggNode.getLimit(); phase2AggNode.unsetLimit(); - DataPartition mergePartition; - if (phase2AggNode.getAggInfo().getGroupingExprs().isEmpty()) { - mergePartition = DataPartition.UNPARTITIONED; + DataPartition phase2MergePartition; + List<Expr> phase2PartitionExprs = + phase2AggNode.getMergePartitionExprs(ctx_.getRootAnalyzer()); + if (phase2PartitionExprs.isEmpty()) { + phase2MergePartition = DataPartition.UNPARTITIONED; } else { phase2AggNode.setIsPreagg(ctx_); - mergePartition = DataPartition.hashPartitioned( - phase2AggNode.getAggInfo().getMergeAggInfo().getGroupingExprs()); + phase2MergePartition = DataPartition.hashPartitioned(phase2PartitionExprs); } PlanFragment secondMergeFragment = - createParentFragment(firstMergeFragment, mergePartition); + createParentFragment(firstMergeFragment, phase2MergePartition); + AggregationNode phase2MergeAggNode = new AggregationNode(ctx_.getNextNodeId(), - phase2AggNode, phase2AggNode.getAggInfo().getMergeAggInfo()); + phase2AggNode, phase2AggNode.getMultiAggInfo(), AggPhase.SECOND_MERGE); phase2MergeAggNode.init(ctx_.getRootAnalyzer()); phase2MergeAggNode.setLimit(limit); // Transfer having predicates to final merge agg node http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java index 6b3f032..313597e 100644 --- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java @@ -39,6 +39,8 @@ import org.apache.impala.analysis.ExprId; import org.apache.impala.analysis.ExprSubstitutionMap; import org.apache.impala.analysis.InlineViewRef; import org.apache.impala.analysis.JoinOperator; +import org.apache.impala.analysis.MultiAggregateInfo; +import org.apache.impala.analysis.MultiAggregateInfo.AggPhase; import org.apache.impala.analysis.NullLiteral; import org.apache.impala.analysis.QueryStmt; import org.apache.impala.analysis.SelectStmt; @@ -261,13 +263,17 @@ public class SingleNodePlanner { AnalyticInfo analyticInfo = selectStmt.getAnalyticInfo(); AnalyticPlanner analyticPlanner = new AnalyticPlanner(analyticInfo, analyzer, ctx_); + MultiAggregateInfo multiAggInfo = selectStmt.getMultiAggInfo(); + List<Expr> groupingExprs = multiAggInfo != null ? + multiAggInfo.getGroupingExprs() : + Collections.<Expr>emptyList(); List<Expr> inputPartitionExprs = Lists.newArrayList(); - AggregateInfo aggInfo = selectStmt.getAggInfo(); - root = analyticPlanner.createSingleNodePlan(root, - aggInfo != null ? aggInfo.getGroupingExprs() : null, inputPartitionExprs); - if (aggInfo != null && !inputPartitionExprs.isEmpty()) { + root = analyticPlanner.createSingleNodePlan( + root, groupingExprs, inputPartitionExprs); + if (multiAggInfo != null && !inputPartitionExprs.isEmpty() + && multiAggInfo.getMaterializedAggClasses().size() == 1) { // analytic computation will benefit from a partition on inputPartitionExprs - aggInfo.setPartitionExprs(inputPartitionExprs); + multiAggInfo.getMaterializedAggClass(0).setPartitionExprs(inputPartitionExprs); } } } else { @@ -610,13 +616,20 @@ public class SingleNodePlanner { List<SubplanRef> subplanRefs = Lists.newArrayList(); computeParentAndSubplanRefs( selectStmt.getTableRefs(), analyzer.isStraightJoin(), parentRefs, subplanRefs); - AggregateInfo aggInfo = selectStmt.getAggInfo(); - PlanNode root = createTableRefsPlan(parentRefs, subplanRefs, aggInfo, analyzer); - // add aggregation, if any - if (aggInfo != null) { - if (root instanceof HdfsScanNode) { - aggInfo.substitute(((HdfsScanNode) root).getOptimizedAggSmap(), analyzer); - aggInfo.getMergeAggInfo().substitute(((HdfsScanNode) root).getOptimizedAggSmap(), analyzer); + MultiAggregateInfo multiAggInfo = selectStmt.getMultiAggInfo(); + // Only optimize scan/agg plan if there is a single aggregation class. + AggregateInfo scanAggInfo = null; + if (multiAggInfo != null && multiAggInfo.getMaterializedAggClasses().size() == 1) { + scanAggInfo = multiAggInfo.getMaterializedAggClass(0); + } + PlanNode root = createTableRefsPlan(parentRefs, subplanRefs, scanAggInfo, analyzer); + // Add aggregation, if any. + if (multiAggInfo != null) { + // Apply substitution for optimized scan/agg plan, + if (scanAggInfo != null && root instanceof HdfsScanNode) { + scanAggInfo.substitute(((HdfsScanNode) root).getOptimizedAggSmap(), analyzer); + scanAggInfo.getMergeAggInfo().substitute( + ((HdfsScanNode) root).getOptimizedAggSmap(), analyzer); } root = createAggregationPlan(selectStmt, analyzer, root); } @@ -886,28 +899,56 @@ public class SingleNodePlanner { * Returns a new AggregationNode that materializes the aggregation of the given stmt. * Assigns conjuncts from the Having clause to the returned node. */ - private PlanNode createAggregationPlan(SelectStmt selectStmt, Analyzer analyzer, - PlanNode root) throws ImpalaException { - Preconditions.checkState(selectStmt.getAggInfo() != null); - // add aggregation, if required - AggregateInfo aggInfo = selectStmt.getAggInfo(); - root = new AggregationNode(ctx_.getNextNodeId(), root, aggInfo); - root.init(analyzer); - Preconditions.checkState(root.hasValidStats()); - // if we're computing DISTINCT agg fns, the analyzer already created the - // 2nd phase agginfo - if (aggInfo.isDistinctAgg()) { - ((AggregationNode)root).unsetNeedsFinalize(); - // The output of the 1st phase agg is the 1st phase intermediate. - ((AggregationNode)root).setIntermediateTuple(); - root = new AggregationNode(ctx_.getNextNodeId(), root, - aggInfo.getSecondPhaseDistinctAggInfo()); - root.init(analyzer); - Preconditions.checkState(root.hasValidStats()); + private AggregationNode createAggregationPlan( + SelectStmt selectStmt, Analyzer analyzer, PlanNode root) throws ImpalaException { + MultiAggregateInfo multiAggInfo = + Preconditions.checkNotNull(selectStmt.getMultiAggInfo()); + AggregationNode result = createAggregationPlan(root, multiAggInfo, analyzer); + ExprSubstitutionMap simplifiedAggSmap = multiAggInfo.getSimplifiedAggSmap(); + if (simplifiedAggSmap == null) return result; + + // Fix up aggregations that simplified to a single class after + // materializeRequiredSlots(). + + // Collect conjuncts and then re-assign them to the top-most aggregation node + // of the simplified plan. + AggregationNode dummyAgg = new AggregationNode( + ctx_.getNextNodeId(), result, multiAggInfo, AggPhase.TRANSPOSE); + dummyAgg.init(analyzer); + List<Expr> conjuncts = + Expr.substituteList(dummyAgg.getConjuncts(), simplifiedAggSmap, analyzer, true); + // Validate conjuncts after substitution. + for (Expr c : conjuncts) { + Preconditions.checkState(c.isBound(result.getTupleIds().get(0))); } - // add Having clause - root.assignConjuncts(analyzer); - return root; + result.getConjuncts().addAll(conjuncts); + + // Apply simplification substitution in ancestors. + result.setOutputSmap( + ExprSubstitutionMap.compose(result.getOutputSmap(), simplifiedAggSmap, analyzer)); + return result; + } + + private AggregationNode createAggregationPlan(PlanNode root, + MultiAggregateInfo multiAggInfo, Analyzer analyzer) throws InternalException { + Preconditions.checkNotNull(multiAggInfo); + AggregationNode firstPhaseAgg = + new AggregationNode(ctx_.getNextNodeId(), root, multiAggInfo, AggPhase.FIRST); + firstPhaseAgg.init(analyzer); + if (!multiAggInfo.hasSecondPhase()) return firstPhaseAgg; + + firstPhaseAgg.unsetNeedsFinalize(); + firstPhaseAgg.setIntermediateTuple(); + + AggregationNode secondPhaseAgg = new AggregationNode( + ctx_.getNextNodeId(), firstPhaseAgg, multiAggInfo, AggPhase.SECOND); + secondPhaseAgg.init(analyzer); + if (!multiAggInfo.hasTransposePhase()) return secondPhaseAgg; + + AggregationNode transposePhaseAgg = new AggregationNode( + ctx_.getNextNodeId(), secondPhaseAgg, multiAggInfo, AggPhase.TRANSPOSE); + transposePhaseAgg.init(analyzer); + return transposePhaseAgg; } /** @@ -1621,7 +1662,7 @@ public class SingleNodePlanner { result = createUnionPlan( analyzer, unionStmt, unionStmt.getDistinctOperands(), null); result = new AggregationNode( - ctx_.getNextNodeId(), result, unionStmt.getDistinctAggInfo()); + ctx_.getNextNodeId(), result, unionStmt.getDistinctAggInfo(), AggPhase.FIRST); result.init(analyzer); } // create ALL tree http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java index a0dec83..905fae6 100644 --- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java @@ -18,6 +18,8 @@ package org.apache.impala.analysis; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -30,7 +32,6 @@ import org.apache.impala.analysis.TimestampArithmeticExpr.TimeUnit; import org.apache.impala.catalog.AggregateFunction; import org.apache.impala.catalog.BuiltinsDb; import org.apache.impala.catalog.Catalog; -import org.apache.impala.catalog.CatalogException; import org.apache.impala.catalog.Column; import org.apache.impala.catalog.Db; import org.apache.impala.catalog.Function; @@ -2204,13 +2205,15 @@ public class AnalyzeExprsTest extends AnalyzerTest { TupleDescriptor tblRefDesc = stmt.fromClause_.get(0).getDesc(); tblRefDesc.materializeSlots(); tblRefDesc.computeMemLayout(); - if (stmt.hasAggInfo()) { - TupleDescriptor intDesc = stmt.getAggInfo().intermediateTupleDesc_; + if (stmt.hasMultiAggInfo()) { + Preconditions.checkState(stmt.getMultiAggInfo().getAggClasses().size() == 1); + AggregateInfo aggInfo = stmt.getMultiAggInfo().getAggClass(0); + TupleDescriptor intDesc = aggInfo.intermediateTupleDesc_; intDesc.materializeSlots(); intDesc.computeMemLayout(); - checkSerializedMTime(stmt.getAggInfo().getAggregateExprs().get(0), expectMTime); + checkSerializedMTime(aggInfo.getAggregateExprs().get(0), expectMTime); checkSerializedMTime( - stmt.getAggInfo().getMergeAggInfo().getAggregateExprs().get(0), expectMTime); + aggInfo.getMergeAggInfo().getAggregateExprs().get(0), expectMTime); } else { checkSerializedMTime(stmt.getSelectList().getItems().get(0).getExpr(), expectMTime); } @@ -2727,7 +2730,7 @@ public class AnalyzeExprsTest extends AnalyzerTest { } @Test - public void TestAppxCountDistinctOption() throws AnalysisException, CatalogException { + public void TestAppxCountDistinctOption() throws AnalysisException { TQueryOptions queryOptions = new TQueryOptions(); queryOptions.setAppx_count_distinct(true); @@ -2736,20 +2739,24 @@ public class AnalyzeExprsTest extends AnalyzerTest { // Accumulates count(distinct) for all columns of both alltypesTbl and decimalTbl. List<String> allCountDistinctFns = Lists.newArrayList(); - Table alltypesTbl = catalog_.getTable("functional", "alltypes"); + Table alltypesTbl = catalog_.getOrLoadTable("functional", "alltypes"); for (Column col: alltypesTbl.getColumns()) { String colName = col.getName(); // Test a single count(distinct) with some other aggs. - AnalyzesOk(String.format( - "select count(distinct %s), sum(distinct smallint_col), " + + String countDistinctFn = String.format("count(distinct %s)", colName); + SelectStmt stmt = (SelectStmt) AnalyzesOk(String.format( + "select %s, sum(distinct smallint_col), " + "avg(float_col), min(%s) " + "from functional.alltypes", - colName, colName), createAnalysisCtx(queryOptions)); - countDistinctFns.add(String.format("count(distinct %s)", colName)); + countDistinctFn, colName), createAnalysisCtx(queryOptions)); + validateSingleColAppxCountDistinctRewrite(stmt, colName); + countDistinctFns.add(countDistinctFn); } // Test a single query with a count(distinct) on all columns of alltypesTbl. - AnalyzesOk(String.format("select %s from functional.alltypes", + SelectStmt alltypesStmt = (SelectStmt) AnalyzesOk(String.format( + "select %s from functional.alltypes", Joiner.on(",").join(countDistinctFns)), createAnalysisCtx(queryOptions)); + assertAllNdvAggExprs(alltypesStmt, alltypesTbl.getColumns().size()); allCountDistinctFns.addAll(countDistinctFns); countDistinctFns.clear(); @@ -2757,38 +2764,85 @@ public class AnalyzeExprsTest extends AnalyzerTest { for (Column col: decimalTbl.getColumns()) { String colName = col.getName(); // Test a single count(distinct) with some other aggs. - AnalyzesOk(String.format( + SelectStmt stmt = (SelectStmt) AnalyzesOk(String.format( "select count(distinct %s), sum(distinct d1), " + "avg(d2), min(%s) " + "from functional.decimal_tbl", colName, colName), createAnalysisCtx(queryOptions)); countDistinctFns.add(String.format("count(distinct %s)", colName)); + validateSingleColAppxCountDistinctRewrite(stmt, colName); } // Test a single query with a count(distinct) on all columns of decimalTbl. - AnalyzesOk(String.format("select %s from functional.decimal_tbl", + SelectStmt decimalTblStmt = (SelectStmt) AnalyzesOk(String.format( + "select %s from functional.decimal_tbl", Joiner.on(",").join(countDistinctFns)), createAnalysisCtx(queryOptions)); + assertAllNdvAggExprs(decimalTblStmt, decimalTbl.getColumns().size()); allCountDistinctFns.addAll(countDistinctFns); // Test a single query with a count(distinct) on all columns of both // alltypes/decimalTbl. - AnalyzesOk(String.format( + SelectStmt comboStmt = (SelectStmt) AnalyzesOk(String.format( "select %s from functional.alltypes cross join functional.decimal_tbl", - Joiner.on(",").join(countDistinctFns)), createAnalysisCtx(queryOptions)); + Joiner.on(",").join(allCountDistinctFns)), createAnalysisCtx(queryOptions)); + assertAllNdvAggExprs(comboStmt, alltypesTbl.getColumns().size() + + decimalTbl.getColumns().size()); // The rewrite does not work for multiple count() arguments. - AnalysisError("select count(distinct int_col, bigint_col), " + - "count(distinct string_col, float_col) from functional.alltypes", - createAnalysisCtx(queryOptions), - "all DISTINCT aggregate functions need to have the same set of parameters as " + - "count(DISTINCT int_col, bigint_col); deviating function: " + - "count(DISTINCT string_col, float_col)"); + SelectStmt noRewriteStmt = (SelectStmt) AnalyzesOk( + "select count(distinct int_col, bigint_col), " + + "count(distinct string_col, float_col) from functional.alltypes"); + assertNoNdvAggExprs(noRewriteStmt, 2); + // The rewrite only applies to the count() function. - AnalysisError( + noRewriteStmt = (SelectStmt) AnalyzesOk( "select avg(distinct int_col), sum(distinct float_col) from functional.alltypes", - createAnalysisCtx(queryOptions), - "all DISTINCT aggregate functions need to have the same set of parameters as " + - "avg(DISTINCT int_col); deviating function: sum(DISTINCT"); + createAnalysisCtx(queryOptions)); + assertNoNdvAggExprs(noRewriteStmt, 2); + } + + // Checks that 'stmt' has two aggregate exprs - DISTINCT 'sum' and non-DISTINCT 'ndv'. + private void validateSingleColAppxCountDistinctRewrite( + SelectStmt stmt, String rewrittenColName) { + MultiAggregateInfo multiAggInfo = stmt.getMultiAggInfo(); + List<FunctionCallExpr> aggExprs = multiAggInfo.getAggExprs(); + assertEquals(4, aggExprs.size()); + int numDistinctExprs = 0; + int numNdvExprs = 0; + for (FunctionCallExpr aggExpr : aggExprs) { + if (aggExpr.isDistinct()) { + assertEquals("sum", aggExpr.getFnName().toString()); + ++numDistinctExprs; + } + if (aggExpr.getFnName().toString().equals("ndv")) { + assertEquals(rewrittenColName, aggExpr.getChild(0).toSql()); + ++numNdvExprs; + } + } + assertEquals(1, numDistinctExprs); + assertEquals(1, numNdvExprs); + } + + // Checks that all aggregate exprs in 'stmt' are non-DISTINCT 'ndv'. + private void assertAllNdvAggExprs(SelectStmt stmt, int expectedNumAggExprs) { + MultiAggregateInfo multiAggInfo = stmt.getMultiAggInfo(); + List<FunctionCallExpr> aggExprs = multiAggInfo.getAggExprs(); + assertEquals(expectedNumAggExprs, aggExprs.size()); + for (FunctionCallExpr aggExpr : aggExprs) { + assertFalse(aggExpr.isDistinct()); + assertEquals("ndv", aggExpr.getFnName().toString()); + } + } + + // Checks that all aggregate exprs in 'stmt' are DISTINCT and not 'ndv'. + private void assertNoNdvAggExprs(SelectStmt stmt, int expectedNumAggExprs) { + MultiAggregateInfo multiAggInfo = stmt.getMultiAggInfo(); + List<FunctionCallExpr> aggExprs = multiAggInfo.getAggExprs(); + assertEquals(expectedNumAggExprs, aggExprs.size()); + for (FunctionCallExpr aggExpr : aggExprs) { + assertTrue(aggExpr.isDistinct()); + assertNotEquals("ndv", aggExpr.getFnName().toString()); + } } @Test http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java index fe7e352..c508409 100644 --- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java @@ -24,8 +24,8 @@ import static org.junit.Assert.fail; import java.lang.reflect.Field; import java.util.List; +import java.util.Set; -import org.apache.impala.analysis.AnalysisContext.AnalysisResult; import org.apache.impala.catalog.Column; import org.apache.impala.catalog.PrimitiveType; import org.apache.impala.catalog.ScalarType; @@ -41,6 +41,7 @@ import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; public class AnalyzeStmtsTest extends AnalyzerTest { @@ -2114,9 +2115,6 @@ public class AnalyzeStmtsTest extends AnalyzerTest { AnalysisError("select distinct id from functional.testtbl having max(id) > 0", "cannot combine SELECT DISTINCT with aggregate functions or GROUP BY"); AnalyzesOk("select count(distinct id, zip) from functional.testtbl"); - AnalysisError("select count(distinct id, zip), count(distinct zip) " + - "from functional.testtbl", - "all DISTINCT aggregate functions need to have the same set of parameters"); AnalyzesOk("select tinyint_col, count(distinct int_col, bigint_col) " + "from functional.alltypesagg group by 1"); AnalyzesOk("select tinyint_col, count(distinct int_col)," @@ -2127,14 +2125,48 @@ public class AnalyzeStmtsTest extends AnalyzerTest { AnalyzesOk("select sum(distinct t1.bigint_col), avg(distinct t1.bigint_col) " + "from functional.alltypes t1 group by t1.int_col, t1.int_col"); - AnalysisError("select tinyint_col, count(distinct int_col)," - + "sum(distinct bigint_col) from functional.alltypesagg group by 1", - "all DISTINCT aggregate functions need to have the same set of parameters"); // min and max are ignored in terms of DISTINCT AnalyzesOk("select tinyint_col, count(distinct int_col)," + "min(distinct smallint_col), max(distinct string_col) " + "from functional.alltypesagg group by 1"); + // Test multiple distinct aggregations. + Table alltypesTbl = catalog_.getOrLoadTable("functional", "alltypes"); + List<String> distinctFns = Lists.newArrayList(); + for (Column col : alltypesTbl.getColumns()) { + distinctFns.add(String.format("count(distinct %s)", col.getName())); + } + // Test a single query with a count(distinct) on all columns of alltypesTbl. + AnalyzesOk(String.format( + "select %s from functional.alltypes", Joiner.on(",").join(distinctFns))); + + // Test various mixes of distinct and non-distinct, including multiple distinct. + Set<String> testAggExprs = Sets.newHashSet( + "count(tinyint_col)", + "count(string_col)", + "avg(float_col)", + "max(string_col)", + "count(distinct tinyint_col)", + "count(distinct tinyint_col, smallint_col, int_col)", + "avg(distinct double_col)", + "count(distinct string_col)", + "sum(distinct smallint_col)" + ); + List<String> testGroupByExprs = Lists.newArrayList( + "int_col", "bigint_col", "string_col", + "string_col, date_string_col", + "id, double_col, date_string_col" + ); + for (Set<String> aggExprs : Sets.powerSet(testAggExprs)) { + if (aggExprs.isEmpty()) continue; + String selectList = Joiner.on(",").join(aggExprs); + AnalyzesOk(String.format("select %s from functional.alltypes", selectList)); + for (String groupByExprs : testGroupByExprs) { + AnalyzesOk(String.format( + "select %s from functional.alltypes group by %s", selectList, groupByExprs)); + } + } + // IMPALA-6114: Test that numeric literals having the same value, but different types are // considered distinct. AnalyzesOk("select distinct cast(0 as decimal(14)), 0 from functional.alltypes"); @@ -2155,7 +2187,8 @@ public class AnalyzeStmtsTest extends AnalyzerTest { String stmtSql = String.format("select %s from %s", aggFnCall, tblName); SelectStmt stmt = (SelectStmt) AnalyzesOk(stmtSql); // Verify that the resolved function signature matches as expected. - Type[] args = stmt.getAggInfo().getAggregateExprs().get(0).getFn().getArgs(); + AggregateInfo aggInfo = stmt.getMultiAggInfo().getAggClass(0); + Type[] args = aggInfo.getAggregateExprs().get(0).getFn().getArgs(); assertEquals(args.length, 2); assertTrue(col.getType().matchesType(args[0]) || col.getType().isStringType() && args[0].equals(Type.STRING)); @@ -2256,6 +2289,23 @@ public class AnalyzeStmtsTest extends AnalyzerTest { AnalyzesOk("select tinyint_col, count(distinct int_col)," + "sum(distinct int_col) from " + "(select * from functional.alltypesagg) x group by 1"); + AnalyzesOk("select * from " + + "(select count(distinct id, zip), count(distinct zip) " + + "from functional.testtbl) x"); + AnalyzesOk("select * from " + "(select tinyint_col, count(distinct int_col)," + + "sum(distinct bigint_col) from functional.alltypesagg group by 1) x"); + AnalyzesOk("select count(distinct id, zip) " + + "from (select * from functional.testtbl) x"); + AnalyzesOk("select tinyint_col, count(distinct int_col, bigint_col) " + + "from (select * from functional.alltypesagg) x group by 1"); + AnalyzesOk("select tinyint_col, count(distinct int_col)," + + "sum(distinct int_col) from " + + "(select * from functional.alltypesagg) x group by 1"); + AnalyzesOk("select count(distinct id, zip), count(distinct zip) " + + " from (select * from functional.testtbl) x"); + AnalyzesOk("select tinyint_col, count(distinct int_col)," + + "sum(distinct bigint_col) from " + + "(select * from functional.alltypesagg) x group by 1"); // Error case when distinct is inside an inline view AnalysisError("select * from " + @@ -2267,13 +2317,6 @@ public class AnalyzeStmtsTest extends AnalyzerTest { AnalysisError("select * from " + "(select distinct id, zip, count(*) from functional.testtbl group by 1, 2) x", "cannot combine SELECT DISTINCT with aggregate functions or GROUP BY"); - AnalysisError("select * from " + - "(select count(distinct id, zip), count(distinct zip) " + - "from functional.testtbl) x", - "all DISTINCT aggregate functions need to have the same set of parameters"); - AnalysisError("select * from " + "(select tinyint_col, count(distinct int_col)," - + "sum(distinct bigint_col) from functional.alltypesagg group by 1) x", - "all DISTINCT aggregate functions need to have the same set of parameters"); // Error case when inline view is in the from clause AnalysisError("select distinct count(*) from (select * from functional.testtbl) x", @@ -2284,20 +2327,6 @@ public class AnalyzeStmtsTest extends AnalyzerTest { AnalysisError("select distinct id, zip, count(*) from " + "(select * from functional.testtbl) x group by 1, 2", "cannot combine SELECT DISTINCT with aggregate functions or GROUP BY"); - AnalyzesOk("select count(distinct id, zip) " + - "from (select * from functional.testtbl) x"); - AnalysisError("select count(distinct id, zip), count(distinct zip) " + - " from (select * from functional.testtbl) x", - "all DISTINCT aggregate functions need to have the same set of parameters"); - AnalyzesOk("select tinyint_col, count(distinct int_col, bigint_col) " - + "from (select * from functional.alltypesagg) x group by 1"); - AnalyzesOk("select tinyint_col, count(distinct int_col)," - + "sum(distinct int_col) from " + - "(select * from functional.alltypesagg) x group by 1"); - AnalysisError("select tinyint_col, count(distinct int_col)," - + "sum(distinct bigint_col) from " + - "(select * from functional.alltypesagg) x group by 1", - "all DISTINCT aggregate functions need to have the same set of parameters"); } @Test http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java index 7fc8768..643f85e 100644 --- a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java @@ -19,7 +19,6 @@ package org.apache.impala.analysis; import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import org.apache.impala.catalog.Function; @@ -27,7 +26,6 @@ import org.apache.impala.catalog.ScalarType; import org.apache.impala.catalog.Type; import org.apache.impala.common.AnalysisException; import org.apache.impala.common.FrontendTestBase; -import org.apache.impala.thrift.TExpr; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; @@ -58,47 +56,6 @@ public class AnalyzerTest extends FrontendTestBase { } /** - * Check whether SelectStmt components can be converted to thrift. - */ - protected void checkSelectToThrift(SelectStmt node) { - // convert select list exprs and where clause to thrift - List<Expr> selectListExprs = node.getResultExprs(); - List<TExpr> thriftExprs = Expr.treesToThrift(selectListExprs); - LOG.info("select list:\n"); - for (TExpr expr: thriftExprs) { - LOG.info(expr.toString() + "\n"); - } - for (Expr expr: selectListExprs) { - checkBinaryExprs(expr); - } - if (node.getWhereClause() != null) { - TExpr thriftWhere = node.getWhereClause().treeToThrift(); - LOG.info("WHERE pred: " + thriftWhere.toString() + "\n"); - checkBinaryExprs(node.getWhereClause()); - } - AggregateInfo aggInfo = node.getAggInfo(); - if (aggInfo != null) { - if (aggInfo.getGroupingExprs() != null) { - LOG.info("grouping exprs:\n"); - for (Expr expr: aggInfo.getGroupingExprs()) { - LOG.info(expr.treeToThrift().toString() + "\n"); - checkBinaryExprs(expr); - } - } - LOG.info("aggregate exprs:\n"); - for (Expr expr: aggInfo.getAggregateExprs()) { - LOG.info(expr.treeToThrift().toString() + "\n"); - checkBinaryExprs(expr); - } - if (node.getHavingPred() != null) { - TExpr thriftHaving = node.getHavingPred().treeToThrift(); - LOG.info("HAVING pred: " + thriftHaving.toString() + "\n"); - checkBinaryExprs(node.getHavingPred()); - } - } - } - - /** * Generates and analyzes two variants of the given query by replacing all occurrences * of "$TBL" in the query string with the unqualified and fully-qualified version of * the given table name. The unqualified variant is analyzed using an analyzer that has @@ -133,23 +90,6 @@ public class AnalyzerTest extends FrontendTestBase { AnalysisError(fqQuery, expectedError); } - /** - * Makes sure that operands to binary exprs having same type. - */ - private void checkBinaryExprs(Expr expr) { - if (expr instanceof BinaryPredicate - || (expr instanceof ArithmeticExpr - && ((ArithmeticExpr) expr).getOp() != ArithmeticExpr.Operator.BITNOT)) { - Assert.assertEquals(expr.getChildren().size(), 2); - // The types must be equal or one of them is NULL_TYPE. - Assert.assertTrue(expr.getChild(0).getType() == expr.getChild(1).getType() - || expr.getChild(0).getType().isNull() || expr.getChild(1).getType().isNull()); - } - for (Expr child: expr.getChildren()) { - checkBinaryExprs(child); - } - } - @Test public void TestCompressedText() throws AnalysisException { AnalyzesOk("SELECT count(*) FROM functional_text_lzo.tinyinttable"); http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/fe/src/test/java/org/apache/impala/planner/PlannerTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java index 87cd518..0cc2e48 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java @@ -75,6 +75,31 @@ public class PlannerTest extends PlannerTestBase { } @Test + public void testMultipleDistinct() { + // TODO: Multiple distinct with count(distinct a,b,c) variants. + // TODO: Multiple distinct in subplan. + // TODO: Multiple distinct in subqueries. + // TODO: Multiple distinct lineage tests. + // TODO: Multiple distinct and SHUFFLE_DISTINCT_EXPRS tests + runPlannerTestFile("multiple-distinct"); + } + + @Test + public void testMultipleDistinctMaterialization() { + runPlannerTestFile("multiple-distinct-materialization"); + } + + @Test + public void testMultipleDistinctPredicates() { + runPlannerTestFile("multiple-distinct-predicates"); + } + + @Test + public void testMultipleDistinctLimit() { + runPlannerTestFile("multiple-distinct-limit"); + } + + @Test public void testShuffleByDistinctExprs() { runPlannerTestFile("shuffle-by-distinct-exprs"); } http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/testdata/workloads/functional-planner/queries/PlannerTest/distinct.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/distinct.test b/testdata/workloads/functional-planner/queries/PlannerTest/distinct.test index 1e6b3ba..6e66e30 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/distinct.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/distinct.test @@ -579,3 +579,48 @@ PLAN-ROOT SINK 00:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB ==== +# Query block with a single distinct and multiple non-distinct aggs simplifies to a +# non-grouping aggregation plan. +select a, c from + (select min(string_col) a, count(distinct smallint_col) b, + max(string_col) c + from functional.alltypes + having min(string_col) < '9' and min(string_col) < max(string_col)) v +---- PLAN +PLAN-ROOT SINK +| +02:AGGREGATE [FINALIZE] +| output: min:merge(string_col), max:merge(string_col) +| having: min(string_col) < '9', min(string_col) < max(string_col) +| +01:AGGREGATE +| output: min(string_col), max(string_col) +| group by: smallint_col +| +00:SCAN HDFS [functional.alltypes] + partitions=24/24 files=24 size=478.45KB +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +06:AGGREGATE [FINALIZE] +| output: min:merge(string_col), max:merge(string_col) +| having: min(string_col) < '9', min(string_col) < max(string_col) +| +05:EXCHANGE [UNPARTITIONED] +| +02:AGGREGATE +| output: min:merge(string_col), max:merge(string_col) +| +04:AGGREGATE +| output: min:merge(string_col), max:merge(string_col) +| group by: smallint_col +| +03:EXCHANGE [HASH(smallint_col)] +| +01:AGGREGATE [STREAMING] +| output: min(string_col), max(string_col) +| group by: smallint_col +| +00:SCAN HDFS [functional.alltypes] + partitions=24/24 files=24 size=478.45KB +==== http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/testdata/workloads/functional-planner/queries/PlannerTest/multiple-distinct-limit.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/multiple-distinct-limit.test b/testdata/workloads/functional-planner/queries/PlannerTest/multiple-distinct-limit.test new file mode 100644 index 0000000..d3c5553 --- /dev/null +++ b/testdata/workloads/functional-planner/queries/PlannerTest/multiple-distinct-limit.test @@ -0,0 +1,181 @@ +# Test correct placement of limit. +select count(distinct tinyint_col) a, min(timestamp_col) b, + count(distinct smallint_col) c, max(timestamp_col) d +from functional.alltypes +limit 10 +---- PLAN +PLAN-ROOT SINK +| +03:AGGREGATE [FINALIZE] +| output: aggif(valid_tid() = 2, count(tinyint_col)), aggif(valid_tid() = 4, count(smallint_col)), aggif(valid_tid() = 5, min(timestamp_col)), aggif(valid_tid() = 5, max(timestamp_col)) +| limit: 10 +| +02:AGGREGATE [FINALIZE] +| Class 0 +| output: count(tinyint_col) +| Class 1 +| output: count(smallint_col) +| Class 2 +| output: min:merge(timestamp_col), max:merge(timestamp_col) +| +01:AGGREGATE +| Class 0 +| group by: tinyint_col +| Class 1 +| group by: smallint_col +| Class 2 +| output: min(timestamp_col), max(timestamp_col) +| +00:SCAN HDFS [functional.alltypes] + partitions=24/24 files=24 size=478.45KB +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +03:AGGREGATE [FINALIZE] +| output: aggif(valid_tid() = 2, count(tinyint_col)), aggif(valid_tid() = 4, count(smallint_col)), aggif(valid_tid() = 5, min(timestamp_col)), aggif(valid_tid() = 5, max(timestamp_col)) +| limit: 10 +| +07:AGGREGATE [FINALIZE] +| Class 0 +| output: count:merge(tinyint_col) +| Class 1 +| output: count:merge(smallint_col) +| Class 2 +| output: min:merge(timestamp_col), max:merge(timestamp_col) +| +06:EXCHANGE [UNPARTITIONED] +| +02:AGGREGATE +| Class 0 +| output: count(tinyint_col) +| Class 1 +| output: count(smallint_col) +| Class 2 +| output: min:merge(timestamp_col), max:merge(timestamp_col) +| +05:AGGREGATE +| Class 0 +| group by: tinyint_col +| Class 1 +| group by: smallint_col +| Class 2 +| output: min:merge(timestamp_col), max:merge(timestamp_col) +| +04:EXCHANGE [HASH(CASE valid_tid() WHEN 1 THEN murmur_hash(tinyint_col) WHEN 3 THEN murmur_hash(smallint_col) WHEN 5 THEN 0 END)] +| +01:AGGREGATE [STREAMING] +| Class 0 +| group by: tinyint_col +| Class 1 +| group by: smallint_col +| Class 2 +| output: min(timestamp_col), max(timestamp_col) +| +00:SCAN HDFS [functional.alltypes] + partitions=24/24 files=24 size=478.45KB +==== +# Test correct placement of limit. Simplifies to a single class with one distinct agg. +select b from ( + select count(distinct tinyint_col) a, min(timestamp_col) b, + count(distinct smallint_col) c, max(timestamp_col) d + from functional.alltypes limit 20) v +limit 10 +---- PLAN +PLAN-ROOT SINK +| +01:AGGREGATE [FINALIZE] +| output: min(timestamp_col) +| limit: 10 +| +00:SCAN HDFS [functional.alltypes] + partitions=24/24 files=24 size=478.45KB +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +04:AGGREGATE [FINALIZE] +| output: min:merge(timestamp_col) +| limit: 10 +| +03:EXCHANGE [UNPARTITIONED] +| +01:AGGREGATE +| output: min(timestamp_col) +| +00:SCAN HDFS [functional.alltypes] + partitions=24/24 files=24 size=478.45KB +==== +# Test correct placement of limit. Simplifies to a single class with a non-distinct agg. +select d from ( + select count(distinct tinyint_col) a, min(timestamp_col) b, + count(distinct smallint_col) c, max(timestamp_col) d + from functional.alltypes limit 20) v +limit 10 +---- PLAN +PLAN-ROOT SINK +| +01:AGGREGATE [FINALIZE] +| output: max(timestamp_col) +| limit: 10 +| +00:SCAN HDFS [functional.alltypes] + partitions=24/24 files=24 size=478.45KB +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +04:AGGREGATE [FINALIZE] +| output: max:merge(timestamp_col) +| limit: 10 +| +03:EXCHANGE [UNPARTITIONED] +| +01:AGGREGATE +| output: max(timestamp_col) +| +00:SCAN HDFS [functional.alltypes] + partitions=24/24 files=24 size=478.45KB +==== +# Test correct placement of limit. Simplifies to a single class with distinct +# and non-distinct aggss. +select d, c, d from ( + select count(distinct tinyint_col) a, min(timestamp_col) b, + count(distinct smallint_col) c, max(timestamp_col) d + from functional.alltypes limit 20) v +limit 10 +---- PLAN +PLAN-ROOT SINK +| +02:AGGREGATE [FINALIZE] +| output: count(smallint_col), max:merge(timestamp_col) +| limit: 10 +| +01:AGGREGATE +| output: max(timestamp_col) +| group by: smallint_col +| +00:SCAN HDFS [functional.alltypes] + partitions=24/24 files=24 size=478.45KB +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +07:AGGREGATE [FINALIZE] +| output: count:merge(smallint_col), max:merge(timestamp_col) +| limit: 10 +| +06:EXCHANGE [UNPARTITIONED] +| +02:AGGREGATE +| output: count(smallint_col), max:merge(timestamp_col) +| +05:AGGREGATE +| output: max:merge(timestamp_col) +| group by: smallint_col +| +04:EXCHANGE [HASH(smallint_col)] +| +01:AGGREGATE [STREAMING] +| output: max(timestamp_col) +| group by: smallint_col +| +00:SCAN HDFS [functional.alltypes] + partitions=24/24 files=24 size=478.45KB +====