Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java?rev=1622216&r1=1622215&r2=1622216&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java Wed Sep 3 10:46:04 2014 @@ -22,16 +22,14 @@ import java.util.Stack; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; -import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; /** - * FileSinkProcessor handles addition of merge, move and stats tasks for filesinks + * FileSinkProcessor is a simple rule to remember seen file sinks for later + * processing. * */ public class FileSinkProcessor implements NodeProcessor { @@ -39,12 +37,6 @@ public class FileSinkProcessor implement static final private Log LOG = LogFactory.getLog(FileSinkProcessor.class.getName()); @Override - /* - * (non-Javadoc) - * we should ideally not modify the tree we traverse. - * However, since we need to walk the tree at any time when we modify the - * operator, we might as well do it here. - */ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException {
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java?rev=1622216&r1=1622215&r2=1622216&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java Wed Sep 3 10:46:04 2014 @@ -26,29 +26,28 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator; import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; -import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; -import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; import org.apache.hadoop.hive.ql.plan.TezWork; /** @@ -134,6 +133,15 @@ public class GenTezProcContext implement // remember which reducesinks we've already connected public final Set<ReduceSinkOperator> connectedReduceSinks; + // remember the event operators we've seen + public final Set<AppMasterEventOperator> eventOperatorSet; + + // remember the event operators we've abandoned. + public final Set<AppMasterEventOperator> abandonedEventOperatorSet; + + // remember the connections between ts and event + public final Map<TableScanOperator, List<AppMasterEventOperator>> tsToEventMap; + @SuppressWarnings("unchecked") public GenTezProcContext(HiveConf conf, ParseContext parseContext, List<Task<MoveWork>> moveTask, List<Task<? extends Serializable>> rootTasks, @@ -165,6 +173,9 @@ public class GenTezProcContext implement this.linkedFileSinks = new LinkedHashMap<Path, List<FileSinkDesc>>(); this.fileSinkSet = new LinkedHashSet<FileSinkOperator>(); this.connectedReduceSinks = new LinkedHashSet<ReduceSinkOperator>(); + this.eventOperatorSet = new LinkedHashSet<AppMasterEventOperator>(); + this.abandonedEventOperatorSet = new LinkedHashSet<AppMasterEventOperator>(); + this.tsToEventMap = new LinkedHashMap<TableScanOperator, List<AppMasterEventOperator>>(); rootTasks.add(currentTask); } Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java?rev=1622216&r1=1622215&r2=1622216&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java Wed Sep 3 10:46:04 2014 @@ -20,38 +20,43 @@ package org.apache.hadoop.hive.ql.parse; import java.util.ArrayList; import java.util.Deque; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.List; import java.util.LinkedList; -import java.util.Map; +import java.util.List; import java.util.Set; -import org.apache.hadoop.fs.Path; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator; import org.apache.hadoop.hive.ql.exec.FetchTask; -import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; +import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.plan.UnionWork; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; + /** * GenTezUtils is a collection of shared helper methods to produce * TezWork @@ -119,12 +124,12 @@ public class GenTezUtils { int maxReducers = context.conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS); // min we allow tez to pick - int minPartition = Math.max(1, (int) (reduceSink.getConf().getNumReducers() + int minPartition = Math.max(1, (int) (reduceSink.getConf().getNumReducers() * minPartitionFactor)); minPartition = (minPartition > maxReducers) ? maxReducers : minPartition; // max we allow tez to pick - int maxPartition = (int) (reduceSink.getConf().getNumReducers() * maxPartitionFactor); + int maxPartition = (int) (reduceSink.getConf().getNumReducers() * maxPartitionFactor); maxPartition = (maxPartition > maxReducers) ? maxReducers : maxPartition; reduceWork.setMinReduceTasks(minPartition); @@ -210,18 +215,20 @@ public class GenTezUtils { BaseWork work) throws SemanticException { - Set<Operator<?>> roots = work.getAllRootOperators(); + List<Operator<?>> roots = new ArrayList<Operator<?>>(); + roots.addAll(work.getAllRootOperators()); if (work.getDummyOps() != null) { roots.addAll(work.getDummyOps()); } + roots.addAll(context.eventOperatorSet); // need to clone the plan. - Set<Operator<?>> newRoots = Utilities.cloneOperatorTree(conf, roots); + List<Operator<?>> newRoots = Utilities.cloneOperatorTree(conf, roots); // we're cloning the operator plan but we're retaining the original work. That means // that root operators have to be replaced with the cloned ops. The replacement map // tells you what that mapping is. - Map<Operator<?>, Operator<?>> replacementMap = new HashMap<Operator<?>, Operator<?>>(); + BiMap<Operator<?>, Operator<?>> replacementMap = HashBiMap.create(); // there's some special handling for dummyOps required. Mapjoins won't be properly // initialized if their dummy parents aren't initialized. Since we cloned the plan @@ -231,11 +238,35 @@ public class GenTezUtils { Iterator<Operator<?>> it = newRoots.iterator(); for (Operator<?> orig: roots) { Operator<?> newRoot = it.next(); + + replacementMap.put(orig, newRoot); + if (newRoot instanceof HashTableDummyOperator) { - dummyOps.add((HashTableDummyOperator)newRoot); + // dummy ops need to be updated to the cloned ones. + dummyOps.add((HashTableDummyOperator) newRoot); + it.remove(); + } else if (newRoot instanceof AppMasterEventOperator) { + // event operators point to table scan operators. When cloning these we + // need to restore the original scan. + if (newRoot.getConf() instanceof DynamicPruningEventDesc) { + TableScanOperator ts = ((DynamicPruningEventDesc) orig.getConf()).getTableScan(); + if (ts == null) { + throw new AssertionError("No table scan associated with dynamic event pruning. " + orig); + } + ((DynamicPruningEventDesc) newRoot.getConf()).setTableScan(ts); + } it.remove(); } else { - replacementMap.put(orig,newRoot); + if (newRoot instanceof TableScanOperator) { + if (context.tsToEventMap.containsKey(orig)) { + // we need to update event operators with the cloned table scan + for (AppMasterEventOperator event : context.tsToEventMap.get(orig)) { + ((DynamicPruningEventDesc) event.getConf()).setTableScan((TableScanOperator) newRoot); + } + } + } + context.rootToWorkMap.remove(orig); + context.rootToWorkMap.put(newRoot, work); } } @@ -272,6 +303,15 @@ public class GenTezUtils { desc.setLinkedFileSinkDesc(linked); } + if (current instanceof AppMasterEventOperator) { + // remember for additional processing later + context.eventOperatorSet.add((AppMasterEventOperator) current); + + // mark the original as abandoned. Don't need it anymore. + context.abandonedEventOperatorSet.add((AppMasterEventOperator) replacementMap.inverse() + .get(current)); + } + if (current instanceof UnionOperator) { Operator<?> parent = null; int count = 0; @@ -337,4 +377,87 @@ public class GenTezUtils { } } } + + /** + * processAppMasterEvent sets up the event descriptor and the MapWork. + * + * @param procCtx + * @param event + */ + public void processAppMasterEvent(GenTezProcContext procCtx, AppMasterEventOperator event) { + + if (procCtx.abandonedEventOperatorSet.contains(event)) { + // don't need this anymore + return; + } + + DynamicPruningEventDesc eventDesc = (DynamicPruningEventDesc)event.getConf(); + TableScanOperator ts = eventDesc.getTableScan(); + + MapWork work = (MapWork) procCtx.rootToWorkMap.get(ts); + if (work == null) { + throw new AssertionError("No work found for tablescan " + ts); + } + + BaseWork enclosingWork = getEnclosingWork(event, procCtx); + if (enclosingWork == null) { + throw new AssertionError("Cannot find work for operator" + event); + } + String sourceName = enclosingWork.getName(); + + // store the vertex name in the operator pipeline + eventDesc.setVertexName(work.getName()); + eventDesc.setInputName(work.getAliases().get(0)); + + // store table descriptor in map-work + if (!work.getEventSourceTableDescMap().containsKey(sourceName)) { + work.getEventSourceTableDescMap().put(sourceName, new LinkedList<TableDesc>()); + } + List<TableDesc> tables = work.getEventSourceTableDescMap().get(sourceName); + tables.add(event.getConf().getTable()); + + // store column name in map-work + if (!work.getEventSourceColumnNameMap().containsKey(sourceName)) { + work.getEventSourceColumnNameMap().put(sourceName, new LinkedList<String>()); + } + List<String> columns = work.getEventSourceColumnNameMap().get(sourceName); + columns.add(eventDesc.getTargetColumnName()); + + // store partition key expr in map-work + if (!work.getEventSourcePartKeyExprMap().containsKey(sourceName)) { + work.getEventSourcePartKeyExprMap().put(sourceName, new LinkedList<ExprNodeDesc>()); + } + List<ExprNodeDesc> keys = work.getEventSourcePartKeyExprMap().get(sourceName); + keys.add(eventDesc.getPartKey()); + + } + + /** + * getEncosingWork finds the BaseWork any given operator belongs to. + */ + public BaseWork getEnclosingWork(Operator<?> op, GenTezProcContext procCtx) { + List<Operator<?>> ops = new ArrayList<Operator<?>>(); + findRoots(op, ops); + for (Operator<?> r : ops) { + BaseWork work = procCtx.rootToWorkMap.get(r); + if (work != null) { + return work; + } + } + return null; + } + + /* + * findRoots returns all root operators (in ops) that result in operator op + */ + private void findRoots(Operator<?> op, List<Operator<?>> ops) { + List<Operator<?>> parents = op.getParentOperators(); + if (parents == null || parents.isEmpty()) { + ops.add(op); + return; + } + for (Operator<?> p : parents) { + findRoots(p, ops); + } + } } Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java?rev=1622216&r1=1622215&r2=1622216&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java Wed Sep 3 10:46:04 2014 @@ -23,13 +23,18 @@ import java.util.HashSet; import java.util.Set; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; + /** * OptimizeTezProcContext. OptimizeTezProcContext maintains information * about the current operator plan as we walk the operator tree @@ -47,19 +52,23 @@ public class OptimizeTezProcContext impl public final Set<ReduceSinkOperator> visitedReduceSinks = new HashSet<ReduceSinkOperator>(); + public final Multimap<AppMasterEventOperator, TableScanOperator> eventOpToTableScanMap = + HashMultimap.create(); + // rootOperators are all the table scan operators in sequence // of traversal - public final Deque<Operator<? extends OperatorDesc>> rootOperators; + public Deque<Operator<? extends OperatorDesc>> rootOperators; - @SuppressWarnings("unchecked") - public OptimizeTezProcContext(HiveConf conf, ParseContext parseContext, - Set<ReadEntity> inputs, Set<WriteEntity> outputs, - Deque<Operator<?>> rootOperators) { + public OptimizeTezProcContext(HiveConf conf, ParseContext parseContext, Set<ReadEntity> inputs, + Set<WriteEntity> outputs) { this.conf = conf; this.parseContext = parseContext; this.inputs = inputs; this.outputs = outputs; - this.rootOperators = rootOperators; + } + + public void setRootOperators(Deque<Operator<? extends OperatorDesc>> roots) { + this.rootOperators = roots; } } Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java?rev=1622216&r1=1622215&r2=1622216&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java Wed Sep 3 10:46:04 2014 @@ -21,20 +21,24 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Deque; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.Stack; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; @@ -52,20 +56,25 @@ import org.apache.hadoop.hive.ql.lib.For import org.apache.hadoop.hive.ql.lib.GraphWalker; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; -import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.optimizer.ConstantPropagate; import org.apache.hadoop.hive.ql.optimizer.ConvertJoinMapJoin; +import org.apache.hadoop.hive.ql.optimizer.DynamicPartitionPruningOptimization; import org.apache.hadoop.hive.ql.optimizer.ReduceSinkMapJoinProc; +import org.apache.hadoop.hive.ql.optimizer.RemoveDynamicPruningBySize; import org.apache.hadoop.hive.ql.optimizer.SetReducerParallelism; +import org.apache.hadoop.hive.ql.optimizer.metainfo.annotation.AnnotateWithOpTraits; import org.apache.hadoop.hive.ql.optimizer.physical.CrossProductCheck; import org.apache.hadoop.hive.ql.optimizer.physical.MetadataOnlyOptimizer; import org.apache.hadoop.hive.ql.optimizer.physical.NullScanOptimizer; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext; -import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer; import org.apache.hadoop.hive.ql.optimizer.physical.StageIDsRearranger; +import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer; +import org.apache.hadoop.hive.ql.optimizer.stats.annotation.AnnotateWithStatistics; import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -85,7 +94,7 @@ public class TezCompiler extends TaskCom @Override public void init(HiveConf conf, LogHelper console, Hive db) { super.init(conf, console, db); - + // Tez requires us to use RPC for the query plan HiveConf.setBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN, true); @@ -98,31 +107,203 @@ public class TezCompiler extends TaskCom protected void optimizeOperatorPlan(ParseContext pCtx, Set<ReadEntity> inputs, Set<WriteEntity> outputs) throws SemanticException { - // Sequence of TableScan operators to be walked + // Create the context for the walker + OptimizeTezProcContext procCtx = new OptimizeTezProcContext(conf, pCtx, inputs, outputs); + + // setup dynamic partition pruning where possible + runDynamicPartitionPruning(procCtx, inputs, outputs); + + // setup stats in the operator plan + runStatsAnnotation(procCtx); + + // run the optimizations that use stats for optimization + runStatsDependentOptimizations(procCtx, inputs, outputs); + + // after the stats phase we might have some cyclic dependencies that we need + // to take care of. + runCycleAnalysisForPartitionPruning(procCtx, inputs, outputs); + + } + + private void runCycleAnalysisForPartitionPruning(OptimizeTezProcContext procCtx, + Set<ReadEntity> inputs, Set<WriteEntity> outputs) throws SemanticException { + + if (!procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING)) { + return; + } + + boolean cycleFree = false; + while (!cycleFree) { + cycleFree = true; + Set<Set<Operator<?>>> components = getComponents(procCtx); + for (Set<Operator<?>> component : components) { + if (LOG.isDebugEnabled()) { + LOG.debug("Component: "); + for (Operator<?> co : component) { + LOG.debug("Operator: " + co.getName() + ", " + co.getIdentifier()); + } + } + if (component.size() != 1) { + LOG.info("Found cycle in operator plan..."); + cycleFree = false; + removeEventOperator(component); + } + } + LOG.info("Cycle free: " + cycleFree); + } + } + + private void removeEventOperator(Set<Operator<?>> component) { + AppMasterEventOperator victim = null; + for (Operator<?> o : component) { + if (o instanceof AppMasterEventOperator) { + if (victim == null + || o.getConf().getStatistics().getDataSize() < victim.getConf().getStatistics() + .getDataSize()) { + victim = (AppMasterEventOperator) o; + } + } + } + + Operator<?> child = victim; + Operator<?> curr = victim; + + while (curr.getChildOperators().size() <= 1) { + child = curr; + curr = curr.getParentOperators().get(0); + } + + // at this point we've found the fork in the op pipeline that has the + // pruning as a child plan. + LOG.info("Disabling dynamic pruning for: " + + ((DynamicPruningEventDesc) victim.getConf()).getTableScan().toString() + + ". Needed to break cyclic dependency"); + curr.removeChild(child); + } + + // Tarjan's algo + private Set<Set<Operator<?>>> getComponents(OptimizeTezProcContext procCtx) { Deque<Operator<?>> deque = new LinkedList<Operator<?>>(); - deque.addAll(pCtx.getTopOps().values()); + deque.addAll(procCtx.parseContext.getTopOps().values()); - // Create the context for the walker - OptimizeTezProcContext procCtx - = new OptimizeTezProcContext(conf, pCtx, inputs, outputs, deque); + AtomicInteger index = new AtomicInteger(); + Map<Operator<?>, Integer> indexes = new HashMap<Operator<?>, Integer>(); + Map<Operator<?>, Integer> lowLinks = new HashMap<Operator<?>, Integer>(); + Stack<Operator<?>> nodes = new Stack<Operator<?>>(); + Set<Set<Operator<?>>> components = new HashSet<Set<Operator<?>>>(); + + for (Operator<?> o : deque) { + if (!indexes.containsKey(o)) { + connect(o, index, nodes, indexes, lowLinks, components); + } + } + + return components; + } + + private void connect(Operator<?> o, AtomicInteger index, Stack<Operator<?>> nodes, + Map<Operator<?>, Integer> indexes, Map<Operator<?>, Integer> lowLinks, + Set<Set<Operator<?>>> components) { + + indexes.put(o, index.get()); + lowLinks.put(o, index.get()); + index.incrementAndGet(); + nodes.push(o); + + List<Operator<?>> children; + if (o instanceof AppMasterEventOperator) { + children = new ArrayList<Operator<?>>(); + children.addAll(o.getChildOperators()); + TableScanOperator ts = ((DynamicPruningEventDesc) o.getConf()).getTableScan(); + LOG.debug("Adding special edge: " + o.getName() + " --> " + ts.toString()); + children.add(ts); + } else { + children = o.getChildOperators(); + } + + for (Operator<?> child : children) { + if (!indexes.containsKey(child)) { + connect(child, index, nodes, indexes, lowLinks, components); + lowLinks.put(child, Math.min(lowLinks.get(o), lowLinks.get(child))); + } else if (nodes.contains(child)) { + lowLinks.put(o, Math.min(lowLinks.get(o), indexes.get(child))); + } + } + + if (lowLinks.get(o).equals(indexes.get(o))) { + Set<Operator<?>> component = new HashSet<Operator<?>>(); + components.add(component); + Operator<?> current; + do { + current = nodes.pop(); + component.add(current); + } while (current != o); + } + } + + private void runStatsAnnotation(OptimizeTezProcContext procCtx) throws SemanticException { + new AnnotateWithStatistics().transform(procCtx.parseContext); + new AnnotateWithOpTraits().transform(procCtx.parseContext); + } + + private void runStatsDependentOptimizations(OptimizeTezProcContext procCtx, + Set<ReadEntity> inputs, Set<WriteEntity> outputs) throws SemanticException { + + // Sequence of TableScan operators to be walked + Deque<Operator<?>> deque = new LinkedList<Operator<?>>(); + deque.addAll(procCtx.parseContext.getTopOps().values()); // create a walker which walks the tree in a DFS manner while maintaining // the operator stack. Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>(); - opRules.put(new RuleRegExp(new String("Set parallelism - ReduceSink"), + opRules.put(new RuleRegExp("Set parallelism - ReduceSink", ReduceSinkOperator.getOperatorName() + "%"), new SetReducerParallelism()); - opRules.put(new RuleRegExp(new String("Convert Join to Map-join"), + opRules.put(new RuleRegExp("Convert Join to Map-join", JoinOperator.getOperatorName() + "%"), new ConvertJoinMapJoin()); + opRules.put( + new RuleRegExp("Remove dynamic pruning by size", + AppMasterEventOperator.getOperatorName() + "%"), + new RemoveDynamicPruningBySize()); + // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx); List<Node> topNodes = new ArrayList<Node>(); - topNodes.addAll(pCtx.getTopOps().values()); + topNodes.addAll(procCtx.parseContext.getTopOps().values()); + GraphWalker ogw = new ForwardWalker(disp); + ogw.startWalking(topNodes, null); + } + + private void runDynamicPartitionPruning(OptimizeTezProcContext procCtx, Set<ReadEntity> inputs, + Set<WriteEntity> outputs) throws SemanticException { + + if (!procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING)) { + return; + } + + // Sequence of TableScan operators to be walked + Deque<Operator<?>> deque = new LinkedList<Operator<?>>(); + deque.addAll(procCtx.parseContext.getTopOps().values()); + + Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>(); + opRules.put( + new RuleRegExp(new String("Dynamic Partition Pruning"), FilterOperator.getOperatorName() + + "%"), new DynamicPartitionPruningOptimization()); + + // The dispatcher fires the processor corresponding to the closest matching + // rule and passes the context along + Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx); + List<Node> topNodes = new ArrayList<Node>(); + topNodes.addAll(procCtx.parseContext.getTopOps().values()); GraphWalker ogw = new ForwardWalker(disp); ogw.startWalking(topNodes, null); + + // need a new run of the constant folding because we might have created lots + // of "and true and true" conditions. + new ConstantPropagate().transform(procCtx.parseContext); } @Override @@ -158,19 +339,12 @@ public class TezCompiler extends TaskCom new ProcessAnalyzeTable(GenTezUtils.getUtils())); opRules.put(new RuleRegExp("Remember union", - UnionOperator.getOperatorName() + "%"), new NodeProcessor() - { - @Override - public Object process(Node n, Stack<Node> s, - NodeProcessorCtx procCtx, Object... os) throws SemanticException { - GenTezProcContext context = (GenTezProcContext) procCtx; - UnionOperator union = (UnionOperator) n; - - // simply need to remember that we've seen a union. - context.currentUnionOperators.add(union); - return null; - } - }); + UnionOperator.getOperatorName() + "%"), + new UnionProcessor()); + + opRules.put(new RuleRegExp("AppMasterEventOperator", + AppMasterEventOperator.getOperatorName() + "%"), + new AppMasterEventProcessor()); // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along @@ -185,10 +359,17 @@ public class TezCompiler extends TaskCom GenTezUtils.getUtils().removeUnionOperators(conf, procCtx, w); } - // finally make sure the file sink operators are set up right + // then we make sure the file sink operators are set up right for (FileSinkOperator fileSink: procCtx.fileSinkSet) { GenTezUtils.getUtils().processFileSink(procCtx, fileSink); } + + // and finally we hook up any events that need to be sent to the tez AM + LOG.debug("There are " + procCtx.eventOperatorSet.size() + " app master events."); + for (AppMasterEventOperator event : procCtx.eventOperatorSet) { + LOG.debug("Handling AppMasterEventOperator: " + event); + GenTezUtils.getUtils().processAppMasterEvent(procCtx, event); + } } @Override Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/UnionProcessor.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/UnionProcessor.java?rev=1622216&view=auto ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/UnionProcessor.java (added) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/UnionProcessor.java Wed Sep 3 10:46:04 2014 @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse; + +import java.util.Stack; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.UnionOperator; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; + +/** + * FileSinkProcessor is a simple rule to remember seen unions for later + * processing. + * + */ +public class UnionProcessor implements NodeProcessor { + + static final private Log LOG = LogFactory.getLog(UnionProcessor.class.getName()); + + @Override + public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs) + throws SemanticException { + GenTezProcContext context = (GenTezProcContext) procCtx; + UnionOperator union = (UnionOperator) nd; + + // simply need to remember that we've seen a union. + context.currentUnionOperators.add(union); + return null; + } +} Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/AppMasterEventDesc.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/AppMasterEventDesc.java?rev=1622216&view=auto ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/AppMasterEventDesc.java (added) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/AppMasterEventDesc.java Wed Sep 3 10:46:04 2014 @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.IOException; + +import org.apache.hadoop.io.DataOutputBuffer; + + +@SuppressWarnings("serial") +@Explain(displayName = "Application Master Event Operator") +public class AppMasterEventDesc extends AbstractOperatorDesc { + + private TableDesc table; + private String vertexName; + private String inputName; + + @Explain(displayName = "Target Vertex") + public String getVertexName() { + return vertexName; + } + + @Explain(displayName = "Target Input") + public String getInputName() { + return inputName; + } + + public void setInputName(String inputName) { + this.inputName = inputName; + } + + public void setVertexName(String vertexName) { + this.vertexName = vertexName; + } + + public TableDesc getTable() { + return table; + } + + public void setTable(TableDesc table) { + this.table = table; + } + + public void writeEventHeader(DataOutputBuffer buffer) throws IOException { + // nothing to add + } +} Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPruningEventDesc.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPruningEventDesc.java?rev=1622216&view=auto ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPruningEventDesc.java (added) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPruningEventDesc.java Wed Sep 3 10:46:04 2014 @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.IOException; + +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.io.DataOutputBuffer; + +@SuppressWarnings("serial") +@Explain(displayName = "Dynamic Partitioning Event Operator") +public class DynamicPruningEventDesc extends AppMasterEventDesc { + + // column in the target table that will be pruned against + private String targetColumnName; + + // tableScan is only available during compile + private transient TableScanOperator tableScan; + + // the partition column we're interested in + private ExprNodeDesc partKey; + + public TableScanOperator getTableScan() { + return tableScan; + } + + public void setTableScan(TableScanOperator tableScan) { + this.tableScan = tableScan; + } + + @Explain(displayName = "Target column") + public String getTargetColumnName() { + return targetColumnName; + } + + public void setTargetColumnName(String columnName) { + this.targetColumnName = columnName; + } + + @Override + public void writeEventHeader(DataOutputBuffer buffer) throws IOException { + super.writeEventHeader(buffer); + buffer.writeUTF(targetColumnName); + } + + public void setPartKey(ExprNodeDesc partKey) { + this.partKey = partKey; + } + + @Explain(displayName = "Partition key expr") + public String getPartKeyString() { + return this.partKey.getExprString(); + } + + public ExprNodeDesc getPartKey() { + return this.partKey; + } +} Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDynamicListDesc.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDynamicListDesc.java?rev=1622216&view=auto ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDynamicListDesc.java (added) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDynamicListDesc.java Wed Sep 3 10:46:04 2014 @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + +/** + * This expression represents a list that will be available at runtime. + */ +@SuppressWarnings("serial") +public class ExprNodeDynamicListDesc extends ExprNodeDesc { + + Operator<? extends OperatorDesc> source; + int keyIndex; + + public ExprNodeDynamicListDesc() { + } + + public ExprNodeDynamicListDesc(TypeInfo typeInfo, Operator<? extends OperatorDesc> source, int keyIndex) { + super(typeInfo); + this.source = source; + this.keyIndex = keyIndex; + } + + public void setSource(Operator<? extends OperatorDesc> source) { + this.source = source; + } + + public Operator<? extends OperatorDesc> getSource() { + return source; + } + + public void setKeyIndex(int keyIndex) { + this.keyIndex = keyIndex; + } + + public int getKeyIndex() { + return this.keyIndex; + } + + @Override + public ExprNodeDesc clone() { + ExprNodeDynamicListDesc clone = new ExprNodeDynamicListDesc(typeInfo, source, keyIndex); + return clone; + } + + @Override + public boolean isSame(Object o) { + if (o instanceof ExprNodeDynamicListDesc) { + return source.equals(((ExprNodeDynamicListDesc)o).getSource()); + } + return false; + } + + @Override + public String getExprString() { + return source.toString(); + } + + @Override + public String toString() { + return source.toString(); + } +} Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java?rev=1622216&r1=1622215&r2=1622216&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java Wed Sep 3 10:46:04 2014 @@ -26,9 +26,9 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; -import java.util.Set; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -118,6 +118,14 @@ public class MapWork extends BaseWork { private boolean dummyTableScan = false; + // used for dynamic partitioning + private Map<String, List<TableDesc>> eventSourceTableDescMap = + new LinkedHashMap<String, List<TableDesc>>(); + private Map<String, List<String>> eventSourceColumnNameMap = + new LinkedHashMap<String, List<String>>(); + private Map<String, List<ExprNodeDesc>> eventSourcePartKeyExprMap = + new LinkedHashMap<String, List<ExprNodeDesc>>(); + public MapWork() {} public MapWork(String name) { @@ -535,4 +543,28 @@ public class MapWork extends BaseWork { public boolean getDummyTableScan() { return dummyTableScan; } + + public void setEventSourceTableDescMap(Map<String, List<TableDesc>> map) { + this.eventSourceTableDescMap = map; + } + + public Map<String, List<TableDesc>> getEventSourceTableDescMap() { + return eventSourceTableDescMap; + } + + public void setEventSourceColumnNameMap(Map<String, List<String>> map) { + this.eventSourceColumnNameMap = map; + } + + public Map<String, List<String>> getEventSourceColumnNameMap() { + return eventSourceColumnNameMap; + } + + public Map<String, List<ExprNodeDesc>> getEventSourcePartKeyExprMap() { + return eventSourcePartKeyExprMap; + } + + public void setEventSourcePartKeyExprMap(Map<String, List<ExprNodeDesc>> map) { + this.eventSourcePartKeyExprMap = map; + } } Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java?rev=1622216&r1=1622215&r2=1622216&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java Wed Sep 3 10:46:04 2014 @@ -20,8 +20,8 @@ package org.apache.hadoop.hive.ql.plan; import java.util.ArrayList; import java.util.HashMap; -import java.util.LinkedHashSet; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -32,13 +32,9 @@ import org.apache.hadoop.hive.ql.exec.Fi import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.lib.NodeProcessor; -import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.serde2.Deserializer; -import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.mapred.JobConf; @@ -99,7 +95,7 @@ public class ReduceWork extends BaseWork private ObjectInspector keyObjectInspector = null; private ObjectInspector valueObjectInspector = null; - private Map<String, Integer> reduceColumnNameMap = new LinkedHashMap<String, Integer>(); + private final Map<String, Integer> reduceColumnNameMap = new LinkedHashMap<String, Integer>(); /** * If the plan has a reducer and correspondingly a reduce-sink, then store the TableDesc pointing @@ -118,7 +114,7 @@ public class ReduceWork extends BaseWork private ObjectInspector getObjectInspector(TableDesc desc) { ObjectInspector objectInspector; try { - Deserializer deserializer = (SerDe) ReflectionUtils.newInstance(desc + Deserializer deserializer = ReflectionUtils.newInstance(desc .getDeserializerClass(), null); SerDeUtils.initializeSerDe(deserializer, null, desc.getProperties(), null); objectInspector = deserializer.getObjectInspector(); @@ -239,7 +235,6 @@ public class ReduceWork extends BaseWork @Override public void replaceRoots(Map<Operator<?>, Operator<?>> replacementMap) { - assert replacementMap.size() == 1; setReducer(replacementMap.get(getReducer())); } Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java?rev=1622216&r1=1622215&r2=1622216&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java Wed Sep 3 10:46:04 2014 @@ -906,11 +906,8 @@ public final class OpProcFactory { } ExprNodeDesc condn = ExprNodeDescUtils.mergePredicates(preds); - if(!(condn instanceof ExprNodeGenericFuncDesc)) { - return null; - } - if (op instanceof TableScanOperator) { + if (op instanceof TableScanOperator && condn instanceof ExprNodeGenericFuncDesc) { boolean pushFilterToStorage; HiveConf hiveConf = owi.getParseContext().getConf(); pushFilterToStorage = Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java?rev=1622216&view=auto ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java (added) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java Wed Sep 3 10:46:04 2014 @@ -0,0 +1,283 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.ppd; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Stack; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.exec.CommonJoinOperator; +import org.apache.hadoop.hive.ql.exec.FunctionRegistry; +import org.apache.hadoop.hive.ql.exec.JoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorFactory; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.RowSchema; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.PreOrderWalker; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.optimizer.Transform; +import org.apache.hadoop.hive.ql.parse.OpParseContext; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.RowResolver; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicListDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.FilterDesc; +import org.apache.hadoop.hive.ql.plan.JoinCondDesc; +import org.apache.hadoop.hive.ql.plan.JoinDesc; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; + +/** + * creates synthetic predicates that represent "IN (keylist other table)" + */ +public class SyntheticJoinPredicate implements Transform { + + private static transient Log LOG = LogFactory.getLog(SyntheticJoinPredicate.class.getName()); + + @Override + public ParseContext transform(ParseContext pctx) throws SemanticException { + + if (!pctx.getConf().getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") + || !pctx.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING)) { + return pctx; + } + + Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>(); + opRules.put(new RuleRegExp("R1", "(" + + TableScanOperator.getOperatorName() + "%" + ".*" + + ReduceSinkOperator.getOperatorName() + "%" + + JoinOperator.getOperatorName() + "%)"), new JoinSynthetic()); + + // The dispatcher fires the processor corresponding to the closest matching + // rule and passes the context along + SyntheticContext context = new SyntheticContext(pctx); + Dispatcher disp = new DefaultRuleDispatcher(null, opRules, context); + GraphWalker ogw = new PreOrderWalker(disp); + + // Create a list of top op nodes + List<Node> topNodes = new ArrayList<Node>(); + topNodes.addAll(pctx.getTopOps().values()); + ogw.startWalking(topNodes, null); + + return pctx; + } + + // insert filter operator between target(child) and input(parent) + private static Operator<FilterDesc> createFilter(Operator<?> target, Operator<?> parent, + RowResolver parentRR, ExprNodeDesc filterExpr) { + Operator<FilterDesc> filter = OperatorFactory.get(new FilterDesc(filterExpr, false), + new RowSchema(parentRR.getColumnInfos())); + filter.setParentOperators(new ArrayList<Operator<? extends OperatorDesc>>()); + filter.setChildOperators(new ArrayList<Operator<? extends OperatorDesc>>()); + filter.getParentOperators().add(parent); + filter.getChildOperators().add(target); + parent.replaceChild(target, filter); + target.replaceParent(parent, filter); + return filter; + } + + private static class SyntheticContext implements NodeProcessorCtx { + + ParseContext parseContext; + + public SyntheticContext(ParseContext pCtx) { + parseContext = pCtx; + } + + public ParseContext getParseContext() { + return parseContext; + } + } + + private static class JoinSynthetic implements NodeProcessor { + @Override + public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + + ParseContext pCtx = ((SyntheticContext) procCtx).getParseContext(); + + @SuppressWarnings("unchecked") + CommonJoinOperator<JoinDesc> join = (CommonJoinOperator<JoinDesc>) nd; + + ReduceSinkOperator source = (ReduceSinkOperator) stack.get(stack.size() - 2); + int srcPos = join.getParentOperators().indexOf(source); + + List<Operator<? extends OperatorDesc>> parents = join.getParentOperators(); + + int[][] targets = getTargets(join); + + Operator<? extends OperatorDesc> parent = source.getParentOperators().get(0); + RowResolver parentRR = pCtx.getOpParseCtx().get(parent).getRowResolver(); + + // don't generate for null-safes. + if (join.getConf().getNullSafes() != null) { + for (boolean b : join.getConf().getNullSafes()) { + if (b) { + return null; + } + } + } + + for (int targetPos: targets[srcPos]) { + if (srcPos == targetPos) { + continue; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Synthetic predicate: " + srcPos + " --> " + targetPos); + } + ReduceSinkOperator target = (ReduceSinkOperator) parents.get(targetPos); + List<ExprNodeDesc> sourceKeys = source.getConf().getKeyCols(); + List<ExprNodeDesc> targetKeys = target.getConf().getKeyCols(); + + if (sourceKeys.size() < 1) { + continue; + } + + ExprNodeDesc syntheticExpr = null; + + for (int i = 0; i < sourceKeys.size(); ++i) { + List<ExprNodeDesc> inArgs = new ArrayList<ExprNodeDesc>(); + inArgs.add(sourceKeys.get(i)); + + ExprNodeDynamicListDesc dynamicExpr = + new ExprNodeDynamicListDesc(targetKeys.get(i).getTypeInfo(), target, i); + + inArgs.add(dynamicExpr); + + ExprNodeDesc syntheticInExpr = + ExprNodeGenericFuncDesc.newInstance(FunctionRegistry.getFunctionInfo("in") + .getGenericUDF(), inArgs); + + if (syntheticExpr != null) { + List<ExprNodeDesc> andArgs = new ArrayList<ExprNodeDesc>(); + andArgs.add(syntheticExpr); + andArgs.add(syntheticInExpr); + + syntheticExpr = + ExprNodeGenericFuncDesc.newInstance(FunctionRegistry.getFunctionInfo("and") + .getGenericUDF(), andArgs); + } else { + syntheticExpr = syntheticInExpr; + } + } + + Operator<FilterDesc> newFilter = createFilter(source, parent, parentRR, syntheticExpr); + pCtx.getOpParseCtx().put(newFilter, new OpParseContext(parentRR)); + parent = newFilter; + } + + return null; + } + + // calculate filter propagation directions for each alias + // L<->R for inner/semi join, L<-R for left outer join, R<-L for right outer + // join + private int[][] getTargets(CommonJoinOperator<JoinDesc> join) { + JoinCondDesc[] conds = join.getConf().getConds(); + + int aliases = conds.length + 1; + Vectors vector = new Vectors(aliases); + for (JoinCondDesc cond : conds) { + int left = cond.getLeft(); + int right = cond.getRight(); + switch (cond.getType()) { + case JoinDesc.INNER_JOIN: + case JoinDesc.LEFT_SEMI_JOIN: + vector.add(left, right); + vector.add(right, left); + break; + case JoinDesc.LEFT_OUTER_JOIN: + vector.add(right, left); + break; + case JoinDesc.RIGHT_OUTER_JOIN: + vector.add(left, right); + break; + case JoinDesc.FULL_OUTER_JOIN: + break; + } + } + int[][] result = new int[aliases][]; + for (int pos = 0 ; pos < aliases; pos++) { + // find all targets recursively + result[pos] = vector.traverse(pos); + } + return result; + } + } + + private static class Vectors { + + private final Set<Integer>[] vector; + + @SuppressWarnings("unchecked") + public Vectors(int length) { + vector = new Set[length]; + } + + public void add(int from, int to) { + if (vector[from] == null) { + vector[from] = new HashSet<Integer>(); + } + vector[from].add(to); + } + + public int[] traverse(int pos) { + Set<Integer> targets = new HashSet<Integer>(); + traverse(targets, pos); + return toArray(targets); + } + + private int[] toArray(Set<Integer> values) { + int index = 0; + int[] result = new int[values.size()]; + for (int value : values) { + result[index++] = value; + } + return result; + } + + private void traverse(Set<Integer> targets, int pos) { + if (vector[pos] == null) { + return; + } + for (int target : vector[pos]) { + if (targets.add(target)) { + traverse(targets, target); + } + } + } + } +} Added: hive/branches/tez/ql/src/test/queries/clientpositive/dynamic_partition_pruning.q URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/queries/clientpositive/dynamic_partition_pruning.q?rev=1622216&view=auto ============================================================================== --- hive/branches/tez/ql/src/test/queries/clientpositive/dynamic_partition_pruning.q (added) +++ hive/branches/tez/ql/src/test/queries/clientpositive/dynamic_partition_pruning.q Wed Sep 3 10:46:04 2014 @@ -0,0 +1,191 @@ +set hive.optimize.ppd=true; +set hive.ppd.remove.duplicatefilters=true; +set hive.tez.dynamic.partition.pruning=true; +set hive.optimize.metadataonly=false; +set hive.optimize.index.filter=true; + + +select distinct ds from srcpart; +select distinct hr from srcpart; + +EXPLAIN create table srcpart_date as select ds as ds, ds as date from srcpart group by ds; +create table srcpart_date as select ds as ds, ds as date from srcpart group by ds; +create table srcpart_hour as select hr as hr, hr as hour from srcpart group by hr; +create table srcpart_date_hour as select ds as ds, ds as date, hr as hr, hr as hour from srcpart group by ds, hr; +create table srcpart_double_hour as select (hr*2) as hr, hr as hour from srcpart group by hr; + +-- single column, single key +EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08'; +select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08'; +set hive.tez.dynamic.partition.pruning=false; +EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08'; +select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08'; +set hive.tez.dynamic.partition.pruning=true; +select count(*) from srcpart where ds = '2008-04-08'; + +-- multiple sources, single key +EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr) +where srcpart_date.date = '2008-04-08' and srcpart_hour.hour = 11; +select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr) +where srcpart_date.date = '2008-04-08' and srcpart_hour.hour = 11; +set hive.tez.dynamic.partition.pruning=false; +EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr) +where srcpart_date.date = '2008-04-08' and srcpart_hour.hour = 11; +select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr) +where srcpart_date.date = '2008-04-08' and srcpart_hour.hour = 11; +set hive.tez.dynamic.partition.pruning=true; +select count(*) from srcpart where hr = 11 and ds = '2008-04-08'; + +-- multiple columns single source +EXPLAIN select count(*) from srcpart join srcpart_date_hour on (srcpart.ds = srcpart_date_hour.ds and srcpart.hr = srcpart_date_hour.hr) where srcpart_date_hour.date = '2008-04-08' and srcpart_date_hour.hour = 11; +select count(*) from srcpart join srcpart_date_hour on (srcpart.ds = srcpart_date_hour.ds and srcpart.hr = srcpart_date_hour.hr) where srcpart_date_hour.date = '2008-04-08' and srcpart_date_hour.hour = 11; +set hive.tez.dynamic.partition.pruning=false; +EXPLAIN select count(*) from srcpart join srcpart_date_hour on (srcpart.ds = srcpart_date_hour.ds and srcpart.hr = srcpart_date_hour.hr) where srcpart_date_hour.date = '2008-04-08' and srcpart_date_hour.hour = 11; +select count(*) from srcpart join srcpart_date_hour on (srcpart.ds = srcpart_date_hour.ds and srcpart.hr = srcpart_date_hour.hr) where srcpart_date_hour.date = '2008-04-08' and srcpart_date_hour.hour = 11; +set hive.tez.dynamic.partition.pruning=true; +select count(*) from srcpart where ds = '2008-04-08' and hr = 11; + +-- empty set +EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = 'I DONT EXIST'; +select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = 'I DONT EXIST'; +set hive.tez.dynamic.partition.pruning=false; +EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = 'I DONT EXIST'; +select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = 'I DONT EXIST'; +set hive.tez.dynamic.partition.pruning=true; +select count(*) from srcpart where ds = 'I DONT EXIST'; + +-- expressions +EXPLAIN select count(*) from srcpart join srcpart_double_hour on (srcpart.hr = cast(srcpart_double_hour.hr/2 as int)) where srcpart_double_hour.hour = 11; +select count(*) from srcpart join srcpart_double_hour on (srcpart.hr = cast(srcpart_double_hour.hr/2 as int)) where srcpart_double_hour.hour = 11; +EXPLAIN select count(*) from srcpart join srcpart_double_hour on (srcpart.hr*2 = srcpart_double_hour.hr) where srcpart_double_hour.hour = 11; +select count(*) from srcpart join srcpart_double_hour on (srcpart.hr*2 = srcpart_double_hour.hr) where srcpart_double_hour.hour = 11; +set hive.tez.dynamic.partition.pruning=false; +EXPLAIN select count(*) from srcpart join srcpart_double_hour on (srcpart.hr = cast(srcpart_double_hour.hr/2 as int)) where srcpart_double_hour.hour = 11; +select count(*) from srcpart join srcpart_double_hour on (srcpart.hr = cast(srcpart_double_hour.hr/2 as int)) where srcpart_double_hour.hour = 11; +EXPLAIN select count(*) from srcpart join srcpart_double_hour on (srcpart.hr*2 = srcpart_double_hour.hr) where srcpart_double_hour.hour = 11; +select count(*) from srcpart join srcpart_double_hour on (srcpart.hr*2 = srcpart_double_hour.hr) where srcpart_double_hour.hour = 11; +set hive.tez.dynamic.partition.pruning=true; +select count(*) from srcpart where hr = 11; +EXPLAIN select count(*) from srcpart join srcpart_double_hour on (cast(srcpart.hr*2 as string) = cast(srcpart_double_hour.hr as string)) where srcpart_double_hour.hour = 11; +select count(*) from srcpart join srcpart_double_hour on (cast(srcpart.hr*2 as string) = cast(srcpart_double_hour.hr as string)) where srcpart_double_hour.hour = 11; +set hive.tez.dynamic.partition.pruning=true; +select count(*) from srcpart where cast(hr as string) = 11; + + +-- parent is reduce tasks +EXPLAIN select count(*) from srcpart join (select ds as ds, ds as date from srcpart group by ds) s on (srcpart.ds = s.ds) where s.date = '2008-04-08'; +select count(*) from srcpart join (select ds as ds, ds as date from srcpart group by ds) s on (srcpart.ds = s.ds) where s.date = '2008-04-08'; +select count(*) from srcpart where ds = '2008-04-08'; + +-- non-equi join +EXPLAIN select count(*) from srcpart, srcpart_date_hour where (srcpart_date_hour.date = '2008-04-08' and srcpart_date_hour.hour = 11) and (srcpart.ds = srcpart_date_hour.ds or srcpart.hr = srcpart_date_hour.hr); +select count(*) from srcpart, srcpart_date_hour where (srcpart_date_hour.date = '2008-04-08' and srcpart_date_hour.hour = 11) and (srcpart.ds = srcpart_date_hour.ds or srcpart.hr = srcpart_date_hour.hr); + +-- old style join syntax +EXPLAIN select count(*) from srcpart, srcpart_date_hour where srcpart_date_hour.date = '2008-04-08' and srcpart_date_hour.hour = 11 and srcpart.ds = srcpart_date_hour.ds and srcpart.hr = srcpart_date_hour.hr; +select count(*) from srcpart, srcpart_date_hour where srcpart_date_hour.date = '2008-04-08' and srcpart_date_hour.hour = 11 and srcpart.ds = srcpart_date_hour.ds and srcpart.hr = srcpart_date_hour.hr; + +-- left join +EXPLAIN select count(*) from srcpart left join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08'; +EXPLAIN select count(*) from srcpart_date left join srcpart on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08'; + +-- full outer +EXPLAIN select count(*) from srcpart full outer join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08'; + +-- with static pruning +EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr) +where srcpart_date.date = '2008-04-08' and srcpart_hour.hour = 11 and srcpart.hr = 11; +select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr) +where srcpart_date.date = '2008-04-08' and srcpart_hour.hour = 11 and srcpart.hr = 11; +EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr) +where srcpart_date.date = '2008-04-08' and srcpart.hr = 13; +select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr) +where srcpart_date.date = '2008-04-08' and srcpart.hr = 13; + +-- union + subquery +EXPLAIN select count(*) from srcpart where srcpart.ds in (select max(srcpart.ds) from srcpart union all select min(srcpart.ds) from srcpart); +select count(*) from srcpart where srcpart.ds in (select max(srcpart.ds) from srcpart union all select min(srcpart.ds) from srcpart); +EXPLAIN select distinct(ds) from srcpart where srcpart.ds in (select max(srcpart.ds) from srcpart union all select min(srcpart.ds) from srcpart); +select distinct(ds) from srcpart where srcpart.ds in (select max(srcpart.ds) from srcpart union all select min(srcpart.ds) from srcpart); +EXPLAIN select ds from (select distinct(ds) as ds from srcpart union all select distinct(ds) as ds from srcpart) s where s.ds in (select max(srcpart.ds) from srcpart union all select min(srcpart.ds) from srcpart); +select ds from (select distinct(ds) as ds from srcpart union all select distinct(ds) as ds from srcpart) s where s.ds in (select max(srcpart.ds) from srcpart union all select min(srcpart.ds) from srcpart); + +set hive.auto.convert.join=true; +set hive.auto.convert.join.noconditionaltask = true; +set hive.auto.convert.join.noconditionaltask.size = 10000000; + +-- single column, single key +EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08'; +select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08'; +select count(*) from srcpart where ds = '2008-04-08'; + +-- multiple sources, single key +EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr) +where srcpart_date.date = '2008-04-08' and srcpart_hour.hour = 11; +select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr) +where srcpart_date.date = '2008-04-08' and srcpart_hour.hour = 11; +select count(*) from srcpart where hr = 11 and ds = '2008-04-08'; + +-- multiple columns single source +EXPLAIN select count(*) from srcpart join srcpart_date_hour on (srcpart.ds = srcpart_date_hour.ds and srcpart.hr = srcpart_date_hour.hr) where srcpart_date_hour.date = '2008-04-08' and srcpart_date_hour.hour = 11; +select count(*) from srcpart join srcpart_date_hour on (srcpart.ds = srcpart_date_hour.ds and srcpart.hr = srcpart_date_hour.hr) where srcpart_date_hour.date = '2008-04-08' and srcpart_date_hour.hour = 11; +select count(*) from srcpart where ds = '2008-04-08' and hr = 11; + +-- empty set +EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = 'I DONT EXIST'; +-- Disabled until TEZ-1486 is fixed +-- select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = 'I DONT EXIST'; + +-- expressions +EXPLAIN select count(*) from srcpart join srcpart_double_hour on (srcpart.hr = cast(srcpart_double_hour.hr/2 as int)) where srcpart_double_hour.hour = 11; +select count(*) from srcpart join srcpart_double_hour on (srcpart.hr = cast(srcpart_double_hour.hr/2 as int)) where srcpart_double_hour.hour = 11; +EXPLAIN select count(*) from srcpart join srcpart_double_hour on (srcpart.hr*2 = srcpart_double_hour.hr) where srcpart_double_hour.hour = 11; +select count(*) from srcpart join srcpart_double_hour on (srcpart.hr*2 = srcpart_double_hour.hr) where srcpart_double_hour.hour = 11; +select count(*) from srcpart where hr = 11; + +-- parent is reduce tasks +EXPLAIN select count(*) from srcpart join (select ds as ds, ds as date from srcpart group by ds) s on (srcpart.ds = s.ds) where s.date = '2008-04-08'; +select count(*) from srcpart join (select ds as ds, ds as date from srcpart group by ds) s on (srcpart.ds = s.ds) where s.date = '2008-04-08'; +select count(*) from srcpart where ds = '2008-04-08'; + +-- left join +EXPLAIN select count(*) from srcpart left join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08'; +EXPLAIN select count(*) from srcpart_date left join srcpart on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08'; + +-- full outer +EXPLAIN select count(*) from srcpart full outer join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08'; + +-- with static pruning +EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr) +where srcpart_date.date = '2008-04-08' and srcpart_hour.hour = 11 and srcpart.hr = 11; +select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr) +where srcpart_date.date = '2008-04-08' and srcpart_hour.hour = 11 and srcpart.hr = 11; +EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr) +where srcpart_date.date = '2008-04-08' and srcpart.hr = 13; +-- Disabled until TEZ-1486 is fixed +-- select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr) +-- where srcpart_date.date = '2008-04-08' and srcpart.hr = 13; + +-- union + subquery +EXPLAIN select distinct(ds) from srcpart where srcpart.ds in (select max(srcpart.ds) from srcpart union all select min(srcpart.ds) from srcpart); +select distinct(ds) from srcpart where srcpart.ds in (select max(srcpart.ds) from srcpart union all select min(srcpart.ds) from srcpart); + + +-- different file format +create table srcpart_orc (key int, value string) partitioned by (ds string, hr int) stored as orc; + + +set hive.exec.dynamic.partition.mode=nonstrict; +set hive.vectorized.execution.enabled=false; +set hive.exec.max.dynamic.partitions=1000; + +insert into table srcpart_orc partition (ds, hr) select key, value, ds, hr from srcpart; +EXPLAIN select count(*) from srcpart_orc join srcpart_date_hour on (srcpart_orc.ds = srcpart_date_hour.ds and srcpart_orc.hr = srcpart_date_hour.hr) where srcpart_date_hour.hour = 11 and (srcpart_date_hour.date = '2008-04-08' or srcpart_date_hour.date = '2008-04-09'); +select count(*) from srcpart_orc join srcpart_date_hour on (srcpart_orc.ds = srcpart_date_hour.ds and srcpart_orc.hr = srcpart_date_hour.hr) where srcpart_date_hour.hour = 11 and (srcpart_date_hour.date = '2008-04-08' or srcpart_date_hour.date = '2008-04-09'); +select count(*) from srcpart where (ds = '2008-04-08' or ds = '2008-04-09') and hr = 11; + +drop table srcpart_orc; +drop table srcpart_date; +drop table srcpart_hour; +drop table srcpart_date_hour; +drop table srcpart_double_hour; Added: hive/branches/tez/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q?rev=1622216&view=auto ============================================================================== --- hive/branches/tez/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q (added) +++ hive/branches/tez/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q Wed Sep 3 10:46:04 2014 @@ -0,0 +1,41 @@ +set hive.optimize.ppd=true; +set hive.ppd.remove.duplicatefilters=true; +set hive.tez.dynamic.partition.pruning=true; +set hive.optimize.metadataonly=false; +set hive.optimize.index.filter=true; +set hive.auto.convert.join=true; +set hive.auto.convert.join.noconditionaltask = true; +set hive.auto.convert.join.noconditionaltask.size = 10000000; + +create table dim_shops (id int, label string) row format delimited fields terminated by ',' stored as textfile; +load data local inpath '../../data/files/dim_shops.txt' into table dim_shops; + +create table agg_01 (amount decimal) partitioned by (dim_shops_id int) row format delimited fields terminated by ',' stored as textfile; +alter table agg_01 add partition (dim_shops_id = 1); +alter table agg_01 add partition (dim_shops_id = 2); +alter table agg_01 add partition (dim_shops_id = 3); + +load data local inpath '../../data/files/agg_01-p1.txt' into table agg_01 partition (dim_shops_id=1); +load data local inpath '../../data/files/agg_01-p2.txt' into table agg_01 partition (dim_shops_id=2); +load data local inpath '../../data/files/agg_01-p3.txt' into table agg_01 partition (dim_shops_id=3); + +select * from dim_shops; +select * from agg_01; + +EXPLAIN SELECT d1.label, count(*), sum(agg.amount) +FROM agg_01 agg, +dim_shops d1 +WHERE agg.dim_shops_id = d1.id +and +d1.label in ('foo', 'bar') +GROUP BY d1.label +ORDER BY d1.label; + +SELECT d1.label, count(*), sum(agg.amount) +FROM agg_01 agg, +dim_shops d1 +WHERE agg.dim_shops_id = d1.id +and +d1.label in ('foo', 'bar') +GROUP BY d1.label +ORDER BY d1.label; Modified: hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket2.q.out URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket2.q.out?rev=1622216&r1=1622215&r2=1622216&view=diff ============================================================================== Files hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket2.q.out (original) and hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket2.q.out Wed Sep 3 10:46:04 2014 differ Modified: hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket3.q.out URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket3.q.out?rev=1622216&r1=1622215&r2=1622216&view=diff ============================================================================== Files hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket3.q.out (original) and hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket3.q.out Wed Sep 3 10:46:04 2014 differ Modified: hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket4.q.out URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket4.q.out?rev=1622216&r1=1622215&r2=1622216&view=diff ============================================================================== Files hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket4.q.out (original) and hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket4.q.out Wed Sep 3 10:46:04 2014 differ Modified: hive/branches/tez/ql/src/test/results/clientpositive/tez/cross_product_check_2.q.out URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/results/clientpositive/tez/cross_product_check_2.q.out?rev=1622216&r1=1622215&r2=1622216&view=diff ============================================================================== --- hive/branches/tez/ql/src/test/results/clientpositive/tez/cross_product_check_2.q.out (original) +++ hive/branches/tez/ql/src/test/results/clientpositive/tez/cross_product_check_2.q.out Wed Sep 3 10:46:04 2014 @@ -83,7 +83,7 @@ STAGE PLANS: Processor Tree: ListSink -Warning: Map Join MAPJOIN[16][bigTable=a] in task 'Map 3' is a cross product +Warning: Map Join MAPJOIN[18][bigTable=a] in task 'Map 3' is a cross product PREHOOK: query: explain select * from B d1 join B d2 on d1.key = d2.key join A PREHOOK: type: QUERY POSTHOOK: query: explain select * from B d1 join B d2 on d1.key = d2.key join A @@ -171,7 +171,7 @@ STAGE PLANS: Processor Tree: ListSink -Warning: Map Join MAPJOIN[23][bigTable=a] in task 'Map 4' is a cross product +Warning: Map Join MAPJOIN[25][bigTable=a] in task 'Map 4' is a cross product PREHOOK: query: explain select * from A join (select d1.key from B d1 join B d2 on d1.key = d2.key @@ -396,7 +396,7 @@ STAGE PLANS: Processor Tree: ListSink -Warning: Map Join MAPJOIN[28][bigTable=?] in task 'Reducer 5' is a cross product +Warning: Map Join MAPJOIN[30][bigTable=?] in task 'Reducer 5' is a cross product PREHOOK: query: explain select * from (select A.key from A group by key) ss join (select d1.key from B d1 join B d2 on d1.key = d2.key where 1 = 1 group by d1.key) od1
