http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/DataPartition.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/DataPartition.java b/fe/src/main/java/com/cloudera/impala/planner/DataPartition.java deleted file mode 100644 index 3320c2b..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/DataPartition.java +++ /dev/null @@ -1,131 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.impala.analysis.Analyzer; -import com.cloudera.impala.analysis.Expr; -import com.cloudera.impala.analysis.ExprSubstitutionMap; -import com.cloudera.impala.thrift.TDataPartition; -import com.cloudera.impala.thrift.TPartitionType; -import com.google.common.base.Joiner; -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -/** - * Specification of the partition of a single stream of data. - * Examples of those streams of data are: the scan of a table; the output - * of a plan fragment; etc. (ie, this is not restricted to direct exchanges - * between two fragments, which in the backend is facilitated by the classes - * DataStreamSender/DataStreamMgr/DataStreamRecvr). - */ -public class DataPartition { - private final static Logger LOG = LoggerFactory.getLogger(DataPartition.class); - - private final TPartitionType type_; - - // for hash partition: exprs used to compute hash value - private List<Expr> partitionExprs_; - - private DataPartition(TPartitionType type, List<Expr> exprs) { - Preconditions.checkNotNull(exprs); - Preconditions.checkState(!exprs.isEmpty()); - Preconditions.checkState(type == TPartitionType.HASH_PARTITIONED - || type == TPartitionType.RANGE_PARTITIONED); - type_ = type; - partitionExprs_ = exprs; - } - - private DataPartition(TPartitionType type) { - Preconditions.checkState(type == TPartitionType.UNPARTITIONED - || type == TPartitionType.RANDOM); - type_ = type; - partitionExprs_ = Lists.newArrayList(); - } - - public final static DataPartition UNPARTITIONED = - new DataPartition(TPartitionType.UNPARTITIONED); - - public final static DataPartition RANDOM = - new DataPartition(TPartitionType.RANDOM); - - public static DataPartition hashPartitioned(List<Expr> exprs) { - return new DataPartition(TPartitionType.HASH_PARTITIONED, exprs); - } - - public boolean isPartitioned() { return type_ != TPartitionType.UNPARTITIONED; } - public boolean isHashPartitioned() { return type_ == TPartitionType.HASH_PARTITIONED; } - public TPartitionType getType() { return type_; } - public List<Expr> getPartitionExprs() { return partitionExprs_; } - - public void substitute(ExprSubstitutionMap smap, Analyzer analyzer) { - partitionExprs_ = Expr.substituteList(partitionExprs_, smap, analyzer, false); - } - - public TDataPartition toThrift() { - TDataPartition result = new TDataPartition(type_); - if (partitionExprs_ != null) { - result.setPartition_exprs(Expr.treesToThrift(partitionExprs_)); - } - return result; - } - - @Override - public boolean equals(Object obj) { - if (obj == null) return false; - if (obj.getClass() != this.getClass()) return false; - DataPartition other = (DataPartition) obj; - if (type_ != other.type_) return false; - return Expr.equalLists(partitionExprs_, other.partitionExprs_); - } - - public String debugString() { - return Objects.toStringHelper(this) - .add("type_", type_) - .addValue(Expr.debugString(partitionExprs_)) - .toString(); - } - - public String getExplainString() { - StringBuilder str = new StringBuilder(); - str.append(getPartitionShortName(type_)); - if (!partitionExprs_.isEmpty()) { - List<String> strings = Lists.newArrayList(); - for (Expr expr: partitionExprs_) { - strings.add(expr.toSql()); - } - str.append("(" + Joiner.on(",").join(strings) +")"); - } - return str.toString(); - } - - private String getPartitionShortName(TPartitionType partition) { - switch (partition) { - case RANDOM: return "RANDOM"; - case HASH_PARTITIONED: return "HASH"; - case RANGE_PARTITIONED: return "RANGE"; - case UNPARTITIONED: return "UNPARTITIONED"; - default: return ""; - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/DataSink.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/DataSink.java b/fe/src/main/java/com/cloudera/impala/planner/DataSink.java deleted file mode 100644 index ff81b50..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/DataSink.java +++ /dev/null @@ -1,64 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import java.util.List; - -import com.cloudera.impala.analysis.Expr; -import com.cloudera.impala.catalog.HBaseTable; -import com.cloudera.impala.catalog.HdfsTable; -import com.cloudera.impala.catalog.KuduTable; -import com.cloudera.impala.catalog.Table; -import com.cloudera.impala.thrift.TDataSink; -import com.cloudera.impala.thrift.TExplainLevel; - -/** - * A DataSink describes the destination of a plan fragment's output rows. - * The destination could be another plan fragment on a remote machine, - * or a table into which the rows are to be inserted - * (i.e., the destination of the last fragment of an INSERT statement). - */ -public abstract class DataSink { - - // estimated per-host memory requirement for sink; - // set in computeCosts(); invalid: -1 - protected long perHostMemCost_ = -1; - - // Fragment that this DataSink belongs to. Set by the PlanFragment enclosing this sink. - protected PlanFragment fragment_; - - /** - * Return an explain string for the DataSink. Each line of the explain will be prefixed - * by "prefix". - */ - public abstract String getExplainString(String prefix, String detailPrefix, - TExplainLevel explainLevel); - - protected abstract TDataSink toThrift(); - - public void setFragment(PlanFragment fragment) { fragment_ = fragment; } - public PlanFragment getFragment() { return fragment_; } - public long getPerHostMemCost() { return perHostMemCost_; } - - /** - * Estimates the cost of executing this DataSink. Currently only sets perHostMemCost. - */ - public void computeCosts() { - perHostMemCost_ = 0; - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/DataSourceScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/DataSourceScanNode.java b/fe/src/main/java/com/cloudera/impala/planner/DataSourceScanNode.java deleted file mode 100644 index ab92605..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/DataSourceScanNode.java +++ /dev/null @@ -1,371 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.impala.analysis.Analyzer; -import com.cloudera.impala.analysis.BinaryPredicate; -import com.cloudera.impala.analysis.BoolLiteral; -import com.cloudera.impala.analysis.CompoundPredicate; -import com.cloudera.impala.analysis.Expr; -import com.cloudera.impala.analysis.LiteralExpr; -import com.cloudera.impala.analysis.NumericLiteral; -import com.cloudera.impala.analysis.SlotRef; -import com.cloudera.impala.analysis.StringLiteral; -import com.cloudera.impala.analysis.TupleDescriptor; -import com.cloudera.impala.catalog.DataSource; -import com.cloudera.impala.catalog.DataSourceTable; -import com.cloudera.impala.common.ImpalaException; -import com.cloudera.impala.common.InternalException; -import com.cloudera.impala.extdatasource.ExternalDataSourceExecutor; -import com.cloudera.impala.extdatasource.thrift.TBinaryPredicate; -import com.cloudera.impala.extdatasource.thrift.TColumnDesc; -import com.cloudera.impala.extdatasource.thrift.TComparisonOp; -import com.cloudera.impala.extdatasource.thrift.TPrepareParams; -import com.cloudera.impala.extdatasource.thrift.TPrepareResult; -import com.cloudera.impala.service.FeSupport; -import com.cloudera.impala.thrift.TCacheJarResult; -import com.cloudera.impala.thrift.TColumnValue; -import com.cloudera.impala.thrift.TDataSourceScanNode; -import com.cloudera.impala.thrift.TErrorCode; -import com.cloudera.impala.thrift.TExplainLevel; -import com.cloudera.impala.thrift.TNetworkAddress; -import com.cloudera.impala.thrift.TPlanNode; -import com.cloudera.impala.thrift.TPlanNodeType; -import com.cloudera.impala.thrift.TQueryOptions; -import com.cloudera.impala.thrift.TScanRange; -import com.cloudera.impala.thrift.TScanRangeLocation; -import com.cloudera.impala.thrift.TScanRangeLocations; -import com.cloudera.impala.thrift.TStatus; -import com.google.common.base.Joiner; -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - -/** - * Scan of a table provided by an external data source. - */ -public class DataSourceScanNode extends ScanNode { - private final static Logger LOG = LoggerFactory.getLogger(DataSourceScanNode.class); - private final TupleDescriptor desc_; - private final DataSourceTable table_; - - // The converted conjuncts_ that were accepted by the data source. A conjunct can - // be converted if it contains only disjunctive predicates of the form - // <slotref> <op> <constant>. - private List<List<TBinaryPredicate>> acceptedPredicates_; - - // The conjuncts that were accepted by the data source and removed from conjuncts_ in - // removeAcceptedConjuncts(). Only used in getNodeExplainString() to print the - // conjuncts applied by the data source. - private List<Expr> acceptedConjuncts_; - - // The number of rows estimate as returned by prepare(). - private long numRowsEstimate_; - - public DataSourceScanNode(PlanNodeId id, TupleDescriptor desc) { - super(id, desc, "SCAN DATA SOURCE"); - desc_ = desc; - table_ = (DataSourceTable) desc_.getTable(); - acceptedPredicates_ = null; - acceptedConjuncts_ = null; - } - - @Override - public void init(Analyzer analyzer) throws ImpalaException { - checkForSupportedFileFormats(); - assignConjuncts(analyzer); - analyzer.createEquivConjuncts(tupleIds_.get(0), conjuncts_); - prepareDataSource(); - conjuncts_ = orderConjunctsByCost(conjuncts_); - computeStats(analyzer); - // materialize slots in remaining conjuncts_ - analyzer.materializeSlots(conjuncts_); - computeMemLayout(analyzer); - computeScanRangeLocations(analyzer); - } - - /** - * Returns a thrift TColumnValue representing the literal from a binary - * predicate, or null if the type cannot be represented. - */ - public static TColumnValue literalToColumnValue(LiteralExpr expr) { - switch (expr.getType().getPrimitiveType()) { - case BOOLEAN: - return new TColumnValue().setBool_val(((BoolLiteral) expr).getValue()); - case TINYINT: - return new TColumnValue().setByte_val( - (byte) ((NumericLiteral) expr).getLongValue()); - case SMALLINT: - return new TColumnValue().setShort_val( - (short) ((NumericLiteral) expr).getLongValue()); - case INT: - return new TColumnValue().setInt_val( - (int) ((NumericLiteral) expr).getLongValue()); - case BIGINT: - return new TColumnValue().setLong_val(((NumericLiteral) expr).getLongValue()); - case FLOAT: - case DOUBLE: - return new TColumnValue().setDouble_val( - ((NumericLiteral) expr).getDoubleValue()); - case STRING: - return new TColumnValue().setString_val(((StringLiteral) expr).getValue()); - case DECIMAL: - case DATE: - case DATETIME: - case TIMESTAMP: - // TODO: we support DECIMAL and TIMESTAMP but no way to specify it in SQL. - return null; - default: - Preconditions.checkState(false); - return null; - } - } - - /** - * Calls prepare() on the data source to determine accepted predicates and get - * stats. The accepted predicates are moved from conjuncts_ into acceptedConjuncts_ - * and the associated TBinaryPredicates are set in acceptedPredicates_. - */ - private void prepareDataSource() throws InternalException { - // Binary predicates that will be offered to the data source. - List<List<TBinaryPredicate>> offeredPredicates = Lists.newArrayList(); - // The index into conjuncts_ for each element in offeredPredicates. - List<Integer> conjunctsIdx = Lists.newArrayList(); - for (int i = 0; i < conjuncts_.size(); ++i) { - Expr conjunct = conjuncts_.get(i); - List<TBinaryPredicate> disjuncts = getDisjuncts(conjunct); - if (disjuncts != null) { - offeredPredicates.add(disjuncts); - conjunctsIdx.add(i); - } - } - - String hdfsLocation = table_.getDataSource().getHdfs_location(); - TCacheJarResult cacheResult = FeSupport.CacheJar(hdfsLocation); - TStatus cacheJarStatus = cacheResult.getStatus(); - if (cacheJarStatus.getStatus_code() != TErrorCode.OK) { - throw new InternalException(String.format( - "Unable to cache data source library at location '%s'. Check that the file " + - "exists and is readable. Message: %s", - hdfsLocation, Joiner.on("\n").join(cacheJarStatus.getError_msgs()))); - } - String localPath = cacheResult.getLocal_path(); - String className = table_.getDataSource().getClass_name(); - String apiVersion = table_.getDataSource().getApi_version(); - TPrepareResult prepareResult; - TStatus prepareStatus; - try { - ExternalDataSourceExecutor executor = new ExternalDataSourceExecutor( - localPath, className, apiVersion, table_.getInitString()); - TPrepareParams prepareParams = new TPrepareParams(); - prepareParams.setInit_string(table_.getInitString()); - prepareParams.setPredicates(offeredPredicates); - // TODO: Include DB (i.e. getFullName())? - prepareParams.setTable_name(table_.getName()); - prepareResult = executor.prepare(prepareParams); - prepareStatus = prepareResult.getStatus(); - } catch (Exception e) { - throw new InternalException(String.format( - "Error calling prepare() on data source %s", - DataSource.debugString(table_.getDataSource())), e); - } - if (prepareStatus.getStatus_code() != TErrorCode.OK) { - throw new InternalException(String.format( - "Data source %s returned an error from prepare(): %s", - DataSource.debugString(table_.getDataSource()), - Joiner.on("\n").join(prepareStatus.getError_msgs()))); - } - - numRowsEstimate_ = prepareResult.getNum_rows_estimate(); - acceptedPredicates_ = Lists.newArrayList(); - List<Integer> acceptedPredicatesIdx = prepareResult.isSetAccepted_conjuncts() ? - prepareResult.getAccepted_conjuncts() : ImmutableList.<Integer>of(); - for (Integer acceptedIdx: acceptedPredicatesIdx) { - acceptedPredicates_.add(offeredPredicates.get(acceptedIdx)); - } - removeAcceptedConjuncts(acceptedPredicatesIdx, conjunctsIdx); - } - - /** - * Converts the conjunct to a list of TBinaryPredicates if it contains only - * disjunctive predicates of the form {slotref} {op} {constant} that can be represented - * by TBinaryPredicates. If the Expr cannot be converted, null is returned. - * TODO: Move this to Expr. - */ - private List<TBinaryPredicate> getDisjuncts(Expr conjunct) { - List<TBinaryPredicate> disjuncts = Lists.newArrayList(); - if (getDisjunctsHelper(conjunct, disjuncts)) return disjuncts; - return null; - } - - // Recursive helper method for getDisjuncts(). - private boolean getDisjunctsHelper(Expr conjunct, - List<TBinaryPredicate> predicates) { - if (conjunct instanceof BinaryPredicate) { - if (conjunct.getChildren().size() != 2) return false; - SlotRef slotRef = null; - LiteralExpr literalExpr = null; - TComparisonOp op = null; - if ((conjunct.getChild(0).unwrapSlotRef(true) instanceof SlotRef) && - (conjunct.getChild(1) instanceof LiteralExpr)) { - slotRef = conjunct.getChild(0).unwrapSlotRef(true); - literalExpr = (LiteralExpr) conjunct.getChild(1); - op = ((BinaryPredicate) conjunct).getOp().getThriftOp(); - } else if ((conjunct.getChild(1).unwrapSlotRef(true) instanceof SlotRef) && - (conjunct.getChild(0) instanceof LiteralExpr)) { - slotRef = conjunct.getChild(1).unwrapSlotRef(true); - literalExpr = (LiteralExpr) conjunct.getChild(0); - op = ((BinaryPredicate) conjunct).getOp().converse().getThriftOp(); - } else { - return false; - } - - TColumnValue val = literalToColumnValue(literalExpr); - if (val == null) return false; // false if unsupported type, e.g. - - String colName = Joiner.on(".").join(slotRef.getResolvedPath().getRawPath()); - TColumnDesc col = new TColumnDesc().setName(colName).setType( - slotRef.getType().toThrift()); - predicates.add(new TBinaryPredicate().setCol(col).setOp(op).setValue(val)); - return true; - } else if (conjunct instanceof CompoundPredicate) { - CompoundPredicate compoundPredicate = ((CompoundPredicate) conjunct); - if (compoundPredicate.getOp() != CompoundPredicate.Operator.OR) return false; - if (!getDisjunctsHelper(conjunct.getChild(0), predicates)) return false; - if (!getDisjunctsHelper(conjunct.getChild(1), predicates)) return false; - return true; - } else { - return false; - } - } - - @Override - public void computeStats(Analyzer analyzer) { - super.computeStats(analyzer); - inputCardinality_ = numRowsEstimate_; - cardinality_ = numRowsEstimate_; - cardinality_ *= computeSelectivity(); - cardinality_ = Math.max(1, cardinality_); - cardinality_ = capAtLimit(cardinality_); - - LOG.debug("computeStats DataSourceScan: cardinality=" + Long.toString(cardinality_)); - - numNodes_ = table_.getNumNodes(); - LOG.debug("computeStats DataSourceScan: #nodes=" + Integer.toString(numNodes_)); - } - - @Override - protected String debugString() { - return Objects.toStringHelper(this) - .add("tid", desc_.getId().asInt()) - .add("tblName", table_.getFullName()) - .add("dataSource", DataSource.debugString(table_.getDataSource())) - .add("initString", table_.getInitString()) - .addValue(super.debugString()) - .toString(); - } - - /** - * Removes the predicates from conjuncts_ that were accepted by the data source. - * Stores the accepted conjuncts in acceptedConjuncts_. - */ - private void removeAcceptedConjuncts(List<Integer> acceptedPredicatesIdx, - List<Integer> conjunctsIdx) { - acceptedConjuncts_ = Lists.newArrayList(); - // Because conjuncts_ is modified in place using positional indexes from - // conjunctsIdx, we remove the accepted predicates in reverse order. - for (int i = acceptedPredicatesIdx.size() - 1; i >= 0; --i) { - int acceptedPredIdx = acceptedPredicatesIdx.get(i); - int conjunctIdx = conjunctsIdx.get(acceptedPredIdx); - acceptedConjuncts_.add(conjuncts_.remove(conjunctIdx)); - } - // Returns a view of the list in the original order as we will print these - // in the explain string and it's convenient to have predicates printed - // in the same order that they're specified. - acceptedConjuncts_ = Lists.reverse(acceptedConjuncts_); - } - - @Override - protected void toThrift(TPlanNode msg) { - Preconditions.checkNotNull(acceptedPredicates_); - msg.node_type = TPlanNodeType.DATA_SOURCE_NODE; - msg.data_source_node = new TDataSourceScanNode(desc_.getId().asInt(), - table_.getDataSource(), table_.getInitString(), acceptedPredicates_); - } - - /** - * Create a single scan range for the localhost. - */ - private void computeScanRangeLocations(Analyzer analyzer) { - // TODO: Does the port matter? - TNetworkAddress networkAddress = addressToTNetworkAddress("localhost:12345"); - Integer hostIndex = analyzer.getHostIndex().getIndex(networkAddress); - scanRanges_ = Lists.newArrayList( - new TScanRangeLocations( - new TScanRange(), Lists.newArrayList(new TScanRangeLocation(hostIndex)))); - } - - @Override - public void computeCosts(TQueryOptions queryOptions) { - // TODO: What's a good estimate of memory consumption? - perHostMemCost_ = 1024L * 1024L * 1024L; - } - - /** - * Returns the per-host upper bound of memory that any number of concurrent scan nodes - * will use. Used for estimating the per-host memory requirement of queries. - */ - public static long getPerHostMemUpperBound() { - // TODO: What's a good estimate of memory consumption? - return 1024L * 1024L * 1024L; - } - - @Override - protected String getNodeExplainString(String prefix, String detailPrefix, - TExplainLevel detailLevel) { - StringBuilder output = new StringBuilder(); - String aliasStr = ""; - if (!table_.getFullName().equalsIgnoreCase(desc_.getAlias()) && - !table_.getName().equalsIgnoreCase(desc_.getAlias())) { - aliasStr = " " + desc_.getAlias(); - } - output.append(String.format("%s%s:%s [%s%s]\n", prefix, id_.toString(), - displayName_, table_.getFullName(), aliasStr)); - - if (!acceptedConjuncts_.isEmpty()) { - output.append(prefix + "data source predicates: " + - getExplainString(acceptedConjuncts_) + "\n"); - } - if (!conjuncts_.isEmpty()) { - output.append(prefix + "predicates: " + getExplainString(conjuncts_) + "\n"); - } - - // Add table and column stats in verbose mode. - if (detailLevel == TExplainLevel.VERBOSE) { - output.append(getStatsExplainString(prefix, detailLevel)); - output.append("\n"); - } - return output.toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/DataStreamSink.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/DataStreamSink.java b/fe/src/main/java/com/cloudera/impala/planner/DataStreamSink.java deleted file mode 100644 index 514a791..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/DataStreamSink.java +++ /dev/null @@ -1,61 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import com.cloudera.impala.thrift.TDataSink; -import com.cloudera.impala.thrift.TDataSinkType; -import com.cloudera.impala.thrift.TDataStreamSink; -import com.cloudera.impala.thrift.TExplainLevel; -import com.google.common.base.Preconditions; - -/** - * Data sink that forwards data to an exchange node. - */ -public class DataStreamSink extends DataSink { - private final ExchangeNode exchNode_; - private final DataPartition outputPartition_; - - public DataStreamSink(ExchangeNode exchNode, DataPartition partition) { - Preconditions.checkNotNull(exchNode); - Preconditions.checkNotNull(partition); - exchNode_ = exchNode; - outputPartition_ = partition; - } - - @Override - public String getExplainString(String prefix, String detailPrefix, - TExplainLevel detailLevel) { - StringBuilder output = new StringBuilder(); - output.append( - String.format("%sDATASTREAM SINK [FRAGMENT=%s, EXCHANGE=%s, %s]", - prefix, exchNode_.getFragment().getId().toString(), - exchNode_.getId().toString(), exchNode_.getDisplayLabelDetail())); - return output.toString(); - } - - @Override - protected TDataSink toThrift() { - TDataSink result = new TDataSink(TDataSinkType.DATA_STREAM_SINK); - TDataStreamSink tStreamSink = - new TDataStreamSink(exchNode_.getId().asInt(), outputPartition_.toThrift()); - result.setStream_sink(tStreamSink); - return result; - } - - public DataPartition getOutputPartition() { return outputPartition_; } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java b/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java deleted file mode 100644 index b38b018..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java +++ /dev/null @@ -1,1019 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import java.util.ArrayList; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.impala.analysis.AggregateInfo; -import com.cloudera.impala.analysis.AnalysisContext; -import com.cloudera.impala.analysis.Analyzer; -import com.cloudera.impala.analysis.Expr; -import com.cloudera.impala.analysis.InsertStmt; -import com.cloudera.impala.analysis.JoinOperator; -import com.cloudera.impala.analysis.QueryStmt; -import com.cloudera.impala.common.ImpalaException; -import com.cloudera.impala.common.InternalException; -import com.cloudera.impala.planner.JoinNode.DistributionMode; -import com.cloudera.impala.planner.RuntimeFilterGenerator.RuntimeFilter; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - - -/** - * The distributed planner is responsible for creating an executable, distributed plan - * from a single-node plan that can be sent to the backend. - */ -public class DistributedPlanner { - private final static Logger LOG = LoggerFactory.getLogger(DistributedPlanner.class); - - private final PlannerContext ctx_; - - public DistributedPlanner(PlannerContext ctx) { - ctx_ = ctx; - } - - /** - * Create plan fragments for a single-node plan considering a set of execution options. - * The fragments are returned in a list such that element i of that list can - * only consume output of the following fragments j > i. - * - * TODO: take data partition of the plan fragments into account; in particular, - * coordinate between hash partitioning for aggregation and hash partitioning - * for analytic computation more generally than what createQueryPlan() does - * right now (the coordination only happens if the same select block does both - * the aggregation and analytic computation). - */ - public ArrayList<PlanFragment> createPlanFragments( - PlanNode singleNodePlan) throws ImpalaException { - Preconditions.checkState(!ctx_.isSingleNodeExec()); - AnalysisContext.AnalysisResult analysisResult = ctx_.getAnalysisResult(); - QueryStmt queryStmt = ctx_.getQueryStmt(); - ArrayList<PlanFragment> fragments = Lists.newArrayList(); - // For inserts or CTAS, unless there is a limit, leave the root fragment - // partitioned, otherwise merge everything into a single coordinator fragment, - // so we can pass it back to the client. - boolean isPartitioned = false; - if ((analysisResult.isInsertStmt() || analysisResult.isCreateTableAsSelectStmt() - || analysisResult.isUpdateStmt() || analysisResult.isDeleteStmt()) - && !singleNodePlan.hasLimit()) { - Preconditions.checkState(!queryStmt.hasOffset()); - isPartitioned = true; - } - LOG.debug("create plan fragments"); - long perNodeMemLimit = ctx_.getQueryOptions().mem_limit; - LOG.debug("memlimit=" + Long.toString(perNodeMemLimit)); - createPlanFragments(singleNodePlan, isPartitioned, perNodeMemLimit, fragments); - return fragments; - } - - /** - * Return plan fragment that produces result of 'root'; recursively creates - * all input fragments to the returned fragment. - * If a new fragment is created, it is appended to 'fragments', so that - * each fragment is preceded by those from which it consumes the output. - * If 'isPartitioned' is false, the returned fragment is unpartitioned; - * otherwise it may be partitioned, depending on whether its inputs are - * partitioned; the partition function is derived from the inputs. - */ - private PlanFragment createPlanFragments( - PlanNode root, boolean isPartitioned, - long perNodeMemLimit, ArrayList<PlanFragment> fragments) - throws ImpalaException { - ArrayList<PlanFragment> childFragments = Lists.newArrayList(); - for (PlanNode child: root.getChildren()) { - // allow child fragments to be partitioned, unless they contain a limit clause - // (the result set with the limit constraint needs to be computed centrally); - // merge later if needed - boolean childIsPartitioned = !child.hasLimit(); - // Do not fragment the subplan of a SubplanNode since it is executed locally. - if (root instanceof SubplanNode && child == root.getChild(1)) continue; - childFragments.add( - createPlanFragments( - child, childIsPartitioned, perNodeMemLimit, fragments)); - } - - PlanFragment result = null; - if (root instanceof ScanNode) { - result = createScanFragment(root); - fragments.add(result); - } else if (root instanceof HashJoinNode) { - Preconditions.checkState(childFragments.size() == 2); - result = createHashJoinFragment( - (HashJoinNode) root, childFragments.get(1), childFragments.get(0), - perNodeMemLimit, fragments); - } else if (root instanceof NestedLoopJoinNode) { - Preconditions.checkState(childFragments.size() == 2); - result = createNestedLoopJoinFragment( - (NestedLoopJoinNode) root, childFragments.get(1), childFragments.get(0), - perNodeMemLimit, fragments); - } else if (root instanceof SubplanNode) { - Preconditions.checkState(childFragments.size() == 1); - result = createSubplanNodeFragment((SubplanNode) root, childFragments.get(0)); - } else if (root instanceof SelectNode) { - result = createSelectNodeFragment((SelectNode) root, childFragments); - } else if (root instanceof UnionNode) { - result = createUnionNodeFragment((UnionNode) root, childFragments, fragments); - } else if (root instanceof AggregationNode) { - result = createAggregationFragment( - (AggregationNode) root, childFragments.get(0), fragments); - } else if (root instanceof SortNode) { - if (((SortNode) root).isAnalyticSort()) { - // don't parallelize this like a regular SortNode - result = createAnalyticFragment( - root, childFragments.get(0), fragments); - } else { - result = createOrderByFragment( - (SortNode) root, childFragments.get(0), fragments); - } - } else if (root instanceof AnalyticEvalNode) { - result = createAnalyticFragment(root, childFragments.get(0), fragments); - } else if (root instanceof EmptySetNode) { - result = new PlanFragment( - ctx_.getNextFragmentId(), root, DataPartition.UNPARTITIONED); - } else { - throw new InternalException( - "Cannot create plan fragment for this node type: " + root.getExplainString()); - } - // move 'result' to end, it depends on all of its children - fragments.remove(result); - fragments.add(result); - - if (!isPartitioned && result.isPartitioned()) { - result = createMergeFragment(result); - fragments.add(result); - } - - return result; - } - - /** - * Returns the product of the distinct value estimates of the individual exprs - * or -1 if any of them doesn't have a distinct value estimate. - */ - private long getNumDistinctValues(List<Expr> exprs) { - long result = 1; - for (Expr expr: exprs) { - result *= expr.getNumDistinctValues(); - if (result < 0) return -1; - } - return result; - } - - /** - * Decides whether to repartition the output of 'inputFragment' before feeding its - * data into the table sink of the given 'insertStmt'. The decision obeys the - * shuffle/noshuffle plan hints if present. Otherwise, returns a plan fragment that - * partitions the output of 'inputFragment' on the partition exprs of 'insertStmt', - * unless the expected number of partitions is less than the number of nodes on which - * inputFragment runs, or the target table is unpartitioned. - * For inserts into unpartitioned tables or inserts with only constant partition exprs, - * the shuffle hint leads to a plan that merges all rows at the coordinator where - * the table sink is executed. - * If this functions ends up creating a new fragment, appends that to 'fragments'. - */ - public PlanFragment createInsertFragment( - PlanFragment inputFragment, InsertStmt insertStmt, Analyzer analyzer, - ArrayList<PlanFragment> fragments) - throws ImpalaException { - if (insertStmt.hasNoShuffleHint()) return inputFragment; - - List<Expr> partitionExprs = Lists.newArrayList(insertStmt.getPartitionKeyExprs()); - // Ignore constants for the sake of partitioning. - Expr.removeConstants(partitionExprs); - - // Do nothing if the input fragment is already appropriately partitioned. - DataPartition inputPartition = inputFragment.getDataPartition(); - if (!partitionExprs.isEmpty() && - analyzer.equivSets(inputPartition.getPartitionExprs(), partitionExprs)) { - return inputFragment; - } - - // Make a cost-based decision only if no user hint was supplied. - if (!insertStmt.hasShuffleHint()) { - // If the existing partition exprs are a subset of the table partition exprs, check - // if it is distributed across all nodes. If so, don't repartition. - if (Expr.isSubset(inputPartition.getPartitionExprs(), partitionExprs)) { - long numPartitions = getNumDistinctValues(inputPartition.getPartitionExprs()); - if (numPartitions >= inputFragment.getNumNodes()) return inputFragment; - } - - // Don't repartition if we know we have fewer partitions than nodes - // (ie, default to repartitioning if col stats are missing). - // TODO: We want to repartition if the resulting files would otherwise - // be very small (less than some reasonable multiple of the recommended block size). - // In order to do that, we need to come up with an estimate of the avg row size - // in the particular file format of the output table/partition. - // We should always know on how many nodes our input is running. - long numPartitions = getNumDistinctValues(partitionExprs); - Preconditions.checkState(inputFragment.getNumNodes() != -1); - if (numPartitions > 0 && numPartitions <= inputFragment.getNumNodes()) { - return inputFragment; - } - } - - ExchangeNode exchNode = - new ExchangeNode(ctx_.getNextNodeId(), inputFragment.getPlanRoot()); - exchNode.init(analyzer); - Preconditions.checkState(exchNode.hasValidStats()); - DataPartition partition; - if (partitionExprs.isEmpty()) { - partition = DataPartition.UNPARTITIONED; - } else { - partition = DataPartition.hashPartitioned(partitionExprs); - } - PlanFragment fragment = - new PlanFragment(ctx_.getNextFragmentId(), exchNode, partition); - inputFragment.setDestination(exchNode); - inputFragment.setOutputPartition(partition); - fragments.add(fragment); - return fragment; - } - - /** - * Return unpartitioned fragment that merges the input fragment's output via - * an ExchangeNode. - * Requires that input fragment be partitioned. - */ - private PlanFragment createMergeFragment(PlanFragment inputFragment) - throws ImpalaException { - Preconditions.checkState(inputFragment.isPartitioned()); - ExchangeNode mergePlan = - new ExchangeNode(ctx_.getNextNodeId(), inputFragment.getPlanRoot()); - mergePlan.init(ctx_.getRootAnalyzer()); - Preconditions.checkState(mergePlan.hasValidStats()); - PlanFragment fragment = new PlanFragment(ctx_.getNextFragmentId(), mergePlan, - DataPartition.UNPARTITIONED); - inputFragment.setDestination(mergePlan); - return fragment; - } - - /** - * Create new randomly-partitioned fragment containing a single scan node. - * TODO: take bucketing into account to produce a naturally hash-partitioned - * fragment - * TODO: hbase scans are range-partitioned on the row key - */ - private PlanFragment createScanFragment(PlanNode node) { - return new PlanFragment(ctx_.getNextFragmentId(), node, DataPartition.RANDOM); - } - - /** - * Adds the SubplanNode as the new plan root to the child fragment and returns - * the child fragment. - */ - private PlanFragment createSubplanNodeFragment(SubplanNode node, - PlanFragment childFragment) { - node.setChild(0, childFragment.getPlanRoot()); - childFragment.setPlanRoot(node); - return childFragment; - } - - /** - * Modifies the leftChildFragment to execute a cross join. The right child input is - * provided by an ExchangeNode, which is the destination of the rightChildFragment's - * output. - */ - private PlanFragment createNestedLoopJoinFragment(NestedLoopJoinNode node, - PlanFragment rightChildFragment, PlanFragment leftChildFragment, - long perNodeMemLimit, ArrayList<PlanFragment> fragments) - throws ImpalaException { - node.setDistributionMode(DistributionMode.BROADCAST); - node.setChild(0, leftChildFragment.getPlanRoot()); - connectChildFragment(node, 1, leftChildFragment, rightChildFragment); - leftChildFragment.setPlanRoot(node); - return leftChildFragment; - } - - /** - * Helper function to produce a partitioning hash-join fragment - */ - private PlanFragment createPartitionedHashJoinFragment(HashJoinNode node, - Analyzer analyzer, boolean lhsHasCompatPartition, boolean rhsHasCompatPartition, - PlanFragment leftChildFragment, PlanFragment rightChildFragment, - List<Expr> lhsJoinExprs, List<Expr> rhsJoinExprs, - ArrayList<PlanFragment> fragments) throws ImpalaException { - node.setDistributionMode(HashJoinNode.DistributionMode.PARTITIONED); - // The lhs and rhs input fragments are already partitioned on the join exprs. - // Combine the lhs/rhs input fragments into leftChildFragment by placing the join - // node into leftChildFragment and setting its lhs/rhs children to the plan root of - // the lhs/rhs child fragment, respectively. No new child fragments or exchanges - // are created, and the rhs fragment is removed. - // TODO: Relax the isCompatPartition() check below. The check is conservative and - // may reject partitions that could be made physically compatible. Fix this by - // removing equivalent duplicates from partition exprs and impose a canonical order - // on partition exprs (both using the canonical equivalence class representatives). - if (lhsHasCompatPartition - && rhsHasCompatPartition - && isCompatPartition( - leftChildFragment.getDataPartition(), - rightChildFragment.getDataPartition(), - lhsJoinExprs, rhsJoinExprs, analyzer)) { - node.setChild(0, leftChildFragment.getPlanRoot()); - node.setChild(1, rightChildFragment.getPlanRoot()); - // fix up PlanNode.fragment_ for the migrated PlanNode tree of the rhs child - leftChildFragment.setFragmentInPlanTree(node.getChild(1)); - // Relocate input fragments of rightChildFragment to leftChildFragment. - for (PlanFragment rhsInput: rightChildFragment.getChildren()) { - leftChildFragment.getChildren().add(rhsInput); - } - // Remove right fragment because its plan tree has been merged into leftFragment. - fragments.remove(rightChildFragment); - leftChildFragment.setPlanRoot(node); - return leftChildFragment; - } - - // The lhs input fragment is already partitioned on the join exprs. - // Make the HashJoin the new root of leftChildFragment and set the join's - // first child to the lhs plan root. The second child of the join is an - // ExchangeNode that is fed by the rhsInputFragment whose sink repartitions - // its data by the rhs join exprs. - DataPartition rhsJoinPartition = null; - if (lhsHasCompatPartition) { - rhsJoinPartition = getCompatPartition(lhsJoinExprs, - leftChildFragment.getDataPartition(), rhsJoinExprs, analyzer); - if (rhsJoinPartition != null) { - node.setChild(0, leftChildFragment.getPlanRoot()); - connectChildFragment(node, 1, leftChildFragment, rightChildFragment); - rightChildFragment.setOutputPartition(rhsJoinPartition); - leftChildFragment.setPlanRoot(node); - return leftChildFragment; - } - } - - // Same as above but with rhs and lhs reversed. - DataPartition lhsJoinPartition = null; - if (rhsHasCompatPartition) { - lhsJoinPartition = getCompatPartition(rhsJoinExprs, - rightChildFragment.getDataPartition(), lhsJoinExprs, analyzer); - if (lhsJoinPartition != null) { - node.setChild(1, rightChildFragment.getPlanRoot()); - connectChildFragment(node, 0, rightChildFragment, leftChildFragment); - leftChildFragment.setOutputPartition(lhsJoinPartition); - rightChildFragment.setPlanRoot(node); - return rightChildFragment; - } - } - - Preconditions.checkState(lhsJoinPartition == null); - Preconditions.checkState(rhsJoinPartition == null); - lhsJoinPartition = DataPartition.hashPartitioned(Expr.cloneList(lhsJoinExprs)); - rhsJoinPartition = DataPartition.hashPartitioned(Expr.cloneList(rhsJoinExprs)); - - // Neither lhs nor rhs are already partitioned on the join exprs. - // Create a new parent fragment containing a HashJoin node with two - // ExchangeNodes as inputs; the latter are the destinations of the - // left- and rightChildFragments, which now partition their output - // on their respective join exprs. - // The new fragment is hash-partitioned on the lhs input join exprs. - ExchangeNode lhsExchange = - new ExchangeNode(ctx_.getNextNodeId(), leftChildFragment.getPlanRoot()); - lhsExchange.computeStats(null); - node.setChild(0, lhsExchange); - ExchangeNode rhsExchange = - new ExchangeNode(ctx_.getNextNodeId(), rightChildFragment.getPlanRoot()); - rhsExchange.computeStats(null); - node.setChild(1, rhsExchange); - - // Connect the child fragments in a new fragment, and set the data partition - // of the new fragment and its child fragments. - PlanFragment joinFragment = - new PlanFragment(ctx_.getNextFragmentId(), node, lhsJoinPartition); - leftChildFragment.setDestination(lhsExchange); - leftChildFragment.setOutputPartition(lhsJoinPartition); - rightChildFragment.setDestination(rhsExchange); - rightChildFragment.setOutputPartition(rhsJoinPartition); - return joinFragment; - } - - /** - * Creates either a broadcast join or a repartitioning join, depending on the - * expected cost. - * If any of the inputs to the cost computation is unknown, it assumes the cost - * will be 0. Costs being equal, it'll favor partitioned over broadcast joins. - * If perNodeMemLimit > 0 and the size of the hash table for a broadcast join is - * expected to exceed that mem limit, switches to partitioned join instead. - * TODO: revisit the choice of broadcast as the default - * TODO: don't create a broadcast join if we already anticipate that this will - * exceed the query's memory budget. - */ - private PlanFragment createHashJoinFragment( - HashJoinNode node, PlanFragment rightChildFragment, - PlanFragment leftChildFragment, long perNodeMemLimit, - ArrayList<PlanFragment> fragments) - throws ImpalaException { - // For both join types, the total cost is calculated as the amount of data - // sent over the network, plus the amount of data inserted into the hash table. - // broadcast: send the rightChildFragment's output to each node executing - // the leftChildFragment, and build a hash table with it on each node. - Analyzer analyzer = ctx_.getRootAnalyzer(); - PlanNode rhsTree = rightChildFragment.getPlanRoot(); - long rhsDataSize = 0; - long broadcastCost = Long.MAX_VALUE; - if (rhsTree.getCardinality() != -1) { - rhsDataSize = Math.round( - rhsTree.getCardinality() * ExchangeNode.getAvgSerializedRowSize(rhsTree)); - if (leftChildFragment.getNumNodes() != -1) { - broadcastCost = 2 * rhsDataSize * leftChildFragment.getNumNodes(); - } - } - LOG.debug("broadcast: cost=" + Long.toString(broadcastCost)); - LOG.debug("card=" + Long.toString(rhsTree.getCardinality()) + " row_size=" - + Float.toString(rhsTree.getAvgRowSize()) + " #nodes=" - + Integer.toString(leftChildFragment.getNumNodes())); - - // repartition: both left- and rightChildFragment are partitioned on the - // join exprs, and a hash table is built with the rightChildFragment's output. - PlanNode lhsTree = leftChildFragment.getPlanRoot(); - long partitionCost = Long.MAX_VALUE; - List<Expr> lhsJoinExprs = Lists.newArrayList(); - List<Expr> rhsJoinExprs = Lists.newArrayList(); - for (Expr joinConjunct: node.getEqJoinConjuncts()) { - // no remapping necessary - lhsJoinExprs.add(joinConjunct.getChild(0).clone()); - rhsJoinExprs.add(joinConjunct.getChild(1).clone()); - } - boolean lhsHasCompatPartition = false; - boolean rhsHasCompatPartition = false; - if (lhsTree.getCardinality() != -1 && rhsTree.getCardinality() != -1) { - lhsHasCompatPartition = analyzer.equivSets(lhsJoinExprs, - leftChildFragment.getDataPartition().getPartitionExprs()); - rhsHasCompatPartition = analyzer.equivSets(rhsJoinExprs, - rightChildFragment.getDataPartition().getPartitionExprs()); - - double lhsNetworkCost = (lhsHasCompatPartition) ? 0.0 : - Math.round( - lhsTree.getCardinality() * ExchangeNode.getAvgSerializedRowSize(lhsTree)); - double rhsNetworkCost = (rhsHasCompatPartition) ? 0.0 : rhsDataSize; - partitionCost = Math.round(lhsNetworkCost + rhsNetworkCost + rhsDataSize); - } - LOG.debug("partition: cost=" + Long.toString(partitionCost)); - LOG.debug("lhs card=" + Long.toString(lhsTree.getCardinality()) + " row_size=" - + Float.toString(lhsTree.getAvgRowSize())); - LOG.debug("rhs card=" + Long.toString(rhsTree.getCardinality()) + " row_size=" - + Float.toString(rhsTree.getAvgRowSize())); - LOG.debug(rhsTree.getExplainString()); - - boolean doBroadcast = false; - // we do a broadcast join if - // - we're explicitly told to do so - // - or if it's cheaper and we weren't explicitly told to do a partitioned join - // - and we're not doing a full outer or right outer/semi join (those require the - // left-hand side to be partitioned for correctness) - // - and the expected size of the hash tbl doesn't exceed perNodeMemLimit - // - or we are doing a null-aware left anti join (broadcast is required for - // correctness) - // we do a "<=" comparison of the costs so that we default to broadcast joins if - // we're unable to estimate the cost - if ((node.getJoinOp() != JoinOperator.RIGHT_OUTER_JOIN - && node.getJoinOp() != JoinOperator.FULL_OUTER_JOIN - && node.getJoinOp() != JoinOperator.RIGHT_SEMI_JOIN - && node.getJoinOp() != JoinOperator.RIGHT_ANTI_JOIN - // a broadcast join hint overides the check to see if the hash table - // size is less than the pernode memlimit - && (node.getDistributionModeHint() == DistributionMode.BROADCAST - || perNodeMemLimit == 0 - || Math.round(rhsDataSize * PlannerContext.HASH_TBL_SPACE_OVERHEAD) - <= perNodeMemLimit) - // a broadcast join hint overrides the check to see if performing a broadcast - // join is more costly than a partitioned join - && (node.getDistributionModeHint() == DistributionMode.BROADCAST - || (node.getDistributionModeHint() != DistributionMode.PARTITIONED - && broadcastCost <= partitionCost))) - || node.getJoinOp().isNullAwareLeftAntiJoin()) { - doBroadcast = true; - } - - PlanFragment hjFragment = null; - if (doBroadcast) { - node.setDistributionMode(HashJoinNode.DistributionMode.BROADCAST); - // Doesn't create a new fragment, but modifies leftChildFragment to execute - // the join; the build input is provided by an ExchangeNode, which is the - // destination of the rightChildFragment's output - node.setChild(0, leftChildFragment.getPlanRoot()); - connectChildFragment(node, 1, leftChildFragment, rightChildFragment); - leftChildFragment.setPlanRoot(node); - hjFragment = leftChildFragment; - } else { - hjFragment = createPartitionedHashJoinFragment(node, analyzer, - lhsHasCompatPartition, rhsHasCompatPartition, leftChildFragment, - rightChildFragment, lhsJoinExprs, rhsJoinExprs, fragments); - } - - for (RuntimeFilter filter: node.getRuntimeFilters()) { - filter.setIsBroadcast(doBroadcast); - filter.computeHasLocalTargets(); - // Work around IMPALA-3450, where cardinalities might be wrong in single-node plans - // with UNION and LIMITs. - // TODO: Remove. - filter.computeNdvEstimate(); - } - return hjFragment; - } - - /** - * Returns true if the lhs and rhs partitions are physically compatible for executing - * a partitioned join with the given lhs/rhs join exprs. Physical compatibility means - * that lhs/rhs exchange nodes hashing on exactly those partition expressions are - * guaranteed to send two rows with identical partition-expr values to the same node. - * The requirements for physical compatibility are: - * 1. Number of exprs must be the same - * 2. The lhs partition exprs are identical to the lhs join exprs and the rhs partition - * exprs are identical to the rhs join exprs - * 3. Or for each expr in the lhs partition, there must be an equivalent expr in the - * rhs partition at the same ordinal position within the expr list - * (4. The expr types must be identical, but that is enforced later in PlanFragment) - * Conditions 2 and 3 are similar but not the same due to outer joins, e.g., for full - * outer joins condition 3 can never be met, but condition 2 can. - * TODO: Move parts of this function into DataPartition as appropriate. - */ - private boolean isCompatPartition(DataPartition lhsPartition, - DataPartition rhsPartition, List<Expr> lhsJoinExprs, List<Expr> rhsJoinExprs, - Analyzer analyzer) { - List<Expr> lhsPartExprs = lhsPartition.getPartitionExprs(); - List<Expr> rhsPartExprs = rhsPartition.getPartitionExprs(); - // 1. Sizes must be equal. - if (lhsPartExprs.size() != rhsPartExprs.size()) return false; - // 2. Lhs/rhs join exprs are identical to lhs/rhs partition exprs. - Preconditions.checkState(lhsJoinExprs.size() == rhsJoinExprs.size()); - if (lhsJoinExprs.size() == lhsPartExprs.size()) { - if (lhsJoinExprs.equals(lhsPartExprs) && rhsJoinExprs.equals(rhsPartExprs)) { - return true; - } - } - // 3. Each lhs part expr must have an equivalent expr at the same position - // in the rhs part exprs. - for (int i = 0; i < lhsPartExprs.size(); ++i) { - if (!analyzer.equivExprs(lhsPartExprs.get(i), rhsPartExprs.get(i))) return false; - } - return true; - } - - /** - * Returns a new data partition that is suitable for creating an exchange node to feed - * a partitioned hash join. The hash join is assumed to be placed in a fragment with an - * existing data partition that is compatible with either the lhs or rhs join exprs - * (srcPartition belongs to the fragment and srcJoinExprs are the compatible exprs). - * The returned partition uses the given joinExprs which are assumed to be the lhs or - * rhs join exprs, whichever srcJoinExprs are not. - * The returned data partition has two important properties to ensure correctness: - * 1. It has exactly the same number of hash exprs as the srcPartition (IMPALA-1307), - * possibly by removing redundant exprs from joinExprs or adding some joinExprs - * multiple times to match the srcPartition - * 2. The hash exprs are ordered based on their corresponding 'matches' in - * the existing srcPartition (IMPALA-1324). - * Returns null if no compatible data partition could be constructed. - * TODO: Move parts of this function into DataPartition as appropriate. - * TODO: Make comment less operational and more semantic. - */ - private DataPartition getCompatPartition(List<Expr> srcJoinExprs, - DataPartition srcPartition, List<Expr> joinExprs, Analyzer analyzer) { - Preconditions.checkState(srcPartition.isHashPartitioned()); - List<Expr> srcPartExprs = srcPartition.getPartitionExprs(); - List<Expr> resultPartExprs = Lists.newArrayList(); - for (int i = 0; i < srcPartExprs.size(); ++i) { - for (int j = 0; j < srcJoinExprs.size(); ++j) { - if (analyzer.equivExprs(srcPartExprs.get(i), srcJoinExprs.get(j))) { - resultPartExprs.add(joinExprs.get(j).clone()); - break; - } - } - } - if (resultPartExprs.size() != srcPartExprs.size()) return null; - return DataPartition.hashPartitioned(resultPartExprs); - } - - /** - * Returns a new fragment with a UnionNode as its root. The data partition of the - * returned fragment and how the data of the child fragments is consumed depends on the - * data partitions of the child fragments: - * - All child fragments are unpartitioned or partitioned: The returned fragment has an - * UNPARTITIONED or RANDOM data partition, respectively. The UnionNode absorbs the - * plan trees of all child fragments. - * - Mixed partitioned/unpartitioned child fragments: The returned fragment is - * RANDOM partitioned. The plan trees of all partitioned child fragments are absorbed - * into the UnionNode. All unpartitioned child fragments are connected to the - * UnionNode via a RANDOM exchange, and remain unchanged otherwise. - */ - private PlanFragment createUnionNodeFragment(UnionNode unionNode, - ArrayList<PlanFragment> childFragments, ArrayList<PlanFragment> fragments) - throws ImpalaException { - Preconditions.checkState(unionNode.getChildren().size() == childFragments.size()); - - // A UnionNode could have no children or constant selects if all of its operands - // were dropped because of constant predicates that evaluated to false. - if (unionNode.getChildren().isEmpty()) { - return new PlanFragment( - ctx_.getNextFragmentId(), unionNode, DataPartition.UNPARTITIONED); - } - - Preconditions.checkState(!childFragments.isEmpty()); - int numUnpartitionedChildFragments = 0; - for (int i = 0; i < childFragments.size(); ++i) { - if (!childFragments.get(i).isPartitioned()) ++numUnpartitionedChildFragments; - } - - // remove all children to avoid them being tagged with the wrong - // fragment (in the PlanFragment c'tor; we haven't created ExchangeNodes yet) - unionNode.clearChildren(); - - // If all child fragments are unpartitioned, return a single unpartitioned fragment - // with a UnionNode that merges all child fragments. - if (numUnpartitionedChildFragments == childFragments.size()) { - PlanFragment unionFragment = new PlanFragment(ctx_.getNextFragmentId(), - unionNode, DataPartition.UNPARTITIONED); - // Absorb the plan trees of all childFragments into unionNode - // and fix up the fragment tree in the process. - for (int i = 0; i < childFragments.size(); ++i) { - unionNode.addChild(childFragments.get(i).getPlanRoot()); - unionFragment.setFragmentInPlanTree(unionNode.getChild(i)); - unionFragment.addChildren(childFragments.get(i).getChildren()); - } - unionNode.init(ctx_.getRootAnalyzer()); - // All child fragments have been absorbed into unionFragment. - fragments.removeAll(childFragments); - return unionFragment; - } - - // There is at least one partitioned child fragment. - PlanFragment unionFragment = new PlanFragment( - ctx_.getNextFragmentId(), unionNode, DataPartition.RANDOM); - for (int i = 0; i < childFragments.size(); ++i) { - PlanFragment childFragment = childFragments.get(i); - if (childFragment.isPartitioned()) { - // absorb the plan trees of all partitioned child fragments into unionNode - unionNode.addChild(childFragment.getPlanRoot()); - unionFragment.setFragmentInPlanTree(unionNode.getChild(i)); - unionFragment.addChildren(childFragment.getChildren()); - fragments.remove(childFragment); - } else { - // dummy entry for subsequent addition of the ExchangeNode - unionNode.addChild(null); - // Connect the unpartitioned child fragments to unionNode via a random exchange. - connectChildFragment(unionNode, i, unionFragment, childFragment); - childFragment.setOutputPartition(DataPartition.RANDOM); - } - } - unionNode.reorderOperands(ctx_.getRootAnalyzer()); - unionNode.init(ctx_.getRootAnalyzer()); - return unionFragment; - } - - /** - * Adds the SelectNode as the new plan root to the child fragment and returns - * the child fragment. - */ - private PlanFragment createSelectNodeFragment(SelectNode selectNode, - ArrayList<PlanFragment> childFragments) { - Preconditions.checkState(selectNode.getChildren().size() == childFragments.size()); - PlanFragment childFragment = childFragments.get(0); - // set the child explicitly, an ExchangeNode might have been inserted - // (whereas selectNode.child[0] would point to the original child) - selectNode.setChild(0, childFragment.getPlanRoot()); - childFragment.setPlanRoot(selectNode); - return childFragment; - } - - /** - * Replace node's child at index childIdx with an ExchangeNode that receives its - * input from childFragment. ParentFragment contains node and the new ExchangeNode. - */ - private void connectChildFragment(PlanNode node, int childIdx, - PlanFragment parentFragment, PlanFragment childFragment) throws ImpalaException { - ExchangeNode exchangeNode = - new ExchangeNode(ctx_.getNextNodeId(), childFragment.getPlanRoot()); - exchangeNode.init(ctx_.getRootAnalyzer()); - exchangeNode.setFragment(parentFragment); - node.setChild(childIdx, exchangeNode); - childFragment.setDestination(exchangeNode); - } - - /** - * Create a new fragment containing a single ExchangeNode that consumes the output - * of childFragment, set the destination of childFragment to the new parent - * and the output partition of childFragment to that of the new parent. - * TODO: the output partition of a child isn't necessarily the same as the data - * partition of the receiving parent (if there is more materialization happening - * in the parent, such as during distinct aggregation). Do we care about the data - * partition of the parent being applicable to the *output* of the parent (it's - * correct for the input). - */ - private PlanFragment createParentFragment( - PlanFragment childFragment, DataPartition parentPartition) - throws ImpalaException { - ExchangeNode exchangeNode = - new ExchangeNode(ctx_.getNextNodeId(), childFragment.getPlanRoot()); - exchangeNode.init(ctx_.getRootAnalyzer()); - PlanFragment parentFragment = new PlanFragment(ctx_.getNextFragmentId(), - exchangeNode, parentPartition); - childFragment.setDestination(exchangeNode); - childFragment.setOutputPartition(parentPartition); - return parentFragment; - } - - /** - * Returns a fragment that materializes the aggregation result of 'node'. - * If the child fragment is partitioned, the result fragment will be partitioned on - * the grouping exprs of 'node'. - * If 'node' is phase 1 of a 2-phase DISTINCT aggregation, this will simply - * add 'node' to the child fragment and return the child fragment; the new - * fragment will be created by the subsequent call of createAggregationFragment() - * for the phase 2 AggregationNode. - */ - private PlanFragment createAggregationFragment(AggregationNode node, - PlanFragment childFragment, ArrayList<PlanFragment> fragments) - throws ImpalaException { - if (!childFragment.isPartitioned()) { - // nothing to distribute; do full aggregation directly within childFragment - childFragment.addPlanRoot(node); - return childFragment; - } - - if (node.getAggInfo().isDistinctAgg()) { - // 'node' is phase 1 of a DISTINCT aggregation; the actual agg fragment - // will get created in the next createAggregationFragment() call - // for the parent AggregationNode - childFragment.addPlanRoot(node); - return childFragment; - } - - // Check if 'node' is phase 2 of a DISTINCT aggregation. - boolean isDistinct = node.getChild(0) instanceof AggregationNode - && ((AggregationNode)(node.getChild(0))).getAggInfo().isDistinctAgg(); - if (isDistinct) { - return createPhase2DistinctAggregationFragment(node, childFragment, fragments); - } else { - return createMergeAggregationFragment(node, childFragment); - } - } - - /** - * Returns a fragment that materializes the final result of an aggregation where - * 'childFragment' is a partitioned fragment and 'node' is not part of a distinct - * aggregation. - */ - private PlanFragment createMergeAggregationFragment( - AggregationNode node, PlanFragment childFragment) - throws ImpalaException { - Preconditions.checkArgument(childFragment.isPartitioned()); - ArrayList<Expr> groupingExprs = node.getAggInfo().getGroupingExprs(); - boolean hasGrouping = !groupingExprs.isEmpty(); - - DataPartition parentPartition = null; - if (hasGrouping) { - // the parent fragment is partitioned on the grouping exprs; - // substitute grouping exprs to reference the *output* of the agg, not the input - List<Expr> partitionExprs = node.getAggInfo().getPartitionExprs(); - if (partitionExprs == null) partitionExprs = groupingExprs; - partitionExprs = Expr.substituteList(partitionExprs, - node.getAggInfo().getIntermediateSmap(), ctx_.getRootAnalyzer(), false); - boolean childHasCompatPartition = ctx_.getRootAnalyzer().equivSets(partitionExprs, - childFragment.getDataPartition().getPartitionExprs()); - if (childHasCompatPartition) { - // The data is already partitioned on the required expressions, we can just do - // the aggregation in the child fragment without an extra merge step. - childFragment.addPlanRoot(node); - return childFragment; - } - parentPartition = DataPartition.hashPartitioned(partitionExprs); - } else { - // the parent fragment is unpartitioned - parentPartition = DataPartition.UNPARTITIONED; - } - - // the original aggregation materializes the intermediate agg tuple and goes - // into the child fragment; merge aggregation materializes the output agg tuple - // and goes into a parent fragment - childFragment.addPlanRoot(node); - node.setIntermediateTuple(); - node.setIsPreagg(ctx_); - - // if there is a limit, we need to transfer it from the pre-aggregation - // node in the child fragment to the merge aggregation node in the parent - long limit = node.getLimit(); - node.unsetLimit(); - node.unsetNeedsFinalize(); - - // place a merge aggregation step in a new fragment - PlanFragment mergeFragment = createParentFragment(childFragment, parentPartition); - AggregationNode mergeAggNode = new AggregationNode(ctx_.getNextNodeId(), - mergeFragment.getPlanRoot(), node.getAggInfo().getMergeAggInfo()); - mergeAggNode.init(ctx_.getRootAnalyzer()); - mergeAggNode.setLimit(limit); - - // HAVING predicates can only be evaluated after the merge agg step - node.transferConjuncts(mergeAggNode); - // Recompute stats after transferring the conjuncts_ (order is important). - node.computeStats(ctx_.getRootAnalyzer()); - mergeFragment.getPlanRoot().computeStats(ctx_.getRootAnalyzer()); - mergeAggNode.computeStats(ctx_.getRootAnalyzer()); - // Set new plan root after updating stats. - mergeFragment.addPlanRoot(mergeAggNode); - - return mergeFragment; - } - - /** - * Returns a fragment that materialises the final result of a distinct aggregation - * where 'childFragment' is a partitioned fragment with the first phase aggregation - * as its root and 'node' is the second phase of the distinct aggregation. - */ - private PlanFragment createPhase2DistinctAggregationFragment(AggregationNode node, - PlanFragment childFragment, ArrayList<PlanFragment> fragments) - throws ImpalaException { - ArrayList<Expr> groupingExprs = node.getAggInfo().getGroupingExprs(); - boolean hasGrouping = !groupingExprs.isEmpty(); - - // The first-phase aggregation node is already in the child fragment. - Preconditions.checkState(node.getChild(0) == childFragment.getPlanRoot()); - - AggregateInfo firstPhaseAggInfo = ((AggregationNode) node.getChild(0)).getAggInfo(); - List<Expr> partitionExprs = null; - if (hasGrouping) { - // We need to do - // - child fragment: - // * phase-1 aggregation - // - merge fragment, hash-partitioned on grouping exprs: - // * merge agg of phase 1 - // * phase 2 agg - // The output partition exprs of the child are the (input) grouping exprs of the - // parent. The grouping exprs reference the output tuple of the 1st phase, but the - // partitioning happens on the intermediate tuple of the 1st phase. - partitionExprs = Expr.substituteList( - groupingExprs, firstPhaseAggInfo.getOutputToIntermediateSmap(), - ctx_.getRootAnalyzer(), false); - } else { - // We need to do - // - child fragment: - // * phase-1 aggregation - // - merge fragment 1, hash-partitioned on distinct exprs: - // * merge agg of phase 1 - // * phase 2 agg - // - merge fragment 2, unpartitioned: - // * merge agg of phase 2 - partitionExprs = Expr.substituteList(firstPhaseAggInfo.getGroupingExprs(), - firstPhaseAggInfo.getIntermediateSmap(), ctx_.getRootAnalyzer(), false); - } - - PlanFragment mergeFragment = null; - boolean childHasCompatPartition = ctx_.getRootAnalyzer().equivSets(partitionExprs, - childFragment.getDataPartition().getPartitionExprs()); - if (childHasCompatPartition) { - // The data is already partitioned on the required expressions, we can skip the - // phase 1 merge step. - childFragment.addPlanRoot(node); - mergeFragment = childFragment; - } else { - DataPartition mergePartition = DataPartition.hashPartitioned(partitionExprs); - // Convert the existing node to a preaggregation. - AggregationNode preaggNode = (AggregationNode)node.getChild(0); - preaggNode.setIsPreagg(ctx_); - - // place a merge aggregation step for the 1st phase in a new fragment - mergeFragment = createParentFragment(childFragment, mergePartition); - AggregateInfo phase1MergeAggInfo = firstPhaseAggInfo.getMergeAggInfo(); - AggregationNode phase1MergeAggNode = - new AggregationNode(ctx_.getNextNodeId(), preaggNode, phase1MergeAggInfo); - phase1MergeAggNode.init(ctx_.getRootAnalyzer()); - phase1MergeAggNode.unsetNeedsFinalize(); - phase1MergeAggNode.setIntermediateTuple(); - mergeFragment.addPlanRoot(phase1MergeAggNode); - - // the 2nd-phase aggregation consumes the output of the merge agg; - // if there is a limit, it had already been placed with the 2nd aggregation - // step (which is where it should be) - mergeFragment.addPlanRoot(node); - } - - if (!hasGrouping) { - // place the merge aggregation of the 2nd phase in an unpartitioned fragment; - // add preceding merge fragment at end - if (mergeFragment != childFragment) fragments.add(mergeFragment); - - node.unsetNeedsFinalize(); - node.setIntermediateTuple(); - // Any limit should be placed in the final merge aggregation node - long limit = node.getLimit(); - node.unsetLimit(); - mergeFragment = createParentFragment(mergeFragment, DataPartition.UNPARTITIONED); - AggregateInfo phase2MergeAggInfo = node.getAggInfo().getMergeAggInfo(); - AggregationNode phase2MergeAggNode = new AggregationNode(ctx_.getNextNodeId(), node, - phase2MergeAggInfo); - phase2MergeAggNode.init(ctx_.getRootAnalyzer()); - // Transfer having predicates. If hasGrouping == true, the predicates should - // instead be evaluated by the 2nd phase agg (the predicates are already there). - node.transferConjuncts(phase2MergeAggNode); - phase2MergeAggNode.setLimit(limit); - mergeFragment.addPlanRoot(phase2MergeAggNode); - } - return mergeFragment; - } - - /** - * Returns a fragment that produces the output of either an AnalyticEvalNode - * or of the SortNode that provides the input to an AnalyticEvalNode. - * ('node' can be either an AnalyticEvalNode or a SortNode). - * The returned fragment is either partitioned on the Partition By exprs or - * unpartitioned in the absence of such exprs. - */ - private PlanFragment createAnalyticFragment(PlanNode node, - PlanFragment childFragment, ArrayList<PlanFragment> fragments) - throws ImpalaException { - Preconditions.checkState( - node instanceof SortNode || node instanceof AnalyticEvalNode); - if (node instanceof AnalyticEvalNode) { - AnalyticEvalNode analyticNode = (AnalyticEvalNode) node; - if (analyticNode.getPartitionExprs().isEmpty() - && analyticNode.getOrderByElements().isEmpty()) { - // no Partition-By/Order-By exprs: compute analytic exprs in single - // unpartitioned fragment - PlanFragment fragment = childFragment; - if (childFragment.isPartitioned()) { - fragment = createParentFragment(childFragment, DataPartition.UNPARTITIONED); - } - fragment.addPlanRoot(analyticNode); - return fragment; - } else { - childFragment.addPlanRoot(analyticNode); - return childFragment; - } - } - - SortNode sortNode = (SortNode) node; - Preconditions.checkState(sortNode.isAnalyticSort()); - PlanFragment analyticFragment = childFragment; - if (sortNode.getInputPartition() != null) { - // make sure the childFragment's output is partitioned as required by the sortNode - sortNode.getInputPartition().substitute( - childFragment.getPlanRoot().getOutputSmap(), ctx_.getRootAnalyzer()); - if (!childFragment.getDataPartition().equals(sortNode.getInputPartition())) { - analyticFragment = - createParentFragment(childFragment, sortNode.getInputPartition()); - } - } - analyticFragment.addPlanRoot(sortNode); - return analyticFragment; - } - - /** - * Returns a new unpartitioned fragment that materializes the result of the given - * SortNode. If the child fragment is partitioned, returns a new fragment with a - * sort-merging exchange that merges the results of the partitioned sorts. - * The offset and limit are adjusted in the child and parent plan nodes to produce - * the correct result. - */ - private PlanFragment createOrderByFragment(SortNode node, - PlanFragment childFragment, ArrayList<PlanFragment> fragments) - throws ImpalaException { - node.setChild(0, childFragment.getPlanRoot()); - childFragment.addPlanRoot(node); - if (!childFragment.isPartitioned()) return childFragment; - - // Remember original offset and limit. - boolean hasLimit = node.hasLimit(); - long limit = node.getLimit(); - long offset = node.getOffset(); - - // Create a new fragment for a sort-merging exchange. - PlanFragment mergeFragment = - createParentFragment(childFragment, DataPartition.UNPARTITIONED); - ExchangeNode exchNode = (ExchangeNode) mergeFragment.getPlanRoot(); - - // Set limit, offset and merge parameters in the exchange node. - exchNode.unsetLimit(); - if (hasLimit) exchNode.setLimit(limit); - exchNode.setMergeInfo(node.getSortInfo(), offset); - - // Child nodes should not process the offset. If there is a limit, - // the child nodes need only return (offset + limit) rows. - SortNode childSortNode = (SortNode) childFragment.getPlanRoot(); - Preconditions.checkState(node == childSortNode); - if (hasLimit) { - childSortNode.unsetLimit(); - childSortNode.setLimit(limit + offset); - } - childSortNode.setOffset(0); - childSortNode.computeStats(ctx_.getRootAnalyzer()); - exchNode.computeStats(ctx_.getRootAnalyzer()); - - return mergeFragment; - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/EmptySetNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/EmptySetNode.java b/fe/src/main/java/com/cloudera/impala/planner/EmptySetNode.java deleted file mode 100644 index ed9dc70..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/EmptySetNode.java +++ /dev/null @@ -1,71 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import java.util.ArrayList; - -import com.cloudera.impala.analysis.Analyzer; -import com.cloudera.impala.analysis.TupleId; -import com.cloudera.impala.thrift.TExplainLevel; -import com.cloudera.impala.thrift.TPlanNode; -import com.cloudera.impala.thrift.TPlanNodeType; -import com.google.common.base.Preconditions; - -/** - * Node that returns an empty result set. Used for planning query blocks with a constant - * predicate evaluating to false or a limit 0. The result set will have zero rows, but - * the row descriptor must still include a materialized tuple so that the backend can - * construct a valid row empty batch. - */ -public class EmptySetNode extends PlanNode { - public EmptySetNode(PlanNodeId id, ArrayList<TupleId> tupleIds) { - super(id, tupleIds, "EMPTYSET"); - Preconditions.checkArgument(tupleIds.size() > 0); - } - - @Override - public void computeStats(Analyzer analyzer) { - avgRowSize_ = 0; - cardinality_ = 0; - perHostMemCost_ = 0; - numNodes_ = 1; - } - - @Override - public void init(Analyzer analyzer) { - Preconditions.checkState(conjuncts_.isEmpty()); - // If the physical output tuple produced by an AnalyticEvalNode wasn't created - // the logical output tuple is returned by getMaterializedTupleIds(). It needs - // to be set as materialized (even though it isn't) to avoid failing precondition - // checks generating the thrift for slot refs that may reference this tuple. - for (TupleId id: tupleIds_) analyzer.getTupleDesc(id).setIsMaterialized(true); - computeMemLayout(analyzer); - computeStats(analyzer); - } - - @Override - protected String getNodeExplainString(String prefix, String detailPrefix, - TExplainLevel detailLevel) { - return String.format("%s%s:%s\n", prefix, id_.toString(), displayName_); - } - - @Override - protected void toThrift(TPlanNode msg) { - msg.node_type = TPlanNodeType.EMPTY_SET_NODE; - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/ExchangeNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/ExchangeNode.java b/fe/src/main/java/com/cloudera/impala/planner/ExchangeNode.java deleted file mode 100644 index eeef5fe..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/ExchangeNode.java +++ /dev/null @@ -1,204 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.impala.analysis.Analyzer; -import com.cloudera.impala.analysis.Expr; -import com.cloudera.impala.analysis.SortInfo; -import com.cloudera.impala.analysis.TupleId; -import com.cloudera.impala.common.ImpalaException; -import com.cloudera.impala.thrift.TExchangeNode; -import com.cloudera.impala.thrift.TExplainLevel; -import com.cloudera.impala.thrift.TPlanNode; -import com.cloudera.impala.thrift.TPlanNodeType; -import com.cloudera.impala.thrift.TSortInfo; -import com.google.common.base.Preconditions; - -/** - * Receiver side of a 1:n data stream. Logically, an ExchangeNode consumes the data - * produced by its children. For each of the sending child nodes the actual data - * transmission is performed by the DataStreamSink of the PlanFragment housing - * that child node. Typically, an ExchangeNode only has a single sender child but, - * e.g., for distributed union queries an ExchangeNode may have one sender child per - * union operand. - * - * If a (optional) SortInfo field is set, the ExchangeNode will merge its - * inputs on the parameters specified in the SortInfo object. It is assumed that the - * inputs are also sorted individually on the same SortInfo parameter. - */ -public class ExchangeNode extends PlanNode { - private final static Logger LOG = LoggerFactory.getLogger(ExchangeNode.class); - - // The serialization overhead per tuple in bytes when sent over an exchange. - // Currently it accounts only for the tuple_offset entry per tuple (4B) in a - // BE TRowBatch. If we modify the RowBatch serialization, then we need to - // update this constant as well. - private static final double PER_TUPLE_SERIALIZATION_OVERHEAD = 4.0; - - // The parameters based on which sorted input streams are merged by this - // exchange node. Null if this exchange does not merge sorted streams - private SortInfo mergeInfo_; - - // Offset after which the exchange begins returning rows. Currently valid - // only if mergeInfo_ is non-null, i.e. this is a merging exchange node. - private long offset_; - - public ExchangeNode(PlanNodeId id, PlanNode input) { - super(id, "EXCHANGE"); - offset_ = 0; - children_.add(input); - // Only apply the limit at the receiver if there are multiple senders. - if (input.getFragment().isPartitioned()) limit_ = input.limit_; - computeTupleIds(); - } - - @Override - public void computeTupleIds() { - clearTupleIds(); - tupleIds_.addAll(getChild(0).getTupleIds()); - tblRefIds_.addAll(getChild(0).getTblRefIds()); - nullableTupleIds_.addAll(getChild(0).getNullableTupleIds()); - } - - @Override - public void init(Analyzer analyzer) throws ImpalaException { - super.init(analyzer); - Preconditions.checkState(conjuncts_.isEmpty()); - } - - @Override - public void computeStats(Analyzer analyzer) { - Preconditions.checkState(!children_.isEmpty(), - "ExchangeNode must have at least one child"); - cardinality_ = 0; - for (PlanNode child: children_) { - if (child.getCardinality() == -1) { - cardinality_ = -1; - break; - } - cardinality_ = addCardinalities(cardinality_, child.getCardinality()); - } - - if (hasLimit()) { - if (cardinality_ == -1) { - cardinality_ = limit_; - } else { - cardinality_ = Math.min(limit_, cardinality_); - } - } - - // Apply the offset correction if there's a valid cardinality - if (cardinality_ > -1) { - cardinality_ = Math.max(0, cardinality_ - offset_); - } - - // Pick the max numNodes_ and avgRowSize_ of all children. - numNodes_ = Integer.MIN_VALUE; - avgRowSize_ = Integer.MIN_VALUE; - for (PlanNode child: children_) { - numNodes_ = Math.max(child.numNodes_, numNodes_); - avgRowSize_ = Math.max(child.avgRowSize_, avgRowSize_); - } - } - - /** - * Set the parameters used to merge sorted input streams. This can be called - * after init(). - */ - public void setMergeInfo(SortInfo info, long offset) { - mergeInfo_ = info; - offset_ = offset; - displayName_ = "MERGING-EXCHANGE"; - } - - @Override - protected String getNodeExplainString(String prefix, String detailPrefix, - TExplainLevel detailLevel) { - StringBuilder output = new StringBuilder(); - output.append(String.format("%s%s [%s]\n", prefix, - getDisplayLabel(), getDisplayLabelDetail())); - - if (offset_ > 0) { - output.append(detailPrefix + "offset: ").append(offset_).append("\n"); - } - - if (mergeInfo_ != null && detailLevel.ordinal() > TExplainLevel.MINIMAL.ordinal()) { - output.append(detailPrefix + "order by: "); - for (int i = 0; i < mergeInfo_.getOrderingExprs().size(); ++i) { - if (i > 0) output.append(", "); - output.append(mergeInfo_.getOrderingExprs().get(i).toSql() + " "); - output.append(mergeInfo_.getIsAscOrder().get(i) ? "ASC" : "DESC"); - - Boolean nullsFirstParam = mergeInfo_.getNullsFirstParams().get(i); - if (nullsFirstParam != null) { - output.append(nullsFirstParam ? " NULLS FIRST" : " NULLS LAST"); - } - } - output.append("\n"); - } - return output.toString(); - } - - @Override - protected String getDisplayLabelDetail() { - // For the non-fragmented explain levels, print the data partition - // of the data stream sink that sends to this exchange node. - Preconditions.checkState(!children_.isEmpty()); - DataSink sink = getChild(0).getFragment().getSink(); - if (sink == null) return ""; - Preconditions.checkState(sink instanceof DataStreamSink); - DataStreamSink streamSink = (DataStreamSink) sink; - if (!streamSink.getOutputPartition().isPartitioned() && - fragment_.isPartitioned()) { - // If the output of the sink is not partitioned but the target fragment is - // partitioned, then the data exchange is broadcast. - return "BROADCAST"; - } else { - return streamSink.getOutputPartition().getExplainString(); - } - } - - /** - * Returns the average size of rows produced by 'exchInput' when serialized for - * being sent through an exchange. - */ - public static double getAvgSerializedRowSize(PlanNode exchInput) { - return exchInput.getAvgRowSize() + - (exchInput.getTupleIds().size() * PER_TUPLE_SERIALIZATION_OVERHEAD); - } - - @Override - protected void toThrift(TPlanNode msg) { - msg.node_type = TPlanNodeType.EXCHANGE_NODE; - msg.exchange_node = new TExchangeNode(); - for (TupleId tid: tupleIds_) { - msg.exchange_node.addToInput_row_tuples(tid.asInt()); - } - - if (mergeInfo_ != null) { - TSortInfo sortInfo = new TSortInfo( - Expr.treesToThrift(mergeInfo_.getOrderingExprs()), mergeInfo_.getIsAscOrder(), - mergeInfo_.getNullsFirst()); - msg.exchange_node.setSort_info(sortInfo); - msg.exchange_node.setOffset(offset_); - } - } -}
