http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/HBaseScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/HBaseScanNode.java b/fe/src/main/java/com/cloudera/impala/planner/HBaseScanNode.java deleted file mode 100644 index e31372d..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/HBaseScanNode.java +++ /dev/null @@ -1,510 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.filter.CompareFilter; -import org.apache.hadoop.hbase.util.Bytes; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.impala.analysis.Analyzer; -import com.cloudera.impala.analysis.BinaryPredicate; -import com.cloudera.impala.analysis.Expr; -import com.cloudera.impala.analysis.SlotDescriptor; -import com.cloudera.impala.analysis.StringLiteral; -import com.cloudera.impala.analysis.TupleDescriptor; -import com.cloudera.impala.catalog.HBaseColumn; -import com.cloudera.impala.catalog.HBaseTable; -import com.cloudera.impala.catalog.PrimitiveType; -import com.cloudera.impala.catalog.Type; -import com.cloudera.impala.common.ImpalaException; -import com.cloudera.impala.common.InternalException; -import com.cloudera.impala.common.Pair; -import com.cloudera.impala.service.FeSupport; -import com.cloudera.impala.thrift.TColumnValue; -import com.cloudera.impala.thrift.TExplainLevel; -import com.cloudera.impala.thrift.THBaseFilter; -import com.cloudera.impala.thrift.THBaseKeyRange; -import com.cloudera.impala.thrift.THBaseScanNode; -import com.cloudera.impala.thrift.TNetworkAddress; -import com.cloudera.impala.thrift.TPlanNode; -import com.cloudera.impala.thrift.TPlanNodeType; -import com.cloudera.impala.thrift.TQueryOptions; -import com.cloudera.impala.thrift.TScanRange; -import com.cloudera.impala.thrift.TScanRangeLocation; -import com.cloudera.impala.thrift.TScanRangeLocations; -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -/** - * Full scan of an HBase table. - * Only families/qualifiers specified in TupleDescriptor will be retrieved in the backend. - */ -public class HBaseScanNode extends ScanNode { - private final static Logger LOG = LoggerFactory.getLogger(HBaseScanNode.class); - private final TupleDescriptor desc_; - - // One range per clustering column. The range bounds are expected to be constants. - // A null entry means there's no range restriction for that particular key. - // If keyRanges is non-null it always contains as many entries as there are clustering - // cols. - private List<ValueRange> keyRanges_; - - // derived from keyRanges_; empty means unbounded; - // initialize start/stopKey_ to be unbounded. - private byte[] startKey_ = HConstants.EMPTY_START_ROW; - private byte[] stopKey_ = HConstants.EMPTY_END_ROW; - - // True if this scan node is not going to scan anything. If the row key filter - // evaluates to null, or if the lower bound > upper bound, then this scan node won't - // scan at all. - private boolean isEmpty_ = false; - - // List of HBase Filters for generating thrift message. Filled in finalize(). - private final List<THBaseFilter> filters_ = new ArrayList<THBaseFilter>(); - - // The suggested value for "hbase.client.scan.setCaching", which batches maxCaching - // rows per fetch request to the HBase region server. If the value is too high, - // then the hbase region server will have a hard time (GC pressure and long response - // times). If the value is too small, then there will be extra trips to the hbase - // region server. - // Default to 1024 and update it based on row size estimate such that each batch size - // won't exceed 500MB. - private final static int MAX_HBASE_FETCH_BATCH_SIZE = 500 * 1024 * 1024; - private final static int DEFAULT_SUGGESTED_CACHING = 1024; - private int suggestedCaching_ = DEFAULT_SUGGESTED_CACHING; - - // HBase config; Common across all object instance. - private static Configuration hbaseConf_ = HBaseConfiguration.create(); - - public HBaseScanNode(PlanNodeId id, TupleDescriptor desc) { - super(id, desc, "SCAN HBASE"); - desc_ = desc; - } - - public void setKeyRanges(List<ValueRange> keyRanges) { - Preconditions.checkNotNull(keyRanges); - keyRanges_ = keyRanges; - } - - @Override - public void init(Analyzer analyzer) throws ImpalaException { - checkForSupportedFileFormats(); - assignConjuncts(analyzer); - conjuncts_ = orderConjunctsByCost(conjuncts_); - setStartStopKey(analyzer); - // Convert predicates to HBase filters_. - createHBaseFilters(analyzer); - - // materialize slots in remaining conjuncts_ - analyzer.materializeSlots(conjuncts_); - computeMemLayout(analyzer); - computeScanRangeLocations(analyzer); - - // Call computeStats() after materializing slots and computing the mem layout. - computeStats(analyzer); - } - - /** - * Convert keyRanges_ to startKey_ and stopKey_. - * If ValueRange is not null, transform it into start/stopKey_ by evaluating the - * expression. Analysis has checked that the expression is string type. If the - * expression evaluates to null, then there's nothing to scan because Hbase row key - * cannot be null. - * At present, we only do row key filtering for string-mapped keys. String-mapped keys - * are always encded as ascii. - * ValueRange is null if there is no predicate on the row-key. - */ - private void setStartStopKey(Analyzer analyzer) throws InternalException { - Preconditions.checkNotNull(keyRanges_); - Preconditions.checkState(keyRanges_.size() == 1); - - ValueRange rowRange = keyRanges_.get(0); - if (rowRange != null) { - if (rowRange.getLowerBound() != null) { - Preconditions.checkState(rowRange.getLowerBound().isConstant()); - Preconditions.checkState( - rowRange.getLowerBound().getType().equals(Type.STRING)); - TColumnValue val = FeSupport.EvalConstExpr(rowRange.getLowerBound(), - analyzer.getQueryCtx()); - if (!val.isSetString_val()) { - // lower bound is null. - isEmpty_ = true; - return; - } else { - startKey_ = convertToBytes(val.getString_val(), - !rowRange.getLowerBoundInclusive()); - } - } - if (rowRange.getUpperBound() != null) { - Preconditions.checkState(rowRange.getUpperBound().isConstant()); - Preconditions.checkState( - rowRange.getUpperBound().getType().equals(Type.STRING)); - TColumnValue val = FeSupport.EvalConstExpr(rowRange.getUpperBound(), - analyzer.getQueryCtx()); - if (!val.isSetString_val()) { - // upper bound is null. - isEmpty_ = true; - return; - } else { - stopKey_ = convertToBytes(val.getString_val(), - rowRange.getUpperBoundInclusive()); - } - } - } - - boolean endKeyIsEndOfTable = Bytes.equals(stopKey_, HConstants.EMPTY_END_ROW); - if ((Bytes.compareTo(startKey_, stopKey_) > 0) && !endKeyIsEndOfTable) { - // Lower bound is greater than upper bound. - isEmpty_ = true; - } - } - - /** - * Also sets suggestedCaching_. - */ - @Override - public void computeStats(Analyzer analyzer) { - super.computeStats(analyzer); - HBaseTable tbl = (HBaseTable) desc_.getTable(); - - ValueRange rowRange = keyRanges_.get(0); - if (isEmpty_) { - cardinality_ = 0; - } else if (rowRange != null && rowRange.isEqRange()) { - cardinality_ = 1; - } else { - // Set maxCaching so that each fetch from hbase won't return a batch of more than - // MAX_HBASE_FETCH_BATCH_SIZE bytes. - Pair<Long, Long> estimate = tbl.getEstimatedRowStats(startKey_, stopKey_); - cardinality_ = estimate.first.longValue(); - if (estimate.second.longValue() > 0) { - suggestedCaching_ = (int) - Math.max(MAX_HBASE_FETCH_BATCH_SIZE / estimate.second.longValue(), 1); - } - } - inputCardinality_ = cardinality_; - - cardinality_ *= computeSelectivity(); - cardinality_ = Math.max(1, cardinality_); - cardinality_ = capAtLimit(cardinality_); - LOG.debug("computeStats HbaseScan: cardinality=" + Long.toString(cardinality_)); - - // TODO: take actual regions into account - numNodes_ = tbl.getNumNodes(); - LOG.debug("computeStats HbaseScan: #nodes=" + Integer.toString(numNodes_)); - } - - @Override - protected String debugString() { - HBaseTable tbl = (HBaseTable) desc_.getTable(); - return Objects.toStringHelper(this) - .add("tid", desc_.getId().asInt()) - .add("hiveTblName", tbl.getFullName()) - .add("hbaseTblName", tbl.getHBaseTableName()) - .add("startKey", ByteBuffer.wrap(startKey_).toString()) - .add("stopKey", ByteBuffer.wrap(stopKey_).toString()) - .add("isEmpty", isEmpty_) - .addValue(super.debugString()) - .toString(); - } - - // We convert predicates of the form <slotref> op <constant> where slotref is of - // type string to HBase filters. All these predicates are also evaluated at - // the HBaseScanNode. To properly filter out NULL values HBaseScanNode treats all - // predicates as disjunctive, thereby requiring re-evaluation when there are multiple - // attributes. We explicitly materialize the referenced slots, otherwise our hbase - // scans don't return correct data. - // TODO: expand this to generate nested filter lists for arbitrary conjunctions - // and disjunctions. - private void createHBaseFilters(Analyzer analyzer) { - for (Expr e: conjuncts_) { - // We only consider binary predicates - if (!(e instanceof BinaryPredicate)) continue; - BinaryPredicate bp = (BinaryPredicate) e; - CompareFilter.CompareOp hbaseOp = impalaOpToHBaseOp(bp.getOp()); - // Ignore unsupported ops - if (hbaseOp == null) continue; - - for (SlotDescriptor slot: desc_.getSlots()) { - // Only push down predicates on string columns - if (slot.getType().getPrimitiveType() != PrimitiveType.STRING) continue; - - Expr bindingExpr = bp.getSlotBinding(slot.getId()); - if (bindingExpr == null || !(bindingExpr instanceof StringLiteral)) continue; - - StringLiteral literal = (StringLiteral) bindingExpr; - HBaseColumn col = (HBaseColumn) slot.getColumn(); - filters_.add(new THBaseFilter( - col.getColumnFamily(), col.getColumnQualifier(), - (byte) hbaseOp.ordinal(), literal.getUnescapedValue())); - analyzer.materializeSlots(Lists.newArrayList(e)); - } - } - } - - @Override - protected void toThrift(TPlanNode msg) { - msg.node_type = TPlanNodeType.HBASE_SCAN_NODE; - HBaseTable tbl = (HBaseTable) desc_.getTable(); - msg.hbase_scan_node = - new THBaseScanNode(desc_.getId().asInt(), tbl.getHBaseTableName()); - if (!filters_.isEmpty()) { - msg.hbase_scan_node.setFilters(filters_); - } - msg.hbase_scan_node.setSuggested_max_caching(suggestedCaching_); - } - - /** - * We create a TScanRange for each region server that contains at least one - * relevant region, and the created TScanRange will contain all the relevant regions - * of that region server. - */ - private void computeScanRangeLocations(Analyzer analyzer) { - scanRanges_ = Lists.newArrayList(); - - // For empty scan node, return an empty list. - if (isEmpty_) return; - - // Retrieve relevant HBase regions and their region servers - HBaseTable tbl = (HBaseTable) desc_.getTable(); - org.apache.hadoop.hbase.client.Table hbaseTbl = null; - List<HRegionLocation> regionsLoc; - try { - hbaseTbl = tbl.getHBaseTable(); - regionsLoc = HBaseTable.getRegionsInRange(hbaseTbl, startKey_, stopKey_); - hbaseTbl.close(); - } catch (IOException e) { - throw new RuntimeException( - "couldn't retrieve HBase table (" + tbl.getHBaseTableName() + ") info:\n" - + e.getMessage(), e); - } - - // Convert list of HRegionLocation to Map<hostport, List<HRegionLocation>>. - // The List<HRegionLocations>'s end up being sorted by start key/end key, because - // regionsLoc is sorted that way. - Map<String, List<HRegionLocation>> locationMap = Maps.newHashMap(); - for (HRegionLocation regionLoc: regionsLoc) { - String locHostPort = regionLoc.getHostnamePort(); - if (locationMap.containsKey(locHostPort)) { - locationMap.get(locHostPort).add(regionLoc); - } else { - locationMap.put(locHostPort, Lists.newArrayList(regionLoc)); - } - } - - for (Map.Entry<String, List<HRegionLocation>> locEntry: locationMap.entrySet()) { - // HBaseTableScanner(backend) initializes a result scanner for each key range. - // To minimize # of result scanner re-init, create only a single HBaseKeyRange - // for all adjacent regions on this server. - THBaseKeyRange keyRange = null; - byte[] prevEndKey = null; - for (HRegionLocation regionLoc: locEntry.getValue()) { - byte[] curRegStartKey = regionLoc.getRegionInfo().getStartKey(); - byte[] curRegEndKey = regionLoc.getRegionInfo().getEndKey(); - if (prevEndKey != null && - Bytes.compareTo(prevEndKey, curRegStartKey) == 0) { - // the current region starts where the previous one left off; - // extend the key range - setKeyRangeEnd(keyRange, curRegEndKey); - } else { - // create a new HBaseKeyRange (and TScanRange2/TScanRangeLocations to go - // with it). - keyRange = new THBaseKeyRange(); - setKeyRangeStart(keyRange, curRegStartKey); - setKeyRangeEnd(keyRange, curRegEndKey); - - TScanRangeLocations scanRangeLocation = new TScanRangeLocations(); - TNetworkAddress networkAddress = addressToTNetworkAddress(locEntry.getKey()); - scanRangeLocation.addToLocations( - new TScanRangeLocation(analyzer.getHostIndex().getIndex(networkAddress))); - scanRanges_.add(scanRangeLocation); - - TScanRange scanRange = new TScanRange(); - scanRange.setHbase_key_range(keyRange); - scanRangeLocation.setScan_range(scanRange); - } - prevEndKey = curRegEndKey; - } - } - } - - /** - * Set the start key of keyRange using the provided key, bounded by startKey_ - * @param keyRange the keyRange to be updated - * @param rangeStartKey the start key value to be set to - */ - private void setKeyRangeStart(THBaseKeyRange keyRange, byte[] rangeStartKey) { - keyRange.unsetStartKey(); - // use the max(startKey, rangeStartKey) for scan start - if (!Bytes.equals(rangeStartKey, HConstants.EMPTY_START_ROW) || - !Bytes.equals(startKey_, HConstants.EMPTY_START_ROW)) { - byte[] partStart = (Bytes.compareTo(rangeStartKey, startKey_) < 0) ? - startKey_ : rangeStartKey; - keyRange.setStartKey(Bytes.toString(partStart)); - } - } - - /** - * Set the end key of keyRange using the provided key, bounded by stopKey_ - * @param keyRange the keyRange to be updated - * @param rangeEndKey the end key value to be set to - */ - private void setKeyRangeEnd(THBaseKeyRange keyRange, byte[] rangeEndKey) { - keyRange.unsetStopKey(); - // use the min(stopkey, regionStopKey) for scan stop - if (!Bytes.equals(rangeEndKey, HConstants.EMPTY_END_ROW) || - !Bytes.equals(stopKey_, HConstants.EMPTY_END_ROW)) { - if (Bytes.equals(stopKey_, HConstants.EMPTY_END_ROW)) { - keyRange.setStopKey(Bytes.toString(rangeEndKey)); - } else if (Bytes.equals(rangeEndKey, HConstants.EMPTY_END_ROW)) { - keyRange.setStopKey(Bytes.toString(stopKey_)); - } else { - byte[] partEnd = (Bytes.compareTo(rangeEndKey, stopKey_) < 0) ? - rangeEndKey : stopKey_; - keyRange.setStopKey(Bytes.toString(partEnd)); - } - } - } - - @Override - protected String getNodeExplainString(String prefix, String detailPrefix, - TExplainLevel detailLevel) { - HBaseTable table = (HBaseTable) desc_.getTable(); - StringBuilder output = new StringBuilder(); - if (isEmpty_) { - output.append(prefix + "empty scan node\n"); - return output.toString(); - } - String aliasStr = ""; - if (!table.getFullName().equalsIgnoreCase(desc_.getAlias()) && - !table.getName().equalsIgnoreCase(desc_.getAlias())) { - aliasStr = " " + desc_.getAlias(); - } - output.append(String.format("%s%s:%s [%s%s]\n", prefix, id_.toString(), - displayName_, table.getFullName(), aliasStr)); - if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) { - if (!Bytes.equals(startKey_, HConstants.EMPTY_START_ROW)) { - output.append(detailPrefix + "start key: " + printKey(startKey_) + "\n"); - } - if (!Bytes.equals(stopKey_, HConstants.EMPTY_END_ROW)) { - output.append(detailPrefix + "stop key: " + printKey(stopKey_) + "\n"); - } - if (!filters_.isEmpty()) { - output.append(detailPrefix + "hbase filters:"); - if (filters_.size() == 1) { - THBaseFilter filter = filters_.get(0); - output.append(" " + filter.family + ":" + filter.qualifier + " " + - CompareFilter.CompareOp.values()[filter.op_ordinal].toString() + " " + - "'" + filter.filter_constant + "'"); - } else { - for (int i = 0; i < filters_.size(); ++i) { - THBaseFilter filter = filters_.get(i); - output.append("\n " + filter.family + ":" + filter.qualifier + " " + - CompareFilter.CompareOp.values()[filter.op_ordinal].toString() + " " + - "'" + filter.filter_constant + "'"); - } - } - output.append('\n'); - } - if (!conjuncts_.isEmpty()) { - output.append( - detailPrefix + "predicates: " + getExplainString(conjuncts_) + "\n"); - } - } - if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) { - output.append(getStatsExplainString(detailPrefix, detailLevel)); - output.append("\n"); - } - return output.toString(); - } - - /** - * Convert key into byte array and append a '\0' if 'nextKey' is true. - */ - private byte[] convertToBytes(String rowKey, boolean nextKey) { - byte[] keyBytes = Bytes.toBytes(rowKey); - if (!nextKey) { - return keyBytes; - } else { - // append \0 - return Arrays.copyOf(keyBytes, keyBytes.length + 1); - } - } - - /** - * Prints non-printable characters in escaped octal, otherwise outputs - * the characters. - */ - public static String printKey(byte[] key) { - StringBuilder result = new StringBuilder(); - for (int i = 0; i < key.length; ++i) { - if (!Character.isISOControl(key[i])) { - result.append((char) key[i]); - } else { - result.append("\\"); - result.append(Integer.toOctalString(key[i])); - } - } - return result.toString(); - } - - private static CompareFilter.CompareOp impalaOpToHBaseOp( - BinaryPredicate.Operator impalaOp) { - switch(impalaOp) { - case EQ: return CompareFilter.CompareOp.EQUAL; - case NE: return CompareFilter.CompareOp.NOT_EQUAL; - case GT: return CompareFilter.CompareOp.GREATER; - case GE: return CompareFilter.CompareOp.GREATER_OR_EQUAL; - case LT: return CompareFilter.CompareOp.LESS; - case LE: return CompareFilter.CompareOp.LESS_OR_EQUAL; - // TODO: Add support for pushing LIKE/REGEX down to HBase with a different Filter. - default: throw new IllegalArgumentException( - "HBase: Unsupported Impala compare operator: " + impalaOp); - } - } - - @Override - public void computeCosts(TQueryOptions queryOptions) { - // TODO: What's a good estimate of memory consumption? - perHostMemCost_ = 1024L * 1024L * 1024L; - } - - /** - * Returns the per-host upper bound of memory that any number of concurrent scan nodes - * will use. Used for estimating the per-host memory requirement of queries. - */ - public static long getPerHostMemUpperBound() { - // TODO: What's a good estimate of memory consumption? - return 1024L * 1024L * 1024L; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/HBaseTableSink.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/HBaseTableSink.java b/fe/src/main/java/com/cloudera/impala/planner/HBaseTableSink.java deleted file mode 100644 index 2a0d1b7..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/HBaseTableSink.java +++ /dev/null @@ -1,59 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - - -package com.cloudera.impala.planner; - -import com.cloudera.impala.catalog.Table; -import com.cloudera.impala.common.PrintUtils; -import com.cloudera.impala.thrift.TDataSink; -import com.cloudera.impala.thrift.TDataSinkType; -import com.cloudera.impala.thrift.TExplainLevel; -import com.cloudera.impala.thrift.TTableSink; -import com.cloudera.impala.thrift.TTableSinkType; - -/** - * Class used to represent a Sink that will transport - * data from a plan fragment into an HBase table using HTable. - */ -public class HBaseTableSink extends TableSink { - public HBaseTableSink(Table targetTable) { - super(targetTable, Op.INSERT); - } - - @Override - public String getExplainString(String prefix, String detailPrefix, - TExplainLevel explainLevel) { - StringBuilder output = new StringBuilder(); - output.append(prefix + "WRITE TO HBASE table=" + targetTable_.getFullName() + "\n"); - if (explainLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) { - output.append(PrintUtils.printHosts(detailPrefix, fragment_.getNumNodes())); - output.append(PrintUtils.printMemCost(" ", perHostMemCost_)); - output.append("\n"); - } - return output.toString(); - } - - @Override - protected TDataSink toThrift() { - TDataSink result = new TDataSink(TDataSinkType.TABLE_SINK); - TTableSink tTableSink = new TTableSink(targetTable_.getId().asInt(), - TTableSinkType.HBASE, sinkOp_.toThrift()); - result.table_sink = tTableSink; - return result; - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java b/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java deleted file mode 100644 index 906f732..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java +++ /dev/null @@ -1,193 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.impala.analysis.Analyzer; -import com.cloudera.impala.analysis.BinaryPredicate; -import com.cloudera.impala.analysis.Expr; -import com.cloudera.impala.analysis.ExprSubstitutionMap; -import com.cloudera.impala.analysis.JoinOperator; -import com.cloudera.impala.catalog.Type; -import com.cloudera.impala.common.AnalysisException; -import com.cloudera.impala.common.ImpalaException; -import com.cloudera.impala.common.InternalException; -import com.cloudera.impala.thrift.TEqJoinCondition; -import com.cloudera.impala.thrift.TExplainLevel; -import com.cloudera.impala.thrift.THashJoinNode; -import com.cloudera.impala.thrift.TPlanNode; -import com.cloudera.impala.thrift.TPlanNodeType; -import com.cloudera.impala.thrift.TQueryOptions; -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -/** - * Hash join between left child (outer) and right child (inner). One child must be the - * plan generated for a table ref. Typically, that is the right child, but due to join - * inversion (for outer/semi/cross joins) it could also be the left child. - */ -public class HashJoinNode extends JoinNode { - private final static Logger LOG = LoggerFactory.getLogger(HashJoinNode.class); - - public HashJoinNode(PlanNode outer, PlanNode inner, boolean isStraightJoin, - DistributionMode distrMode, JoinOperator joinOp, - List<BinaryPredicate> eqJoinConjuncts, List<Expr> otherJoinConjuncts) { - super(outer, inner, isStraightJoin, distrMode, joinOp, eqJoinConjuncts, - otherJoinConjuncts, "HASH JOIN"); - Preconditions.checkNotNull(eqJoinConjuncts); - Preconditions.checkState(joinOp_ != JoinOperator.CROSS_JOIN); - } - - @Override - public List<BinaryPredicate> getEqJoinConjuncts() { return eqJoinConjuncts_; } - - @Override - public void init(Analyzer analyzer) throws ImpalaException { - super.init(analyzer); - List<BinaryPredicate> newEqJoinConjuncts = Lists.newArrayList(); - ExprSubstitutionMap combinedChildSmap = getCombinedChildSmap(); - for (Expr c: eqJoinConjuncts_) { - BinaryPredicate eqPred = - (BinaryPredicate) c.substitute(combinedChildSmap, analyzer, false); - Type t0 = eqPred.getChild(0).getType(); - Type t1 = eqPred.getChild(1).getType(); - if (!t0.matchesType(t1)) { - // With decimal and char types, the child types do not have to match because - // the equality builtin handles it. However, they will not hash correctly so - // insert a cast. - boolean bothDecimal = t0.isDecimal() && t1.isDecimal(); - boolean bothString = t0.isStringType() && t1.isStringType(); - if (!bothDecimal && !bothString) { - throw new InternalException("Cannot compare " + - t0.toSql() + " to " + t1.toSql() + " in join predicate."); - } - Type compatibleType = Type.getAssignmentCompatibleType(t0, t1, false); - Preconditions.checkState(compatibleType.isDecimal() || - compatibleType.isStringType()); - try { - if (!t0.equals(compatibleType)) { - eqPred.setChild(0, eqPred.getChild(0).castTo(compatibleType)); - } - if (!t1.equals(compatibleType)) { - eqPred.setChild(1, eqPred.getChild(1).castTo(compatibleType)); - } - } catch (AnalysisException e) { - throw new InternalException("Should not happen", e); - } - } - Preconditions.checkState( - eqPred.getChild(0).getType().matchesType(eqPred.getChild(1).getType())); - BinaryPredicate newEqPred = new BinaryPredicate(eqPred.getOp(), - eqPred.getChild(0), eqPred.getChild(1)); - newEqPred.analyze(analyzer); - newEqJoinConjuncts.add(newEqPred); - } - eqJoinConjuncts_ = newEqJoinConjuncts; - orderJoinConjunctsByCost(); - computeStats(analyzer); - } - - @Override - protected String debugString() { - return Objects.toStringHelper(this) - .add("eqJoinConjuncts_", eqJoinConjunctsDebugString()) - .addValue(super.debugString()) - .toString(); - } - - private String eqJoinConjunctsDebugString() { - Objects.ToStringHelper helper = Objects.toStringHelper(this); - for (Expr entry: eqJoinConjuncts_) { - helper.add("lhs" , entry.getChild(0)).add("rhs", entry.getChild(1)); - } - return helper.toString(); - } - - @Override - protected void toThrift(TPlanNode msg) { - msg.node_type = TPlanNodeType.HASH_JOIN_NODE; - msg.hash_join_node = new THashJoinNode(); - msg.hash_join_node.join_op = joinOp_.toThrift(); - for (Expr entry: eqJoinConjuncts_) { - BinaryPredicate bp = (BinaryPredicate)entry; - TEqJoinCondition eqJoinCondition = - new TEqJoinCondition(bp.getChild(0).treeToThrift(), - bp.getChild(1).treeToThrift(), - bp.getOp() == BinaryPredicate.Operator.NOT_DISTINCT); - msg.hash_join_node.addToEq_join_conjuncts(eqJoinCondition); - } - for (Expr e: otherJoinConjuncts_) { - msg.hash_join_node.addToOther_join_conjuncts(e.treeToThrift()); - } - } - - @Override - protected String getNodeExplainString(String prefix, String detailPrefix, - TExplainLevel detailLevel) { - StringBuilder output = new StringBuilder(); - output.append(String.format("%s%s [%s]\n", prefix, getDisplayLabel(), - getDisplayLabelDetail())); - - if (detailLevel.ordinal() > TExplainLevel.STANDARD.ordinal()) { - if (joinTableId_.isValid()) { - output.append( - detailPrefix + "hash-table-id=" + joinTableId_.toString() + "\n"); - } - } - if (detailLevel.ordinal() > TExplainLevel.MINIMAL.ordinal()) { - output.append(detailPrefix + "hash predicates: "); - for (int i = 0; i < eqJoinConjuncts_.size(); ++i) { - Expr eqConjunct = eqJoinConjuncts_.get(i); - output.append(eqConjunct.toSql()); - if (i + 1 != eqJoinConjuncts_.size()) output.append(", "); - } - output.append("\n"); - if (!otherJoinConjuncts_.isEmpty()) { - output.append(detailPrefix + "other join predicates: ") - .append(getExplainString(otherJoinConjuncts_) + "\n"); - } - if (!conjuncts_.isEmpty()) { - output.append(detailPrefix + "other predicates: ") - .append(getExplainString(conjuncts_) + "\n"); - } - if (!runtimeFilters_.isEmpty()) { - output.append(detailPrefix + "runtime filters: "); - output.append(getRuntimeFilterExplainString(true)); - } - } - return output.toString(); - } - - @Override - public void computeCosts(TQueryOptions queryOptions) { - if (getChild(1).getCardinality() == -1 || getChild(1).getAvgRowSize() == -1 - || numNodes_ == 0) { - perHostMemCost_ = DEFAULT_PER_HOST_MEM; - return; - } - perHostMemCost_ = - (long) Math.ceil(getChild(1).cardinality_ * getChild(1).avgRowSize_ - * PlannerContext.HASH_TBL_SPACE_OVERHEAD); - if (distrMode_ == DistributionMode.PARTITIONED) perHostMemCost_ /= numNodes_; - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/HdfsPartitionFilter.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/HdfsPartitionFilter.java b/fe/src/main/java/com/cloudera/impala/planner/HdfsPartitionFilter.java deleted file mode 100644 index d1710aa..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/HdfsPartitionFilter.java +++ /dev/null @@ -1,127 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.impala.analysis.Analyzer; -import com.cloudera.impala.analysis.Expr; -import com.cloudera.impala.analysis.ExprSubstitutionMap; -import com.cloudera.impala.analysis.SlotDescriptor; -import com.cloudera.impala.analysis.SlotId; -import com.cloudera.impala.analysis.SlotRef; -import com.cloudera.impala.catalog.Column; -import com.cloudera.impala.catalog.HdfsPartition; -import com.cloudera.impala.catalog.HdfsTable; -import com.cloudera.impala.common.InternalException; -import com.cloudera.impala.service.FeSupport; -import com.cloudera.impala.thrift.TColumnValue; -import com.cloudera.impala.thrift.TResultRow; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -/** - * An HdfsPartitionFilter represents a predicate on the partition columns (or a subset) - * of a table. It can be evaluated at plan generation time against an HdfsPartition. - */ -public class HdfsPartitionFilter { - private final static Logger LOG = LoggerFactory.getLogger(HdfsPartitionFilter.class); - - private final Expr predicate_; - - // lhs exprs of smap used in isMatch() - private final ArrayList<SlotRef> lhsSlotRefs_ = Lists.newArrayList(); - // indices into Table.getColumns() - private final ArrayList<Integer> refdKeys_ = Lists.newArrayList(); - - public HdfsPartitionFilter(Expr predicate, HdfsTable tbl, Analyzer analyzer) { - predicate_ = predicate; - - // populate lhsSlotRefs_ and refdKeys_ - ArrayList<SlotId> refdSlots = Lists.newArrayList(); - predicate.getIds(null, refdSlots); - HashMap<Column, SlotDescriptor> slotDescsByCol = Maps.newHashMap(); - for (SlotId refdSlot: refdSlots) { - SlotDescriptor slotDesc = analyzer.getDescTbl().getSlotDesc(refdSlot); - slotDescsByCol.put(slotDesc.getColumn(), slotDesc); - } - - for (int i = 0; i < tbl.getNumClusteringCols(); ++i) { - Column col = tbl.getColumns().get(i); - SlotDescriptor slotDesc = slotDescsByCol.get(col); - if (slotDesc != null) { - lhsSlotRefs_.add(new SlotRef(slotDesc)); - refdKeys_.add(i); - } - } - Preconditions.checkState(lhsSlotRefs_.size() == refdKeys_.size()); - } - - /** - * Evaluate a filter against a batch of partitions and return the partition ids - * that pass the filter. - */ - public HashSet<Long> getMatchingPartitionIds(ArrayList<HdfsPartition> partitions, - Analyzer analyzer) throws InternalException { - HashSet<Long> result = new HashSet<Long>(); - // List of predicates to evaluate - ArrayList<Expr> predicates = new ArrayList<Expr>(partitions.size()); - long[] partitionIds = new long[partitions.size()]; - int indx = 0; - for (HdfsPartition p: partitions) { - predicates.add(buildPartitionPredicate(p, analyzer)); - partitionIds[indx++] = p.getId(); - } - // Evaluate the predicates - TResultRow results = FeSupport.EvalPredicateBatch(predicates, - analyzer.getQueryCtx()); - Preconditions.checkState(results.getColValsSize() == partitions.size()); - indx = 0; - for (TColumnValue val: results.getColVals()) { - if (val.isBool_val()) result.add(partitionIds[indx]); - ++indx; - } - return result; - } - - /** - * Construct a predicate for a given partition by substituting the SlotRefs - * for the partition cols with the respective partition-key values. - */ - private Expr buildPartitionPredicate(HdfsPartition partition, Analyzer analyzer) - throws InternalException { - // construct smap - ExprSubstitutionMap sMap = new ExprSubstitutionMap(); - for (int i = 0; i < refdKeys_.size(); ++i) { - sMap.put( - lhsSlotRefs_.get(i), partition.getPartitionValues().get(refdKeys_.get(i))); - } - - Expr literalPredicate = predicate_.substitute(sMap, analyzer, false); - LOG.trace("buildPartitionPredicate: " + literalPredicate.toSql() + " " + - literalPredicate.debugString()); - Preconditions.checkState(literalPredicate.isConstant()); - return literalPredicate; - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/HdfsPartitionPruner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/HdfsPartitionPruner.java b/fe/src/main/java/com/cloudera/impala/planner/HdfsPartitionPruner.java deleted file mode 100644 index 9606dc5..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/HdfsPartitionPruner.java +++ /dev/null @@ -1,475 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.Set; -import java.util.TreeMap; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.impala.analysis.Analyzer; -import com.cloudera.impala.analysis.BetweenPredicate; -import com.cloudera.impala.analysis.BinaryPredicate; -import com.cloudera.impala.analysis.BinaryPredicate.Operator; -import com.cloudera.impala.analysis.CompoundPredicate; -import com.cloudera.impala.analysis.Expr; -import com.cloudera.impala.analysis.InPredicate; -import com.cloudera.impala.analysis.IsNullPredicate; -import com.cloudera.impala.analysis.LiteralExpr; -import com.cloudera.impala.analysis.NullLiteral; -import com.cloudera.impala.analysis.SlotDescriptor; -import com.cloudera.impala.analysis.SlotId; -import com.cloudera.impala.analysis.SlotRef; -import com.cloudera.impala.analysis.TupleDescriptor; -import com.cloudera.impala.catalog.HdfsPartition; -import com.cloudera.impala.catalog.HdfsTable; -import com.cloudera.impala.common.AnalysisException; -import com.cloudera.impala.common.InternalException; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicates; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - - -/** - * HDFS partitions pruner provides a mechanism to filter out partitions of an HDFS - * table based on the conjuncts provided by the caller. - * - * The pruner is initialized with a TupleDescriptor for the slots being materialized. - * The prunePartitions() method is the external interface exposed by this class. It - * takes a list of conjuncts, loops through all the partitions and prunes them based - * on applicable conjuncts. It returns a list of partitions left after applying all - * the conjuncts and also removes the conjuncts which have been fully evaluated with - * the partition columns. - */ -public class HdfsPartitionPruner { - - private final static Logger LOG = LoggerFactory.getLogger(HdfsPartitionPruner.class); - - // Partition batch size used during partition pruning. - private final static int PARTITION_PRUNING_BATCH_SIZE = 1024; - - private final HdfsTable tbl_; - - private List<SlotId> partitionSlots_ = Lists.newArrayList(); - - public HdfsPartitionPruner(TupleDescriptor tupleDesc) { - Preconditions.checkState(tupleDesc.getTable() instanceof HdfsTable); - tbl_ = (HdfsTable)tupleDesc.getTable(); - - // Collect all the partitioning columns from TupleDescriptor. - for (SlotDescriptor slotDesc: tupleDesc.getSlots()) { - if (slotDesc.getColumn() == null) continue; - if (slotDesc.getColumn().getPosition() < tbl_.getNumClusteringCols()) { - partitionSlots_.add(slotDesc.getId()); - } - } - } - - /** - * Return a list of partitions left after applying the conjuncts. Please note - * that conjunts used for filtering will be removed from the list 'conjuncts'. - */ - public List<HdfsPartition> prunePartitions(Analyzer analyzer, List<Expr> conjuncts) - throws InternalException { - // Start with creating a collection of partition filters for the applicable conjuncts. - List<HdfsPartitionFilter> partitionFilters = Lists.newArrayList(); - // Conjuncts that can be evaluated from the partition key values. - List<Expr> simpleFilterConjuncts = Lists.newArrayList(); - - // Simple predicates (e.g. binary predicates of the form - // <SlotRef> <op> <LiteralExpr>) can be used to derive lists - // of matching partition ids directly from the partition key values. - // Split conjuncts among those that can be evaluated from partition - // key values and those that need to be evaluated in the BE. - Iterator<Expr> it = conjuncts.iterator(); - while (it.hasNext()) { - Expr conjunct = it.next(); - if (conjunct.isBoundBySlotIds(partitionSlots_)) { - // Check if the conjunct can be evaluated from the partition metadata. - // canEvalUsingPartitionMd() operates on a cloned conjunct which may get - // modified if it contains constant expressions. If the cloned conjunct - // cannot be evaluated from the partition metadata, the original unmodified - // conjuct is evaluated in the BE. - Expr clonedConjunct = conjunct.clone(); - if (canEvalUsingPartitionMd(clonedConjunct, analyzer)) { - simpleFilterConjuncts.add(Expr.pushNegationToOperands(clonedConjunct)); - } else { - partitionFilters.add(new HdfsPartitionFilter(conjunct, tbl_, analyzer)); - } - it.remove(); - } - } - - // Set of matching partition ids, i.e. partitions that pass all filters - HashSet<Long> matchingPartitionIds = null; - - // Evaluate the partition filters from the partition key values. - // The result is the intersection of the associated partition id sets. - for (Expr filter: simpleFilterConjuncts) { - // Evaluate the filter - HashSet<Long> matchingIds = evalSlotBindingFilter(filter); - if (matchingPartitionIds == null) { - matchingPartitionIds = matchingIds; - } else { - matchingPartitionIds.retainAll(matchingIds); - } - } - - // Check if we need to initialize the set of valid partition ids. - if (simpleFilterConjuncts.size() == 0) { - Preconditions.checkState(matchingPartitionIds == null); - matchingPartitionIds = Sets.newHashSet(tbl_.getPartitionIds()); - } - - // Evaluate the 'complex' partition filters in the BE. - evalPartitionFiltersInBe(partitionFilters, matchingPartitionIds, analyzer); - - // Populate the list of valid, non-empty partitions to process - List<HdfsPartition> results = Lists.newArrayList(); - Map<Long, HdfsPartition> partitionMap = tbl_.getPartitionMap(); - for (Long id: matchingPartitionIds) { - HdfsPartition partition = partitionMap.get(id); - Preconditions.checkNotNull(partition); - if (partition.hasFileDescriptors()) { - results.add(partition); - analyzer.getDescTbl().addReferencedPartition(tbl_, partition.getId()); - } - } - return results; - } - - /** - * Recursive function that checks if a given partition expr can be evaluated - * directly from the partition key values. If 'expr' contains any constant expressions, - * they are evaluated in the BE and are replaced by their corresponding results, as - * LiteralExprs. - */ - private boolean canEvalUsingPartitionMd(Expr expr, Analyzer analyzer) { - Preconditions.checkNotNull(expr); - if (expr instanceof BinaryPredicate) { - // Evaluate any constant expression in the BE - try { - expr.foldConstantChildren(analyzer); - } catch (AnalysisException e) { - LOG.error("Error evaluating constant expressions in the BE: " + e.getMessage()); - return false; - } - BinaryPredicate bp = (BinaryPredicate)expr; - SlotRef slot = bp.getBoundSlot(); - if (slot == null) return false; - Expr bindingExpr = bp.getSlotBinding(slot.getSlotId()); - if (bindingExpr == null || !bindingExpr.isLiteral()) return false; - return true; - } else if (expr instanceof CompoundPredicate) { - boolean res = canEvalUsingPartitionMd(expr.getChild(0), analyzer); - if (expr.getChild(1) != null) { - res &= canEvalUsingPartitionMd(expr.getChild(1), analyzer); - } - return res; - } else if (expr instanceof IsNullPredicate) { - // Check for SlotRef IS [NOT] NULL case - IsNullPredicate nullPredicate = (IsNullPredicate)expr; - return nullPredicate.getBoundSlot() != null; - } else if (expr instanceof InPredicate) { - // Evaluate any constant expressions in the BE - try { - expr.foldConstantChildren(analyzer); - } catch (AnalysisException e) { - LOG.error("Error evaluating constant expressions in the BE: " + e.getMessage()); - return false; - } - // Check for SlotRef [NOT] IN (Literal, ... Literal) case - SlotRef slot = ((InPredicate)expr).getBoundSlot(); - if (slot == null) return false; - for (int i = 1; i < expr.getChildren().size(); ++i) { - if (!(expr.getChild(i).isLiteral())) return false; - } - return true; - } else if (expr instanceof BetweenPredicate) { - return canEvalUsingPartitionMd(((BetweenPredicate) expr).getRewrittenPredicate(), - analyzer); - } - return false; - } - - /** - * Evaluate a BinaryPredicate filter on a partition column and return the - * ids of the matching partitions. An empty set is returned if there - * are no matching partitions. - */ - private HashSet<Long> evalBinaryPredicate(Expr expr) { - Preconditions.checkNotNull(expr); - Preconditions.checkState(expr instanceof BinaryPredicate); - boolean isSlotOnLeft = true; - if (expr.getChild(0).isLiteral()) isSlotOnLeft = false; - - // Get the operands - BinaryPredicate bp = (BinaryPredicate)expr; - SlotRef slot = bp.getBoundSlot(); - Preconditions.checkNotNull(slot); - Expr bindingExpr = bp.getSlotBinding(slot.getSlotId()); - Preconditions.checkNotNull(bindingExpr); - Preconditions.checkState(bindingExpr.isLiteral()); - LiteralExpr literal = (LiteralExpr)bindingExpr; - Operator op = bp.getOp(); - if ((literal instanceof NullLiteral) && (op != Operator.NOT_DISTINCT) - && (op != Operator.DISTINCT_FROM)) { - return Sets.newHashSet(); - } - - // Get the partition column position and retrieve the associated partition - // value metadata. - int partitionPos = slot.getDesc().getColumn().getPosition(); - TreeMap<LiteralExpr, HashSet<Long>> partitionValueMap = - tbl_.getPartitionValueMap(partitionPos); - if (partitionValueMap.isEmpty()) return Sets.newHashSet(); - - HashSet<Long> matchingIds = Sets.newHashSet(); - // Compute the matching partition ids - if (op == Operator.NOT_DISTINCT) { - // Case: SlotRef <=> Literal - if (literal instanceof NullLiteral) { - Set<Long> ids = tbl_.getNullPartitionIds(partitionPos); - if (ids != null) matchingIds.addAll(ids); - return matchingIds; - } - // Punt to equality case: - op = Operator.EQ; - } - if (op == Operator.EQ) { - // Case: SlotRef = Literal - HashSet<Long> ids = partitionValueMap.get(literal); - if (ids != null) matchingIds.addAll(ids); - return matchingIds; - } - if (op == Operator.DISTINCT_FROM) { - // Case: SlotRef IS DISTINCT FROM Literal - if (literal instanceof NullLiteral) { - matchingIds.addAll(tbl_.getPartitionIds()); - Set<Long> nullIds = tbl_.getNullPartitionIds(partitionPos); - matchingIds.removeAll(nullIds); - return matchingIds; - } else { - matchingIds.addAll(tbl_.getPartitionIds()); - HashSet<Long> ids = partitionValueMap.get(literal); - if (ids != null) matchingIds.removeAll(ids); - return matchingIds; - } - } - if (op == Operator.NE) { - // Case: SlotRef != Literal - matchingIds.addAll(tbl_.getPartitionIds()); - Set<Long> nullIds = tbl_.getNullPartitionIds(partitionPos); - matchingIds.removeAll(nullIds); - HashSet<Long> ids = partitionValueMap.get(literal); - if (ids != null) matchingIds.removeAll(ids); - return matchingIds; - } - - // Determine the partition key value range of this predicate. - NavigableMap<LiteralExpr, HashSet<Long>> rangeValueMap = null; - LiteralExpr firstKey = partitionValueMap.firstKey(); - LiteralExpr lastKey = partitionValueMap.lastKey(); - boolean upperInclusive = false; - boolean lowerInclusive = false; - LiteralExpr upperBoundKey = null; - LiteralExpr lowerBoundKey = null; - - if (((op == Operator.LE || op == Operator.LT) && isSlotOnLeft) || - ((op == Operator.GE || op == Operator.GT) && !isSlotOnLeft)) { - // Case: SlotRef <[=] Literal - if (literal.compareTo(firstKey) < 0) return Sets.newHashSet(); - if (op == Operator.LE || op == Operator.GE) upperInclusive = true; - - if (literal.compareTo(lastKey) <= 0) { - upperBoundKey = literal; - } else { - upperBoundKey = lastKey; - upperInclusive = true; - } - lowerBoundKey = firstKey; - lowerInclusive = true; - } else { - // Cases: SlotRef >[=] Literal - if (literal.compareTo(lastKey) > 0) return Sets.newHashSet(); - if (op == Operator.GE || op == Operator.LE) lowerInclusive = true; - - if (literal.compareTo(firstKey) >= 0) { - lowerBoundKey = literal; - } else { - lowerBoundKey = firstKey; - lowerInclusive = true; - } - upperBoundKey = lastKey; - upperInclusive = true; - } - - // Retrieve the submap that corresponds to the computed partition key - // value range. - rangeValueMap = partitionValueMap.subMap(lowerBoundKey, lowerInclusive, - upperBoundKey, upperInclusive); - // Compute the matching partition ids - for (HashSet<Long> idSet: rangeValueMap.values()) { - if (idSet != null) matchingIds.addAll(idSet); - } - return matchingIds; - } - - /** - * Evaluate an InPredicate filter on a partition column and return the ids of - * the matching partitions. - */ - private HashSet<Long> evalInPredicate(Expr expr) { - Preconditions.checkNotNull(expr); - Preconditions.checkState(expr instanceof InPredicate); - InPredicate inPredicate = (InPredicate)expr; - HashSet<Long> matchingIds = Sets.newHashSet(); - SlotRef slot = inPredicate.getBoundSlot(); - Preconditions.checkNotNull(slot); - int partitionPos = slot.getDesc().getColumn().getPosition(); - TreeMap<LiteralExpr, HashSet<Long>> partitionValueMap = - tbl_.getPartitionValueMap(partitionPos); - - if (inPredicate.isNotIn()) { - // Case: SlotRef NOT IN (Literal, ..., Literal) - // If there is a NullLiteral, return an empty set. - List<Expr> nullLiterals = Lists.newArrayList(); - inPredicate.collectAll(Predicates.instanceOf(NullLiteral.class), nullLiterals); - if (!nullLiterals.isEmpty()) return matchingIds; - matchingIds.addAll(tbl_.getPartitionIds()); - // Exclude partitions with null partition column values - Set<Long> nullIds = tbl_.getNullPartitionIds(partitionPos); - matchingIds.removeAll(nullIds); - } - // Compute the matching partition ids - for (int i = 1; i < inPredicate.getChildren().size(); ++i) { - LiteralExpr literal = (LiteralExpr)inPredicate.getChild(i); - HashSet<Long> idSet = partitionValueMap.get(literal); - if (idSet != null) { - if (inPredicate.isNotIn()) { - matchingIds.removeAll(idSet); - } else { - matchingIds.addAll(idSet); - } - } - } - return matchingIds; - } - - /** - * Evaluate an IsNullPredicate on a partition column and return the ids of the - * matching partitions. - */ - private HashSet<Long> evalIsNullPredicate(Expr expr) { - Preconditions.checkNotNull(expr); - Preconditions.checkState(expr instanceof IsNullPredicate); - HashSet<Long> matchingIds = Sets.newHashSet(); - IsNullPredicate nullPredicate = (IsNullPredicate)expr; - SlotRef slot = nullPredicate.getBoundSlot(); - Preconditions.checkNotNull(slot); - int partitionPos = slot.getDesc().getColumn().getPosition(); - Set<Long> nullPartitionIds = tbl_.getNullPartitionIds(partitionPos); - - if (nullPredicate.isNotNull()) { - matchingIds.addAll(tbl_.getPartitionIds()); - matchingIds.removeAll(nullPartitionIds); - } else { - matchingIds.addAll(nullPartitionIds); - } - return matchingIds; - } - - /** - * Evaluate a slot binding predicate on a partition key using the partition - * key values; return the matching partition ids. An empty set is returned - * if there are no matching partitions. This function can evaluate the following - * types of predicates: BinaryPredicate, CompoundPredicate, IsNullPredicate, - * InPredicate, and BetweenPredicate. - */ - private HashSet<Long> evalSlotBindingFilter(Expr expr) { - Preconditions.checkNotNull(expr); - if (expr instanceof BinaryPredicate) { - return evalBinaryPredicate(expr); - } else if (expr instanceof CompoundPredicate) { - HashSet<Long> leftChildIds = evalSlotBindingFilter(expr.getChild(0)); - CompoundPredicate cp = (CompoundPredicate)expr; - // NOT operators have been eliminated - Preconditions.checkState(cp.getOp() != CompoundPredicate.Operator.NOT); - if (cp.getOp() == CompoundPredicate.Operator.AND) { - HashSet<Long> rightChildIds = evalSlotBindingFilter(expr.getChild(1)); - leftChildIds.retainAll(rightChildIds); - } else if (cp.getOp() == CompoundPredicate.Operator.OR) { - HashSet<Long> rightChildIds = evalSlotBindingFilter(expr.getChild(1)); - leftChildIds.addAll(rightChildIds); - } - return leftChildIds; - } else if (expr instanceof InPredicate) { - return evalInPredicate(expr); - } else if (expr instanceof IsNullPredicate) { - return evalIsNullPredicate(expr); - } else if (expr instanceof BetweenPredicate) { - return evalSlotBindingFilter(((BetweenPredicate) expr).getRewrittenPredicate()); - } - return null; - } - - /** - * Evaluate a list of HdfsPartitionFilters in the BE. These are 'complex' - * filters that could not be evaluated from the partition key values. - */ - private void evalPartitionFiltersInBe(List<HdfsPartitionFilter> filters, - HashSet<Long> matchingPartitionIds, Analyzer analyzer) throws InternalException { - Map<Long, HdfsPartition> partitionMap = tbl_.getPartitionMap(); - // Set of partition ids that pass a filter - HashSet<Long> matchingIds = Sets.newHashSet(); - // Batch of partitions - ArrayList<HdfsPartition> partitionBatch = Lists.newArrayList(); - // Identify the partitions that pass all filters. - for (HdfsPartitionFilter filter: filters) { - // Iterate through the currently valid partitions - for (Long id: matchingPartitionIds) { - HdfsPartition p = partitionMap.get(id); - Preconditions.checkState( - p.getPartitionValues().size() == tbl_.getNumClusteringCols()); - // Add the partition to the current batch - partitionBatch.add(partitionMap.get(id)); - if (partitionBatch.size() == PARTITION_PRUNING_BATCH_SIZE) { - // Batch is full. Evaluate the predicates of this batch in the BE. - matchingIds.addAll(filter.getMatchingPartitionIds(partitionBatch, analyzer)); - partitionBatch.clear(); - } - } - // Check if there are any unprocessed partitions. - if (!partitionBatch.isEmpty()) { - matchingIds.addAll(filter.getMatchingPartitionIds(partitionBatch, analyzer)); - partitionBatch.clear(); - } - // Prune the partitions ids that didn't pass the filter - matchingPartitionIds.retainAll(matchingIds); - matchingIds.clear(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java b/fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java deleted file mode 100644 index 9b83902..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java +++ /dev/null @@ -1,677 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.impala.analysis.Analyzer; -import com.cloudera.impala.analysis.Expr; -import com.cloudera.impala.analysis.SlotDescriptor; -import com.cloudera.impala.analysis.TableRef; -import com.cloudera.impala.analysis.TupleDescriptor; -import com.cloudera.impala.analysis.TupleId; -import com.cloudera.impala.catalog.Column; -import com.cloudera.impala.catalog.HdfsFileFormat; -import com.cloudera.impala.catalog.HdfsPartition; -import com.cloudera.impala.catalog.HdfsPartition.FileBlock; -import com.cloudera.impala.catalog.HdfsTable; -import com.cloudera.impala.catalog.Type; -import com.cloudera.impala.common.ImpalaException; -import com.cloudera.impala.common.NotImplementedException; -import com.cloudera.impala.common.PrintUtils; -import com.cloudera.impala.common.RuntimeEnv; -import com.cloudera.impala.thrift.TExplainLevel; -import com.cloudera.impala.thrift.TExpr; -import com.cloudera.impala.thrift.THdfsFileBlock; -import com.cloudera.impala.thrift.THdfsFileSplit; -import com.cloudera.impala.thrift.THdfsScanNode; -import com.cloudera.impala.thrift.TNetworkAddress; -import com.cloudera.impala.thrift.TPlanNode; -import com.cloudera.impala.thrift.TPlanNodeType; -import com.cloudera.impala.thrift.TQueryOptions; -import com.cloudera.impala.thrift.TReplicaPreference; -import com.cloudera.impala.thrift.TScanRange; -import com.cloudera.impala.thrift.TScanRangeLocation; -import com.cloudera.impala.thrift.TScanRangeLocations; -import com.cloudera.impala.util.MembershipSnapshot; -import com.google.common.base.Joiner; -import com.google.common.base.Objects; -import com.google.common.base.Objects.ToStringHelper; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -/** - * Scan of a single single table. Currently limited to full-table scans. - * - * It's expected that the creator of this object has already done any necessary - * partition pruning before creating this object. In other words, the 'conjuncts' - * passed to the constructors are conjucts not fully evaluated by partition pruning - * and 'partitions' are the remaining partitions after pruning. - * - * TODO: pass in range restrictions. - */ -public class HdfsScanNode extends ScanNode { - private final static Logger LOG = LoggerFactory.getLogger(HdfsScanNode.class); - - // Read size of the backend I/O manager. Used in computeCosts(). - private final static long IO_MGR_BUFFER_SIZE = 8L * 1024L * 1024L; - - // Maximum number of I/O buffers per thread executing this scan. - private final static long MAX_IO_BUFFERS_PER_THREAD = 10; - - // Number of scanner threads per core executing this scan. - private final static int THREADS_PER_CORE = 3; - - // Factor capturing the worst-case deviation from a uniform distribution of scan ranges - // among nodes. The factor of 1.2 means that a particular node may have 20% more - // scan ranges than would have been estimated assuming a uniform distribution. - private final static double SCAN_RANGE_SKEW_FACTOR = 1.2; - - private final HdfsTable tbl_; - - // Partitions that are filtered in for scanning by the key ranges - private final List<HdfsPartition> partitions_; - - private final TReplicaPreference replicaPreference_; - private final boolean randomReplica_; - - // Total number of files from partitions_ - private long totalFiles_ = 0; - - // Total number of bytes from partitions_ - private long totalBytes_ = 0; - - // True if this scan node should use codegen for evaluting conjuncts. - private boolean codegenConjuncts_; - - // Conjuncts that can be evaluated while materializing the items (tuples) of - // collection-typed slots. Maps from tuple descriptor to the conjuncts bound by that - // tuple. Uses a linked hash map for consistent display in explain. - private final Map<TupleDescriptor, List<Expr>> collectionConjuncts_ = - Maps.newLinkedHashMap(); - - // Indicates corrupt table stats based on the number of non-empty scan ranges and - // numRows set to 0. Set in computeStats(). - private boolean hasCorruptTableStats_; - - // Number of header lines to skip at the beginning of each file of this table. Only set - // to values > 0 for hdfs text files. - private int skipHeaderLineCount_ = 0; - - /** - * Construct a node to scan given data files into tuples described by 'desc', - * with 'conjuncts' being the unevaluated conjuncts bound by the tuple and - * 'partitions' being the partitions which need to be included. Please see - * class comments above for details. - */ - public HdfsScanNode(PlanNodeId id, TupleDescriptor desc, List<Expr> conjuncts, - List<HdfsPartition> partitions, TableRef hdfsTblRef) { - super(id, desc, "SCAN HDFS"); - Preconditions.checkState(desc.getTable() instanceof HdfsTable); - tbl_ = (HdfsTable)desc.getTable(); - conjuncts_ = conjuncts; - partitions_ = partitions; - replicaPreference_ = hdfsTblRef.getReplicaPreference(); - randomReplica_ = hdfsTblRef.getRandomReplica(); - HdfsTable hdfsTable = (HdfsTable)hdfsTblRef.getTable(); - Preconditions.checkState(tbl_ == hdfsTable); - StringBuilder error = new StringBuilder(); - skipHeaderLineCount_ = tbl_.parseSkipHeaderLineCount(error); - if (error.length() > 0) { - // Any errors should already have been caught during analysis. - throw new IllegalStateException(error.toString()); - } - } - - @Override - protected String debugString() { - ToStringHelper helper = Objects.toStringHelper(this); - for (HdfsPartition partition: partitions_) { - helper.add("Partition " + partition.getId() + ":", partition.toString()); - } - return helper.addValue(super.debugString()).toString(); - } - - /** - * Populate collectionConjuncts_ and scanRanges_. - */ - @Override - public void init(Analyzer analyzer) throws ImpalaException { - conjuncts_ = orderConjunctsByCost(conjuncts_); - checkForSupportedFileFormats(); - - assignCollectionConjuncts(analyzer); - - computeMemLayout(analyzer); - - // compute scan range locations - computeScanRangeLocations(analyzer); - - // do this at the end so it can take all conjuncts and scan ranges into account - computeStats(analyzer); - - // TODO: do we need this? - assignedConjuncts_ = analyzer.getAssignedConjuncts(); - - // Decide whether codegen should be used for evaluating conjuncts. - checkForCodegen(analyzer); - } - - /** - * Throws if the table schema contains a complex type and we need to scan - * a partition that has a format for which we do not support complex types, - * regardless of whether a complex-typed column is actually referenced - * in the query. - */ - @Override - protected void checkForSupportedFileFormats() throws NotImplementedException { - Preconditions.checkNotNull(desc_); - Preconditions.checkNotNull(desc_.getTable()); - Column firstComplexTypedCol = null; - for (Column col: desc_.getTable().getColumns()) { - if (col.getType().isComplexType()) { - firstComplexTypedCol = col; - break; - } - } - if (firstComplexTypedCol == null) return; - - boolean referencesComplexTypedCol = false; - for (SlotDescriptor slotDesc: desc_.getSlots()) { - if (!slotDesc.isMaterialized()) continue; - if (slotDesc.getType().isComplexType() || slotDesc.getColumn() == null) { - referencesComplexTypedCol = true; - break; - } - } - - for (HdfsPartition part: partitions_) { - HdfsFileFormat format = part.getInputFormatDescriptor().getFileFormat(); - if (format.isComplexTypesSupported()) continue; - // If the file format allows querying just scalar typed columns and the query - // doesn't materialize any complex typed columns, it is allowed. - if (format.canSkipComplexTypes() && !referencesComplexTypedCol) { - continue; - } - String errSuffix = String.format( - "Complex types are supported for these file formats: %s", - Joiner.on(", ").join(HdfsFileFormat.complexTypesFormats())); - if (desc_.getTable().getNumClusteringCols() == 0) { - throw new NotImplementedException(String.format( - "Scan of table '%s' in format '%s' is not supported because the table " + - "has a column '%s' with a complex type '%s'.\n%s.", - desc_.getAlias(), format, firstComplexTypedCol.getName(), - firstComplexTypedCol.getType().toSql(), errSuffix)); - } - throw new NotImplementedException(String.format( - "Scan of partition '%s' in format '%s' of table '%s' is not supported " + - "because the table has a column '%s' with a complex type '%s'.\n%s.", - part.getPartitionName(), format, desc_.getAlias(), - firstComplexTypedCol.getName(), firstComplexTypedCol.getType().toSql(), - errSuffix)); - } - } - - public boolean isPartitionedTable() { - return desc_.getTable().getNumClusteringCols() > 0; - } - - /** - * Populates the collection conjuncts, materializes their required slots, and marks - * the conjuncts as assigned, if it is correct to do so. Some conjuncts may have to - * also be evaluated at a subsequent semi or outer join. - */ - private void assignCollectionConjuncts(Analyzer analyzer) { - collectionConjuncts_.clear(); - assignCollectionConjuncts(desc_, analyzer); - } - - /** - * Recursively collects and assigns conjuncts bound by tuples materialized in a - * collection-typed slot. - * - * Limitation: Conjuncts that must first be migrated into inline views and that cannot - * be captured by slot binding will not be assigned here, but in an UnnestNode. - * This limitation applies to conjuncts bound by inline-view slots that are backed by - * non-SlotRef exprs in the inline-view's select list. We only capture value transfers - * between slots, and not between arbitrary exprs. - * - * TODO for 2.3: The logic for gathering conjuncts and deciding which ones should be - * marked as assigned needs to be clarified and consolidated in one place. The code - * below is rather different from the code for assigning the top-level conjuncts in - * init() although the performed tasks is conceptually identical. Refactoring the - * assignment code is tricky/risky for now. - */ - private void assignCollectionConjuncts(TupleDescriptor tupleDesc, Analyzer analyzer) { - for (SlotDescriptor slotDesc: tupleDesc.getSlots()) { - if (!slotDesc.getType().isCollectionType()) continue; - Preconditions.checkNotNull(slotDesc.getItemTupleDesc()); - TupleDescriptor itemTupleDesc = slotDesc.getItemTupleDesc(); - TupleId itemTid = itemTupleDesc.getId(); - // First collect unassigned and binding predicates. Then remove redundant - // predicates based on slot equivalences and enforce slot equivalences by - // generating new predicates. - List<Expr> collectionConjuncts = - analyzer.getUnassignedConjuncts(Lists.newArrayList(itemTid)); - ArrayList<Expr> bindingPredicates = analyzer.getBoundPredicates(itemTid); - for (Expr boundPred: bindingPredicates) { - if (!collectionConjuncts.contains(boundPred)) collectionConjuncts.add(boundPred); - } - analyzer.createEquivConjuncts(itemTid, collectionConjuncts); - // Mark those conjuncts as assigned that do not also need to be evaluated by a - // subsequent semi or outer join. - for (Expr conjunct: collectionConjuncts) { - if (!analyzer.evalByJoin(conjunct)) analyzer.markConjunctAssigned(conjunct); - } - if (!collectionConjuncts.isEmpty()) { - analyzer.materializeSlots(collectionConjuncts); - collectionConjuncts_.put(itemTupleDesc, collectionConjuncts); - } - // Recursively look for collection-typed slots in nested tuple descriptors. - assignCollectionConjuncts(itemTupleDesc, analyzer); - } - } - - /** - * Computes scan ranges (hdfs splits) plus their storage locations, including volume - * ids, based on the given maximum number of bytes each scan range should scan. - */ - private void computeScanRangeLocations(Analyzer analyzer) { - long maxScanRangeLength = analyzer.getQueryCtx().getRequest().getQuery_options() - .getMax_scan_range_length(); - scanRanges_ = Lists.newArrayList(); - for (HdfsPartition partition: partitions_) { - Preconditions.checkState(partition.getId() >= 0); - for (HdfsPartition.FileDescriptor fileDesc: partition.getFileDescriptors()) { - for (THdfsFileBlock thriftBlock: fileDesc.getFileBlocks()) { - HdfsPartition.FileBlock block = FileBlock.fromThrift(thriftBlock); - List<Integer> replicaHostIdxs = block.getReplicaHostIdxs(); - if (replicaHostIdxs.size() == 0) { - // we didn't get locations for this block; for now, just ignore the block - // TODO: do something meaningful with that - continue; - } - // Collect the network address and volume ID of all replicas of this block. - List<TScanRangeLocation> locations = Lists.newArrayList(); - for (int i = 0; i < replicaHostIdxs.size(); ++i) { - TScanRangeLocation location = new TScanRangeLocation(); - // Translate from the host index (local to the HdfsTable) to network address. - Integer tableHostIdx = replicaHostIdxs.get(i); - TNetworkAddress networkAddress = - partition.getTable().getHostIndex().getEntry(tableHostIdx); - Preconditions.checkNotNull(networkAddress); - // Translate from network address to the global (to this request) host index. - Integer globalHostIdx = analyzer.getHostIndex().getIndex(networkAddress); - location.setHost_idx(globalHostIdx); - location.setVolume_id(block.getDiskId(i)); - location.setIs_cached(block.isCached(i)); - locations.add(location); - } - // create scan ranges, taking into account maxScanRangeLength - long currentOffset = block.getOffset(); - long remainingLength = block.getLength(); - while (remainingLength > 0) { - long currentLength = remainingLength; - if (maxScanRangeLength > 0 && remainingLength > maxScanRangeLength) { - currentLength = maxScanRangeLength; - } - TScanRange scanRange = new TScanRange(); - scanRange.setHdfs_file_split(new THdfsFileSplit( - fileDesc.getFileName(), currentOffset, currentLength, partition.getId(), - fileDesc.getFileLength(), fileDesc.getFileCompression(), - fileDesc.getModificationTime())); - TScanRangeLocations scanRangeLocations = new TScanRangeLocations(); - scanRangeLocations.scan_range = scanRange; - scanRangeLocations.locations = locations; - scanRanges_.add(scanRangeLocations); - remainingLength -= currentLength; - currentOffset += currentLength; - } - } - } - } - } - - /** - * Also computes totalBytes_, totalFiles_, numPartitionsMissingStats_, - * and sets hasCorruptTableStats_. - */ - @Override - public void computeStats(Analyzer analyzer) { - super.computeStats(analyzer); - LOG.debug("collecting partitions for table " + tbl_.getName()); - numPartitionsMissingStats_ = 0; - totalFiles_ = 0; - totalBytes_ = 0; - if (tbl_.getNumClusteringCols() == 0) { - cardinality_ = tbl_.getNumRows(); - if (cardinality_ < -1 || (cardinality_ == 0 && tbl_.getTotalHdfsBytes() > 0)) { - hasCorruptTableStats_ = true; - } - if (partitions_.isEmpty()) { - // Nothing to scan. Definitely a cardinality of 0 even if we have no stats. - cardinality_ = 0; - } else { - Preconditions.checkState(partitions_.size() == 1); - totalFiles_ += partitions_.get(0).getFileDescriptors().size(); - totalBytes_ += partitions_.get(0).getSize(); - } - } else { - cardinality_ = 0; - boolean hasValidPartitionCardinality = false; - for (HdfsPartition p: partitions_) { - // Check for corrupt table stats - if (p.getNumRows() < -1 || (p.getNumRows() == 0 && p.getSize() > 0)) { - hasCorruptTableStats_ = true; - } - // ignore partitions with missing stats in the hope they don't matter - // enough to change the planning outcome - if (p.getNumRows() > -1) { - cardinality_ = addCardinalities(cardinality_, p.getNumRows()); - hasValidPartitionCardinality = true; - } else { - ++numPartitionsMissingStats_; - } - totalFiles_ += p.getFileDescriptors().size(); - totalBytes_ += p.getSize(); - } - if (!partitions_.isEmpty() && !hasValidPartitionCardinality) { - // if none of the partitions knew its number of rows, we fall back on - // the table stats - cardinality_ = tbl_.getNumRows(); - } - } - // Adjust cardinality for all collections referenced along the tuple's path. - if (cardinality_ != -1) { - for (Type t: desc_.getPath().getMatchedTypes()) { - if (t.isCollectionType()) cardinality_ *= PlannerContext.AVG_COLLECTION_SIZE; - } - } - inputCardinality_ = cardinality_; - - // Sanity check scan node cardinality. - if (cardinality_ < -1) { - hasCorruptTableStats_ = true; - cardinality_ = -1; - } - - if (cardinality_ > 0) { - LOG.debug("cardinality_=" + Long.toString(cardinality_) + - " sel=" + Double.toString(computeSelectivity())); - cardinality_ = Math.round(cardinality_ * computeSelectivity()); - // IMPALA-2165: Avoid setting the cardinality to 0 after rounding. - cardinality_ = Math.max(cardinality_, 1); - } - cardinality_ = capAtLimit(cardinality_); - LOG.debug("computeStats HdfsScan: cardinality_=" + Long.toString(cardinality_)); - - computeNumNodes(analyzer, cardinality_); - LOG.debug("computeStats HdfsScan: #nodes=" + Integer.toString(numNodes_)); - } - - /** - * Estimate the number of impalad nodes that this scan node will execute on (which is - * ultimately determined by the scheduling done by the backend's SimpleScheduler). - * Assume that scan ranges that can be scheduled locally will be, and that scan - * ranges that cannot will be round-robined across the cluster. - */ - protected void computeNumNodes(Analyzer analyzer, long cardinality) { - Preconditions.checkNotNull(scanRanges_); - MembershipSnapshot cluster = MembershipSnapshot.getCluster(); - HashSet<TNetworkAddress> localHostSet = Sets.newHashSet(); - int totalNodes = 0; - int numLocalRanges = 0; - int numRemoteRanges = 0; - for (TScanRangeLocations range: scanRanges_) { - boolean anyLocal = false; - for (TScanRangeLocation loc: range.locations) { - TNetworkAddress dataNode = analyzer.getHostIndex().getEntry(loc.getHost_idx()); - if (cluster.contains(dataNode)) { - anyLocal = true; - // Use the full datanode address (including port) to account for the test - // minicluster where there are multiple datanodes and impalads on a single - // host. This assumes that when an impalad is colocated with a datanode, - // there are the same number of impalads as datanodes on this host in this - // cluster. - localHostSet.add(dataNode); - } - } - // This range has at least one replica with a colocated impalad, so assume it - // will be scheduled on one of those nodes. - if (anyLocal) { - ++numLocalRanges; - } else { - ++numRemoteRanges; - } - // Approximate the number of nodes that will execute locally assigned ranges to - // be the smaller of the number of locally assigned ranges and the number of - // hosts that hold block replica for those ranges. - int numLocalNodes = Math.min(numLocalRanges, localHostSet.size()); - // The remote ranges are round-robined across all the impalads. - int numRemoteNodes = Math.min(numRemoteRanges, cluster.numNodes()); - // The local and remote assignments may overlap, but we don't know by how much so - // conservatively assume no overlap. - totalNodes = Math.min(numLocalNodes + numRemoteNodes, cluster.numNodes()); - // Exit early if all hosts have a scan range assignment, to avoid extraneous work - // in case the number of scan ranges dominates the number of nodes. - if (totalNodes == cluster.numNodes()) break; - } - // Tables can reside on 0 nodes (empty table), but a plan node must always be - // executed on at least one node. - numNodes_ = (cardinality == 0 || totalNodes == 0) ? 1 : totalNodes; - LOG.debug("computeNumNodes totalRanges=" + scanRanges_.size() + - " localRanges=" + numLocalRanges + " remoteRanges=" + numRemoteRanges + - " localHostSet.size=" + localHostSet.size() + - " clusterNodes=" + cluster.numNodes()); - } - - /** - * Approximate the cost of evaluating all conjuncts bound by this node by - * aggregating total number of nodes in expression trees of all conjuncts. - */ - private int computeConjunctsCost() { - int cost = 0; - for (Expr expr: getConjuncts()) { - cost += expr.numNodes(); - } - for (List<Expr> exprs: collectionConjuncts_.values()) { - for (Expr expr: exprs) { - cost += expr.numNodes(); - } - } - return cost; - } - - /** - * Scan node is not a codegen-enabled operator. Decide whether to use codegen for - * conjuncts evaluation by estimating the cost of interpretation. - */ - private void checkForCodegen(Analyzer analyzer) { - long conjunctsCost = computeConjunctsCost(); - long inputCardinality = getInputCardinality(); - long threshold = - analyzer.getQueryCtx().getRequest().query_options.scan_node_codegen_threshold; - if (inputCardinality == -1) { - codegenConjuncts_ = conjunctsCost > 0; - } else { - codegenConjuncts_ = inputCardinality * conjunctsCost > threshold; - } - } - - @Override - protected void toThrift(TPlanNode msg) { - msg.hdfs_scan_node = new THdfsScanNode(desc_.getId().asInt()); - if (replicaPreference_ != null) { - msg.hdfs_scan_node.setReplica_preference(replicaPreference_); - } - msg.hdfs_scan_node.setRandom_replica(randomReplica_); - msg.node_type = TPlanNodeType.HDFS_SCAN_NODE; - msg.hdfs_scan_node.setCodegen_conjuncts(codegenConjuncts_); - if (!collectionConjuncts_.isEmpty()) { - Map<Integer, List<TExpr>> tcollectionConjuncts = Maps.newLinkedHashMap(); - for (Map.Entry<TupleDescriptor, List<Expr>> entry: - collectionConjuncts_.entrySet()) { - tcollectionConjuncts.put(entry.getKey().getId().asInt(), - Expr.treesToThrift(entry.getValue())); - } - msg.hdfs_scan_node.setCollection_conjuncts(tcollectionConjuncts); - } - if (skipHeaderLineCount_ > 0) { - msg.hdfs_scan_node.setSkip_header_line_count(skipHeaderLineCount_); - } - } - - @Override - protected String getDisplayLabelDetail() { - HdfsTable table = (HdfsTable) desc_.getTable(); - List<String> path = Lists.newArrayList(); - path.add(table.getDb().getName()); - path.add(table.getName()); - Preconditions.checkNotNull(desc_.getPath()); - if (desc_.hasExplicitAlias()) { - return desc_.getPath().toString() + " " + desc_.getAlias(); - } else { - return desc_.getPath().toString(); - } - } - - @Override - protected String getNodeExplainString(String prefix, String detailPrefix, - TExplainLevel detailLevel) { - StringBuilder output = new StringBuilder(); - HdfsTable table = (HdfsTable) desc_.getTable(); - output.append(String.format("%s%s [%s", prefix, getDisplayLabel(), - getDisplayLabelDetail())); - if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal() && - fragment_.isPartitioned()) { - output.append(", " + fragment_.getDataPartition().getExplainString()); - } - output.append("]\n"); - if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) { - int numPartitions = partitions_.size(); - if (tbl_.getNumClusteringCols() == 0) numPartitions = 1; - output.append(String.format("%spartitions=%s/%s files=%s size=%s", detailPrefix, - numPartitions, table.getPartitions().size() - 1, totalFiles_, - PrintUtils.printBytes(totalBytes_))); - output.append("\n"); - if (!conjuncts_.isEmpty()) { - output.append( - detailPrefix + "predicates: " + getExplainString(conjuncts_) + "\n"); - } - if (!collectionConjuncts_.isEmpty()) { - for (Map.Entry<TupleDescriptor, List<Expr>> entry: - collectionConjuncts_.entrySet()) { - String alias = entry.getKey().getAlias(); - output.append(String.format("%spredicates on %s: %s\n", - detailPrefix, alias, getExplainString(entry.getValue()))); - } - } - if (!runtimeFilters_.isEmpty()) { - output.append(detailPrefix + "runtime filters: "); - output.append(getRuntimeFilterExplainString(false)); - } - } - if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) { - output.append(getStatsExplainString(detailPrefix, detailLevel)); - output.append("\n"); - } - return output.toString(); - } - - @Override - public void computeCosts(TQueryOptions queryOptions) { - Preconditions.checkNotNull(scanRanges_, "Cost estimation requires scan ranges."); - if (scanRanges_.isEmpty()) { - perHostMemCost_ = 0; - return; - } - Preconditions.checkState(0 < numNodes_ && numNodes_ <= scanRanges_.size()); - Preconditions.checkNotNull(desc_); - Preconditions.checkNotNull(desc_.getTable() instanceof HdfsTable); - HdfsTable table = (HdfsTable) desc_.getTable(); - int perHostScanRanges; - if (table.getMajorityFormat() == HdfsFileFormat.PARQUET) { - // For the purpose of this estimation, the number of per-host scan ranges for - // Parquet files are equal to the number of non-partition columns scanned. - perHostScanRanges = 0; - for (SlotDescriptor slot: desc_.getSlots()) { - if (slot.getColumn() == null || - slot.getColumn().getPosition() >= table.getNumClusteringCols()) { - ++perHostScanRanges; - } - } - } else { - perHostScanRanges = (int) Math.ceil(( - (double) scanRanges_.size() / (double) numNodes_) * SCAN_RANGE_SKEW_FACTOR); - } - - // TODO: The total memory consumption for a particular query depends on the number - // of *available* cores, i.e., it depends the resource consumption of other - // concurrent queries. Figure out how to account for that. - int maxScannerThreads = Math.min(perHostScanRanges, - RuntimeEnv.INSTANCE.getNumCores() * THREADS_PER_CORE); - // Account for the max scanner threads query option. - if (queryOptions.isSetNum_scanner_threads() && - queryOptions.getNum_scanner_threads() > 0) { - maxScannerThreads = - Math.min(maxScannerThreads, queryOptions.getNum_scanner_threads()); - } - - long avgScanRangeBytes = (long) Math.ceil(totalBytes_ / (double) scanRanges_.size()); - // The +1 accounts for an extra I/O buffer to read past the scan range due to a - // trailing record spanning Hdfs blocks. - long perThreadIoBuffers = - Math.min((long) Math.ceil(avgScanRangeBytes / (double) IO_MGR_BUFFER_SIZE), - MAX_IO_BUFFERS_PER_THREAD) + 1; - perHostMemCost_ = maxScannerThreads * perThreadIoBuffers * IO_MGR_BUFFER_SIZE; - - // Sanity check: the tighter estimation should not exceed the per-host maximum. - long perHostUpperBound = getPerHostMemUpperBound(); - if (perHostMemCost_ > perHostUpperBound) { - LOG.warn(String.format("Per-host mem cost %s exceeded per-host upper bound %s.", - PrintUtils.printBytes(perHostMemCost_), - PrintUtils.printBytes(perHostUpperBound))); - perHostMemCost_ = perHostUpperBound; - } - } - - /** - * Hdfs scans use a shared pool of buffers managed by the I/O manager. Intuitively, - * the maximum number of I/O buffers is limited by the total disk bandwidth of a node. - * Therefore, this upper bound is independent of the number of concurrent scans and - * queries and helps to derive a tighter per-host memory estimate for queries with - * multiple concurrent scans. - */ - public static long getPerHostMemUpperBound() { - // THREADS_PER_CORE each using a default of - // MAX_IO_BUFFERS_PER_THREAD * IO_MGR_BUFFER_SIZE bytes. - return (long) RuntimeEnv.INSTANCE.getNumCores() * (long) THREADS_PER_CORE * - MAX_IO_BUFFERS_PER_THREAD * IO_MGR_BUFFER_SIZE; - } - - @Override - public boolean hasCorruptTableStats() { return hasCorruptTableStats_; } -}
