http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/HdfsTableSink.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/HdfsTableSink.java b/fe/src/main/java/com/cloudera/impala/planner/HdfsTableSink.java deleted file mode 100644 index 7b97773..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/HdfsTableSink.java +++ /dev/null @@ -1,157 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import java.util.List; - -import com.cloudera.impala.analysis.Expr; -import com.cloudera.impala.catalog.HdfsFileFormat; -import com.cloudera.impala.catalog.HdfsTable; -import com.cloudera.impala.catalog.Table; -import com.cloudera.impala.common.PrintUtils; -import com.cloudera.impala.thrift.TDataSink; -import com.cloudera.impala.thrift.TDataSinkType; -import com.cloudera.impala.thrift.TExplainLevel; -import com.cloudera.impala.thrift.THdfsTableSink; -import com.cloudera.impala.thrift.TTableSink; -import com.cloudera.impala.thrift.TTableSinkType; -import com.google.common.base.Preconditions; - -/** - * Base class for Hdfs data sinks such as HdfsTextTableSink. - * - */ -public class HdfsTableSink extends TableSink { - // Default number of partitions used for computeCosts() in the absence of column stats. - protected final long DEFAULT_NUM_PARTITIONS = 10; - - // Exprs for computing the output partition(s). - protected final List<Expr> partitionKeyExprs_; - // Whether to overwrite the existing partition(s). - protected final boolean overwrite_; - - public HdfsTableSink(Table targetTable, List<Expr> partitionKeyExprs, - boolean overwrite) { - super(targetTable, Op.INSERT); - Preconditions.checkState(targetTable instanceof HdfsTable); - partitionKeyExprs_ = partitionKeyExprs; - overwrite_ = overwrite; - } - - @Override - public void computeCosts() { - HdfsTable table = (HdfsTable) targetTable_; - // TODO: Estimate the memory requirements more accurately by partition type. - HdfsFileFormat format = table.getMajorityFormat(); - PlanNode inputNode = fragment_.getPlanRoot(); - int numNodes = fragment_.getNumNodes(); - // Compute the per-host number of partitions, taking the number of nodes - // and the data partition of the fragment executing this sink into account. - long numPartitions = fragment_.getNumDistinctValues(partitionKeyExprs_); - if (numPartitions == -1) numPartitions = DEFAULT_NUM_PARTITIONS; - long perPartitionMemReq = getPerPartitionMemReq(format); - - // The estimate is based purely on the per-partition mem req if the input cardinality_ - // or the avg row size is unknown. - if (inputNode.getCardinality() == -1 || inputNode.getAvgRowSize() == -1) { - perHostMemCost_ = numPartitions * perPartitionMemReq; - return; - } - - // The per-partition estimate may be higher than the memory required to buffer - // the entire input data. - long perHostInputCardinality = Math.max(1L, inputNode.getCardinality() / numNodes); - long perHostInputBytes = - (long) Math.ceil(perHostInputCardinality * inputNode.getAvgRowSize()); - perHostMemCost_ = Math.min(perHostInputBytes, numPartitions * perPartitionMemReq); - } - - /** - * Returns the per-partition memory requirement for inserting into the given - * file format. - */ - private long getPerPartitionMemReq(HdfsFileFormat format) { - switch (format) { - // Writing to a Parquet table requires up to 1GB of buffer per partition. - // TODO: The per-partition memory requirement is configurable in the QueryOptions. - case PARQUET: return 1024L * 1024L * 1024L; - case TEXT: return 100L * 1024L; - default: - Preconditions.checkState(false, "Unsupported TableSink format " + - format.toString()); - } - return 0; - } - - @Override - public String getExplainString(String prefix, String detailPrefix, - TExplainLevel explainLevel) { - StringBuilder output = new StringBuilder(); - String overwriteStr = ", OVERWRITE=" + (overwrite_ ? "true" : "false"); - String partitionKeyStr = ""; - if (!partitionKeyExprs_.isEmpty()) { - StringBuilder tmpBuilder = new StringBuilder(", PARTITION-KEYS=("); - for (Expr expr: partitionKeyExprs_) { - tmpBuilder.append(expr.toSql() + ","); - } - tmpBuilder.deleteCharAt(tmpBuilder.length() - 1); - tmpBuilder.append(")"); - partitionKeyStr = tmpBuilder.toString(); - } - output.append(String.format("%sWRITE TO HDFS [%s%s%s]\n", prefix, - targetTable_.getFullName(), overwriteStr, partitionKeyStr)); - // Report the total number of partitions, independent of the number of nodes - // and the data partition of the fragment executing this sink. - if (explainLevel.ordinal() > TExplainLevel.MINIMAL.ordinal()) { - long totalNumPartitions = Expr.getNumDistinctValues(partitionKeyExprs_); - if (totalNumPartitions == -1) { - output.append(detailPrefix + "partitions=unavailable"); - } else { - output.append(detailPrefix + "partitions=" - + (totalNumPartitions == 0 ? 1 : totalNumPartitions)); - } - output.append("\n"); - if (explainLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) { - output.append(PrintUtils.printHosts(detailPrefix, fragment_.getNumNodes())); - output.append(PrintUtils.printMemCost(" ", perHostMemCost_)); - output.append("\n"); - } - } - return output.toString(); - } - - @Override - protected TDataSink toThrift() { - TDataSink result = new TDataSink(TDataSinkType.TABLE_SINK); - THdfsTableSink hdfsTableSink = new THdfsTableSink( - Expr.treesToThrift(partitionKeyExprs_), overwrite_); - HdfsTable table = (HdfsTable) targetTable_; - StringBuilder error = new StringBuilder(); - int skipHeaderLineCount = table.parseSkipHeaderLineCount(error); - // Errors will be caught during analysis. - Preconditions.checkState(error.length() == 0); - if (skipHeaderLineCount > 0) { - hdfsTableSink.setSkip_header_line_count(skipHeaderLineCount); - } - TTableSink tTableSink = new TTableSink(targetTable_.getId().asInt(), - TTableSinkType.HDFS, sinkOp_.toThrift()); - tTableSink.hdfs_table_sink = hdfsTableSink; - result.table_sink = tTableSink; - return result; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/JoinBuildSink.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/JoinBuildSink.java b/fe/src/main/java/com/cloudera/impala/planner/JoinBuildSink.java deleted file mode 100644 index 25da277..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/JoinBuildSink.java +++ /dev/null @@ -1,103 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.impala.analysis.Analyzer; -import com.cloudera.impala.analysis.BinaryPredicate; -import com.cloudera.impala.analysis.Expr; -import com.cloudera.impala.common.ImpalaException; -import com.cloudera.impala.thrift.TDataSink; -import com.cloudera.impala.thrift.TDataSinkType; -import com.cloudera.impala.thrift.TExplainLevel; -import com.cloudera.impala.thrift.TJoinBuildSink; -import com.cloudera.impala.thrift.TPlanNode; -import com.cloudera.impala.thrift.TPlanNodeType; -import com.cloudera.impala.thrift.TQueryOptions; -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -/** - * Sink to materialize the build side of a join. - */ -public class JoinBuildSink extends DataSink { - private final static Logger LOG = LoggerFactory.getLogger(JoinBuildSink.class); - - // id of join's build-side table assigned during planning - private final JoinTableId joinTableId_; - - private final List<Expr> buildExprs_ = Lists.newArrayList(); - - /** - * Creates sink for build side of 'joinNode' (extracts buildExprs_ from joinNode). - */ - public JoinBuildSink(JoinTableId joinTableId, JoinNode joinNode) { - Preconditions.checkState(joinTableId.isValid()); - joinTableId_ = joinTableId; - Preconditions.checkNotNull(joinNode); - Preconditions.checkState(joinNode instanceof JoinNode); - if (!(joinNode instanceof HashJoinNode)) return; - for (Expr eqJoinConjunct: joinNode.getEqJoinConjuncts()) { - BinaryPredicate p = (BinaryPredicate) eqJoinConjunct; - // by convention the build exprs are the rhs of the join conjuncts - buildExprs_.add(p.getChild(1).clone()); - } - } - - public JoinTableId getJoinTableId() { return joinTableId_; } - - @Override - protected TDataSink toThrift() { - TDataSink result = new TDataSink(TDataSinkType.JOIN_BUILD_SINK); - TJoinBuildSink tBuildSink = new TJoinBuildSink(); - tBuildSink.setJoin_table_id(joinTableId_.asInt()); - for (Expr buildExpr: buildExprs_) { - tBuildSink.addToBuild_exprs(buildExpr.treeToThrift()); - } - result.setJoin_build_sink(tBuildSink); - return result; - } - - @Override - public String getExplainString(String prefix, String detailPrefix, - TExplainLevel detailLevel) { - StringBuilder output = new StringBuilder(); - output.append(String.format("%s%s\n", prefix, "JOIN BUILD")); - if (detailLevel.ordinal() > TExplainLevel.MINIMAL.ordinal()) { - output.append( - detailPrefix + "join-table-id=" + joinTableId_.toString() - + " plan-id=" + fragment_.getPlanId().toString() - + " cohort-id=" + fragment_.getCohortId().toString() + "\n"); - if (!buildExprs_.isEmpty()) { - output.append(detailPrefix + "build expressions: ") - .append(Expr.toSql(buildExprs_) + "\n"); - } - } - return output.toString(); - } - - @Override - public void computeCosts() { - // TODO: implement? - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/JoinNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/JoinNode.java b/fe/src/main/java/com/cloudera/impala/planner/JoinNode.java deleted file mode 100644 index ebc9b51..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/JoinNode.java +++ /dev/null @@ -1,508 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import java.util.Collections; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.impala.analysis.Analyzer; -import com.cloudera.impala.analysis.BinaryPredicate; -import com.cloudera.impala.analysis.Expr; -import com.cloudera.impala.analysis.JoinOperator; -import com.cloudera.impala.analysis.SlotDescriptor; -import com.cloudera.impala.analysis.SlotRef; -import com.cloudera.impala.catalog.ColumnStats; -import com.cloudera.impala.catalog.Table; -import com.cloudera.impala.common.ImpalaException; -import com.google.common.base.Preconditions; - -/** - * Logical join operator. Subclasses correspond to implementations of the join operator - * (e.g. hash join, nested-loop join, etc). - */ -public abstract class JoinNode extends PlanNode { - private final static Logger LOG = LoggerFactory.getLogger(JoinNode.class); - - // Default per-host memory requirement used if no valid stats are available. - // TODO: Come up with a more useful heuristic (e.g., based on scanned partitions). - protected final static long DEFAULT_PER_HOST_MEM = 2L * 1024L * 1024L * 1024L; - - // Slop in percent allowed when comparing stats for the purpose of determining whether - // an equi-join condition is a foreign/primary key join. - protected final static double FK_PK_MAX_STATS_DELTA_PERC = 0.05; - - protected JoinOperator joinOp_; - - // Indicates if this join originates from a query block with a straight join hint. - protected final boolean isStraightJoin_; - - // User-provided hint for the distribution mode. Set to 'NONE' if no hints were given. - protected final DistributionMode distrModeHint_; - - protected DistributionMode distrMode_ = DistributionMode.NONE; - - // Join conjuncts. eqJoinConjuncts_ are conjuncts of the form <lhs> = <rhs>; - // otherJoinConjuncts_ are non-equi join conjuncts. For an inner join, join conjuncts - // are conjuncts from the ON, USING or WHERE clauses. For other join types (e.g. outer - // and semi joins) these include only conjuncts from the ON and USING clauses. - protected List<BinaryPredicate> eqJoinConjuncts_; - protected List<Expr> otherJoinConjuncts_; - - // if valid, the rhs input is materialized outside of this node and is assigned - // joinTableId_ - protected JoinTableId joinTableId_ = JoinTableId.INVALID; - - public enum DistributionMode { - NONE("NONE"), - BROADCAST("BROADCAST"), - PARTITIONED("PARTITIONED"); - - private final String description_; - - private DistributionMode(String description) { - description_ = description; - } - - @Override - public String toString() { return description_; } - } - - public JoinNode(PlanNode outer, PlanNode inner, boolean isStraightJoin, - DistributionMode distrMode, JoinOperator joinOp, - List<BinaryPredicate> eqJoinConjuncts, List<Expr> otherJoinConjuncts, - String displayName) { - super(displayName); - Preconditions.checkNotNull(otherJoinConjuncts); - isStraightJoin_ = isStraightJoin; - distrModeHint_ = distrMode; - joinOp_ = joinOp; - children_.add(outer); - children_.add(inner); - eqJoinConjuncts_ = eqJoinConjuncts; - otherJoinConjuncts_ = otherJoinConjuncts; - computeTupleIds(); - } - - @Override - public void computeTupleIds() { - Preconditions.checkState(children_.size() == 2); - clearTupleIds(); - PlanNode outer = children_.get(0); - PlanNode inner = children_.get(1); - - // Only retain the non-semi-joined tuples of the inputs. - switch (joinOp_) { - case LEFT_ANTI_JOIN: - case LEFT_SEMI_JOIN: - case NULL_AWARE_LEFT_ANTI_JOIN: { - tupleIds_.addAll(outer.getTupleIds()); - break; - } - case RIGHT_ANTI_JOIN: - case RIGHT_SEMI_JOIN: { - tupleIds_.addAll(inner.getTupleIds()); - break; - } - default: { - tupleIds_.addAll(outer.getTupleIds()); - tupleIds_.addAll(inner.getTupleIds()); - break; - } - } - tblRefIds_.addAll(outer.getTblRefIds()); - tblRefIds_.addAll(inner.getTblRefIds()); - - // Inherits all the nullable tuple from the children - // Mark tuples that form the "nullable" side of the outer join as nullable. - nullableTupleIds_.addAll(inner.getNullableTupleIds()); - nullableTupleIds_.addAll(outer.getNullableTupleIds()); - if (joinOp_.equals(JoinOperator.FULL_OUTER_JOIN)) { - nullableTupleIds_.addAll(outer.getTupleIds()); - nullableTupleIds_.addAll(inner.getTupleIds()); - } else if (joinOp_.equals(JoinOperator.LEFT_OUTER_JOIN)) { - nullableTupleIds_.addAll(inner.getTupleIds()); - } else if (joinOp_.equals(JoinOperator.RIGHT_OUTER_JOIN)) { - nullableTupleIds_.addAll(outer.getTupleIds()); - } - } - - public JoinOperator getJoinOp() { return joinOp_; } - public List<BinaryPredicate> getEqJoinConjuncts() { return eqJoinConjuncts_; } - public List<Expr> getOtherJoinConjuncts() { return otherJoinConjuncts_; } - public boolean isStraightJoin() { return isStraightJoin_; } - public DistributionMode getDistributionModeHint() { return distrModeHint_; } - public DistributionMode getDistributionMode() { return distrMode_; } - public void setDistributionMode(DistributionMode distrMode) { distrMode_ = distrMode; } - public JoinTableId getJoinTableId() { return joinTableId_; } - public void setJoinTableId(JoinTableId id) { joinTableId_ = id; } - - @Override - public void init(Analyzer analyzer) throws ImpalaException { - // Do not call super.init() to defer computeStats() until all conjuncts - // have been collected. - assignConjuncts(analyzer); - createDefaultSmap(analyzer); - assignedConjuncts_ = analyzer.getAssignedConjuncts(); - otherJoinConjuncts_ = Expr.substituteList(otherJoinConjuncts_, - getCombinedChildSmap(), analyzer, false); - } - - /** - * Returns the estimated cardinality of an inner or outer join. - * - * We estimate the cardinality based on equality join predicates of the form - * "L.c = R.d", with L being a table from child(0) and R a table from child(1). - * For each such join predicate we try to determine whether it is a foreign/primary - * key (FK/PK) join condition, and either use a special FK/PK estimation or a generic - * estimation method. We maintain the minimum cardinality for each method separately, - * and finally return in order of preference: - * - the FK/PK estimate, if there was at least one FP/PK predicate - * - the generic estimate, if there was at least one predicate with sufficient stats - * - otherwise, we optimistically assume a FK/PK join with a join selectivity of 1, - * and return |child(0)| - * - * FK/PK estimation: - * cardinality = |child(0)| * (|child(1)| / |R|) * (NDV(R.d) / NDV(L.c)) - * - the cardinality of a FK/PK must be <= |child(0)| - * - |child(1)| / |R| captures the reduction in join cardinality due to - * predicates on the PK side - * - NDV(R.d) / NDV(L.c) adjusts the join cardinality to avoid underestimation - * due to an independence assumption if the PK side has a higher NDV than the FK - * side. The rationale is that rows filtered from the PK side do not necessarily - * have a match on the FK side, and therefore would not affect the join cardinality. - * TODO: Revisit this pessimistic adjustment that tends to overestimate. - * - * Generic estimation: - * cardinality = |child(0)| * |child(1)| / max(NDV(L.c), NDV(R.d)) - * - case A: NDV(L.c) <= NDV(R.d) - * every row from child(0) joins with |child(1)| / NDV(R.d) rows - * - case B: NDV(L.c) > NDV(R.d) - * every row from child(1) joins with |child(0)| / NDV(L.c) rows - * - we adjust the NDVs from both sides to account for predicates that may - * might have reduce the cardinality and NDVs - */ - private long getJoinCardinality(Analyzer analyzer) { - Preconditions.checkState( - joinOp_ == JoinOperator.INNER_JOIN || joinOp_.isOuterJoin()); - - long lhsCard = getChild(0).cardinality_; - long rhsCard = getChild(1).cardinality_; - if (lhsCard == -1 || rhsCard == -1) return -1; - - // Minimum of estimated join cardinalities for FK/PK join conditions. - long fkPkJoinCard = -1; - // Minimum of estimated join cardinalities for other join conditions. - long genericJoinCard = -1; - for (Expr eqJoinConjunct: eqJoinConjuncts_) { - SlotStats lhsStats = SlotStats.create(eqJoinConjunct.getChild(0)); - SlotStats rhsStats = SlotStats.create(eqJoinConjunct.getChild(1)); - // Ignore the equi-join conjunct if we have no relevant table or column stats. - if (lhsStats == null || rhsStats == null) continue; - - // We assume a FK/PK join based on the following intuitions: - // 1. NDV(L.c) <= NDV(R.d) - // The reasoning is that a FK/PK join is unlikely if the foreign key - // side has a higher NDV than the primary key side. We may miss true - // FK/PK joins due to inaccurate and/or stale stats. - // 2. R.d is probably a primary key. - // Requires that NDV(R.d) is very close to |R|. - // The idea is that, by default, we assume that every join is a FK/PK join unless - // we have compelling evidence that suggests otherwise, so by using || we give the - // FK/PK assumption more chances to succeed. - if (lhsStats.ndv <= rhsStats.ndv * (1.0 + FK_PK_MAX_STATS_DELTA_PERC) || - Math.abs(rhsStats.numRows - rhsStats.ndv) / (double) rhsStats.numRows - <= FK_PK_MAX_STATS_DELTA_PERC) { - // Adjust the join selectivity based on the NDV ratio to avoid underestimating - // the cardinality if the PK side has a higher NDV than the FK side. - double ndvRatio = (double) rhsStats.ndv / (double) lhsStats.ndv; - double rhsSelectivity = (double) rhsCard / (double) rhsStats.numRows; - long joinCard = (long) Math.ceil(lhsCard * rhsSelectivity * ndvRatio); - // FK/PK join cardinality must be <= the lhs cardinality. - joinCard = Math.min(lhsCard, joinCard); - if (fkPkJoinCard == -1) { - fkPkJoinCard = joinCard; - } else { - fkPkJoinCard = Math.min(fkPkJoinCard, joinCard); - } - } else { - // Adjust the NDVs on both sides to account for predicates. Intuitively, the NDVs - // should only decrease, so we bail if the adjustment would lead to an increase. - // TODO: Adjust the NDVs more systematically throughout the plan tree to - // get a more accurate NDV at this plan node. - if (lhsCard > lhsStats.numRows || rhsCard > rhsStats.numRows) continue; - double lhsAdjNdv = lhsStats.ndv * ((double)lhsCard / lhsStats.numRows); - double rhsAdjNdv = rhsStats.ndv * ((double)rhsCard / rhsStats.numRows); - // Generic join cardinality estimation. - long joinCard = (long) Math.ceil( - (lhsCard / Math.max(lhsAdjNdv, rhsAdjNdv)) * rhsCard); - if (genericJoinCard == -1) { - genericJoinCard = joinCard; - } else { - genericJoinCard = Math.min(genericJoinCard, joinCard); - } - } - } - - if (fkPkJoinCard != -1) { - return fkPkJoinCard; - } else if (genericJoinCard != -1) { - return genericJoinCard; - } else { - // Optimistic FK/PK assumption with join selectivity of 1. - return lhsCard; - } - } - - /** - * Class combining column and table stats for a particular slot. Contains the NDV - * for the slot and the number of rows in the originating table. - */ - private static class SlotStats { - // Number of distinct values of the slot. - public final long ndv; - // Number of rows in the originating table. - public final long numRows; - - public SlotStats(long ndv, long numRows) { - // Cap NDV at num rows of the table. - this.ndv = Math.min(ndv, numRows); - this.numRows = numRows; - } - - /** - * Returns a new SlotStats object from the given expr that is guaranteed - * to have valid stats. - * Returns null if 'e' is not a SlotRef or a cast SlotRef, or if there are no - * valid table/column stats for 'e'. - */ - public static SlotStats create(Expr e) { - // We need both the table and column stats, but 'e' might not directly reference - // a scan slot, e.g., if 'e' references a grouping slot of an agg. So we look for - // that source scan slot, traversing through materialization points if necessary. - SlotDescriptor slotDesc = e.findSrcScanSlot(); - if (slotDesc == null) return null; - Table table = slotDesc.getParent().getTable(); - if (table == null || table.getNumRows() == -1) return null; - if (!slotDesc.getStats().hasNumDistinctValues()) return null; - return new SlotStats( - slotDesc.getStats().getNumDistinctValues(), table.getNumRows()); - } - } - - /** - * Returns the estimated cardinality of a semi join node. - * For a left semi join between child(0) and child(1), we look for equality join - * conditions "L.c = R.d" (with L being from child(0) and R from child(1)) and use as - * the cardinality estimate the minimum of - * |child(0)| * Min(NDV(L.c), NDV(R.d)) / NDV(L.c) - * over all suitable join conditions. The reasoning is that: - * - each row in child(0) is returned at most once - * - the probability of a row in child(0) having a match in R is - * Min(NDV(L.c), NDV(R.d)) / NDV(L.c) - * - * For a left anti join we estimate the cardinality as the minimum of: - * |L| * Max(NDV(L.c) - NDV(R.d), NDV(L.c)) / NDV(L.c) - * over all suitable join conditions. The reasoning is that: - * - each row in child(0) is returned at most once - * - if NDV(L.c) > NDV(R.d) then the probability of row in L having a match - * in child(1) is (NDV(L.c) - NDV(R.d)) / NDV(L.c) - * - otherwise, we conservatively use |L| to avoid underestimation - * - * We analogously estimate the cardinality for right semi/anti joins, and treat the - * null-aware anti join like a regular anti join - * - * TODO: In order to take into account additional conjuncts in the child child subtrees - * adjust NDV(L.c) by |child(0)| / |L| and the NDV(R.d) by |child(1)| / |R|. - * The adjustment is currently too dangerous due to the other planner bugs compounding - * to bad plans causing perf regressions (IMPALA-976). - */ - private long getSemiJoinCardinality() { - Preconditions.checkState(joinOp_.isSemiJoin()); - - // Return -1 if the cardinality of the returned side is unknown. - long cardinality; - if (joinOp_ == JoinOperator.RIGHT_SEMI_JOIN - || joinOp_ == JoinOperator.RIGHT_ANTI_JOIN) { - if (getChild(1).cardinality_ == -1) return -1; - cardinality = getChild(1).cardinality_; - } else { - if (getChild(0).cardinality_ == -1) return -1; - cardinality = getChild(0).cardinality_; - } - double minSelectivity = 1.0; - for (Expr eqJoinPredicate: eqJoinConjuncts_) { - long lhsNdv = getNdv(eqJoinPredicate.getChild(0)); - lhsNdv = Math.min(lhsNdv, getChild(0).cardinality_); - long rhsNdv = getNdv(eqJoinPredicate.getChild(1)); - rhsNdv = Math.min(rhsNdv, getChild(1).cardinality_); - - // Skip conjuncts with unknown NDV on either side. - if (lhsNdv == -1 || rhsNdv == -1) continue; - - double selectivity = 1.0; - switch (joinOp_) { - case LEFT_SEMI_JOIN: { - selectivity = (double) Math.min(lhsNdv, rhsNdv) / (double) (lhsNdv); - break; - } - case RIGHT_SEMI_JOIN: { - selectivity = (double) Math.min(lhsNdv, rhsNdv) / (double) (rhsNdv); - break; - } - case LEFT_ANTI_JOIN: - case NULL_AWARE_LEFT_ANTI_JOIN: { - selectivity = (double) Math.max(lhsNdv - rhsNdv, lhsNdv) / (double) lhsNdv; - break; - } - case RIGHT_ANTI_JOIN: { - selectivity = (double) Math.max(rhsNdv - lhsNdv, rhsNdv) / (double) rhsNdv; - break; - } - default: Preconditions.checkState(false); - } - minSelectivity = Math.min(minSelectivity, selectivity); - } - - Preconditions.checkState(cardinality != -1); - return Math.round(cardinality * minSelectivity); - } - - /** - * Unwraps the SlotRef in expr and returns the NDVs of it. - * Returns -1 if the NDVs are unknown or if expr is not a SlotRef. - */ - private long getNdv(Expr expr) { - SlotRef slotRef = expr.unwrapSlotRef(false); - if (slotRef == null) return -1; - SlotDescriptor slotDesc = slotRef.getDesc(); - if (slotDesc == null) return -1; - ColumnStats stats = slotDesc.getStats(); - if (!stats.hasNumDistinctValues()) return -1; - return stats.getNumDistinctValues(); - } - - @Override - public void computeStats(Analyzer analyzer) { - super.computeStats(analyzer); - if (joinOp_.isSemiJoin()) { - cardinality_ = getSemiJoinCardinality(); - } else if (joinOp_.isInnerJoin() || joinOp_.isOuterJoin()){ - cardinality_ = getJoinCardinality(analyzer); - } else { - Preconditions.checkState(joinOp_.isCrossJoin()); - long leftCard = getChild(0).cardinality_; - long rightCard = getChild(1).cardinality_; - if (leftCard != -1 && rightCard != -1) { - cardinality_ = multiplyCardinalities(leftCard, rightCard); - } - } - - // Impose lower/upper bounds on the cardinality based on the join type. - long leftCard = getChild(0).cardinality_; - long rightCard = getChild(1).cardinality_; - switch (joinOp_) { - case LEFT_SEMI_JOIN: { - if (leftCard != -1) { - cardinality_ = Math.min(leftCard, cardinality_); - } - break; - } - case RIGHT_SEMI_JOIN: { - if (rightCard != -1) { - cardinality_ = Math.min(rightCard, cardinality_); - } - break; - } - case LEFT_OUTER_JOIN: { - if (leftCard != -1) { - cardinality_ = Math.max(leftCard, cardinality_); - } - break; - } - case RIGHT_OUTER_JOIN: { - if (rightCard != -1) { - cardinality_ = Math.max(rightCard, cardinality_); - } - break; - } - case FULL_OUTER_JOIN: { - if (leftCard != -1 && rightCard != -1) { - long cardinalitySum = addCardinalities(leftCard, rightCard); - cardinality_ = Math.max(cardinalitySum, cardinality_); - } - break; - } - case LEFT_ANTI_JOIN: - case NULL_AWARE_LEFT_ANTI_JOIN: { - if (leftCard != -1) { - cardinality_ = Math.min(leftCard, cardinality_); - } - break; - } - case RIGHT_ANTI_JOIN: { - if (rightCard != -1) { - cardinality_ = Math.min(rightCard, cardinality_); - } - break; - } - case CROSS_JOIN: { - if (getChild(0).cardinality_ == -1 || getChild(1).cardinality_ == -1) { - cardinality_ = -1; - } else { - cardinality_ = multiplyCardinalities(getChild(0).cardinality_, - getChild(1).cardinality_); - } - break; - } - } - cardinality_ = capAtLimit(cardinality_); - Preconditions.checkState(hasValidStats()); - LOG.debug("stats Join: cardinality=" + Long.toString(cardinality_)); - } - - /** - * Inverts the join op, swaps our children, and swaps the children - * of all eqJoinConjuncts_. All modifications are in place. - */ - public void invertJoin() { - joinOp_ = joinOp_.invert(); - Collections.swap(children_, 0, 1); - for (BinaryPredicate p: eqJoinConjuncts_) p.reverse(); - } - - public boolean hasConjuncts() { - return !eqJoinConjuncts_.isEmpty() || !otherJoinConjuncts_.isEmpty() || - !conjuncts_.isEmpty(); - } - - @Override - protected String getDisplayLabelDetail() { - StringBuilder output = new StringBuilder(joinOp_.toString()); - if (distrMode_ != DistributionMode.NONE) output.append(", " + distrMode_.toString()); - return output.toString(); - } - - protected void orderJoinConjunctsByCost() { - conjuncts_ = orderConjunctsByCost(conjuncts_); - eqJoinConjuncts_ = orderConjunctsByCost(eqJoinConjuncts_); - otherJoinConjuncts_ = orderConjunctsByCost(otherJoinConjuncts_); - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/JoinTableId.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/JoinTableId.java b/fe/src/main/java/com/cloudera/impala/planner/JoinTableId.java deleted file mode 100644 index 5cf7a2b..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/JoinTableId.java +++ /dev/null @@ -1,47 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import com.cloudera.impala.common.Id; -import com.cloudera.impala.common.IdGenerator; - -public class JoinTableId extends Id<JoinTableId> { - // Construction only allowed via an IdGenerator. - protected JoinTableId(int id) { - super(id); - } - - public static JoinTableId INVALID; - static { - INVALID = new JoinTableId(Id.INVALID_ID); - } - - public static IdGenerator<JoinTableId> createGenerator() { - return new IdGenerator<JoinTableId>() { - @Override - public JoinTableId getNextId() { return new JoinTableId(nextId_++); } - @Override - public JoinTableId getMaxId() { return new JoinTableId(nextId_ - 1); } - }; - } - - @Override - public String toString() { - return String.format("%02d", id_); - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/KuduScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/KuduScanNode.java b/fe/src/main/java/com/cloudera/impala/planner/KuduScanNode.java deleted file mode 100644 index 4f654a9..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/KuduScanNode.java +++ /dev/null @@ -1,358 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import java.io.IOException; -import java.util.List; -import java.util.ListIterator; -import java.util.Set; - -import org.apache.kudu.ColumnSchema; -import org.apache.kudu.Schema; -import org.apache.kudu.client.KuduClient; -import org.apache.kudu.client.KuduClient.KuduClientBuilder; -import org.apache.kudu.client.KuduPredicate; -import org.apache.kudu.client.KuduPredicate.ComparisonOp; -import org.apache.kudu.client.KuduScanToken; -import org.apache.kudu.client.KuduScanToken.KuduScanTokenBuilder; -import org.apache.kudu.client.LocatedTablet; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.impala.analysis.Analyzer; -import com.cloudera.impala.analysis.BinaryPredicate; -import com.cloudera.impala.analysis.BoolLiteral; -import com.cloudera.impala.analysis.Expr; -import com.cloudera.impala.analysis.LiteralExpr; -import com.cloudera.impala.analysis.NullLiteral; -import com.cloudera.impala.analysis.NumericLiteral; -import com.cloudera.impala.analysis.SlotDescriptor; -import com.cloudera.impala.analysis.SlotRef; -import com.cloudera.impala.analysis.StringLiteral; -import com.cloudera.impala.analysis.TupleDescriptor; -import com.cloudera.impala.catalog.KuduTable; -import com.cloudera.impala.common.ImpalaRuntimeException; -import com.cloudera.impala.thrift.TExplainLevel; -import com.cloudera.impala.thrift.TKuduScanNode; -import com.cloudera.impala.thrift.TNetworkAddress; -import com.cloudera.impala.thrift.TPlanNode; -import com.cloudera.impala.thrift.TPlanNodeType; -import com.cloudera.impala.thrift.TScanRange; -import com.cloudera.impala.thrift.TScanRangeLocation; -import com.cloudera.impala.thrift.TScanRangeLocations; -import com.google.common.base.Charsets; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - -/** - * Scan of a single Kudu table. - * - * Extracts predicates that can be pushed down to Kudu. Currently only binary predicates - * that have a constant expression on one side and a slot ref on the other can be - * evaluated by Kudu. - * - * Uses the Kudu ScanToken API to generate a set of Kudu "scan tokens" which are used for - * scheduling and initializing the scanners. Scan tokens are opaque objects that represent - * a scan for some Kudu data on a tablet (currently one token represents one tablet), and - * it contains the tablet locations and all information needed to produce a Kudu scanner, - * including the projected columns and predicates that are pushed down. - * - * After KUDU-1065 is resolved, Kudu will also prune the tablets that don't need to be - * scanned, and only the tokens for those tablets will be returned. - */ -public class KuduScanNode extends ScanNode { - private final static Logger LOG = LoggerFactory.getLogger(KuduScanNode.class); - - private final KuduTable kuduTable_; - - // Indexes for the set of hosts that will be used for the query. - // From analyzer.getHostIndex().getIndex(address) - private final Set<Integer> hostIndexSet_ = Sets.newHashSet(); - - // List of conjuncts that can be pushed down to Kudu, after they have been normalized - // by BinaryPredicate.normalizeSlotRefComparison(). Used for computing stats and - // explain strings. - private final List<Expr> kuduConjuncts_ = Lists.newArrayList(); - - // Exprs in kuduConjuncts_ converted to KuduPredicates. - private final List<KuduPredicate> kuduPredicates_ = Lists.newArrayList(); - - public KuduScanNode(PlanNodeId id, TupleDescriptor desc) { - super(id, desc, "SCAN KUDU"); - kuduTable_ = (KuduTable) desc_.getTable(); - } - - @Override - public void init(Analyzer analyzer) throws ImpalaRuntimeException { - assignConjuncts(analyzer); - analyzer.createEquivConjuncts(tupleIds_.get(0), conjuncts_); - conjuncts_ = orderConjunctsByCost(conjuncts_); - - try (KuduClient client = - new KuduClientBuilder(kuduTable_.getKuduMasterAddresses()).build()) { - org.apache.kudu.client.KuduTable rpcTable = - client.openTable(kuduTable_.getKuduTableName()); - validateSchema(rpcTable); - - // Extract predicates that can be evaluated by Kudu. - extractKuduConjuncts(analyzer, client, rpcTable); - - // Materialize the slots of the remaining conjuncts (i.e. those not pushed to Kudu) - analyzer.materializeSlots(conjuncts_); - - // Creates Kudu scan tokens and sets the scan range locations. - computeScanRangeLocations(analyzer, client, rpcTable); - } catch (Exception e) { - throw new ImpalaRuntimeException("Unable to initialize the Kudu scan node", e); - } - - computeMemLayout(analyzer); - computeStats(analyzer); - } - - /** - * Validate the columns Impala expects are actually in the Kudu table. - */ - private void validateSchema(org.apache.kudu.client.KuduTable rpcTable) - throws ImpalaRuntimeException { - Schema tableSchema = rpcTable.getSchema(); - for (SlotDescriptor desc: getTupleDesc().getSlots()) { - String colName = desc.getColumn().getName(); - try { - tableSchema.getColumn(colName); - } catch (Exception e) { - throw new ImpalaRuntimeException("Column '" + colName + "' not found in kudu " + - "table " + rpcTable.getName()); - } - } - } - - /** - * Compute the scan range locations for the given table using the scan tokens. - */ - private void computeScanRangeLocations(Analyzer analyzer, - KuduClient client, org.apache.kudu.client.KuduTable rpcTable) - throws ImpalaRuntimeException { - scanRanges_ = Lists.newArrayList(); - - List<KuduScanToken> scanTokens = createScanTokens(client, rpcTable); - for (KuduScanToken token: scanTokens) { - LocatedTablet tablet = token.getTablet(); - List<TScanRangeLocation> locations = Lists.newArrayList(); - if (tablet.getReplicas().isEmpty()) { - throw new ImpalaRuntimeException(String.format( - "At least one tablet does not have any replicas. Tablet ID: %s", - new String(tablet.getTabletId(), Charsets.UTF_8))); - } - - for (LocatedTablet.Replica replica: tablet.getReplicas()) { - TNetworkAddress address = - new TNetworkAddress(replica.getRpcHost(), replica.getRpcPort()); - // Use the network address to look up the host in the global list - Integer hostIndex = analyzer.getHostIndex().getIndex(address); - locations.add(new TScanRangeLocation(hostIndex)); - hostIndexSet_.add(hostIndex); - } - - TScanRange scanRange = new TScanRange(); - try { - scanRange.setKudu_scan_token(token.serialize()); - } catch (IOException e) { - throw new ImpalaRuntimeException("Unable to serialize Kudu scan token=" + - token.toString(), e); - } - - TScanRangeLocations locs = new TScanRangeLocations(); - locs.setScan_range(scanRange); - locs.locations = locations; - scanRanges_.add(locs); - } - } - - /** - * Returns KuduScanTokens for this scan given the projected columns and predicates that - * will be pushed to Kudu. - */ - private List<KuduScanToken> createScanTokens(KuduClient client, - org.apache.kudu.client.KuduTable rpcTable) { - List<String> projectedCols = Lists.newArrayList(); - for (SlotDescriptor desc: getTupleDesc().getSlots()) { - if (desc.isMaterialized()) projectedCols.add(desc.getColumn().getName()); - } - - KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(rpcTable); - tokenBuilder.setProjectedColumnNames(projectedCols); - for (KuduPredicate predicate: kuduPredicates_) tokenBuilder.addPredicate(predicate); - return tokenBuilder.build(); - } - - @Override - protected double computeSelectivity() { - List<Expr> allConjuncts = Lists.newArrayList( - Iterables.concat(conjuncts_, kuduConjuncts_)); - return computeCombinedSelectivity(allConjuncts); - } - - @Override - protected void computeStats(Analyzer analyzer) { - super.computeStats(analyzer); - // Update the number of nodes to reflect the hosts that have relevant data. - numNodes_ = hostIndexSet_.size(); - - // Update the cardinality - inputCardinality_ = cardinality_ = kuduTable_.getNumRows(); - cardinality_ *= computeSelectivity(); - cardinality_ = Math.min(Math.max(1, cardinality_), kuduTable_.getNumRows()); - cardinality_ = capAtLimit(cardinality_); - LOG.debug("computeStats KuduScan: cardinality=" + Long.toString(cardinality_)); - } - - @Override - protected String getNodeExplainString(String prefix, String detailPrefix, - TExplainLevel detailLevel) { - StringBuilder result = new StringBuilder(); - - String aliasStr = desc_.hasExplicitAlias() ? " " + desc_.getAlias() : ""; - result.append(String.format("%s%s:%s [%s%s]\n", prefix, id_.toString(), displayName_, - kuduTable_.getFullName(), aliasStr)); - - switch (detailLevel) { - case MINIMAL: break; - case STANDARD: // Fallthrough intended. - case EXTENDED: // Fallthrough intended. - case VERBOSE: { - if (!conjuncts_.isEmpty()) { - result.append(detailPrefix + "predicates: " + getExplainString(conjuncts_) - + "\n"); - } - if (!kuduConjuncts_.isEmpty()) { - result.append(detailPrefix + "kudu predicates: " + getExplainString( - kuduConjuncts_) + "\n"); - } - } - } - return result.toString(); - } - - @Override - protected void toThrift(TPlanNode node) { - node.node_type = TPlanNodeType.KUDU_SCAN_NODE; - node.kudu_scan_node = new TKuduScanNode(desc_.getId().asInt()); - } - - /** - * Extracts predicates from conjuncts_ that can be pushed down to Kudu. Currently only - * binary predicates that have a constant expression on one side and a slot ref on the - * other can be evaluated by Kudu. Only looks at comparisons of constants (i.e., the - * bounds of the result can be evaluated with Expr::GetValue(NULL)). If a conjunct can - * be converted into this form, the normalized expr is added to kuduConjuncts_, a - * KuduPredicate is added to kuduPredicates_, and the original expr from conjuncts_ is - * removed. - */ - private void extractKuduConjuncts(Analyzer analyzer, - KuduClient client, org.apache.kudu.client.KuduTable rpcTable) { - ListIterator<Expr> it = conjuncts_.listIterator(); - while (it.hasNext()) { - if (tryConvertKuduPredicate(analyzer, rpcTable, it.next())) it.remove(); - } - } - - /** - * If 'expr' can be converted to a KuduPredicate, returns true and updates - * kuduPredicates_ and kuduConjuncts_. - */ - private boolean tryConvertKuduPredicate(Analyzer analyzer, - org.apache.kudu.client.KuduTable table, Expr expr) { - if (!(expr instanceof BinaryPredicate)) return false; - BinaryPredicate predicate = (BinaryPredicate) expr; - - // TODO KUDU-931 look into handling implicit/explicit casts on the SlotRef. - predicate = BinaryPredicate.normalizeSlotRefComparison(predicate, analyzer); - if (predicate == null) return false; - ComparisonOp op = getKuduOperator(((BinaryPredicate)predicate).getOp()); - if (op == null) return false; - - SlotRef ref = (SlotRef) predicate.getChild(0); - LiteralExpr literal = (LiteralExpr) predicate.getChild(1); - - // Cannot push prediates with null literal values (KUDU-1595). - if (literal instanceof NullLiteral) return false; - - String colName = ref.getDesc().getColumn().getName(); - ColumnSchema column = table.getSchema().getColumn(colName); - KuduPredicate kuduPredicate = null; - switch (literal.getType().getPrimitiveType()) { - case BOOLEAN: { - kuduPredicate = KuduPredicate.newComparisonPredicate(column, op, - ((BoolLiteral)literal).getValue()); - break; - } - case TINYINT: - case SMALLINT: - case INT: { - kuduPredicate = KuduPredicate.newComparisonPredicate(column, op, - ((NumericLiteral)literal).getIntValue()); - break; - } - case BIGINT: { - kuduPredicate = KuduPredicate.newComparisonPredicate(column, op, - ((NumericLiteral)literal).getLongValue()); - break; - } - case FLOAT: { - kuduPredicate = KuduPredicate.newComparisonPredicate(column, op, - (float)((NumericLiteral)literal).getDoubleValue()); - break; - } - case DOUBLE: { - kuduPredicate = KuduPredicate.newComparisonPredicate(column, op, - ((NumericLiteral)literal).getDoubleValue()); - break; - } - case STRING: - case VARCHAR: - case CHAR: { - kuduPredicate = KuduPredicate.newComparisonPredicate(column, op, - ((StringLiteral)literal).getStringValue()); - break; - } - default: break; - } - if (kuduPredicate == null) return false; - - kuduConjuncts_.add(predicate); - kuduPredicates_.add(kuduPredicate); - return true; - } - - /** - * Returns a Kudu comparison operator for the BinaryPredicate operator, or null if - * the operation is not supported by Kudu. - */ - private static KuduPredicate.ComparisonOp getKuduOperator(BinaryPredicate.Operator op) { - switch (op) { - case GT: return ComparisonOp.GREATER; - case LT: return ComparisonOp.LESS; - case GE: return ComparisonOp.GREATER_EQUAL; - case LE: return ComparisonOp.LESS_EQUAL; - case EQ: return ComparisonOp.EQUAL; - default: return null; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/KuduTableSink.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/KuduTableSink.java b/fe/src/main/java/com/cloudera/impala/planner/KuduTableSink.java deleted file mode 100644 index 8e8ac63..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/KuduTableSink.java +++ /dev/null @@ -1,88 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - - -package com.cloudera.impala.planner; - -import java.util.ArrayList; -import java.util.List; - -import com.cloudera.impala.catalog.Table; -import com.cloudera.impala.common.PrintUtils; -import com.cloudera.impala.thrift.TDataSink; -import com.cloudera.impala.thrift.TDataSinkType; -import com.cloudera.impala.thrift.TExplainLevel; -import com.cloudera.impala.thrift.TKuduTableSink; -import com.cloudera.impala.thrift.TTableSink; -import com.cloudera.impala.thrift.TTableSinkType; -import com.google.common.collect.Lists; - -/** - * Class used to represent a Sink that will transport - * data from a plan fragment into an Kudu table using a Kudu client. - */ -public class KuduTableSink extends TableSink { - - // Optional list of referenced Kudu table column indices. The position of a result - // expression i matches a column index into the Kudu schema at targetColdIdxs[i]. - private ArrayList<Integer> targetColIdxs_; - - private final boolean ignoreNotFoundOrDuplicate_; - - public KuduTableSink(Table targetTable, Op sinkOp, - List<Integer> referencedColumns, boolean ignoreNotFoundOrDuplicate) { - super(targetTable, sinkOp); - targetColIdxs_ = referencedColumns != null - ? Lists.newArrayList(referencedColumns) : null; - ignoreNotFoundOrDuplicate_ = ignoreNotFoundOrDuplicate; - } - - @Override - public String getExplainString(String prefix, String detailPrefix, - TExplainLevel explainLevel) { - StringBuilder output = new StringBuilder(); - output.append(prefix + sinkOp_.toExplainString()); - output.append(" KUDU [" + targetTable_.getFullName() + "]\n"); - output.append(detailPrefix); - if (sinkOp_ == Op.INSERT) { - output.append("check unique keys: "); - } else { - output.append("check keys exist: "); - } - output.append(ignoreNotFoundOrDuplicate_); - output.append("\n"); - if (explainLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) { - output.append(PrintUtils.printHosts(detailPrefix, fragment_.getNumNodes())); - output.append(PrintUtils.printMemCost(" ", perHostMemCost_)); - output.append("\n"); - } - return output.toString(); - } - - @Override - protected TDataSink toThrift() { - TDataSink result = new TDataSink(TDataSinkType.TABLE_SINK); - TTableSink tTableSink = new TTableSink(targetTable_.getId().asInt(), - TTableSinkType.KUDU, sinkOp_.toThrift()); - TKuduTableSink tKuduSink = new TKuduTableSink(); - tKuduSink.setReferenced_columns(targetColIdxs_); - tKuduSink.setIgnore_not_found_or_duplicate(ignoreNotFoundOrDuplicate_); - tTableSink.setKudu_table_sink(tKuduSink); - result.table_sink = tTableSink; - return result; - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/NestedLoopJoinNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/NestedLoopJoinNode.java b/fe/src/main/java/com/cloudera/impala/planner/NestedLoopJoinNode.java deleted file mode 100644 index e989438..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/NestedLoopJoinNode.java +++ /dev/null @@ -1,133 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import java.util.Collections; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.impala.analysis.Analyzer; -import com.cloudera.impala.analysis.BinaryPredicate; -import com.cloudera.impala.analysis.Expr; -import com.cloudera.impala.analysis.JoinOperator; -import com.cloudera.impala.common.ImpalaException; -import com.cloudera.impala.thrift.TExplainLevel; -import com.cloudera.impala.thrift.TNestedLoopJoinNode; -import com.cloudera.impala.thrift.TPlanNode; -import com.cloudera.impala.thrift.TPlanNodeType; -import com.cloudera.impala.thrift.TQueryOptions; -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; - -/** - * Nested-loop join between left child and right child. - * Initially, the join operator fully materializes the right input in memory. - * Subsequently, for every row from the left input it identifies the matching rows - * from the right hand side and produces the join result according to the join operator. - * The nested-loop join is used when there are no equi-join predicates. Hence, - * eqJoinConjuncts_ should be empty and all the join conjuncts are stored in - * otherJoinConjuncts_. Currrently, all join operators are supported except for - * null-aware anti join. - * - * Note: The operator does not spill to disk when there is not enough memory to hold the - * right input. - */ -public class NestedLoopJoinNode extends JoinNode { - private final static Logger LOG = LoggerFactory.getLogger(NestedLoopJoinNode.class); - - public NestedLoopJoinNode(PlanNode outer, PlanNode inner, boolean isStraightJoin, - DistributionMode distrMode, JoinOperator joinOp, List<Expr> otherJoinConjuncts) { - super(outer, inner, isStraightJoin, distrMode, joinOp, - Collections.<BinaryPredicate>emptyList(), otherJoinConjuncts, - "NESTED LOOP JOIN"); - } - - @Override - public void init(Analyzer analyzer) throws ImpalaException { - super.init(analyzer); - Preconditions.checkState(eqJoinConjuncts_.isEmpty()); - // Set the proper join operator based on whether predicates are assigned or not. - if (conjuncts_.isEmpty() && otherJoinConjuncts_.isEmpty() && !joinOp_.isSemiJoin() && - !joinOp_.isOuterJoin()) { - joinOp_ = JoinOperator.CROSS_JOIN; - } else if (joinOp_.isCrossJoin()) { - // A cross join with predicates is an inner join. - joinOp_ = JoinOperator.INNER_JOIN; - } - orderJoinConjunctsByCost(); - computeStats(analyzer); - } - - @Override - public void computeCosts(TQueryOptions queryOptions) { - if (getChild(1).getCardinality() == -1 || getChild(1).getAvgRowSize() == -1 - || numNodes_ == 0) { - perHostMemCost_ = DEFAULT_PER_HOST_MEM; - return; - } - perHostMemCost_ = - (long) Math.ceil(getChild(1).cardinality_ * getChild(1).avgRowSize_); - } - - @Override - protected String getNodeExplainString(String prefix, String detailPrefix, - TExplainLevel detailLevel) { - StringBuilder output = new StringBuilder(); - String labelDetail = getDisplayLabelDetail(); - if (labelDetail == null) { - output.append(prefix + getDisplayLabel() + "\n"); - } else { - output.append(String.format("%s%s:%s [%s]\n", prefix, id_.toString(), - displayName_, getDisplayLabelDetail())); - } - if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) { - if (joinTableId_.isValid()) { - output.append( - detailPrefix + "join table id: " + joinTableId_.toString() + "\n"); - } - if (!otherJoinConjuncts_.isEmpty()) { - output.append(detailPrefix + "join predicates: ") - .append(getExplainString(otherJoinConjuncts_) + "\n"); - } - if (!conjuncts_.isEmpty()) { - output.append(detailPrefix + "predicates: ") - .append(getExplainString(conjuncts_) + "\n"); - } - } - return output.toString(); - } - - @Override - protected void toThrift(TPlanNode msg) { - msg.node_type = TPlanNodeType.NESTED_LOOP_JOIN_NODE; - msg.nested_loop_join_node = new TNestedLoopJoinNode(); - msg.nested_loop_join_node.join_op = joinOp_.toThrift(); - for (Expr e: otherJoinConjuncts_) { - msg.nested_loop_join_node.addToJoin_conjuncts(e.treeToThrift()); - } - } - - @Override - protected String debugString() { - return Objects.toStringHelper(this) - .addValue(super.debugString()) - .toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/ParallelPlanner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/ParallelPlanner.java b/fe/src/main/java/com/cloudera/impala/planner/ParallelPlanner.java deleted file mode 100644 index 905d68d..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/ParallelPlanner.java +++ /dev/null @@ -1,205 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.impala.common.IdGenerator; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -/** - * The parallel planner is responsible for breaking up a single distributed plan - * (= tree of PlanFragments) into a (logical) tree of distributed plans. The root - * of that tree produces the query result, all the other ones produce intermediate - * join build sides. All plans that produce intermediate join build sides (one per join - * node in the recipient) for a single recipient plan are grouped together into a - * cohort. Since each plan may only produce a build side for at most one recipient - * plan, each plan belongs to exactly one cohort. - * - * TODO: if the input to the JoinBuildSink is the result of a grouping aggregation - * on the join keys, the AggregationNode should materialize the final hash table - * directly (instead of reading the hash table content and feeding it into a - * JoinBuildSink to build another hash table) - * - * TODO: instead of cohort ids, create a Plan class that is a subclass of TreeNode? - */ -public class ParallelPlanner { - private final static Logger LOG = LoggerFactory.getLogger(ParallelPlanner.class); - - private final IdGenerator<JoinTableId> joinTableIdGenerator_ = - JoinTableId.createGenerator(); - private final IdGenerator<PlanId> planIdGenerator_ = PlanId.createGenerator(); - private final IdGenerator<CohortId> cohortIdGenerator_ = CohortId.createGenerator(); - private final PlannerContext ctx_; - - private List<PlanFragment> planRoots_ = Lists.newArrayList(); - - public ParallelPlanner(PlannerContext ctx) { ctx_ = ctx; } - - /** - * Given a distributed plan, return list of plans ready for parallel execution. - * The last plan in the sequence materializes the query result, the preceding - * plans materialize the build sides of joins. - * Assigns cohortId and planId for all fragments. - * TODO: create class DistributedPlan with a PlanFragment member, so we don't - * need to distinguish PlanFragment and Plan through comments? - */ - public List<PlanFragment> createPlans(PlanFragment root) { - root.setPlanId(planIdGenerator_.getNextId()); - root.setCohortId(cohortIdGenerator_.getNextId()); - planRoots_.add(root); - createBuildPlans(root, null); - return planRoots_; - } - - /** - * Recursively traverse tree of fragments of 'plan' from top to bottom and - * move all build inputs of joins into separate plans. 'buildCohortId' is the - * cohort id of the build plans of 'fragment' and may be null if the plan - * to which 'fragment' belongs has so far not required any build plans. - * Assign fragment's plan id and cohort id to children. - */ - private void createBuildPlans(PlanFragment fragment, CohortId buildCohortId) { - LOG.info("createbuildplans fragment " + fragment.getId().toString()); - List<JoinNode> joins = Lists.newArrayList(); - collectJoins(fragment.getPlanRoot(), joins); - if (!joins.isEmpty()) { - List<String> joinIds = Lists.newArrayList(); - for (JoinNode join: joins) joinIds.add(join.getId().toString()); - LOG.info("collected joins " + Joiner.on(" ").join(joinIds)); - - if (buildCohortId == null) buildCohortId = cohortIdGenerator_.getNextId(); - for (JoinNode join: joins) createBuildPlan(join, buildCohortId); - } - - if (!fragment.getChildren().isEmpty()) { - List<String> ids = Lists.newArrayList(); - for (PlanFragment c: fragment.getChildren()) ids.add(c.getId().toString()); - LOG.info("collected children " + Joiner.on(" ").join(ids) + " parent " - + fragment.getId().toString()); - } - for (PlanFragment child: fragment.getChildren()) { - child.setPlanId(fragment.getPlanId()); - child.setCohortId(fragment.getCohortId()); - createBuildPlans(child, buildCohortId); - } - } - - /** - * Collect all JoinNodes that aren't themselves the build side of a join node - * in this fragment or the rhs of a SubplanNode. - */ - private void collectJoins(PlanNode node, List<JoinNode> result) { - if (node instanceof JoinNode) { - result.add((JoinNode)node); - // for joins, only descend through the probe side; - // we're recursively traversing the build side when constructing the build plan - // in createBuildPlan() - collectJoins(node.getChild(0), result); - return; - } - if (node instanceof ExchangeNode) return; - if (node instanceof SubplanNode) { - collectJoins(node.getChild(0), result); - return; - } - for (PlanNode child: node.getChildren()) collectJoins(child, result); - } - - /** - * Collect all ExchangeNodes in this fragment. - */ - private void collectExchangeNodes(PlanNode node, List<ExchangeNode> result) { - if (node instanceof ExchangeNode) { - result.add((ExchangeNode)node); - return; - } - for (PlanNode child: node.getChildren()) collectExchangeNodes(child, result); - } - - /** - * Create new plan that materializes build input of 'join' and assign it 'cohortId'. - * In the process, moves all fragments required for this materialization from tree - * rooted at 'join's fragment into the new plan. - * Also assigns the new plan a plan id. - */ - private void createBuildPlan(JoinNode join, CohortId cohortId) { - LOG.info("createbuildplan " + join.getId().toString()); - Preconditions.checkNotNull(cohortId); - // collect all ExchangeNodes on the build side and their corresponding input - // fragments - final List<ExchangeNode> exchNodes = Lists.newArrayList(); - collectExchangeNodes(join.getChild(1), exchNodes); - - com.google.common.base.Predicate<PlanFragment> isInputFragment = - new com.google.common.base.Predicate<PlanFragment>() { - public boolean apply(PlanFragment f) { - // we're starting with the fragment containing the join, which might - // be terminal - if (f.getDestNode() == null) return false; - for (ExchangeNode exch: exchNodes) { - if (exch.getId() == f.getDestNode().getId()) return true; - } - return false; - } - }; - List<PlanFragment> inputFragments = Lists.newArrayList(); - join.getFragment().collect(isInputFragment, inputFragments); - Preconditions.checkState(exchNodes.size() == inputFragments.size()); - - // Create new fragment with JoinBuildSink that consumes the output of the - // join's rhs input (the one that materializes the build side). - // The new fragment has the same data partition as the join node's fragment. - JoinBuildSink buildSink = - new JoinBuildSink(joinTableIdGenerator_.getNextId(), join); - join.setJoinTableId(buildSink.getJoinTableId()); - // c'tor fixes up PlanNode.fragment_ - PlanFragment buildFragment = new PlanFragment(ctx_.getNextFragmentId(), - join.getChild(1), join.getFragment().getDataPartition()); - buildFragment.setSink(buildSink); - - // move input fragments - for (int i = 0; i < exchNodes.size(); ++i) { - LOG.info("re-link fragment " + inputFragments.get(i).getId().toString() + " to " - + exchNodes.get(i).getFragment().getId().toString()); - Preconditions.checkState(exchNodes.get(i).getFragment() == buildFragment); - join.getFragment().removeChild(inputFragments.get(i)); - buildFragment.getChildren().add(inputFragments.get(i)); - } - - // assign plan and cohort id - buildFragment.setPlanId(planIdGenerator_.getNextId()); - PlanId parentPlanId = join.getFragment().getPlanId(); - buildFragment.setCohortId(cohortId); - - planRoots_.add(buildFragment); - LOG.info("new build fragment " + buildFragment.getId().toString()); - LOG.info("in cohort " + buildFragment.getCohortId().toString()); - LOG.info("for join node " + join.getId().toString()); - createBuildPlans(buildFragment, null); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/PipelinedPlanNodeSet.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/PipelinedPlanNodeSet.java b/fe/src/main/java/com/cloudera/impala/planner/PipelinedPlanNodeSet.java deleted file mode 100644 index 6714213..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/PipelinedPlanNodeSet.java +++ /dev/null @@ -1,215 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import java.util.ArrayList; -import java.util.List; -import java.util.Set; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.impala.thrift.TQueryOptions; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - -/** - * Represents a set of PlanNodes and DataSinks that execute and consume resources - * concurrently. PlanNodes and DataSinks in such a pipelined plan node set may belong - * to different plan fragments because data is streamed across fragments. - * - * For example, a series of left-deep joins consists of two plan node sets. The first - * set contains all build-side nodes. The second set contains the leftmost - * scan. Both sets contain all join nodes because they execute and consume - * resources during the build and probe phases. Similarly, all nodes below a 'blocking' - * node (e.g, an AggregationNode) are placed into a differnet plan node set than the - * nodes above it, but the blocking node itself belongs to both sets. - */ -public class PipelinedPlanNodeSet { - private final static Logger LOG = LoggerFactory.getLogger(PipelinedPlanNodeSet.class); - - // Minimum per-host resource requirements to ensure that no plan node set can have - // estimates of zero, even if the contained PlanNodes have estimates of zero. - public static final long MIN_PER_HOST_MEM = 10 * 1024 * 1024; - public static final int MIN_PER_HOST_VCORES = 1; - - // List of plan nodes that execute and consume resources concurrently. - private final ArrayList<PlanNode> planNodes = Lists.newArrayList(); - - // DataSinks that execute and consume resources concurrently. - // Primarily used for estimating the cost of insert queries. - private final List<DataSink> dataSinks = Lists.newArrayList(); - - // Estimated per-host memory and CPU requirements. - // Valid after computeResourceEstimates(). - private long perHostMem = MIN_PER_HOST_MEM; - private int perHostVcores = MIN_PER_HOST_VCORES; - - public void add(PlanNode node) { - Preconditions.checkNotNull(node.getFragment()); - planNodes.add(node); - } - - public void addSink(DataSink sink) { - Preconditions.checkNotNull(sink); - dataSinks.add(sink); - } - - /** - * Computes the estimated per-host memory and CPU requirements of this plan node set. - * Optionally excludes unpartitioned fragments from the estimation. - * Returns true if at least one plan node was included in the estimation. - * Otherwise returns false indicating the estimates are invalid. - */ - public boolean computeResourceEstimates(boolean excludeUnpartitionedFragments, - TQueryOptions queryOptions) { - Set<PlanFragment> uniqueFragments = Sets.newHashSet(); - - // Distinguish the per-host memory estimates for scan nodes and non-scan nodes to - // get a tighter estimate on the amount of memory required by multiple concurrent - // scans. The memory required by all concurrent scans of the same type (Hdfs/Hbase) - // cannot exceed the per-host upper memory bound for that scan type. Intuitively, - // the amount of I/O buffers is limited by the disk bandwidth. - long perHostHbaseScanMem = 0L; - long perHostHdfsScanMem = 0L; - long perHostNonScanMem = 0L; - - for (int i = 0; i < planNodes.size(); ++i) { - PlanNode node = planNodes.get(i); - PlanFragment fragment = node.getFragment(); - if (!fragment.isPartitioned() && excludeUnpartitionedFragments) continue; - node.computeCosts(queryOptions); - uniqueFragments.add(fragment); - if (node.getPerHostMemCost() < 0) { - LOG.warn(String.format("Invalid per-host memory requirement %s of node %s.\n" + - "PlanNode stats are: numNodes_=%s ", node.getPerHostMemCost(), - node.getClass().getSimpleName(), node.getNumNodes())); - } - if (node instanceof HBaseScanNode) { - perHostHbaseScanMem += node.getPerHostMemCost(); - } else if (node instanceof HdfsScanNode) { - perHostHdfsScanMem += node.getPerHostMemCost(); - } else { - perHostNonScanMem += node.getPerHostMemCost(); - } - } - - // The memory required by concurrent scans cannot exceed the upper memory bound - // for that scan type. - // TODO: In the future, we may want to restrict scanner concurrency based on a - // memory limit. This estimation will need to accoung for that as well. - perHostHbaseScanMem = - Math.min(perHostHbaseScanMem, HBaseScanNode.getPerHostMemUpperBound()); - perHostHdfsScanMem = - Math.min(perHostHdfsScanMem, HdfsScanNode.getPerHostMemUpperBound()); - - long perHostDataSinkMem = 0L; - for (int i = 0; i < dataSinks.size(); ++i) { - DataSink sink = dataSinks.get(i); - PlanFragment fragment = sink.getFragment(); - if (!fragment.isPartitioned() && excludeUnpartitionedFragments) continue; - // Sanity check that this plan-node set has at least one PlanNode of fragment. - Preconditions.checkState(uniqueFragments.contains(fragment)); - sink.computeCosts(); - if (sink.getPerHostMemCost() < 0) { - LOG.warn(String.format("Invalid per-host memory requirement %s of sink %s.\n", - sink.getPerHostMemCost(), sink.getClass().getSimpleName())); - } - perHostDataSinkMem += sink.getPerHostMemCost(); - } - - // Combine the memory estimates of all sinks, scans nodes and non-scan nodes. - long perHostMem = perHostHdfsScanMem + perHostHbaseScanMem + perHostNonScanMem + - perHostDataSinkMem; - - // The backend needs at least one thread per fragment. - int perHostVcores = uniqueFragments.size(); - - // This plan node set might only have unpartitioned fragments. - // Only set estimates if they are valid. - if (perHostMem >= 0 && perHostVcores >= 0) { - this.perHostMem = perHostMem; - this.perHostVcores = perHostVcores; - return true; - } - return false; - } - - public long getPerHostMem() { return perHostMem; } - public int getPerHostVcores() { return perHostVcores; } - - /** - * Computes and returns the pipelined plan node sets of the given plan. - */ - public static ArrayList<PipelinedPlanNodeSet> computePlanNodeSets(PlanNode root) { - ArrayList<PipelinedPlanNodeSet> planNodeSets = - Lists.newArrayList(new PipelinedPlanNodeSet()); - computePlanNodeSets(root, planNodeSets.get(0), null, planNodeSets); - return planNodeSets; - } - - /** - * Populates 'planNodeSets' by recursively traversing the plan tree rooted at 'node' - * The plan node sets are computed top-down. As a result, the plan node sets are added - * in reverse order of their runtime execution. - * - * Nodes are generally added to lhsSet. Joins are treated specially in that their - * left child is added to lhsSet and their right child to rhsSet to make sure - * that concurrent join builds end up in the same plan node set. - */ - private static void computePlanNodeSets(PlanNode node, PipelinedPlanNodeSet lhsSet, - PipelinedPlanNodeSet rhsSet, ArrayList<PipelinedPlanNodeSet> planNodeSets) { - lhsSet.add(node); - if (node == node.getFragment().getPlanRoot() && node.getFragment().hasSink()) { - lhsSet.addSink(node.getFragment().getSink()); - } - - if (node instanceof HashJoinNode) { - // Create a new set for the right-hand sides of joins if necessary. - if (rhsSet == null) { - rhsSet = new PipelinedPlanNodeSet(); - planNodeSets.add(rhsSet); - } - // The join node itself is added to the lhsSet (above) and the rhsSet. - rhsSet.add(node); - computePlanNodeSets(node.getChild(1), rhsSet, null, planNodeSets); - computePlanNodeSets(node.getChild(0), lhsSet, rhsSet, planNodeSets); - return; - } - - if (node.isBlockingNode()) { - // We add blocking nodes to two plan node sets because they require resources while - // consuming their input (execution of the preceding set) and while they - // emit their output (execution of the following set). - lhsSet = new PipelinedPlanNodeSet(); - lhsSet.add(node); - planNodeSets.add(lhsSet); - // Join builds under this blocking node belong in a new rhsSet. - rhsSet = null; - } - - // Assume that non-join, non-blocking nodes with multiple children - // (e.g., ExchangeNodes) consume their inputs in an arbitrary order, - // i.e., all child subtrees execute concurrently. - // TODO: This is not true for UnionNodes anymore. Fix the estimates accordingly. - for (PlanNode child: node.getChildren()) { - computePlanNodeSets(child, lhsSet, rhsSet, planNodeSets); - } - } -}
