http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/PlanFragment.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/PlanFragment.java b/fe/src/main/java/com/cloudera/impala/planner/PlanFragment.java deleted file mode 100644 index 48a71dc..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/PlanFragment.java +++ /dev/null @@ -1,388 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.impala.analysis.Analyzer; -import com.cloudera.impala.analysis.BinaryPredicate; -import com.cloudera.impala.analysis.Expr; -import com.cloudera.impala.analysis.JoinOperator; -import com.cloudera.impala.analysis.SlotRef; -import com.cloudera.impala.catalog.HdfsFileFormat; -import com.cloudera.impala.catalog.HdfsTable; -import com.cloudera.impala.common.AnalysisException; -import com.cloudera.impala.common.InternalException; -import com.cloudera.impala.common.NotImplementedException; -import com.cloudera.impala.common.TreeNode; -import com.cloudera.impala.planner.JoinNode.DistributionMode; -import com.cloudera.impala.thrift.TExplainLevel; -import com.cloudera.impala.thrift.TPartitionType; -import com.cloudera.impala.thrift.TPlan; -import com.cloudera.impala.thrift.TPlanFragment; -import com.cloudera.impala.thrift.TPlanFragmentTree; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicates; -import com.google.common.collect.Lists; - -/** - * PlanFragments form a tree structure via their ExchangeNodes. A tree of fragments - * connected in that way forms a plan. The output of a plan is produced by the root - * fragment and is either the result of the query or an intermediate result - * needed by a different plan (such as a hash table). - * - * Plans are grouped into cohorts based on the consumer of their output: all - * plans that materialize intermediate results for a particular consumer plan - * are grouped into a single cohort. - * - * A PlanFragment encapsulates the specific tree of execution nodes that - * are used to produce the output of the plan fragment, as well as output exprs, - * destination node, etc. If there are no output exprs, the full row that is - * is produced by the plan root is marked as materialized. - * - * A plan fragment can have one or many instances, each of which in turn is executed by - * an individual node and the output sent to a specific instance of the destination - * fragment (or, in the case of the root fragment, is materialized in some form). - * - * A hash-partitioned plan fragment is the result of one or more hash-partitioning data - * streams being received by plan nodes in this fragment. In the future, a fragment's - * data partition could also be hash partitioned based on a scan node that is reading - * from a physically hash-partitioned table. - * - * The sequence of calls is: - * - c'tor - * - assemble with getters, etc. - * - finalize() - * - toThrift() - * - * TODO: the tree of PlanNodes is connected across fragment boundaries, which makes - * it impossible search for things within a fragment (using TreeNode functions); - * fix that - */ -public class PlanFragment extends TreeNode<PlanFragment> { - private final static Logger LOG = LoggerFactory.getLogger(PlanFragment.class); - - private final PlanFragmentId fragmentId_; - private PlanId planId_; - private CohortId cohortId_; - - // root of plan tree executed by this fragment - private PlanNode planRoot_; - - // exchange node to which this fragment sends its output - private ExchangeNode destNode_; - - // if null, outputs the entire row produced by planRoot_ - private List<Expr> outputExprs_; - - // created in finalize() or set in setSink() - private DataSink sink_; - - // specification of the partition of the input of this fragment; - // an UNPARTITIONED fragment is executed on only a single node - // TODO: improve this comment, "input" is a bit misleading - private DataPartition dataPartition_; - - // specification of how the output of this fragment is partitioned (i.e., how - // it's sent to its destination); - // if the output is UNPARTITIONED, it is being broadcast - private DataPartition outputPartition_; - - /** - * C'tor for fragment with specific partition; the output is by default broadcast. - */ - public PlanFragment(PlanFragmentId id, PlanNode root, DataPartition partition) { - fragmentId_ = id; - planRoot_ = root; - dataPartition_ = partition; - outputPartition_ = DataPartition.UNPARTITIONED; - setFragmentInPlanTree(planRoot_); - } - - /** - * Assigns 'this' as fragment of all PlanNodes in the plan tree rooted at node. - * Does not traverse the children of ExchangeNodes because those must belong to a - * different fragment. - */ - public void setFragmentInPlanTree(PlanNode node) { - if (node == null) return; - node.setFragment(this); - if (node instanceof ExchangeNode) return; - for (PlanNode child : node.getChildren()) setFragmentInPlanTree(child); - } - - /** - * Collect all PlanNodes that belong to the exec tree of this fragment. - */ - public void collectPlanNodes(List<PlanNode> nodes) { - Preconditions.checkNotNull(nodes); - collectPlanNodesHelper(planRoot_, nodes); - } - - private void collectPlanNodesHelper(PlanNode root, List<PlanNode> nodes) { - if (root == null) return; - nodes.add(root); - if (root instanceof ExchangeNode) return; - for (PlanNode child: root.getChildren()) collectPlanNodesHelper(child, nodes); - } - - public void setOutputExprs(List<Expr> outputExprs) { - outputExprs_ = Expr.cloneList(outputExprs); - } - public List<Expr> getOutputExprs() { return outputExprs_; } - - /** - * Finalize plan tree and create stream sink, if needed. - * If this fragment is hash partitioned, ensures that the corresponding partition - * exprs of all hash-partitioning senders are cast to identical types. - * Otherwise, the hashes generated for identical partition values may differ - * among senders if the partition-expr types are not identical. - */ - public void finalize(Analyzer analyzer) - throws InternalException, NotImplementedException { - if (destNode_ != null) { - Preconditions.checkState(sink_ == null); - // we're streaming to an exchange node - DataStreamSink streamSink = new DataStreamSink(destNode_, outputPartition_); - streamSink.setFragment(this); - sink_ = streamSink; - } - - if (!dataPartition_.isHashPartitioned()) return; - - // This fragment is hash partitioned. Gather all exchange nodes and ensure - // that all hash-partitioning senders hash on exprs-values of the same type. - List<ExchangeNode> exchNodes = Lists.newArrayList(); - planRoot_.collect(Predicates.instanceOf(ExchangeNode.class), exchNodes); - - // Contains partition-expr lists of all hash-partitioning sender fragments. - List<List<Expr>> senderPartitionExprs = Lists.newArrayList(); - for (ExchangeNode exchNode: exchNodes) { - Preconditions.checkState(!exchNode.getChildren().isEmpty()); - PlanFragment senderFragment = exchNode.getChild(0).getFragment(); - Preconditions.checkNotNull(senderFragment); - if (!senderFragment.getOutputPartition().isHashPartitioned()) continue; - List<Expr> partExprs = senderFragment.getOutputPartition().getPartitionExprs(); - // All hash-partitioning senders must have compatible partition exprs, otherwise - // this fragment's data partition must not be hash partitioned. - Preconditions.checkState( - partExprs.size() == dataPartition_.getPartitionExprs().size()); - senderPartitionExprs.add(partExprs); - } - - // Cast all corresponding hash partition exprs of all hash-partitioning senders - // to their compatible types. Also cast the data partition's exprs for consistency, - // although not strictly necessary. They should already be type identical to the - // exprs of one of the senders and they are not directly used for hashing in the BE. - senderPartitionExprs.add(dataPartition_.getPartitionExprs()); - try { - analyzer.castToUnionCompatibleTypes(senderPartitionExprs); - } catch (AnalysisException e) { - // Should never happen. Analysis should have ensured type compatibility already. - throw new IllegalStateException(e); - } - } - - /** - * Return the number of nodes on which the plan fragment will execute. - * invalid: -1 - */ - public int getNumNodes() { - return dataPartition_ == DataPartition.UNPARTITIONED ? 1 : planRoot_.getNumNodes(); - } - - /** - * Estimates the per-node number of distinct values of exprs based on the data - * partition of this fragment and its number of nodes. Returns -1 for an invalid - * estimate, e.g., because getNumDistinctValues() failed on one of the exprs. - */ - public long getNumDistinctValues(List<Expr> exprs) { - Preconditions.checkNotNull(dataPartition_); - long result = 1; - int numNodes = getNumNodes(); - Preconditions.checkState(numNodes >= 0); - // The number of nodes is zero for empty tables. - if (numNodes == 0) return 0; - for (Expr expr: exprs) { - long numDistinct = expr.getNumDistinctValues(); - if (numDistinct == -1) { - result = -1; - break; - } - if (dataPartition_.getPartitionExprs().contains(expr)) { - numDistinct = (long)Math.max((double) numDistinct / (double) numNodes, 1L); - } - result = PlanNode.multiplyCardinalities(result, numDistinct); - } - return result; - } - - public TPlanFragment toThrift() { - TPlanFragment result = new TPlanFragment(); - result.setDisplay_name(fragmentId_.toString()); - if (planRoot_ != null) result.setPlan(planRoot_.treeToThrift()); - if (outputExprs_ != null) { - result.setOutput_exprs(Expr.treesToThrift(outputExprs_)); - } - if (sink_ != null) result.setOutput_sink(sink_.toThrift()); - result.setPartition(dataPartition_.toThrift()); - return result; - } - - public TPlanFragmentTree treeToThrift() { - TPlanFragmentTree result = new TPlanFragmentTree(); - treeToThriftHelper(result); - return result; - } - - private void treeToThriftHelper(TPlanFragmentTree plan) { - plan.addToFragments(toThrift()); - for (PlanFragment child: children_) { - child.treeToThriftHelper(plan); - } - } - - public String getExplainString(TExplainLevel detailLevel) { - return getExplainString("", "", detailLevel); - } - - /** - * The root of the output tree will be prefixed by rootPrefix and the remaining plan - * output will be prefixed by prefix. - */ - protected final String getExplainString(String rootPrefix, String prefix, - TExplainLevel detailLevel) { - StringBuilder str = new StringBuilder(); - Preconditions.checkState(dataPartition_ != null); - String detailPrefix = prefix + "| "; // sink detail - if (detailLevel == TExplainLevel.VERBOSE) { - // we're printing a new tree, start over with the indentation - prefix = " "; - rootPrefix = " "; - detailPrefix = prefix + "| "; - str.append(String.format("%s:PLAN FRAGMENT [%s]\n", fragmentId_.toString(), - dataPartition_.getExplainString())); - if (sink_ != null && sink_ instanceof DataStreamSink) { - str.append(sink_.getExplainString(rootPrefix, prefix, detailLevel) + "\n"); - } - } - - String planRootPrefix = rootPrefix; - // Always print sinks other than DataStreamSinks. - if (sink_ != null && !(sink_ instanceof DataStreamSink)) { - str.append(sink_.getExplainString(rootPrefix, detailPrefix, detailLevel)); - if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) { - str.append(prefix + "|\n"); - } - // we already used the root prefix for the sink - planRootPrefix = prefix; - } - if (planRoot_ != null) { - str.append(planRoot_.getExplainString(planRootPrefix, prefix, detailLevel)); - } - return str.toString(); - } - - /** Returns true if this fragment is partitioned. */ - public boolean isPartitioned() { - return (dataPartition_.getType() != TPartitionType.UNPARTITIONED); - } - - public PlanFragmentId getId() { return fragmentId_; } - public PlanId getPlanId() { return planId_; } - public void setPlanId(PlanId id) { planId_ = id; } - public CohortId getCohortId() { return cohortId_; } - public void setCohortId(CohortId id) { cohortId_ = id; } - public PlanFragment getDestFragment() { - if (destNode_ == null) return null; - return destNode_.getFragment(); - } - public ExchangeNode getDestNode() { return destNode_; } - public DataPartition getDataPartition() { return dataPartition_; } - public void setDataPartition(DataPartition dataPartition) { - this.dataPartition_ = dataPartition; - } - public DataPartition getOutputPartition() { return outputPartition_; } - public void setOutputPartition(DataPartition outputPartition) { - this.outputPartition_ = outputPartition; - } - public PlanNode getPlanRoot() { return planRoot_; } - public void setPlanRoot(PlanNode root) { - planRoot_ = root; - setFragmentInPlanTree(planRoot_); - } - - public void setDestination(ExchangeNode destNode) { - destNode_ = destNode; - PlanFragment dest = getDestFragment(); - Preconditions.checkNotNull(dest); - dest.addChild(this); - } - - public boolean hasSink() { return sink_ != null; } - public DataSink getSink() { return sink_; } - public void setSink(DataSink sink) { - Preconditions.checkState(this.sink_ == null); - Preconditions.checkNotNull(sink); - sink.setFragment(this); - this.sink_ = sink; - } - - /** - * Adds a node as the new root to the plan tree. Connects the existing - * root as the child of newRoot. - */ - public void addPlanRoot(PlanNode newRoot) { - Preconditions.checkState(newRoot.getChildren().size() == 1); - newRoot.setChild(0, planRoot_); - planRoot_ = newRoot; - planRoot_.setFragment(this); - } - - /** - * Verify that the tree of PlanFragments and their contained tree of - * PlanNodes is constructed correctly. - */ - public void verifyTree() { - // PlanNode.fragment_ is set correctly - List<PlanNode> nodes = Lists.newArrayList(); - collectPlanNodes(nodes); - List<PlanNode> exchNodes = Lists.newArrayList(); - for (PlanNode node: nodes) { - if (node instanceof ExchangeNode) exchNodes.add(node); - Preconditions.checkState(node.getFragment() == this); - } - - // all ExchangeNodes have registered input fragments - Preconditions.checkState(exchNodes.size() == getChildren().size()); - List<PlanFragment> childFragments = Lists.newArrayList(); - for (PlanNode exchNode: exchNodes) { - PlanFragment childFragment = exchNode.getChild(0).getFragment(); - Preconditions.checkState(!childFragments.contains(childFragment)); - childFragments.add(childFragment); - Preconditions.checkState(childFragment.getDestNode() == exchNode); - } - // all registered children are accounted for - Preconditions.checkState(getChildren().containsAll(childFragments)); - - for (PlanFragment child: getChildren()) child.verifyTree(); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/PlanFragmentId.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/PlanFragmentId.java b/fe/src/main/java/com/cloudera/impala/planner/PlanFragmentId.java deleted file mode 100644 index 98b08fe..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/PlanFragmentId.java +++ /dev/null @@ -1,42 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import com.cloudera.impala.common.Id; -import com.cloudera.impala.common.IdGenerator; - -public class PlanFragmentId extends Id<PlanFragmentId> { - // Construction only allowed via an IdGenerator. - protected PlanFragmentId(int id) { - super(id); - } - - public static IdGenerator<PlanFragmentId> createGenerator() { - return new IdGenerator<PlanFragmentId>() { - @Override - public PlanFragmentId getNextId() { return new PlanFragmentId(nextId_++); } - @Override - public PlanFragmentId getMaxId() { return new PlanFragmentId(nextId_ - 1); } - }; - } - - @Override - public String toString() { - return String.format("F%02d", id_); - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/PlanId.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/PlanId.java b/fe/src/main/java/com/cloudera/impala/planner/PlanId.java deleted file mode 100644 index 2cecbd8..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/PlanId.java +++ /dev/null @@ -1,42 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import com.cloudera.impala.common.Id; -import com.cloudera.impala.common.IdGenerator; - -public class PlanId extends Id<PlanId> { - // Construction only allowed via an IdGenerator. - protected PlanId(int id) { - super(id); - } - - public static IdGenerator<PlanId> createGenerator() { - return new IdGenerator<PlanId>() { - @Override - public PlanId getNextId() { return new PlanId(nextId_++); } - @Override - public PlanId getMaxId() { return new PlanId(nextId_ - 1); } - }; - } - - @Override - public String toString() { - return String.format("%02d", id_); - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java b/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java deleted file mode 100644 index d38f10a..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java +++ /dev/null @@ -1,715 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Set; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.impala.analysis.Analyzer; -import com.cloudera.impala.analysis.Expr; -import com.cloudera.impala.analysis.ExprId; -import com.cloudera.impala.analysis.ExprSubstitutionMap; -import com.cloudera.impala.analysis.TupleDescriptor; -import com.cloudera.impala.analysis.TupleId; -import com.cloudera.impala.common.ImpalaException; -import com.cloudera.impala.common.PrintUtils; -import com.cloudera.impala.common.TreeNode; -import com.cloudera.impala.planner.RuntimeFilterGenerator.RuntimeFilter; -import com.cloudera.impala.thrift.TExecStats; -import com.cloudera.impala.thrift.TExplainLevel; -import com.cloudera.impala.thrift.TPlan; -import com.cloudera.impala.thrift.TPlanNode; -import com.cloudera.impala.thrift.TQueryOptions; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.google.common.math.LongMath; - -/** - * Each PlanNode represents a single relational operator - * and encapsulates the information needed by the planner to - * make optimization decisions. - * - * finalize(): Computes internal state, such as keys for scan nodes; gets called once on - * the root of the plan tree before the call to toThrift(). Also finalizes the set - * of conjuncts, such that each remaining one requires all of its referenced slots to - * be materialized (ie, can be evaluated by calling GetValue(), rather than being - * implicitly evaluated as part of a scan key). - * - * conjuncts_: Each node has a list of conjuncts that can be executed in the context of - * this node, ie, they only reference tuples materialized by this node or one of - * its children (= are bound by tupleIds_). - */ -abstract public class PlanNode extends TreeNode<PlanNode> { - private final static Logger LOG = LoggerFactory.getLogger(PlanNode.class); - - // TODO: Retrieve from the query options instead of using a default. - protected final static int DEFAULT_BATCH_SIZE = 1024; - - // String used for this node in getExplainString(). - protected String displayName_; - - // unique w/in plan tree; assigned by planner, and not necessarily in c'tor - protected PlanNodeId id_; - - protected long limit_; // max. # of rows to be returned; 0: no limit_ - - // ids materialized by the tree rooted at this node - protected ArrayList<TupleId> tupleIds_; - - // ids of the TblRefs "materialized" by this node; identical with tupleIds_ - // if the tree rooted at this node only materializes BaseTblRefs; - // useful during plan generation - protected ArrayList<TupleId> tblRefIds_; - - // A set of nullable TupleId produced by this node. It is a subset of tupleIds_. - // A tuple is nullable within a particular plan tree if it's the "nullable" side of - // an outer join, which has nothing to do with the schema. - protected Set<TupleId> nullableTupleIds_ = Sets.newHashSet(); - - protected List<Expr> conjuncts_ = Lists.newArrayList(); - - // Fragment that this PlanNode is executed in. Valid only after this PlanNode has been - // assigned to a fragment. Set and maintained by enclosing PlanFragment. - protected PlanFragment fragment_; - - // if set, needs to be applied by parent node to reference this node's output - protected ExprSubstitutionMap outputSmap_; - - // global state of planning wrt conjunct assignment; used by planner as a shortcut - // to avoid having to pass assigned conjuncts back and forth - // (the planner uses this to save and reset the global state in between join tree - // alternatives) - // TODO for 2.3: Save this state in the PlannerContext instead. - protected Set<ExprId> assignedConjuncts_; - - // estimate of the output cardinality of this node; set in computeStats(); - // invalid: -1 - protected long cardinality_; - - // number of nodes on which the plan tree rooted at this node would execute; - // set in computeStats(); invalid: -1 - protected int numNodes_; - - // sum of tupleIds_' avgSerializedSizes; set in computeStats() - protected float avgRowSize_; - - // estimated per-host memory requirement for this node; - // set in computeCosts(); invalid: -1 - protected long perHostMemCost_ = -1; - - // Runtime filters assigned to this node. - protected List<RuntimeFilter> runtimeFilters_ = Lists.newArrayList(); - - protected PlanNode(PlanNodeId id, List<TupleId> tupleIds, String displayName) { - this(id, displayName); - tupleIds_.addAll(tupleIds); - tblRefIds_.addAll(tupleIds); - } - - /** - * Deferred id_ assignment. - */ - protected PlanNode(String displayName) { - this(null, displayName); - } - - protected PlanNode(PlanNodeId id, String displayName) { - id_ = id; - limit_ = -1; - tupleIds_ = Lists.newArrayList(); - tblRefIds_ = Lists.newArrayList(); - cardinality_ = -1; - numNodes_ = -1; - displayName_ = displayName; - } - - /** - * Copy c'tor. Also passes in new id_. - */ - protected PlanNode(PlanNodeId id, PlanNode node, String displayName) { - id_ = id; - limit_ = node.limit_; - tupleIds_ = Lists.newArrayList(node.tupleIds_); - tblRefIds_ = Lists.newArrayList(node.tblRefIds_); - nullableTupleIds_ = Sets.newHashSet(node.nullableTupleIds_); - conjuncts_ = Expr.cloneList(node.conjuncts_); - cardinality_ = -1; - numNodes_ = -1; - displayName_ = displayName; - } - - /** - * Sets tblRefIds_, tupleIds_, and nullableTupleIds_. - * The default implementation is a no-op. - */ - public void computeTupleIds() { - Preconditions.checkState(children_.isEmpty() || !tupleIds_.isEmpty()); - } - - /** - * Clears tblRefIds_, tupleIds_, and nullableTupleIds_. - */ - protected void clearTupleIds() { - tblRefIds_.clear(); - tupleIds_.clear(); - nullableTupleIds_.clear(); - } - - public PlanNodeId getId() { return id_; } - public void setId(PlanNodeId id) { - Preconditions.checkState(id_ == null); - id_ = id; - } - public long getLimit() { return limit_; } - public boolean hasLimit() { return limit_ > -1; } - public long getPerHostMemCost() { return perHostMemCost_; } - public long getCardinality() { return cardinality_; } - public int getNumNodes() { return numNodes_; } - public float getAvgRowSize() { return avgRowSize_; } - public void setFragment(PlanFragment fragment) { fragment_ = fragment; } - public PlanFragment getFragment() { return fragment_; } - public List<Expr> getConjuncts() { return conjuncts_; } - public ExprSubstitutionMap getOutputSmap() { return outputSmap_; } - public void setOutputSmap(ExprSubstitutionMap smap) { outputSmap_ = smap; } - public Set<ExprId> getAssignedConjuncts() { return assignedConjuncts_; } - public void setAssignedConjuncts(Set<ExprId> conjuncts) { - assignedConjuncts_ = conjuncts; - } - - /** - * Set the limit_ to the given limit_ only if the limit_ hasn't been set, or the new limit_ - * is lower. - * @param limit_ - */ - public void setLimit(long limit) { - if (limit_ == -1 || (limit != -1 && limit_ > limit)) limit_ = limit; - } - - public void unsetLimit() { limit_ = -1; } - - public ArrayList<TupleId> getTupleIds() { - Preconditions.checkState(tupleIds_ != null); - return tupleIds_; - } - - public ArrayList<TupleId> getTblRefIds() { return tblRefIds_; } - public void setTblRefIds(ArrayList<TupleId> ids) { tblRefIds_ = ids; } - - public Set<TupleId> getNullableTupleIds() { - Preconditions.checkState(nullableTupleIds_ != null); - return nullableTupleIds_; - } - - public void addConjuncts(List<Expr> conjuncts) { - if (conjuncts == null) return; - conjuncts_.addAll(conjuncts); - } - - public void transferConjuncts(PlanNode recipient) { - recipient.conjuncts_.addAll(conjuncts_); - conjuncts_.clear(); - } - - public String getExplainString() { - return getExplainString("", "", TExplainLevel.VERBOSE); - } - - protected void setDisplayName(String s) { displayName_ = s; } - - final protected String getDisplayLabel() { - return String.format("%s:%s", id_.toString(), displayName_); - } - - /** - * Subclasses can override to provide a node specific detail string that - * is displayed to the user. - * e.g. scan can return the table name. - */ - protected String getDisplayLabelDetail() { return ""; } - - /** - * Generate the explain plan tree. The plan will be in the form of: - * - * root - * | - * |----child 3 - * | limit:1 - * | - * |----child 2 - * | limit:2 - * | - * child 1 - * - * The root node header line will be prefixed by rootPrefix and the remaining plan - * output will be prefixed by prefix. - */ - protected final String getExplainString(String rootPrefix, String prefix, - TExplainLevel detailLevel) { - StringBuilder expBuilder = new StringBuilder(); - String detailPrefix = prefix; - String filler; - boolean printFiller = (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()); - - // Do not traverse into the children of an Exchange node to avoid crossing - // fragment boundaries. - boolean traverseChildren = !children_.isEmpty() && - !(this instanceof ExchangeNode && detailLevel == TExplainLevel.VERBOSE); - - if (traverseChildren) { - detailPrefix += "| "; - filler = prefix + "|"; - } else { - detailPrefix += " "; - filler = prefix; - } - - // Print the current node - // The plan node header line will be prefixed by rootPrefix and the remaining details - // will be prefixed by detailPrefix. - expBuilder.append(getNodeExplainString(rootPrefix, detailPrefix, detailLevel)); - - if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal() && - !(this instanceof SortNode)) { - if (limit_ != -1) expBuilder.append(detailPrefix + "limit: " + limit_ + "\n"); - expBuilder.append(getOffsetExplainString(detailPrefix)); - } - - // Output cardinality, cost estimates and tuple Ids only when explain plan level - // is extended or above. - if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) { - // Print estimated output cardinality and memory cost. - expBuilder.append(PrintUtils.printHosts(detailPrefix, numNodes_)); - expBuilder.append(PrintUtils.printMemCost(" ", perHostMemCost_) + "\n"); - - // Print tuple ids and row size. - expBuilder.append(detailPrefix + "tuple-ids="); - for (int i = 0; i < tupleIds_.size(); ++i) { - TupleId tupleId = tupleIds_.get(i); - String nullIndicator = nullableTupleIds_.contains(tupleId) ? "N" : ""; - expBuilder.append(tupleId.asInt() + nullIndicator); - if (i + 1 != tupleIds_.size()) expBuilder.append(","); - } - expBuilder.append(" row-size=" + PrintUtils.printBytes(Math.round(avgRowSize_))); - expBuilder.append(PrintUtils.printCardinality(" ", cardinality_)); - expBuilder.append("\n"); - } - - // Print the children. Do not traverse into the children of an Exchange node to - // avoid crossing fragment boundaries. - if (traverseChildren) { - if (printFiller) expBuilder.append(filler + "\n"); - String childHeadlinePrefix = prefix + "|--"; - String childDetailPrefix = prefix + "| "; - for (int i = children_.size() - 1; i >= 1; --i) { - PlanNode child = getChild(i); - if (fragment_ != child.fragment_) { - // we're crossing a fragment boundary - expBuilder.append( - child.fragment_.getExplainString( - childHeadlinePrefix, childDetailPrefix, detailLevel)); - } else { - expBuilder.append( - child.getExplainString(childHeadlinePrefix, childDetailPrefix, - detailLevel)); - } - if (printFiller) expBuilder.append(filler + "\n"); - } - expBuilder.append(children_.get(0).getExplainString(prefix, prefix, detailLevel)); - } - return expBuilder.toString(); - } - - /** - * Return the node-specific details. - * Subclass should override this function. - * Each line should be prefixed by detailPrefix. - */ - protected String getNodeExplainString(String rootPrefix, String detailPrefix, - TExplainLevel detailLevel) { - return ""; - } - - /** - * Return the offset_ details, if applicable. This is available separately from - * 'getNodeExplainString' because we want to output 'limit: ...' (which can be printed - * from PlanNode) before 'offset: ...', which is only printed from SortNodes right - * now. - */ - protected String getOffsetExplainString(String prefix) { - return ""; - } - - // Convert this plan node, including all children, to its Thrift representation. - public TPlan treeToThrift() { - TPlan result = new TPlan(); - treeToThriftHelper(result); - return result; - } - - // Append a flattened version of this plan node, including all children, to 'container'. - private void treeToThriftHelper(TPlan container) { - TPlanNode msg = new TPlanNode(); - msg.node_id = id_.asInt(); - msg.limit = limit_; - - TExecStats estimatedStats = new TExecStats(); - estimatedStats.setCardinality(cardinality_); - estimatedStats.setMemory_used(perHostMemCost_); - msg.setLabel(getDisplayLabel()); - msg.setLabel_detail(getDisplayLabelDetail()); - msg.setEstimated_stats(estimatedStats); - - Preconditions.checkState(tupleIds_.size() > 0); - msg.setRow_tuples(Lists.<Integer>newArrayListWithCapacity(tupleIds_.size())); - msg.setNullable_tuples(Lists.<Boolean>newArrayListWithCapacity(tupleIds_.size())); - for (TupleId tid: tupleIds_) { - msg.addToRow_tuples(tid.asInt()); - msg.addToNullable_tuples(nullableTupleIds_.contains(tid)); - } - for (Expr e: conjuncts_) { - msg.addToConjuncts(e.treeToThrift()); - } - // Serialize any runtime filters - for (RuntimeFilter filter: runtimeFilters_) { - msg.addToRuntime_filters(filter.toThrift()); - } - toThrift(msg); - container.addToNodes(msg); - // For the purpose of the BE consider ExchangeNodes to have no children. - if (this instanceof ExchangeNode) { - msg.num_children = 0; - return; - } else { - msg.num_children = children_.size(); - for (PlanNode child: children_) { - child.treeToThriftHelper(container); - } - } - } - - /** - * Computes the full internal state, including smap and planner-relevant statistics - * (calls computeStats()), marks all slots referenced by this node as materialized - * and computes the mem layout of all materialized tuples (with the assumption that - * slots that are needed by ancestor PlanNodes have already been marked). - * Also performs final expr substitution with childrens' smaps and computes internal - * state required for toThrift(). This is called directly after construction. - * Throws if an expr substitution or evaluation fails. - */ - public void init(Analyzer analyzer) throws ImpalaException { - assignConjuncts(analyzer); - computeStats(analyzer); - createDefaultSmap(analyzer); - } - - /** - * Assign remaining unassigned conjuncts. - */ - protected void assignConjuncts(Analyzer analyzer) { - List<Expr> unassigned = analyzer.getUnassignedConjuncts(this); - conjuncts_.addAll(unassigned); - analyzer.markConjunctsAssigned(unassigned); - } - - /** - * Returns an smap that combines the childrens' smaps. - */ - protected ExprSubstitutionMap getCombinedChildSmap() { - if (getChildren().size() == 0) return new ExprSubstitutionMap(); - if (getChildren().size() == 1) return getChild(0).getOutputSmap(); - ExprSubstitutionMap result = ExprSubstitutionMap.combine( - getChild(0).getOutputSmap(), getChild(1).getOutputSmap()); - for (int i = 2; i < getChildren().size(); ++i) { - result = ExprSubstitutionMap.combine(result, getChild(i).getOutputSmap()); - } - return result; - } - - /** - * Sets outputSmap_ to compose(existing smap, combined child smap). Also - * substitutes conjuncts_ using the combined child smap. - */ - protected void createDefaultSmap(Analyzer analyzer) { - ExprSubstitutionMap combinedChildSmap = getCombinedChildSmap(); - outputSmap_ = - ExprSubstitutionMap.compose(outputSmap_, combinedChildSmap, analyzer); - conjuncts_ = Expr.substituteList(conjuncts_, outputSmap_, analyzer, false); - } - - /** - * Computes planner statistics: avgRowSize_, numNodes_, cardinality_. - * Subclasses need to override this. - * Assumes that it has already been called on all children. - * and that DescriptorTable.computePhysMemLayout() has been called. - * This is broken out of init() so that it can be called separately - * from init() (to facilitate inserting additional nodes during plan - * partitioning w/o the need to call init() recursively on the whole tree again). - */ - protected void computeStats(Analyzer analyzer) { - avgRowSize_ = 0.0F; - for (TupleId tid: tupleIds_) { - TupleDescriptor desc = analyzer.getTupleDesc(tid); - avgRowSize_ += desc.getAvgSerializedSize(); - } - if (!children_.isEmpty()) numNodes_ = getChild(0).numNodes_; - } - - protected long capAtLimit(long cardinality) { - if (hasLimit()) { - if (cardinality == -1) { - return limit_; - } else { - return Math.min(cardinality, limit_); - } - } - return cardinality; - } - - /** - * Call computeMemLayout() for all materialized tuples. - */ - protected void computeMemLayout(Analyzer analyzer) { - for (TupleId id: tupleIds_) { - analyzer.getDescTbl().getTupleDesc(id).computeMemLayout(); - } - } - - /** - * Returns the estimated combined selectivity of all conjuncts. Uses heuristics to - * address the following estimation challenges: - * 1. The individual selectivities of conjuncts may be unknown. - * 2. Two selectivities, whether known or unknown, could be correlated. Assuming - * independence can lead to significant underestimation. - * - * The first issue is addressed by using a single default selectivity that is - * representative of all conjuncts with unknown selectivities. - * The second issue is addressed by an exponential backoff when multiplying each - * additional selectivity into the final result. - */ - static protected double computeCombinedSelectivity(List<Expr> conjuncts) { - // Collect all estimated selectivities. - List<Double> selectivities = Lists.newArrayList(); - for (Expr e: conjuncts) { - if (e.hasSelectivity()) selectivities.add(e.getSelectivity()); - } - if (selectivities.size() != conjuncts.size()) { - // Some conjuncts have no estimated selectivity. Use a single default - // representative selectivity for all those conjuncts. - selectivities.add(Expr.DEFAULT_SELECTIVITY); - } - // Sort the selectivities to get a consistent estimate, regardless of the original - // conjunct order. Sort in ascending order such that the most selective conjunct - // is fully applied. - Collections.sort(selectivities); - double result = 1.0; - for (int i = 0; i < selectivities.size(); ++i) { - // Exponential backoff for each selectivity multiplied into the final result. - result *= Math.pow(selectivities.get(i), 1.0 / (double) (i + 1)); - } - // Bound result in [0, 1] - return Math.max(0.0, Math.min(1.0, result)); - } - - protected double computeSelectivity() { - return computeCombinedSelectivity(conjuncts_); - } - - // Convert this plan node into msg (excluding children), which requires setting - // the node type and the node-specific field. - protected abstract void toThrift(TPlanNode msg); - - protected String debugString() { - // not using Objects.toStrHelper because - // PlanNode.debugString() is embedded by debug strings of the subclasses - StringBuilder output = new StringBuilder(); - output.append("preds=" + Expr.debugString(conjuncts_)); - output.append(" limit=" + Long.toString(limit_)); - return output.toString(); - } - - protected String getExplainString(List<? extends Expr> exprs) { - if (exprs == null) return ""; - StringBuilder output = new StringBuilder(); - for (int i = 0; i < exprs.size(); ++i) { - if (i > 0) output.append(", "); - output.append(exprs.get(i).toSql()); - } - return output.toString(); - } - - /** - * Returns true if stats-related variables are valid. - */ - protected boolean hasValidStats() { - return (numNodes_ == -1 || numNodes_ >= 0) && - (cardinality_ == -1 || cardinality_ >= 0); - } - - /** - * Computes and returns the sum of two cardinalities. If an overflow occurs, - * the maximum Long value is returned (Long.MAX_VALUE). - */ - public static long addCardinalities(long a, long b) { - try { - return LongMath.checkedAdd(a, b); - } catch (ArithmeticException e) { - LOG.warn("overflow when adding cardinalities: " + a + ", " + b); - return Long.MAX_VALUE; - } - } - - /** - * Computes and returns the product of two cardinalities. If an overflow - * occurs, the maximum Long value is returned (Long.MAX_VALUE). - */ - public static long multiplyCardinalities(long a, long b) { - try { - return LongMath.checkedMultiply(a, b); - } catch (ArithmeticException e) { - LOG.warn("overflow when multiplying cardinalities: " + a + ", " + b); - return Long.MAX_VALUE; - } - } - - /** - * Returns true if this plan node can output its first row only after consuming - * all rows of all its children. This method is used to group plan nodes - * into pipelined units for resource estimation. - */ - public boolean isBlockingNode() { return false; } - - /** - * Estimates the cost of executing this PlanNode. Currently only sets perHostMemCost_. - * May only be called after this PlanNode has been placed in a PlanFragment because - * the cost computation is dependent on the enclosing fragment's data partition. - */ - public void computeCosts(TQueryOptions queryOptions) { - perHostMemCost_ = 0; - } - - /** - * The input cardinality is the sum of output cardinalities of its children. - * For scan nodes the input cardinality is the expected number of rows scanned. - */ - public long getInputCardinality() { - long sum = 0; - for(PlanNode p : children_) { - long tmp = p.getCardinality(); - if (tmp == -1) return -1; - sum = addCardinalities(sum, tmp); - } - return sum; - } - - protected void addRuntimeFilter(RuntimeFilter filter) { runtimeFilters_.add(filter); } - - protected Collection<RuntimeFilter> getRuntimeFilters() { return runtimeFilters_; } - - protected String getRuntimeFilterExplainString(boolean isBuildNode) { - if (runtimeFilters_.isEmpty()) return ""; - final String applyNodeFilterFormat = "%s -> %s"; - final String buildNodeFilterFormat = "%s <- %s"; - String format = isBuildNode ? buildNodeFilterFormat : applyNodeFilterFormat; - StringBuilder output = new StringBuilder(); - List<String> filtersStr = Lists.newArrayList(); - for (RuntimeFilter filter: runtimeFilters_) { - Expr expr = null; - if (isBuildNode) { - expr = filter.getSrcExpr(); - } else { - expr = filter.getTargetExpr(getId()); - } - Preconditions.checkNotNull(expr); - filtersStr.add(String.format(format, filter.getFilterId(), expr.toSql())); - } - output.append(Joiner.on(", ").join(filtersStr) + "\n"); - return output.toString(); - } - - /** - * Sort a list of conjuncts into an estimated cheapest order to evaluate them in, based - * on estimates of the cost to evaluate and selectivity of the expressions. Should be - * called during PlanNode.init for any PlanNode that could have a conjunct list. - * - * The conjuncts are sorted by repeatedly iterating over them and choosing the conjunct - * that would result in the least total estimated work were it to be applied before the - * remaining conjuncts. - * - * As in computeCombinedSelecivity, the selectivities are exponentially backed off over - * the iterations, to reflect the possibility that the conjuncts may be correlated, and - * Exprs without selectivity estimates are given a reasonable default. - */ - public static <T extends Expr> List<T> orderConjunctsByCost(List<T> conjuncts) { - if (conjuncts.size() <= 1) return conjuncts; - - float totalCost = 0; - int numWithoutSel = 0; - List<T> remaining = Lists.newArrayListWithCapacity(conjuncts.size()); - for (T e : conjuncts) { - Preconditions.checkState(e.hasCost()); - totalCost += e.getCost(); - remaining.add(e); - if (!e.hasSelectivity()) { - ++numWithoutSel; - } - } - - // We distribute the DEFAULT_SELECTIVITY over the conjuncts without a selectivity - // estimate so that their combined selectivities equal DEFAULT_SELECTIVITY, i.e. - // Math.pow(defaultSel, numWithoutSel) = Expr.DEFAULT_SELECTIVITY - double defaultSel = Expr.DEFAULT_SELECTIVITY; - if (numWithoutSel != 0) { - defaultSel = Math.pow(Math.E, Math.log(Expr.DEFAULT_SELECTIVITY) / numWithoutSel); - } - - List<T> sortedConjuncts = Lists.newArrayListWithCapacity(conjuncts.size()); - while (!remaining.isEmpty()) { - double smallestCost = Float.MAX_VALUE; - T bestConjunct = null; - double backoffExp = 1.0 / (double) (sortedConjuncts.size() + 1); - for (T e : remaining) { - double sel = Math.pow(e.hasSelectivity() ? e.getSelectivity() : defaultSel, - backoffExp); - - // The cost of evaluating this conjunct first is estimated as the cost of - // applying this conjunct to all rows plus the cost of applying all the - // remaining conjuncts to the number of rows we expect to remain given - // this conjunct's selectivity, exponentially backed off. - double cost = e.getCost() + (totalCost - e.getCost()) * sel; - if (cost < smallestCost) { - smallestCost = cost; - bestConjunct = e; - } - } - - sortedConjuncts.add(bestConjunct); - remaining.remove(bestConjunct); - totalCost -= bestConjunct.getCost(); - } - - return sortedConjuncts; - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/PlanNodeId.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/PlanNodeId.java b/fe/src/main/java/com/cloudera/impala/planner/PlanNodeId.java deleted file mode 100644 index d161e2b..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/PlanNodeId.java +++ /dev/null @@ -1,42 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import com.cloudera.impala.common.Id; -import com.cloudera.impala.common.IdGenerator; - -public class PlanNodeId extends Id<PlanNodeId> { - // Construction only allowed via an IdGenerator. - protected PlanNodeId(int id) { - super(id); - } - - public static IdGenerator<PlanNodeId> createGenerator() { - return new IdGenerator<PlanNodeId>() { - @Override - public PlanNodeId getNextId() { return new PlanNodeId(nextId_++); } - @Override - public PlanNodeId getMaxId() { return new PlanNodeId(nextId_ - 1); } - }; - } - - @Override - public String toString() { - return String.format("%02d", id_); - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/Planner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/Planner.java b/fe/src/main/java/com/cloudera/impala/planner/Planner.java deleted file mode 100644 index df90df3..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/Planner.java +++ /dev/null @@ -1,456 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.impala.analysis.AnalysisContext; -import com.cloudera.impala.analysis.Analyzer; -import com.cloudera.impala.analysis.ColumnLineageGraph; -import com.cloudera.impala.analysis.Expr; -import com.cloudera.impala.analysis.ExprSubstitutionMap; -import com.cloudera.impala.analysis.InsertStmt; -import com.cloudera.impala.analysis.JoinOperator; -import com.cloudera.impala.analysis.QueryStmt; -import com.cloudera.impala.catalog.HBaseTable; -import com.cloudera.impala.catalog.Table; -import com.cloudera.impala.common.ImpalaException; -import com.cloudera.impala.common.PrintUtils; -import com.cloudera.impala.common.RuntimeEnv; -import com.cloudera.impala.thrift.TExplainLevel; -import com.cloudera.impala.thrift.TQueryCtx; -import com.cloudera.impala.thrift.TQueryExecRequest; -import com.cloudera.impala.thrift.TRuntimeFilterMode; -import com.cloudera.impala.thrift.TTableName; -import com.cloudera.impala.util.MaxRowsProcessedVisitor; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -/** - * Creates an executable plan from an analyzed parse tree and query options. - */ -public class Planner { - private final static Logger LOG = LoggerFactory.getLogger(Planner.class); - - private final PlannerContext ctx_; - - public Planner(AnalysisContext.AnalysisResult analysisResult, TQueryCtx queryCtx) { - ctx_ = new PlannerContext(analysisResult, queryCtx); - } - - /** - * Returns a list of plan fragments for executing an analyzed parse tree. - * May return a single-node or distributed executable plan. If enabled (through a - * query option), computes runtime filters for dynamic partition pruning. - * - * Plan generation may fail and throw for the following reasons: - * 1. Expr evaluation failed, e.g., during partition pruning. - * 2. A certain feature is not yet implemented, e.g., physical join implementation for - * outer/semi joins without equi conjuncts. - * 3. Expr substitution failed, e.g., because an expr was substituted with a type that - * render the containing expr semantically invalid. Analysis should have ensured - * that such an expr substitution during plan generation never fails. If it does, - * that typically means there is a bug in analysis, or a broken/missing smap. - */ - public ArrayList<PlanFragment> createPlan() throws ImpalaException { - SingleNodePlanner singleNodePlanner = new SingleNodePlanner(ctx_); - DistributedPlanner distributedPlanner = new DistributedPlanner(ctx_); - PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan(); - ctx_.getRootAnalyzer().getTimeline().markEvent("Single node plan created"); - ArrayList<PlanFragment> fragments = null; - - // Determine the maximum number of rows processed by any node in the plan tree - MaxRowsProcessedVisitor visitor = new MaxRowsProcessedVisitor(); - singleNodePlan.accept(visitor); - long maxRowsProcessed = visitor.get() == -1 ? Long.MAX_VALUE : visitor.get(); - boolean isSmallQuery = - maxRowsProcessed < ctx_.getQueryOptions().exec_single_node_rows_threshold; - if (isSmallQuery) { - // Execute on a single node and disable codegen for small results - ctx_.getQueryOptions().setNum_nodes(1); - ctx_.getQueryOptions().setDisable_codegen(true); - if (maxRowsProcessed < ctx_.getQueryOptions().batch_size || - maxRowsProcessed < 1024 && ctx_.getQueryOptions().batch_size == 0) { - // Only one scanner thread for small queries - ctx_.getQueryOptions().setNum_scanner_threads(1); - } - // disable runtime filters - ctx_.getQueryOptions().setRuntime_filter_mode(TRuntimeFilterMode.OFF); - } - - // Join rewrites. - invertJoins(singleNodePlan, ctx_.isSingleNodeExec()); - singleNodePlan = useNljForSingularRowBuilds(singleNodePlan, ctx_.getRootAnalyzer()); - - // create runtime filters - if (ctx_.getQueryOptions().getRuntime_filter_mode() != TRuntimeFilterMode.OFF) { - // Always compute filters, even if the BE won't always use all of them. - RuntimeFilterGenerator.generateRuntimeFilters(ctx_.getRootAnalyzer(), - singleNodePlan, ctx_.getQueryOptions().getMax_num_runtime_filters()); - ctx_.getRootAnalyzer().getTimeline().markEvent( - "Runtime filters computed"); - } - - if (ctx_.isSingleNodeExec()) { - // create one fragment containing the entire single-node plan tree - fragments = Lists.newArrayList(new PlanFragment( - ctx_.getNextFragmentId(), singleNodePlan, DataPartition.UNPARTITIONED)); - } else { - singleNodePlanner.validatePlan(singleNodePlan); - // create distributed plan - fragments = distributedPlanner.createPlanFragments(singleNodePlan); - } - - PlanFragment rootFragment = fragments.get(fragments.size() - 1); - rootFragment.verifyTree(); - ExprSubstitutionMap rootNodeSmap = rootFragment.getPlanRoot().getOutputSmap(); - List<Expr> resultExprs = null; - if (ctx_.isInsertOrCtas()) { - InsertStmt insertStmt = ctx_.getAnalysisResult().getInsertStmt(); - insertStmt.substituteResultExprs(rootNodeSmap, ctx_.getRootAnalyzer()); - if (!ctx_.isSingleNodeExec()) { - // repartition on partition keys - rootFragment = distributedPlanner.createInsertFragment( - rootFragment, insertStmt, ctx_.getRootAnalyzer(), fragments); - } - // set up table sink for root fragment - rootFragment.setSink(insertStmt.createDataSink()); - resultExprs = insertStmt.getResultExprs(); - } else { - if (ctx_.isUpdate()) { - // Set up update sink for root fragment - rootFragment.setSink(ctx_.getAnalysisResult().getUpdateStmt().createDataSink()); - } else if (ctx_.isDelete()) { - // Set up delete sink for root fragment - rootFragment.setSink(ctx_.getAnalysisResult().getDeleteStmt().createDataSink()); - } - QueryStmt queryStmt = ctx_.getQueryStmt(); - queryStmt.substituteResultExprs(rootNodeSmap, ctx_.getRootAnalyzer()); - resultExprs = queryStmt.getResultExprs(); - } - rootFragment.setOutputExprs(resultExprs); - - LOG.debug("desctbl: " + ctx_.getRootAnalyzer().getDescTbl().debugString()); - LOG.debug("resultexprs: " + Expr.debugString(rootFragment.getOutputExprs())); - LOG.debug("finalize plan fragments"); - for (PlanFragment fragment: fragments) { - fragment.finalize(ctx_.getRootAnalyzer()); - } - - Collections.reverse(fragments); - ctx_.getRootAnalyzer().getTimeline().markEvent("Distributed plan created"); - - ColumnLineageGraph graph = ctx_.getRootAnalyzer().getColumnLineageGraph(); - if (RuntimeEnv.INSTANCE.computeLineage() || RuntimeEnv.INSTANCE.isTestEnv()) { - // Compute the column lineage graph - if (ctx_.isInsertOrCtas()) { - Table targetTable = ctx_.getAnalysisResult().getInsertStmt().getTargetTable(); - graph.addTargetColumnLabels(targetTable); - Preconditions.checkNotNull(targetTable); - List<Expr> exprs = Lists.newArrayList(); - if (targetTable instanceof HBaseTable) { - exprs.addAll(resultExprs); - } else { - exprs.addAll(ctx_.getAnalysisResult().getInsertStmt().getPartitionKeyExprs()); - exprs.addAll(resultExprs.subList(0, - targetTable.getNonClusteringColumns().size())); - } - graph.computeLineageGraph(exprs, ctx_.getRootAnalyzer()); - } else { - graph.addTargetColumnLabels(ctx_.getQueryStmt().getColLabels()); - graph.computeLineageGraph(resultExprs, ctx_.getRootAnalyzer()); - } - LOG.trace("lineage: " + graph.debugString()); - ctx_.getRootAnalyzer().getTimeline().markEvent("Lineage info computed"); - } - - return fragments; - } - - /** - * Return a list of plans, each represented by the root of their fragment trees. - * TODO: roll into createPlan() - */ - public List<PlanFragment> createParallelPlans() throws ImpalaException { - ArrayList<PlanFragment> distrPlan = createPlan(); - Preconditions.checkNotNull(distrPlan); - ParallelPlanner planner = new ParallelPlanner(ctx_); - List<PlanFragment> parallelPlans = planner.createPlans(distrPlan.get(0)); - ctx_.getRootAnalyzer().getTimeline().markEvent("Parallel plans created"); - return parallelPlans; - } - - /** - * Return combined explain string for all plan fragments. - * Includes the estimated resource requirements from the request if set. - */ - public String getExplainString(ArrayList<PlanFragment> fragments, - TQueryExecRequest request, TExplainLevel explainLevel) { - StringBuilder str = new StringBuilder(); - boolean hasHeader = false; - if (request.isSetPer_host_mem_req() && request.isSetPer_host_vcores()) { - str.append( - String.format("Estimated Per-Host Requirements: Memory=%s VCores=%s\n", - PrintUtils.printBytes(request.getPer_host_mem_req()), - request.per_host_vcores)); - hasHeader = true; - } - - // IMPALA-1983 In the case of corrupt stats, issue a warning for all queries except - // child queries of 'compute stats'. - if (!request.query_ctx.isSetParent_query_id() && - request.query_ctx.isSetTables_with_corrupt_stats() && - !request.query_ctx.getTables_with_corrupt_stats().isEmpty()) { - List<String> tableNames = Lists.newArrayList(); - for (TTableName tableName: request.query_ctx.getTables_with_corrupt_stats()) { - tableNames.add(tableName.db_name + "." + tableName.table_name); - } - str.append( - "WARNING: The following tables have potentially corrupt table statistics.\n" + - "Drop and re-compute statistics to resolve this problem.\n" + - Joiner.on(", ").join(tableNames) + "\n"); - hasHeader = true; - } - - // Append warning about tables missing stats except for child queries of - // 'compute stats'. The parent_query_id is only set for compute stats child queries. - if (!request.query_ctx.isSetParent_query_id() && - request.query_ctx.isSetTables_missing_stats() && - !request.query_ctx.getTables_missing_stats().isEmpty()) { - List<String> tableNames = Lists.newArrayList(); - for (TTableName tableName: request.query_ctx.getTables_missing_stats()) { - tableNames.add(tableName.db_name + "." + tableName.table_name); - } - str.append("WARNING: The following tables are missing relevant table " + - "and/or column statistics.\n" + Joiner.on(", ").join(tableNames) + "\n"); - hasHeader = true; - } - - if (request.query_ctx.isDisable_spilling()) { - str.append("WARNING: Spilling is disabled for this query as a safety guard.\n" + - "Reason: Query option disable_unsafe_spills is set, at least one table\n" + - "is missing relevant stats, and no plan hints were given.\n"); - hasHeader = true; - } - if (hasHeader) str.append("\n"); - - if (explainLevel.ordinal() < TExplainLevel.VERBOSE.ordinal()) { - // Print the non-fragmented parallel plan. - str.append(fragments.get(0).getExplainString(explainLevel)); - } else { - // Print the fragmented parallel plan. - for (int i = 0; i < fragments.size(); ++i) { - PlanFragment fragment = fragments.get(i); - str.append(fragment.getExplainString(explainLevel)); - if (explainLevel == TExplainLevel.VERBOSE && i + 1 != fragments.size()) { - str.append("\n"); - } - } - } - return str.toString(); - } - - /** - * Returns true if the fragments are for a trivial, coordinator-only query: - * Case 1: Only an EmptySetNode, e.g. query has a limit 0. - * Case 2: Query has only constant exprs. - */ - private static boolean isTrivialCoordOnlyPlan(List<PlanFragment> fragments) { - Preconditions.checkNotNull(fragments); - Preconditions.checkState(!fragments.isEmpty()); - if (fragments.size() > 1) return false; - PlanNode root = fragments.get(0).getPlanRoot(); - if (root instanceof EmptySetNode) return true; - if (root instanceof UnionNode && ((UnionNode) root).isConstantUnion()) return true; - return false; - } - - /** - * Estimates the per-host memory and CPU requirements for the given plan fragments, - * and sets the results in request. - * Optionally excludes the requirements for unpartitioned fragments. - * TODO: The LOG.warn() messages should eventually become Preconditions checks - * once resource estimation is more robust. - * TODO: Revisit and possibly remove during MT work, particularly references to vcores. - */ - public void computeResourceReqs(List<PlanFragment> fragments, - boolean excludeUnpartitionedFragments, - TQueryExecRequest request) { - Preconditions.checkState(!fragments.isEmpty()); - Preconditions.checkNotNull(request); - - // Compute pipelined plan node sets. - ArrayList<PipelinedPlanNodeSet> planNodeSets = - PipelinedPlanNodeSet.computePlanNodeSets(fragments.get(0).getPlanRoot()); - - // Compute the max of the per-host mem and vcores requirement. - // Note that the max mem and vcores may come from different plan node sets. - long maxPerHostMem = Long.MIN_VALUE; - int maxPerHostVcores = Integer.MIN_VALUE; - for (PipelinedPlanNodeSet planNodeSet: planNodeSets) { - if (!planNodeSet.computeResourceEstimates( - excludeUnpartitionedFragments, ctx_.getQueryOptions())) { - continue; - } - long perHostMem = planNodeSet.getPerHostMem(); - int perHostVcores = planNodeSet.getPerHostVcores(); - if (perHostMem > maxPerHostMem) maxPerHostMem = perHostMem; - if (perHostVcores > maxPerHostVcores) maxPerHostVcores = perHostVcores; - } - - // Do not ask for more cores than are in the RuntimeEnv. - maxPerHostVcores = Math.min(maxPerHostVcores, RuntimeEnv.INSTANCE.getNumCores()); - - // Special case for some trivial coordinator-only queries (IMPALA-3053, IMPALA-1092). - if (isTrivialCoordOnlyPlan(fragments)) { - maxPerHostMem = 1024; - maxPerHostVcores = 1; - } - - // Set costs to zero if there are only unpartitioned fragments and - // excludeUnpartitionedFragments is true. - // TODO: handle this case with a better indication for unknown, e.g. -1 or not set. - if (maxPerHostMem == Long.MIN_VALUE || maxPerHostVcores == Integer.MIN_VALUE) { - boolean allUnpartitioned = true; - for (PlanFragment fragment: fragments) { - if (fragment.isPartitioned()) { - allUnpartitioned = false; - break; - } - } - if (allUnpartitioned && excludeUnpartitionedFragments) { - maxPerHostMem = 0; - maxPerHostVcores = 0; - } - } - - if (maxPerHostMem < 0 || maxPerHostMem == Long.MIN_VALUE) { - LOG.warn("Invalid per-host memory requirement: " + maxPerHostMem); - } - if (maxPerHostVcores < 0 || maxPerHostVcores == Integer.MIN_VALUE) { - LOG.warn("Invalid per-host virtual cores requirement: " + maxPerHostVcores); - } - request.setPer_host_mem_req(maxPerHostMem); - request.setPer_host_vcores((short) maxPerHostVcores); - - LOG.debug("Estimated per-host peak memory requirement: " + maxPerHostMem); - LOG.debug("Estimated per-host virtual cores requirement: " + maxPerHostVcores); - } - - /** - * Traverses the plan tree rooted at 'root' and inverts outer and semi joins - * in the following situations: - * 1. If the left-hand side is a SingularRowSrcNode then we invert the join because - * then the build side is guaranteed to have only a single row. - * 2. There is no backend support for distributed non-equi right outer/semi joins, - * so we invert them (any distributed left semi/outer join is ok). - * 3. Invert semi/outer joins if the right-hand size is estimated to have a higher - * cardinality*avgSerializedSize. Do not invert if relevant stats are missing. - * The first two inversion rules are independent of the presence/absence of stats. - * Left Null Aware Anti Joins are never inverted due to lack of backend support. - * Joins that originate from query blocks with a straight join hint are not inverted. - * The 'isLocalPlan' parameter indicates whether the plan tree rooted at 'root' - * will be executed locally within one machine, i.e., without any data exchanges. - */ - private void invertJoins(PlanNode root, boolean isLocalPlan) { - if (root instanceof SubplanNode) { - invertJoins(root.getChild(0), isLocalPlan); - invertJoins(root.getChild(1), true); - } else { - for (PlanNode child: root.getChildren()) invertJoins(child, isLocalPlan); - } - - if (root instanceof JoinNode) { - JoinNode joinNode = (JoinNode) root; - JoinOperator joinOp = joinNode.getJoinOp(); - - // 1. No inversion allowed due to straight join. - // 2. The null-aware left anti-join operator is not considered for inversion. - // There is no backend support for a null-aware right anti-join because - // we cannot execute it efficiently. - if (joinNode.isStraightJoin() || joinOp.isNullAwareLeftAntiJoin()) { - // Re-compute tuple ids since their order must correspond to the order of children. - root.computeTupleIds(); - return; - } - - if (joinNode.getChild(0) instanceof SingularRowSrcNode) { - // Always place a singular row src on the build side because it - // only produces a single row. - joinNode.invertJoin(); - } else if (!isLocalPlan && joinNode instanceof NestedLoopJoinNode && - (joinOp.isRightSemiJoin() || joinOp.isRightOuterJoin())) { - // The current join is a distributed non-equi right outer or semi join - // which has no backend support. Invert the join to make it executable. - joinNode.invertJoin(); - } else { - // Invert the join if doing so reduces the size of the materialized rhs - // (may also reduce network costs depending on the join strategy). - // Only consider this optimization if both the lhs/rhs cardinalities are known. - long lhsCard = joinNode.getChild(0).getCardinality(); - long rhsCard = joinNode.getChild(1).getCardinality(); - float lhsAvgRowSize = joinNode.getChild(0).getAvgRowSize(); - float rhsAvgRowSize = joinNode.getChild(1).getAvgRowSize(); - if (lhsCard != -1 && rhsCard != -1 && - lhsCard * lhsAvgRowSize < rhsCard * rhsAvgRowSize) { - joinNode.invertJoin(); - } - } - } - - // Re-compute tuple ids because the backend assumes that their order corresponds to - // the order of children. - root.computeTupleIds(); - } - - /** - * Converts hash joins to nested-loop joins if the right-side is a SingularRowSrcNode. - * Does not convert Null Aware Anti Joins because we only support that join op with - * a hash join. - * Throws if JoinNode.init() fails on the new nested-loop join node. - */ - private PlanNode useNljForSingularRowBuilds(PlanNode root, Analyzer analyzer) - throws ImpalaException { - for (int i = 0; i < root.getChildren().size(); ++i) { - root.setChild(i, useNljForSingularRowBuilds(root.getChild(i), analyzer)); - } - if (!(root instanceof JoinNode)) return root; - if (root instanceof NestedLoopJoinNode) return root; - if (!(root.getChild(1) instanceof SingularRowSrcNode)) return root; - JoinNode joinNode = (JoinNode) root; - if (joinNode.getJoinOp().isNullAwareLeftAntiJoin()) { - Preconditions.checkState(joinNode instanceof HashJoinNode); - return root; - } - List<Expr> otherJoinConjuncts = Lists.newArrayList(joinNode.getOtherJoinConjuncts()); - otherJoinConjuncts.addAll(joinNode.getEqJoinConjuncts()); - JoinNode newJoinNode = new NestedLoopJoinNode(joinNode.getChild(0), - joinNode.getChild(1), joinNode.isStraightJoin(), - joinNode.getDistributionModeHint(), joinNode.getJoinOp(), otherJoinConjuncts); - newJoinNode.getConjuncts().addAll(joinNode.getConjuncts()); - newJoinNode.setId(joinNode.getId()); - newJoinNode.init(analyzer); - return newJoinNode; - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/PlannerContext.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/PlannerContext.java b/fe/src/main/java/com/cloudera/impala/planner/PlannerContext.java deleted file mode 100644 index fc11287..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/PlannerContext.java +++ /dev/null @@ -1,100 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import java.util.LinkedList; - -import com.cloudera.impala.analysis.AnalysisContext; -import com.cloudera.impala.analysis.Analyzer; -import com.cloudera.impala.analysis.QueryStmt; -import com.cloudera.impala.common.IdGenerator; -import com.cloudera.impala.thrift.TQueryCtx; -import com.cloudera.impala.thrift.TQueryOptions; -import com.google.common.collect.Lists; - -/** - * Contains the analysis result of a query as well as planning-specific - * parameters and state such as plan-node and plan-fragment id generators. - */ -public class PlannerContext { - // Estimate of the overhead imposed by storing data in a hash tbl; - // used for determining whether a broadcast join is feasible. - public final static double HASH_TBL_SPACE_OVERHEAD = 1.1; - - // The maximum fraction of remaining memory that a sort node can use during execution. - public final static double SORT_MEM_MAX_FRACTION = 0.80; - - // Assumed average number of items in a nested collection, since we currently have no - // statistics on nested fields. The motivation for this constant is to avoid - // pathological plan choices that could result from a SubplanNode having an unknown - // cardinality (due to UnnestNodes not knowing their cardinality), or from a ScanNode - // significantly underestimating its output cardinality because intermediate collections - // are not accounted for at all. For example, we will place a table ref plan with a - // SubplanNode on the build side of a join due to an unknown cardinality if the other - // input is a base table scan with stats. - // The constant value was chosen arbitrarily to not be "too high" or "too low". - // TODO: Compute stats for nested types and pick them up here. - public static final long AVG_COLLECTION_SIZE = 10; - - private final IdGenerator<PlanNodeId> nodeIdGenerator_ = PlanNodeId.createGenerator(); - private final IdGenerator<PlanFragmentId> fragmentIdGenerator_ = - PlanFragmentId.createGenerator(); - - // Keeps track of subplan nesting. Maintained with push/popSubplan(). - private final LinkedList<SubplanNode> subplans_ = Lists.newLinkedList(); - - private final TQueryCtx queryCtx_; - private final AnalysisContext.AnalysisResult analysisResult_; - private final QueryStmt queryStmt_; - - public PlannerContext (AnalysisContext.AnalysisResult analysisResult, - TQueryCtx queryCtx) { - analysisResult_ = analysisResult; - queryCtx_ = queryCtx; - if (isInsertOrCtas()) { - queryStmt_ = analysisResult.getInsertStmt().getQueryStmt(); - } else if (analysisResult.isUpdateStmt()) { - queryStmt_ = analysisResult.getUpdateStmt().getQueryStmt(); - } else if (analysisResult.isDeleteStmt()) { - queryStmt_ = analysisResult.getDeleteStmt().getQueryStmt(); - } else { - queryStmt_ = analysisResult.getQueryStmt(); - } - } - - public QueryStmt getQueryStmt() { return queryStmt_; } - public TQueryCtx getQueryCtx() { return queryCtx_; } - public TQueryOptions getQueryOptions() { - return queryCtx_.getRequest().getQuery_options(); - } - public AnalysisContext.AnalysisResult getAnalysisResult() { return analysisResult_; } - public Analyzer getRootAnalyzer() { return analysisResult_.getAnalyzer(); } - public boolean isSingleNodeExec() { return getQueryOptions().num_nodes == 1; } - public PlanNodeId getNextNodeId() { return nodeIdGenerator_.getNextId(); } - public PlanFragmentId getNextFragmentId() { return fragmentIdGenerator_.getNextId(); } - public boolean isInsertOrCtas() { - return analysisResult_.isInsertStmt() || analysisResult_.isCreateTableAsSelectStmt(); - } - - public boolean hasSubplan() { return !subplans_.isEmpty(); } - public SubplanNode getSubplan() { return subplans_.getFirst(); } - public boolean pushSubplan(SubplanNode n) { return subplans_.offerFirst(n); } - public void popSubplan() { subplans_.removeFirst(); } - public boolean isUpdate() { return analysisResult_.isUpdateStmt(); } - public boolean isDelete() { return analysisResult_.isDeleteStmt(); } -}
