http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/RuntimeFilterGenerator.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/RuntimeFilterGenerator.java b/fe/src/main/java/com/cloudera/impala/planner/RuntimeFilterGenerator.java deleted file mode 100644 index f0cafd5..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/RuntimeFilterGenerator.java +++ /dev/null @@ -1,590 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import java.util.Collections; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import com.cloudera.impala.analysis.Analyzer; -import com.cloudera.impala.analysis.BinaryPredicate; -import com.cloudera.impala.analysis.Expr; -import com.cloudera.impala.analysis.ExprSubstitutionMap; -import com.cloudera.impala.analysis.Predicate; -import com.cloudera.impala.analysis.SlotDescriptor; -import com.cloudera.impala.analysis.SlotId; -import com.cloudera.impala.analysis.SlotRef; -import com.cloudera.impala.analysis.TupleDescriptor; -import com.cloudera.impala.analysis.TupleId; -import com.cloudera.impala.analysis.TupleIsNullPredicate; -import com.cloudera.impala.catalog.Table; -import com.cloudera.impala.common.IdGenerator; -import com.cloudera.impala.planner.PlanNode; -import com.cloudera.impala.thrift.TRuntimeFilterDesc; -import com.cloudera.impala.thrift.TRuntimeFilterTargetDesc; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Class used for generating and assigning runtime filters to a query plan using - * runtime filter propagation. Runtime filter propagation is an optimization technique - * used to filter scanned tuples or scan ranges based on information collected at - * runtime. A runtime filter is constructed during the build phase of a join node, and is - * applied at, potentially, multiple scan nodes on the probe side of that join node. - * Runtime filters are generated from equi-join predicates but they do not replace the - * original predicates. - * - * Example: select * from T1, T2 where T1.a = T2.b and T2.c = '1'; - * Assuming that T1 is a fact table and T2 is a significantly smaller dimension table, a - * runtime filter is constructed at the join node between tables T1 and T2 while building - * the hash table on the values of T2.b (rhs of the join condition) from the tuples of T2 - * that satisfy predicate T2.c = '1'. The runtime filter is subsequently sent to the - * scan node of table T1 and is applied on the values of T1.a (lhs of the join condition) - * to prune tuples of T2 that cannot be part of the join result. - * - * TODO: Consider combining multiple filters, that are applied to the same scan node, - * into a single filter. - */ -public final class RuntimeFilterGenerator { - private final static Logger LOG = - LoggerFactory.getLogger(RuntimeFilterGenerator.class); - - // Map of base table tuple ids to a list of runtime filters that - // can be applied at the corresponding scan nodes. - private final Map<TupleId, List<RuntimeFilter>> runtimeFiltersByTid_ = - Maps.newHashMap(); - - // Generator for filter ids - private final IdGenerator<RuntimeFilterId> filterIdGenerator = - RuntimeFilterId.createGenerator(); - - private RuntimeFilterGenerator() {}; - - /** - * Internal representation of a runtime filter. A runtime filter is generated from - * an equi-join predicate of the form <lhs_expr> = <rhs_expr>, where lhs_expr is the - * expr on which the filter is applied and must be bound by a single tuple id from - * the left plan subtree of the associated join node, while rhs_expr is the expr on - * which the filter is built and can be bound by any number of tuple ids from the - * right plan subtree. Every runtime filter must record the join node that constructs - * the filter and the scan nodes that apply the filter (destination nodes). - */ - public static class RuntimeFilter { - // Identifier of the filter (unique within a query) - private final RuntimeFilterId id_; - // Join node that builds the filter - private final JoinNode src_; - // Expr (rhs of join predicate) on which the filter is built - private final Expr srcExpr_; - // Expr (lhs of join predicate) from which the targetExprs_ are generated. - private final Expr origTargetExpr_; - // Runtime filter targets - private final List<RuntimeFilterTarget> targets_ = Lists.newArrayList(); - // Slots from base table tuples that have value transfer from the slots - // of 'origTargetExpr_'. The slots are grouped by tuple id. - private final Map<TupleId, List<SlotId>> targetSlotsByTid_; - // If true, the join node building this filter is executed using a broadcast join; - // set in the DistributedPlanner.createHashJoinFragment() - private boolean isBroadcastJoin_; - // Estimate of the number of distinct values that will be inserted into this filter, - // globally across all instances of the source node. Used to compute an optimal size - // for the filter. A value of -1 means no estimate is available, and default filter - // parameters should be used. - private long ndvEstimate_ = -1; - // If true, the filter is produced by a broadcast join and there is at least one - // destination scan node which is in the same fragment as the join; set in - // DistributedPlanner.createHashJoinFragment(). - private boolean hasLocalTargets_ = false; - // If true, there is at least one destination scan node which is not in the same - // fragment as the join that produced the filter; set in - // DistributedPlanner.createHashJoinFragment(). - private boolean hasRemoteTargets_ = false; - // If set, indicates that the filter can't be assigned to another scan node. - // Once set, it can't be unset. - private boolean finalized_ = false; - - /** - * Internal representation of a runtime filter target. - */ - private static class RuntimeFilterTarget { - // Scan node that applies the filter - public ScanNode node; - // Expr on which the filter is applied - public Expr expr; - // Indicates if 'expr' is bound only by partition columns - public boolean isBoundByPartitionColumns = false; - // Indicates if 'node' is in the same fragment as the join that produces the - // filter - public boolean isLocalTarget = false; - - public RuntimeFilterTarget(ScanNode targetNode, Expr targetExpr) { - node = targetNode; - expr = targetExpr; - } - - public TRuntimeFilterTargetDesc toThrift() { - TRuntimeFilterTargetDesc tFilterTarget = new TRuntimeFilterTargetDesc(); - tFilterTarget.setNode_id(node.getId().asInt()); - tFilterTarget.setTarget_expr(expr.treeToThrift()); - List<SlotId> sids = Lists.newArrayList(); - expr.getIds(null, sids); - List<Integer> tSlotIds = Lists.newArrayListWithCapacity(sids.size()); - for (SlotId sid: sids) tSlotIds.add(sid.asInt()); - tFilterTarget.setTarget_expr_slotids(tSlotIds); - tFilterTarget.setIs_bound_by_partition_columns(isBoundByPartitionColumns); - tFilterTarget.setIs_local_target(isLocalTarget); - return tFilterTarget; - } - - @Override - public String toString() { - StringBuilder output = new StringBuilder(); - return output.append("Target Id: " + node.getId() + " ") - .append("Target expr: " + expr.debugString() + " ") - .append("Partition columns: " + isBoundByPartitionColumns) - .append("Is local: " + isLocalTarget) - .toString(); - } - } - - private RuntimeFilter(RuntimeFilterId filterId, JoinNode filterSrcNode, - Expr srcExpr, Expr origTargetExpr, Map<TupleId, List<SlotId>> targetSlots) { - id_ = filterId; - src_ = filterSrcNode; - srcExpr_ = srcExpr; - origTargetExpr_ = origTargetExpr; - targetSlotsByTid_ = targetSlots; - computeNdvEstimate(); - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof RuntimeFilter)) return false; - return ((RuntimeFilter) obj).id_.equals(id_); - } - - @Override - public int hashCode() { return id_.hashCode(); } - - public void markFinalized() { finalized_ = true; } - public boolean isFinalized() { return finalized_; } - - /** - * Serializes a runtime filter to Thrift. - */ - public TRuntimeFilterDesc toThrift() { - TRuntimeFilterDesc tFilter = new TRuntimeFilterDesc(); - tFilter.setFilter_id(id_.asInt()); - tFilter.setSrc_expr(srcExpr_.treeToThrift()); - tFilter.setIs_broadcast_join(isBroadcastJoin_); - tFilter.setNdv_estimate(ndvEstimate_); - tFilter.setHas_local_targets(hasLocalTargets_); - tFilter.setHas_remote_targets(hasRemoteTargets_); - boolean appliedOnPartitionColumns = true; - for (int i = 0; i < targets_.size(); ++i) { - RuntimeFilterTarget target = targets_.get(i); - tFilter.addToTargets(target.toThrift()); - tFilter.putToPlanid_to_target_ndx(target.node.getId().asInt(), i); - appliedOnPartitionColumns = - appliedOnPartitionColumns && target.isBoundByPartitionColumns; - } - tFilter.setApplied_on_partition_columns(appliedOnPartitionColumns); - return tFilter; - } - - /** - * Static function to create a RuntimeFilter from 'joinPredicate' that is assigned - * to the join node 'filterSrcNode'. Returns an instance of RuntimeFilter - * or null if a runtime filter cannot be generated from the specified predicate. - */ - public static RuntimeFilter create(IdGenerator<RuntimeFilterId> idGen, - Analyzer analyzer, Expr joinPredicate, JoinNode filterSrcNode) { - Preconditions.checkNotNull(idGen); - Preconditions.checkNotNull(joinPredicate); - Preconditions.checkNotNull(filterSrcNode); - // Only consider binary equality predicates - if (!Predicate.isEquivalencePredicate(joinPredicate)) return null; - - BinaryPredicate normalizedJoinConjunct = - SingleNodePlanner.getNormalizedEqPred(joinPredicate, - filterSrcNode.getChild(0).getTupleIds(), - filterSrcNode.getChild(1).getTupleIds(), analyzer); - if (normalizedJoinConjunct == null) return null; - - Expr targetExpr = normalizedJoinConjunct.getChild(0); - Expr srcExpr = normalizedJoinConjunct.getChild(1); - - Map<TupleId, List<SlotId>> targetSlots = getTargetSlots(analyzer, targetExpr); - Preconditions.checkNotNull(targetSlots); - if (targetSlots.isEmpty()) return null; - - // Ensure that the targer expr does not contain TupleIsNull predicates as these - // can't be evaluated at a scan node. - targetExpr = TupleIsNullPredicate.unwrapExpr(targetExpr.clone()); - LOG.trace("Generating runtime filter from predicate " + joinPredicate); - return new RuntimeFilter(idGen.getNextId(), filterSrcNode, - srcExpr, targetExpr, targetSlots); - } - - /** - * Returns the ids of base table tuple slots on which a runtime filter expr can be - * applied. Due to the existence of equivalence classes, a filter expr may be - * applicable at multiple scan nodes. The returned slot ids are grouped by tuple id. - * Returns an empty collection if the filter expr cannot be applied at a base table. - */ - private static Map<TupleId, List<SlotId>> getTargetSlots(Analyzer analyzer, - Expr expr) { - // 'expr' is not a SlotRef and may contain multiple SlotRefs - List<TupleId> tids = Lists.newArrayList(); - List<SlotId> sids = Lists.newArrayList(); - expr.getIds(tids, sids); - Map<TupleId, List<SlotId>> slotsByTid = Maps.newHashMap(); - // We need to iterate over all the slots of 'expr' and check if they have - // equivalent slots that are bound by the same base table tuple(s). - for (SlotId slotId: sids) { - Map<TupleId, List<SlotId>> currSlotsByTid = - getBaseTblEquivSlots(analyzer, slotId); - if (currSlotsByTid.isEmpty()) return Collections.emptyMap(); - if (slotsByTid.isEmpty()) { - slotsByTid.putAll(currSlotsByTid); - continue; - } - - // Compute the intersection between tuple ids from 'slotsByTid' and - // 'currSlotsByTid'. If the intersection is empty, an empty collection - // is returned. - Iterator<Map.Entry<TupleId, List<SlotId>>> iter = - slotsByTid.entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry<TupleId, List<SlotId>> entry = iter.next(); - List<SlotId> slotIds = currSlotsByTid.get(entry.getKey()); - if (slotIds == null) { - iter.remove(); - } else { - entry.getValue().addAll(slotIds); - } - } - if (slotsByTid.isEmpty()) return Collections.emptyMap(); - } - return slotsByTid; - } - - /** - * Static function that returns the ids of slots bound by base table tuples for which - * there is a value transfer from 'srcSid'. The slots are grouped by tuple id. - */ - private static Map<TupleId, List<SlotId>> getBaseTblEquivSlots(Analyzer analyzer, - SlotId srcSid) { - Map<TupleId, List<SlotId>> slotsByTid = Maps.newHashMap(); - for (SlotId targetSid: analyzer.getValueTransferTargets(srcSid)) { - TupleDescriptor tupleDesc = analyzer.getSlotDesc(targetSid).getParent(); - if (tupleDesc.getTable() == null) continue; - List<SlotId> sids = slotsByTid.get(tupleDesc.getId()); - if (sids == null) { - sids = Lists.newArrayList(); - slotsByTid.put(tupleDesc.getId(), sids); - } - sids.add(targetSid); - } - return slotsByTid; - } - - public Expr getTargetExpr(PlanNodeId targetPlanNodeId) { - for (RuntimeFilterTarget target: targets_) { - if (target.node.getId() != targetPlanNodeId) continue; - return target.expr; - } - return null; - } - - public List<RuntimeFilterTarget> getTargets() { return targets_; } - public boolean hasTargets() { return !targets_.isEmpty(); } - public Expr getSrcExpr() { return srcExpr_; } - public Expr getOrigTargetExpr() { return origTargetExpr_; } - public Map<TupleId, List<SlotId>> getTargetSlots() { return targetSlotsByTid_; } - public RuntimeFilterId getFilterId() { return id_; } - - /** - * Estimates the selectivity of a runtime filter as the cardinality of the - * associated source join node over the cardinality of that join node's left - * child. - */ - public double getSelectivity() { - if (src_.getCardinality() == -1 || src_.getChild(0).getCardinality() == -1) { - return -1; - } - return src_.getCardinality() / (double) src_.getChild(0).getCardinality(); - } - - public void addTarget(ScanNode node, Analyzer analyzer, Expr targetExpr) { - Preconditions.checkState(targetExpr.isBoundByTupleIds(node.getTupleIds())); - RuntimeFilterTarget target = new RuntimeFilterTarget(node, targetExpr); - targets_.add(target); - // Check if all the slots of targetExpr_ are bound by partition columns - TupleDescriptor baseTblDesc = node.getTupleDesc(); - Table tbl = baseTblDesc.getTable(); - if (tbl.getNumClusteringCols() == 0) return; - List<SlotId> sids = Lists.newArrayList(); - targetExpr.getIds(null, sids); - for (SlotId sid: sids) { - SlotDescriptor slotDesc = analyzer.getSlotDesc(sid); - if (slotDesc.getColumn() == null - || slotDesc.getColumn().getPosition() >= tbl.getNumClusteringCols()) { - return; - } - } - target.isBoundByPartitionColumns = true; - } - - public void setIsBroadcast(boolean isBroadcast) { isBroadcastJoin_ = isBroadcast; } - - public void computeNdvEstimate() { ndvEstimate_ = src_.getChild(1).getCardinality(); } - - public void computeHasLocalTargets() { - Preconditions.checkNotNull(src_.getFragment()); - Preconditions.checkState(hasTargets()); - for (RuntimeFilterTarget target: targets_) { - Preconditions.checkNotNull(target.node.getFragment()); - boolean isLocal = - src_.getFragment().getId().equals(target.node.getFragment().getId()); - target.isLocalTarget = isLocal; - hasLocalTargets_ = hasLocalTargets_ || isLocal; - hasRemoteTargets_ = hasRemoteTargets_ || !isLocal; - } - } - - /** - * Assigns this runtime filter to the corresponding plan nodes. - */ - public void assignToPlanNodes() { - Preconditions.checkState(hasTargets()); - src_.addRuntimeFilter(this); - for (RuntimeFilterTarget target: targets_) target.node.addRuntimeFilter(this); - } - - public String debugString() { - StringBuilder output = new StringBuilder(); - return output.append("FilterID: " + id_ + " ") - .append("Source: " + src_.getId() + " ") - .append("SrcExpr: " + getSrcExpr().debugString() + " ") - .append("Target(s): ") - .append(Joiner.on(", ").join(targets_) + " ") - .append("Selectivity: " + getSelectivity()).toString(); - } - } - - /** - * Generates and assigns runtime filters to a query plan tree. - */ - public static void generateRuntimeFilters(Analyzer analyzer, PlanNode plan, - int maxNumFilters) { - Preconditions.checkArgument(maxNumFilters >= 0); - RuntimeFilterGenerator filterGenerator = new RuntimeFilterGenerator(); - filterGenerator.generateFilters(analyzer, plan); - List<RuntimeFilter> filters = Lists.newArrayList(filterGenerator.getRuntimeFilters()); - if (filters.size() > maxNumFilters) { - // If more than 'maxNumFilters' were generated, sort them by increasing selectivity - // and keep the 'maxNumFilters' most selective. - Collections.sort(filters, new Comparator<RuntimeFilter>() { - public int compare(RuntimeFilter a, RuntimeFilter b) { - double aSelectivity = - a.getSelectivity() == -1 ? Double.MAX_VALUE : a.getSelectivity(); - double bSelectivity = - b.getSelectivity() == -1 ? Double.MAX_VALUE : b.getSelectivity(); - double diff = aSelectivity - bSelectivity; - return (diff < 0.0 ? -1 : (diff > 0.0 ? 1 : 0)); - } - } - ); - } - for (RuntimeFilter filter: - filters.subList(0, Math.min(filters.size(), maxNumFilters))) { - LOG.trace("Runtime filter: " + filter.debugString()); - filter.assignToPlanNodes(); - } - } - - /** - * Returns a set of all the registered runtime filters. - */ - public Set<RuntimeFilter> getRuntimeFilters() { - Set<RuntimeFilter> result = Sets.newHashSet(); - for (List<RuntimeFilter> filters: runtimeFiltersByTid_.values()) { - result.addAll(filters); - } - return result; - } - - /** - * Generates the runtime filters for a query by recursively traversing the single-node - * plan tree rooted at 'root'. In the top-down traversal of the plan tree, candidate - * runtime filters are generated from equi-join predicates. In the bottom-up traversal - * of the plan tree, the filters are assigned to destination (scan) nodes. Filters - * that cannot be assigned to a scan node are discarded. - */ - private void generateFilters(Analyzer analyzer, PlanNode root) { - if (root instanceof JoinNode) { - JoinNode joinNode = (JoinNode) root; - List<Expr> joinConjuncts = Lists.newArrayList(); - if (!joinNode.getJoinOp().isLeftOuterJoin() - && !joinNode.getJoinOp().isFullOuterJoin() - && !joinNode.getJoinOp().isAntiJoin()) { - // It's not correct to push runtime filters to the left side of a left outer, - // full outer or anti join if the filter corresponds to an equi-join predicate - // from the ON clause. - joinConjuncts.addAll(joinNode.getEqJoinConjuncts()); - } - joinConjuncts.addAll(joinNode.getConjuncts()); - List<RuntimeFilter> filters = Lists.newArrayList(); - for (Expr conjunct: joinConjuncts) { - RuntimeFilter filter = RuntimeFilter.create(filterIdGenerator, analyzer, - conjunct, joinNode); - if (filter == null) continue; - registerRuntimeFilter(filter); - filters.add(filter); - } - generateFilters(analyzer, root.getChild(0)); - // Finalize every runtime filter of that join. This is to ensure that we don't - // assign a filter to a scan node from the right subtree of joinNode or ancestor - // join nodes in case we don't find a destination node in the left subtree. - for (RuntimeFilter runtimeFilter: filters) finalizeRuntimeFilter(runtimeFilter); - generateFilters(analyzer, root.getChild(1)); - } else if (root instanceof ScanNode) { - assignRuntimeFilters(analyzer, (ScanNode) root); - } else { - for (PlanNode childNode: root.getChildren()) { - generateFilters(analyzer, childNode); - } - } - } - - /** - * Registers a runtime filter with the tuple id of every scan node that is a candidate - * destination node for that filter. - */ - private void registerRuntimeFilter(RuntimeFilter filter) { - Map<TupleId, List<SlotId>> targetSlotsByTid = filter.getTargetSlots(); - Preconditions.checkState(targetSlotsByTid != null && !targetSlotsByTid.isEmpty()); - for (TupleId tupleId: targetSlotsByTid.keySet()) { - registerRuntimeFilter(filter, tupleId); - } - } - - /** - * Registers a runtime filter with a specific target tuple id. - */ - private void registerRuntimeFilter(RuntimeFilter filter, TupleId targetTid) { - Preconditions.checkState(filter.getTargetSlots().containsKey(targetTid)); - List<RuntimeFilter> filters = runtimeFiltersByTid_.get(targetTid); - if (filters == null) { - filters = Lists.newArrayList(); - runtimeFiltersByTid_.put(targetTid, filters); - } - Preconditions.checkState(!filter.isFinalized()); - filters.add(filter); - } - - /** - * Finalizes a runtime filter by disassociating it from all the candidate target scan - * nodes that haven't been used as destinations for that filter. Also sets the - * finalized_ flag of that filter so that it can't be assigned to any other scan nodes. - */ - private void finalizeRuntimeFilter(RuntimeFilter runtimeFilter) { - Set<TupleId> targetTupleIds = Sets.newHashSet(); - for (RuntimeFilter.RuntimeFilterTarget target: runtimeFilter.getTargets()) { - targetTupleIds.addAll(target.node.getTupleIds()); - } - for (TupleId tupleId: runtimeFilter.getTargetSlots().keySet()) { - if (!targetTupleIds.contains(tupleId)) { - runtimeFiltersByTid_.get(tupleId).remove(runtimeFilter); - } - } - runtimeFilter.markFinalized(); - } - - /** - * Assigns runtime filters to a specific scan node 'scanNode'. - * The assigned filters are the ones for which 'scanNode' can be used a destination - * node. A scan node may be used as a destination node for multiple runtime filters. - * Currently, runtime filters can only be assigned to HdfsScanNodes. - */ - private void assignRuntimeFilters(Analyzer analyzer, ScanNode scanNode) { - if (!(scanNode instanceof HdfsScanNode)) return; - TupleId tid = scanNode.getTupleIds().get(0); - if (!runtimeFiltersByTid_.containsKey(tid)) return; - for (RuntimeFilter filter: runtimeFiltersByTid_.get(tid)) { - if (filter.isFinalized()) continue; - Expr targetExpr = computeTargetExpr(filter, tid, analyzer); - if (targetExpr == null) continue; - filter.addTarget(scanNode, analyzer, targetExpr); - } - } - - /** - * Computes the target expr for a specified runtime filter 'filter' to be applied at - * the scan node with target tuple descriptor 'targetTid'. - */ - private Expr computeTargetExpr(RuntimeFilter filter, TupleId targetTid, - Analyzer analyzer) { - Expr targetExpr = filter.getOrigTargetExpr(); - if (!targetExpr.isBound(targetTid)) { - Preconditions.checkState(filter.getTargetSlots().containsKey(targetTid)); - // Modify the filter target expr using the equivalent slots from the scan node - // on which the filter will be applied. - ExprSubstitutionMap smap = new ExprSubstitutionMap(); - List<SlotRef> exprSlots = Lists.newArrayList(); - targetExpr.collect(SlotRef.class, exprSlots); - List<SlotId> sids = filter.getTargetSlots().get(targetTid); - for (SlotRef slotRef: exprSlots) { - for (SlotId sid: sids) { - if (analyzer.hasValueTransfer(slotRef.getSlotId(), sid)) { - SlotRef newSlotRef = new SlotRef(analyzer.getSlotDesc(sid)); - newSlotRef.analyzeNoThrow(analyzer); - smap.put(slotRef, newSlotRef); - break; - } - } - } - Preconditions.checkState(exprSlots.size() == smap.size()); - try { - targetExpr = targetExpr.substitute(smap, analyzer, true); - } catch (Exception e) { - // An exception is thrown if we cannot generate a target expr from this - // scan node that has the same type as the lhs expr of the join predicate - // from which the runtime filter was generated. We skip that scan node and will - // try to assign the filter to a different scan node. - // - // TODO: Investigate if we can generate a type-compatible source/target expr - // pair from that scan node instead of skipping it. - return null; - } - } - Preconditions.checkState( - targetExpr.getType().matchesType(filter.getSrcExpr().getType())); - return targetExpr; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/RuntimeFilterId.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/RuntimeFilterId.java b/fe/src/main/java/com/cloudera/impala/planner/RuntimeFilterId.java deleted file mode 100644 index 32af78c..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/RuntimeFilterId.java +++ /dev/null @@ -1,45 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import com.cloudera.impala.common.Id; -import com.cloudera.impala.common.IdGenerator; - -public class RuntimeFilterId extends Id<RuntimeFilterId> { - // Construction only allowed via an IdGenerator. - protected RuntimeFilterId(int id) { - super(id); - } - - public static IdGenerator<RuntimeFilterId> createGenerator() { - return new IdGenerator<RuntimeFilterId>() { - @Override - public RuntimeFilterId getNextId() { return new RuntimeFilterId(nextId_++); } - @Override - public RuntimeFilterId getMaxId() { return new RuntimeFilterId(nextId_ - 1); } - }; - } - - @Override - public String toString() { - return String.format("RF%03d", id_); - } - - @Override - public int hashCode() { return id_; } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/ScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/ScanNode.java b/fe/src/main/java/com/cloudera/impala/planner/ScanNode.java deleted file mode 100644 index b442b4a..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/ScanNode.java +++ /dev/null @@ -1,191 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import java.util.List; - -import com.cloudera.impala.analysis.SlotDescriptor; -import com.cloudera.impala.analysis.TupleDescriptor; -import com.cloudera.impala.catalog.HdfsFileFormat; -import com.cloudera.impala.catalog.Type; -import com.cloudera.impala.common.NotImplementedException; -import com.cloudera.impala.thrift.TExplainLevel; -import com.cloudera.impala.thrift.TNetworkAddress; -import com.cloudera.impala.thrift.TScanRangeLocations; -import com.google.common.base.Joiner; -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -/** - * Representation of the common elements of all scan nodes. - */ -abstract public class ScanNode extends PlanNode { - protected final TupleDescriptor desc_; - - // Total number of rows this node is expected to process - protected long inputCardinality_ = -1; - - // Counter indicating if partitions have missing statistics - protected int numPartitionsMissingStats_ = 0; - - // List of scan-range locations. Populated in init(). - protected List<TScanRangeLocations> scanRanges_; - - public ScanNode(PlanNodeId id, TupleDescriptor desc, String displayName) { - super(id, desc.getId().asList(), displayName); - desc_ = desc; - } - - public TupleDescriptor getTupleDesc() { return desc_; } - - /** - * Checks if this scan is supported based on the types of scanned columns and the - * underlying file formats, in particular, whether complex types are supported. - * - * The default implementation throws if this scan would need to materialize a nested - * field or collection. The scan is ok if the table schema contains complex types, as - * long as the query does not reference them. - * - * Subclasses should override this function as appropriate. - */ - protected void checkForSupportedFileFormats() throws NotImplementedException { - Preconditions.checkNotNull(desc_); - Preconditions.checkNotNull(desc_.getTable()); - for (SlotDescriptor slotDesc: desc_.getSlots()) { - if (slotDesc.getType().isComplexType() || slotDesc.getColumn() == null) { - Preconditions.checkNotNull(slotDesc.getPath()); - throw new NotImplementedException(String.format( - "Scan of table '%s' is not supported because '%s' references a nested " + - "field/collection.\nComplex types are supported for these file formats: %s.", - slotDesc.getPath().toString(), desc_.getAlias(), - Joiner.on(", ").join(HdfsFileFormat.complexTypesFormats()))); - } - } - } - - /** - * Returns all scan ranges plus their locations. - */ - public List<TScanRangeLocations> getScanRangeLocations() { - Preconditions.checkNotNull(scanRanges_, "Need to call init() first."); - return scanRanges_; - } - - @Override - protected String debugString() { - return Objects.toStringHelper(this) - .add("tid", desc_.getId().asInt()) - .add("tblName", desc_.getTable().getFullName()) - .add("keyRanges", "") - .addValue(super.debugString()) - .toString(); - } - - /** - * Returns the explain string for table and columns stats to be included into the - * a ScanNode's explain string. The given prefix is prepended to each of the lines. - * The prefix is used for proper formatting when the string returned by this method - * is embedded in a query's explain plan. - */ - protected String getStatsExplainString(String prefix, TExplainLevel detailLevel) { - StringBuilder output = new StringBuilder(); - // Table stats. - if (desc_.getTable().getNumRows() == -1) { - output.append(prefix + "table stats: unavailable"); - } else { - output.append(prefix + "table stats: " + desc_.getTable().getNumRows() + - " rows total"); - if (numPartitionsMissingStats_ > 0) { - output.append(" (" + numPartitionsMissingStats_ + " partition(s) missing stats)"); - } - } - output.append("\n"); - - // Column stats. - List<String> columnsMissingStats = Lists.newArrayList(); - for (SlotDescriptor slot: desc_.getSlots()) { - if (!slot.getStats().hasStats() && slot.getColumn() != null) { - columnsMissingStats.add(slot.getColumn().getName()); - } - } - if (columnsMissingStats.isEmpty()) { - output.append(prefix + "column stats: all"); - } else if (columnsMissingStats.size() == desc_.getSlots().size()) { - output.append(prefix + "column stats: unavailable"); - } else { - output.append(String.format("%scolumns missing stats: %s", prefix, - Joiner.on(", ").join(columnsMissingStats))); - } - return output.toString(); - } - - /** - * Returns true if the table underlying this scan is missing table stats - * or column stats relevant to this scan node. - */ - public boolean isTableMissingStats() { - return isTableMissingColumnStats() || isTableMissingTableStats(); - } - - public boolean isTableMissingTableStats() { - if (desc_.getTable().getNumRows() == -1) return true; - return numPartitionsMissingStats_ > 0; - } - - /** - * Returns true if the tuple descriptor references a path with a collection type. - */ - public boolean isAccessingCollectionType() { - for (Type t: desc_.getPath().getMatchedTypes()) { - if (t.isCollectionType()) return true; - } - return false; - } - - public boolean isTableMissingColumnStats() { - for (SlotDescriptor slot: desc_.getSlots()) { - if (!slot.getStats().hasStats()) return true; - } - return false; - } - - /** - * Returns true, if the scanned table is suspected to have corrupt table stats, - * in particular, if the scan is non-empty and 'numRows' is 0 or negative (but not -1). - */ - public boolean hasCorruptTableStats() { return false; } - - /** - * Helper function to parse a "host:port" address string into TNetworkAddress - * This is called with ipaddress:port when doing scan range assignment. - */ - protected static TNetworkAddress addressToTNetworkAddress(String address) { - TNetworkAddress result = new TNetworkAddress(); - String[] hostPort = address.split(":"); - result.hostname = hostPort[0]; - result.port = Integer.parseInt(hostPort[1]); - return result; - } - - @Override - public long getInputCardinality() { - if (getConjuncts().isEmpty() && hasLimit()) return getLimit(); - return inputCardinality_; - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/SelectNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/SelectNode.java b/fe/src/main/java/com/cloudera/impala/planner/SelectNode.java deleted file mode 100644 index b418224..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/SelectNode.java +++ /dev/null @@ -1,93 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.impala.analysis.Analyzer; -import com.cloudera.impala.analysis.Expr; -import com.cloudera.impala.thrift.TExplainLevel; -import com.cloudera.impala.thrift.TPlanNode; -import com.cloudera.impala.thrift.TPlanNodeType; -import com.google.common.base.Preconditions; - -/** - * Node that applies conjuncts and a limit clause. Has exactly one child. - */ -public class SelectNode extends PlanNode { - private final static Logger LOG = LoggerFactory.getLogger(SelectNode.class); - - protected SelectNode(PlanNodeId id, PlanNode child, List<Expr> conjuncts) { - super(id, "SELECT"); - addChild(child); - conjuncts_.addAll(conjuncts); - computeTupleIds(); - } - - @Override - public void computeTupleIds() { - clearTupleIds(); - tblRefIds_.addAll(getChild(0).getTblRefIds()); - tupleIds_.addAll(getChild(0).getTupleIds()); - nullableTupleIds_.addAll(getChild(0).getNullableTupleIds()); - } - - @Override - protected void toThrift(TPlanNode msg) { - msg.node_type = TPlanNodeType.SELECT_NODE; - } - - @Override - public void init(Analyzer analyzer) { - analyzer.markConjunctsAssigned(conjuncts_); - conjuncts_ = orderConjunctsByCost(conjuncts_); - computeStats(analyzer); - createDefaultSmap(analyzer); - } - - @Override - public void computeStats(Analyzer analyzer) { - super.computeStats(analyzer); - if (getChild(0).cardinality_ == -1) { - cardinality_ = -1; - } else { - cardinality_ = - Math.round(((double) getChild(0).cardinality_) * computeSelectivity()); - Preconditions.checkState(cardinality_ >= 0); - } - cardinality_ = capAtLimit(cardinality_); - LOG.debug("stats Select: cardinality=" + Long.toString(cardinality_)); - } - - @Override - protected String getNodeExplainString(String prefix, String detailPrefix, - TExplainLevel detailLevel) { - StringBuilder output = new StringBuilder(); - output.append(String.format("%s%s:%s\n", prefix, id_.toString(), displayName_)); - if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) { - if (!conjuncts_.isEmpty()) { - output.append(detailPrefix + "predicates: " + - getExplainString(conjuncts_) + "\n"); - } - } - return output.toString(); - } -}
