Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java Tue Oct 14 19:06:45 2014 @@ -102,13 +102,19 @@ public class NullScanTaskDispatcher impl } private void processAlias(MapWork work, String alias) { + List<String> paths = getPathsForAlias(work, alias); + if (paths.isEmpty()) { + // partitioned table which don't select any partitions + // there are no paths to replace with fakePath + return; + } work.setUseOneNullRowInputFormat(true); // Change the alias partition desc PartitionDesc aliasPartn = work.getAliasToPartnInfo().get(alias); changePartitionToMetadataOnly(aliasPartn); - List<String> paths = getPathsForAlias(work, alias); + for (String path : paths) { PartitionDesc partDesc = work.getPathToPartitionInfo().get(path); PartitionDesc newPartition = changePartitionToMetadataOnly(partDesc);
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java Tue Oct 14 19:06:45 2014 @@ -411,10 +411,12 @@ public class Vectorizer implements Physi // Check value ObjectInspector. ObjectInspector valueObjectInspector = reduceWork.getValueObjectInspector(); - if (valueObjectInspector == null || !(valueObjectInspector instanceof StructObjectInspector)) { + if (valueObjectInspector == null || + !(valueObjectInspector instanceof StructObjectInspector)) { return false; } - StructObjectInspector valueStructObjectInspector = (StructObjectInspector)valueObjectInspector; + StructObjectInspector valueStructObjectInspector = + (StructObjectInspector)valueObjectInspector; valueColCount = valueStructObjectInspector.getAllStructFieldRefs().size(); } catch (Exception e) { throw new SemanticException(e); @@ -460,18 +462,20 @@ public class Vectorizer implements Physi LOG.info("Vectorizing ReduceWork..."); reduceWork.setVectorMode(true); - // For some reason, the DefaultGraphWalker does not descend down from the reducer Operator as expected. - // We need to descend down, otherwise it breaks our algorithm that determines VectorizationContext... - // Do we use PreOrderWalker instead of DefaultGraphWalker. + // For some reason, the DefaultGraphWalker does not descend down from the reducer Operator as + // expected. We need to descend down, otherwise it breaks our algorithm that determines + // VectorizationContext... Do we use PreOrderWalker instead of DefaultGraphWalker. Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>(); - ReduceWorkVectorizationNodeProcessor vnp = new ReduceWorkVectorizationNodeProcessor(reduceWork, keyColCount, valueColCount); + ReduceWorkVectorizationNodeProcessor vnp = + new ReduceWorkVectorizationNodeProcessor(reduceWork, keyColCount, valueColCount); addReduceWorkRules(opRules, vnp); Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null); GraphWalker ogw = new PreOrderWalker(disp); // iterator the reduce operator tree ArrayList<Node> topNodes = new ArrayList<Node>(); topNodes.add(reduceWork.getReducer()); - LOG.info("vectorizeReduceWork reducer Operator: " + reduceWork.getReducer().getName() + "..."); + LOG.info("vectorizeReduceWork reducer Operator: " + + reduceWork.getReducer().getName() + "..."); HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>(); ogw.startWalking(topNodes, nodeOutput); @@ -550,7 +554,7 @@ public class Vectorizer implements Physi protected final Map<String, VectorizationContext> scratchColumnContext = new HashMap<String, VectorizationContext>(); - protected final Map<Operator<? extends OperatorDesc>, VectorizationContext> vContextsByTSOp = + protected final Map<Operator<? extends OperatorDesc>, VectorizationContext> vContextsByOp = new HashMap<Operator<? extends OperatorDesc>, VectorizationContext>(); protected final Set<Operator<? extends OperatorDesc>> opsDone = @@ -578,28 +582,30 @@ public class Vectorizer implements Physi return scratchColumnMap; } - public VectorizationContext walkStackToFindVectorizationContext(Stack<Node> stack, Operator<? extends OperatorDesc> op) - throws SemanticException { + public VectorizationContext walkStackToFindVectorizationContext(Stack<Node> stack, + Operator<? extends OperatorDesc> op) throws SemanticException { VectorizationContext vContext = null; if (stack.size() <= 1) { - throw new SemanticException(String.format("Expected operator stack for operator %s to have at least 2 operators", op.getName())); + throw new SemanticException( + String.format("Expected operator stack for operator %s to have at least 2 operators", + op.getName())); } // Walk down the stack of operators until we found one willing to give us a context. // At the bottom will be the root operator, guaranteed to have a context int i= stack.size()-2; while (vContext == null) { if (i < 0) { - throw new SemanticException(String.format("Did not find vectorization context for operator %s in operator stack", op.getName())); + return null; } Operator<? extends OperatorDesc> opParent = (Operator<? extends OperatorDesc>) stack.get(i); - vContext = vContextsByTSOp.get(opParent); + vContext = vContextsByOp.get(opParent); --i; } return vContext; } - public Operator<? extends OperatorDesc> doVectorize(Operator<? extends OperatorDesc> op, VectorizationContext vContext) - throws SemanticException { + public Operator<? extends OperatorDesc> doVectorize(Operator<? extends OperatorDesc> op, + VectorizationContext vContext) throws SemanticException { Operator<? extends OperatorDesc> vectorOp = op; try { if (!opsDone.contains(op)) { @@ -611,7 +617,7 @@ public class Vectorizer implements Physi if (vectorOp instanceof VectorizationContextRegion) { VectorizationContextRegion vcRegion = (VectorizationContextRegion) vectorOp; VectorizationContext vOutContext = vcRegion.getOuputVectorizationContext(); - vContextsByTSOp.put(op, vOutContext); + vContextsByOp.put(op, vOutContext); scratchColumnContext.put(vOutContext.getFileKey(), vOutContext); } } @@ -658,13 +664,24 @@ public class Vectorizer implements Physi // vContext.setFileKey(onefile); scratchColumnContext.put(onefile, vContext); + if (LOG.isDebugEnabled()) { + LOG.debug("Vectorized MapWork operator " + op.getName() + + " with vectorization context key=" + vContext.getFileKey() + + ", vectorTypes: " + vContext.getOutputColumnTypeMap().toString() + + ", columnMap: " + vContext.getColumnMap().toString()); + } break; } } } - vContextsByTSOp.put(op, vContext); + vContextsByOp.put(op, vContext); } else { vContext = walkStackToFindVectorizationContext(stack, op); + if (vContext == null) { + throw new SemanticException( + String.format("Did not find vectorization context for operator %s in operator stack", + op.getName())); + } } assert vContext != null; @@ -679,7 +696,22 @@ public class Vectorizer implements Physi return null; } - doVectorize(op, vContext); + Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext); + + if (LOG.isDebugEnabled()) { + LOG.debug("Vectorized MapWork operator " + vectorOp.getName() + + " with vectorization context key=" + vContext.getFileKey() + + ", vectorTypes: " + vContext.getOutputColumnTypeMap().toString() + + ", columnMap: " + vContext.getColumnMap().toString()); + if (vectorOp instanceof VectorizationContextRegion) { + VectorizationContextRegion vcRegion = (VectorizationContextRegion) vectorOp; + VectorizationContext vOutContext = vcRegion.getOuputVectorizationContext(); + LOG.debug("Vectorized MapWork operator " + vectorOp.getName() + + " added new vectorization context key=" + vOutContext.getFileKey() + + ", vectorTypes: " + vOutContext.getOutputColumnTypeMap().toString() + + ", columnMap: " + vOutContext.getColumnMap().toString()); + } + } return null; } @@ -691,6 +723,8 @@ public class Vectorizer implements Physi private int keyColCount; private int valueColCount; private Map<String, Integer> reduceColumnNameMap; + + private VectorizationContext reduceShuffleVectorizationContext; private Operator<? extends OperatorDesc> rootVectorOp; @@ -698,12 +732,14 @@ public class Vectorizer implements Physi return rootVectorOp; } - public ReduceWorkVectorizationNodeProcessor(ReduceWork rWork, int keyColCount, int valueColCount) { + public ReduceWorkVectorizationNodeProcessor(ReduceWork rWork, int keyColCount, + int valueColCount) { this.rWork = rWork; reduceColumnNameMap = rWork.getReduceColumnNameMap(); this.keyColCount = keyColCount; this.valueColCount = valueColCount; rootVectorOp = null; + reduceShuffleVectorizationContext = null; } @Override @@ -711,7 +747,8 @@ public class Vectorizer implements Physi Object... nodeOutputs) throws SemanticException { Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) nd; - LOG.info("ReduceWorkVectorizationNodeProcessor processing Operator: " + op.getName() + "..."); + LOG.info("ReduceWorkVectorizationNodeProcessor processing Operator: " + + op.getName() + "..."); VectorizationContext vContext = null; @@ -719,10 +756,24 @@ public class Vectorizer implements Physi if (op.getParentOperators().size() == 0) { vContext = getReduceVectorizationContext(reduceColumnNameMap); - vContextsByTSOp.put(op, vContext); + vContext.setFileKey("_REDUCE_SHUFFLE_"); + scratchColumnContext.put("_REDUCE_SHUFFLE_", vContext); + reduceShuffleVectorizationContext = vContext; saveRootVectorOp = true; + + if (LOG.isDebugEnabled()) { + LOG.debug("Vectorized ReduceWork reduce shuffle vectorization context key=" + + vContext.getFileKey() + + ", vectorTypes: " + vContext.getOutputColumnTypeMap().toString() + + ", columnMap: " + vContext.getColumnMap().toString()); + } } else { vContext = walkStackToFindVectorizationContext(stack, op); + if (vContext == null) { + // If we didn't find a context among the operators, assume the top -- reduce shuffle's + // vectorization context. + vContext = reduceShuffleVectorizationContext; + } } assert vContext != null; @@ -738,6 +789,21 @@ public class Vectorizer implements Physi } Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext); + + if (LOG.isDebugEnabled()) { + LOG.debug("Vectorized ReduceWork operator " + vectorOp.getName() + + " with vectorization context key=" + vContext.getFileKey() + + ", vectorTypes: " + vContext.getOutputColumnTypeMap().toString() + + ", columnMap: " + vContext.getColumnMap().toString()); + if (vectorOp instanceof VectorizationContextRegion) { + VectorizationContextRegion vcRegion = (VectorizationContextRegion) vectorOp; + VectorizationContext vOutContext = vcRegion.getOuputVectorizationContext(); + LOG.debug("Vectorized ReduceWork operator " + vectorOp.getName() + + " added new vectorization context key=" + vOutContext.getFileKey() + + ", vectorTypes: " + vOutContext.getOutputColumnTypeMap().toString() + + ", columnMap: " + vOutContext.getColumnMap().toString()); + } + } if (vectorOp instanceof VectorGroupByOperator) { VectorGroupByOperator groupBy = (VectorGroupByOperator) vectorOp; VectorGroupByDesc vectorDesc = groupBy.getConf().getVectorDesc(); @@ -816,6 +882,7 @@ public class Vectorizer implements Physi break; case FILESINK: case LIMIT: + case EVENT: ret = true; break; default: @@ -855,6 +922,7 @@ public class Vectorizer implements Physi ret = validateFileSinkOperator((FileSinkOperator) op); break; case LIMIT: + case EVENT: ret = true; break; default: @@ -994,11 +1062,6 @@ public class Vectorizer implements Physi } private boolean validateFileSinkOperator(FileSinkOperator op) { - // HIVE-7557: For now, turn off dynamic partitioning to give more time to - // figure out how to make VectorFileSink work correctly with it... - if (op.getConf().getDynPartCtx() != null) { - return false; - } return true; } @@ -1006,7 +1069,8 @@ public class Vectorizer implements Physi return validateExprNodeDesc(descs, VectorExpressionDescriptor.Mode.PROJECTION); } - private boolean validateExprNodeDesc(List<ExprNodeDesc> descs, VectorExpressionDescriptor.Mode mode) { + private boolean validateExprNodeDesc(List<ExprNodeDesc> descs, + VectorExpressionDescriptor.Mode mode) { for (ExprNodeDesc d : descs) { boolean ret = validateExprNodeDesc(d, mode); if (!ret) { @@ -1098,8 +1162,8 @@ public class Vectorizer implements Physi if (!supportedAggregationUdfs.contains(aggDesc.getGenericUDAFName().toLowerCase())) { return false; } - if (aggDesc.getParameters() != null) { - return validateExprNodeDesc(aggDesc.getParameters()); + if (aggDesc.getParameters() != null && !validateExprNodeDesc(aggDesc.getParameters())) { + return false; } // See if we can vectorize the aggregation. try { @@ -1164,11 +1228,13 @@ public class Vectorizer implements Physi return new VectorizationContext(cmap, columnCount); } - private VectorizationContext getReduceVectorizationContext(Map<String, Integer> reduceColumnNameMap) { + private VectorizationContext getReduceVectorizationContext( + Map<String, Integer> reduceColumnNameMap) { return new VectorizationContext(reduceColumnNameMap, reduceColumnNameMap.size()); } - private void fixupParentChildOperators(Operator<? extends OperatorDesc> op, Operator<? extends OperatorDesc> vectorOp) { + private void fixupParentChildOperators(Operator<? extends OperatorDesc> op, + Operator<? extends OperatorDesc> vectorOp) { if (op.getParentOperators() != null) { vectorOp.setParentOperators(op.getParentOperators()); for (Operator<? extends OperatorDesc> p : op.getParentOperators()) { @@ -1196,6 +1262,7 @@ public class Vectorizer implements Physi case REDUCESINK: case LIMIT: case EXTRACT: + case EVENT: vectorOp = OperatorFactory.getVectorOperator(op.getConf(), vContext); break; default: Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java Tue Oct 14 19:06:45 2014 @@ -57,6 +57,7 @@ import org.apache.hadoop.hive.ql.udf.gen import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; /** * The transformation step that does partition pruning. @@ -155,27 +156,85 @@ public class PartitionPruner implements * pruner condition. * @throws HiveException */ - private static PrunedPartitionList prune(Table tab, ExprNodeDesc prunerExpr, + public static PrunedPartitionList prune(Table tab, ExprNodeDesc prunerExpr, HiveConf conf, String alias, Map<String, PrunedPartitionList> prunedPartitionsMap) throws SemanticException { + LOG.trace("Started pruning partiton"); LOG.trace("dbname = " + tab.getDbName()); LOG.trace("tabname = " + tab.getTableName()); - LOG.trace("prune Expression = " + prunerExpr); + LOG.trace("prune Expression = " + prunerExpr == null ? "" : prunerExpr); String key = tab.getDbName() + "." + tab.getTableName() + ";"; - if (prunerExpr != null) { - key = key + prunerExpr.getExprString(); + if (!tab.isPartitioned()) { + // If the table is not partitioned, return empty list. + return getAllPartsFromCacheOrServer(tab, key, false, prunedPartitionsMap); + } + + if ("strict".equalsIgnoreCase(HiveConf.getVar(conf, HiveConf.ConfVars.HIVEMAPREDMODE)) + && !hasColumnExpr(prunerExpr)) { + // If the "strict" mode is on, we have to provide partition pruner for each table. + throw new SemanticException(ErrorMsg.NO_PARTITION_PREDICATE + .getMsg("for Alias \"" + alias + "\" Table \"" + tab.getTableName() + "\"")); + } + + if (prunerExpr == null) { + // In non-strict mode and there is no predicates at all - get everything. + return getAllPartsFromCacheOrServer(tab, key, false, prunedPartitionsMap); + } + + Set<String> partColsUsedInFilter = new LinkedHashSet<String>(); + // Replace virtual columns with nulls. See javadoc for details. + prunerExpr = removeNonPartCols(prunerExpr, extractPartColNames(tab), partColsUsedInFilter); + // Remove all parts that are not partition columns. See javadoc for details. + ExprNodeGenericFuncDesc compactExpr = (ExprNodeGenericFuncDesc)compactExpr(prunerExpr.clone()); + String oldFilter = prunerExpr.getExprString(); + if (compactExpr == null) { + // Non-strict mode, and all the predicates are on non-partition columns - get everything. + LOG.debug("Filter " + oldFilter + " was null after compacting"); + return getAllPartsFromCacheOrServer(tab, key, true, prunedPartitionsMap); + } + LOG.debug("Filter w/ compacting: " + compactExpr.getExprString() + + "; filter w/o compacting: " + oldFilter); + + key = key + compactExpr.getExprString(); + PrunedPartitionList ppList = prunedPartitionsMap.get(key); + if (ppList != null) { + return ppList; + } + + ppList = getPartitionsFromServer(tab, compactExpr, conf, alias, partColsUsedInFilter, oldFilter.equals(compactExpr.getExprString())); + prunedPartitionsMap.put(key, ppList); + return ppList; + } + + private static PrunedPartitionList getAllPartsFromCacheOrServer(Table tab, String key, boolean unknownPartitions, + Map<String, PrunedPartitionList> partsCache) throws SemanticException { + PrunedPartitionList ppList = partsCache.get(key); + if (ppList != null) { + return ppList; } - PrunedPartitionList ret = prunedPartitionsMap.get(key); - if (ret != null) { - return ret; + Set<Partition> parts; + try { + parts = getAllPartitions(tab); + } catch (HiveException e) { + throw new SemanticException(e); } + ppList = new PrunedPartitionList(tab, parts, null, unknownPartitions); + partsCache.put(key, ppList); + return ppList; + } - ret = getPartitionsFromServer(tab, prunerExpr, conf, alias); - prunedPartitionsMap.put(key, ret); - return ret; + private static ExprNodeDesc removeTruePredciates(ExprNodeDesc e) { + if (e instanceof ExprNodeConstantDesc) { + ExprNodeConstantDesc eC = (ExprNodeConstantDesc) e; + if (e.getTypeInfo() == TypeInfoFactory.booleanTypeInfo + && eC.getValue() == Boolean.TRUE) { + return null; + } + } + return e; } /** @@ -187,7 +246,8 @@ public class PartitionPruner implements */ static private ExprNodeDesc compactExpr(ExprNodeDesc expr) { if (expr instanceof ExprNodeConstantDesc) { - if (((ExprNodeConstantDesc)expr).getValue() == null) { + expr = removeTruePredciates(expr); + if (expr == null || ((ExprNodeConstantDesc)expr).getValue() == null) { return null; } else { throw new IllegalStateException("Unexpected non-null ExprNodeConstantDesc: " @@ -198,10 +258,11 @@ public class PartitionPruner implements boolean isAnd = udf instanceof GenericUDFOPAnd; if (isAnd || udf instanceof GenericUDFOPOr) { List<ExprNodeDesc> children = expr.getChildren(); - ExprNodeDesc left = children.get(0); - children.set(0, compactExpr(left)); - ExprNodeDesc right = children.get(1); - children.set(1, compactExpr(right)); + ExprNodeDesc left = removeTruePredciates(children.get(0)); + children.set(0, left == null ? null : compactExpr(left)); + ExprNodeDesc right = removeTruePredciates(children.get(1)); + children.set(1, right == null ? null : compactExpr(right)); + // Note that one does not simply compact (not-null or null) to not-null. // Only if we have an "and" is it valid to send one side to metastore. if (children.get(0) == null && children.get(1) == null) { @@ -267,40 +328,8 @@ public class PartitionPruner implements } private static PrunedPartitionList getPartitionsFromServer(Table tab, - ExprNodeDesc prunerExpr, HiveConf conf, String alias) throws SemanticException { + final ExprNodeGenericFuncDesc compactExpr, HiveConf conf, String alias, Set<String> partColsUsedInFilter, boolean isPruningByExactFilter) throws SemanticException { try { - if (!tab.isPartitioned()) { - // If the table is not partitioned, return everything. - return new PrunedPartitionList(tab, getAllPartitions(tab), null, false); - } - LOG.debug("tabname = " + tab.getTableName() + " is partitioned"); - - if ("strict".equalsIgnoreCase(HiveConf.getVar(conf, HiveConf.ConfVars.HIVEMAPREDMODE)) - && !hasColumnExpr(prunerExpr)) { - // If the "strict" mode is on, we have to provide partition pruner for each table. - throw new SemanticException(ErrorMsg.NO_PARTITION_PREDICATE - .getMsg("for Alias \"" + alias + "\" Table \"" + tab.getTableName() + "\"")); - } - - if (prunerExpr == null) { - // Non-strict mode, and there is no predicates at all - get everything. - return new PrunedPartitionList(tab, getAllPartitions(tab), null, false); - } - - Set<String> referred = new LinkedHashSet<String>(); - // Replace virtual columns with nulls. See javadoc for details. - prunerExpr = removeNonPartCols(prunerExpr, extractPartColNames(tab), referred); - // Remove all parts that are not partition columns. See javadoc for details. - ExprNodeGenericFuncDesc compactExpr = (ExprNodeGenericFuncDesc)compactExpr(prunerExpr.clone()); - String oldFilter = prunerExpr.getExprString(); - if (compactExpr == null) { - // Non-strict mode, and all the predicates are on non-partition columns - get everything. - LOG.debug("Filter " + oldFilter + " was null after compacting"); - return new PrunedPartitionList(tab, getAllPartitions(tab), null, true); - } - - LOG.debug("Filter w/ compacting: " + compactExpr.getExprString() - + "; filter w/o compacting: " + oldFilter); // Finally, check the filter for non-built-in UDFs. If these are present, we cannot // do filtering on the server, and have to fall back to client path. @@ -330,9 +359,8 @@ public class PartitionPruner implements // The partitions are "unknown" if the call says so due to the expression // evaluator returning null for a partition, or if we sent a partial expression to // metastore and so some partitions may have no data based on other filters. - boolean isPruningByExactFilter = oldFilter.equals(compactExpr.getExprString()); return new PrunedPartitionList(tab, new LinkedHashSet<Partition>(partitions), - new ArrayList<String>(referred), + new ArrayList<String>(partColsUsedInFilter), hasUnknownPartitions || !isPruningByExactFilter); } catch (SemanticException e) { throw e; Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java Tue Oct 14 19:06:45 2014 @@ -18,8 +18,14 @@ package org.apache.hadoop.hive.ql.optimizer.stats.annotation; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Stack; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -31,6 +37,7 @@ import org.apache.hadoop.hive.ql.exec.Fi import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.LimitOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.SelectOperator; @@ -48,10 +55,12 @@ import org.apache.hadoop.hive.ql.plan.Ex import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.GroupByDesc; import org.apache.hadoop.hive.ql.plan.JoinDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.Statistics; import org.apache.hadoop.hive.ql.stats.StatsUtils; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; @@ -66,17 +75,15 @@ import org.apache.hadoop.hive.ql.udf.gen import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNull; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Stack; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; public class StatsRulesProcFactory { private static final Log LOG = LogFactory.getLog(StatsRulesProcFactory.class.getName()); + private static final boolean isDebugEnabled = LOG.isDebugEnabled(); /** * Collect basic statistics like number of rows, data size and column level statistics from the @@ -103,9 +110,9 @@ public class StatsRulesProcFactory { Statistics stats = StatsUtils.collectStatistics(aspCtx.getConf(), partList, table, tsop); tsop.setStatistics(stats.clone()); - if (LOG.isDebugEnabled()) { - LOG.debug("[0] STATS-" + tsop.toString() + " (" + table.getTableName() - + "): " + stats.extendedToString()); + if (isDebugEnabled) { + LOG.debug("[0] STATS-" + tsop.toString() + " (" + table.getTableName() + "): " + + stats.extendedToString()); } } catch (CloneNotSupportedException e) { throw new SemanticException(ErrorMsg.STATISTICS_CLONING_FAILED.getMsg()); @@ -167,14 +174,14 @@ public class StatsRulesProcFactory { stats.setDataSize(setMaxIfInvalid(dataSize)); sop.setStatistics(stats); - if (LOG.isDebugEnabled()) { + if (isDebugEnabled) { LOG.debug("[0] STATS-" + sop.toString() + ": " + stats.extendedToString()); } } else { if (parentStats != null) { sop.setStatistics(parentStats.clone()); - if (LOG.isDebugEnabled()) { + if (isDebugEnabled) { LOG.debug("[1] STATS-" + sop.toString() + ": " + parentStats.extendedToString()); } } @@ -264,7 +271,7 @@ public class StatsRulesProcFactory { updateStats(st, newNumRows, true, fop); } - if (LOG.isDebugEnabled()) { + if (isDebugEnabled) { LOG.debug("[0] STATS-" + fop.toString() + ": " + st.extendedToString()); } } else { @@ -274,7 +281,7 @@ public class StatsRulesProcFactory { updateStats(st, newNumRows, false, fop); } - if (LOG.isDebugEnabled()) { + if (isDebugEnabled) { LOG.debug("[1] STATS-" + fop.toString() + ": " + st.extendedToString()); } } @@ -576,52 +583,103 @@ public class StatsRulesProcFactory { @Override public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { + GroupByOperator gop = (GroupByOperator) nd; Operator<? extends OperatorDesc> parent = gop.getParentOperators().get(0); Statistics parentStats = parent.getStatistics(); + + // parent stats are not populated yet + if (parentStats == null) { + return null; + } + AnnotateStatsProcCtx aspCtx = (AnnotateStatsProcCtx) procCtx; HiveConf conf = aspCtx.getConf(); - int mapSideParallelism = - HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_STATS_MAP_SIDE_PARALLELISM); + long maxSplitSize = HiveConf.getLongVar(conf, HiveConf.ConfVars.MAPREDMAXSPLITSIZE); List<AggregationDesc> aggDesc = gop.getConf().getAggregators(); Map<String, ExprNodeDesc> colExprMap = gop.getColumnExprMap(); RowSchema rs = gop.getSchema(); Statistics stats = null; + List<ColStatistics> colStats = StatsUtils.getColStatisticsFromExprMap(conf, parentStats, + colExprMap, rs); + long cardinality; + long parallelism = 1L; boolean mapSide = false; - int multiplier = mapSideParallelism; - long newNumRows; - long newDataSize; + boolean mapSideHashAgg = false; + long inputSize = 1L; + boolean containsGroupingSet = gop.getConf().isGroupingSetsPresent(); + long sizeOfGroupingSet = + containsGroupingSet ? gop.getConf().getListGroupingSets().size() : 1L; + + // There are different cases for Group By depending on map/reduce side, hash aggregation, + // grouping sets and column stats. If we don't have column stats, we just assume hash + // aggregation is disabled. Following are the possible cases and rule for cardinality + // estimation + + // MAP SIDE: + // Case 1: NO column stats, NO hash aggregation, NO grouping sets â numRows + // Case 2: NO column stats, NO hash aggregation, grouping sets â numRows * sizeOfGroupingSet + // Case 3: column stats, hash aggregation, NO grouping sets â Min(numRows / 2, ndvProduct * parallelism) + // Case 4: column stats, hash aggregation, grouping sets â Min((numRows * sizeOfGroupingSet) / 2, ndvProduct * parallelism * sizeOfGroupingSet) + // Case 5: column stats, NO hash aggregation, NO grouping sets â numRows + // Case 6: column stats, NO hash aggregation, grouping sets â numRows * sizeOfGroupingSet + + // REDUCE SIDE: + // Case 7: NO column stats â numRows / 2 + // Case 8: column stats, grouping sets â Min(numRows, ndvProduct * sizeOfGroupingSet) + // Case 9: column stats, NO grouping sets - Min(numRows, ndvProduct) - // map side if (gop.getChildOperators().get(0) instanceof ReduceSinkOperator || gop.getChildOperators().get(0) instanceof AppMasterEventOperator) { - mapSide = true; + mapSide = true; - // map-side grouping set present. if grouping set is present then - // multiply the number of rows by number of elements in grouping set - if (gop.getConf().isGroupingSetsPresent()) { - multiplier *= gop.getConf().getListGroupingSets().size(); + // consider approximate map side parallelism to be table data size + // divided by max split size + TableScanOperator top = OperatorUtils.findSingleOperatorUpstream(gop, + TableScanOperator.class); + // if top is null then there are multiple parents (RS as well), hence + // lets use parent statistics to get data size. Also maxSplitSize should + // be updated to bytes per reducer (1GB default) + if (top == null) { + inputSize = parentStats.getDataSize(); + maxSplitSize = HiveConf.getLongVar(conf, HiveConf.ConfVars.BYTESPERREDUCER); + } else { + inputSize = top.getConf().getStatistics().getDataSize(); } + parallelism = (int) Math.ceil((double) inputSize / maxSplitSize); + } + + if (isDebugEnabled) { + LOG.debug("STATS-" + gop.toString() + ": inputSize: " + inputSize + " maxSplitSize: " + + maxSplitSize + " parallelism: " + parallelism + " containsGroupingSet: " + + containsGroupingSet + " sizeOfGroupingSet: " + sizeOfGroupingSet); } try { + // satisfying precondition means column statistics is available if (satisfyPrecondition(parentStats)) { - stats = parentStats.clone(); - List<ColStatistics> colStats = - StatsUtils.getColStatisticsFromExprMap(conf, parentStats, colExprMap, rs); + // check if map side aggregation is possible or not based on column stats + mapSideHashAgg = checkMapSideAggregation(gop, colStats, conf); + + if (isDebugEnabled) { + LOG.debug("STATS-" + gop.toString() + " mapSideHashAgg: " + mapSideHashAgg); + } + + stats = parentStats.clone(); stats.setColumnStats(colStats); - long dvProd = 1; + long ndvProduct = 1; + final long parentNumRows = stats.getNumRows(); // compute product of distinct values of grouping columns for (ColStatistics cs : colStats) { if (cs != null) { - long dv = cs.getCountDistint(); + long ndv = cs.getCountDistint(); if (cs.getNumNulls() > 0) { - dv += 1; + ndv += 1; } - dvProd *= dv; + ndvProduct *= ndv; } else { if (parentStats.getColumnStatsState().equals(Statistics.State.COMPLETE)) { // the column must be an aggregate column inserted by GBY. We @@ -632,65 +690,130 @@ public class StatsRulesProcFactory { // partial column statistics on grouping attributes case. // if column statistics on grouping attribute is missing, then // assume worst case. - // GBY rule will emit half the number of rows if dvProd is 0 - dvProd = 0; + // GBY rule will emit half the number of rows if ndvProduct is 0 + ndvProduct = 0; } break; } } - // map side + // if ndvProduct is 0 then column stats state must be partial and we are missing + // column stats for a group by column + if (ndvProduct == 0) { + ndvProduct = parentNumRows / 2; + + if (isDebugEnabled) { + LOG.debug("STATS-" + gop.toString() + ": ndvProduct became 0 as some column does not" + + " have stats. ndvProduct changed to: " + ndvProduct); + } + } + if (mapSide) { + // MAP SIDE - // since we do not know if hash-aggregation will be enabled or disabled - // at runtime we will assume that map-side group by does not do any - // reduction.hence no group by rule will be applied - - // map-side grouping set present. if grouping set is present then - // multiply the number of rows by number of elements in grouping set - if (gop.getConf().isGroupingSetsPresent()) { - newNumRows = setMaxIfInvalid(multiplier * stats.getNumRows()); - newDataSize = setMaxIfInvalid(multiplier * stats.getDataSize()); - stats.setNumRows(newNumRows); - stats.setDataSize(newDataSize); - for (ColStatistics cs : colStats) { - if (cs != null) { - long oldNumNulls = cs.getNumNulls(); - long newNumNulls = multiplier * oldNumNulls; - cs.setNumNulls(newNumNulls); + if (mapSideHashAgg) { + if (containsGroupingSet) { + // Case 4: column stats, hash aggregation, grouping sets + cardinality = Math.min((parentNumRows * sizeOfGroupingSet) / 2, + ndvProduct * parallelism * sizeOfGroupingSet); + + if (isDebugEnabled) { + LOG.debug("[Case 4] STATS-" + gop.toString() + ": cardinality: " + cardinality); + } + } else { + // Case 3: column stats, hash aggregation, NO grouping sets + cardinality = Math.min(parentNumRows / 2, ndvProduct * parallelism); + + if (isDebugEnabled) { + LOG.debug("[Case 3] STATS-" + gop.toString() + ": cardinality: " + cardinality); } } } else { + if (containsGroupingSet) { + // Case 6: column stats, NO hash aggregation, grouping sets + cardinality = parentNumRows * sizeOfGroupingSet; + + if (isDebugEnabled) { + LOG.debug("[Case 6] STATS-" + gop.toString() + ": cardinality: " + cardinality); + } + } else { + // Case 5: column stats, NO hash aggregation, NO grouping sets + cardinality = parentNumRows; - // map side no grouping set - newNumRows = stats.getNumRows() * multiplier; - updateStats(stats, newNumRows, true, gop); + if (isDebugEnabled) { + LOG.debug("[Case 5] STATS-" + gop.toString() + ": cardinality: " + cardinality); + } + } } } else { + // REDUCE SIDE + + // in reduce side GBY, we don't know if the grouping set was present or not. so get it + // from map side GBY + GroupByOperator mGop = OperatorUtils.findSingleOperatorUpstream(parent, GroupByOperator.class); + if (mGop != null) { + containsGroupingSet = mGop.getConf().isGroupingSetsPresent(); + sizeOfGroupingSet = mGop.getConf().getListGroupingSets().size(); + } + + if (containsGroupingSet) { + // Case 8: column stats, grouping sets + cardinality = Math.min(parentNumRows, ndvProduct * sizeOfGroupingSet); + + if (isDebugEnabled) { + LOG.debug("[Case 8] STATS-" + gop.toString() + ": cardinality: " + cardinality); + } + } else { + // Case 9: column stats, NO grouping sets + cardinality = Math.min(parentNumRows, ndvProduct); - // reduce side - newNumRows = applyGBYRule(stats.getNumRows(), dvProd); - updateStats(stats, newNumRows, true, gop); + if (isDebugEnabled) { + LOG.debug("[Case 9] STATS-" + gop.toString() + ": cardinality: " + cardinality); + } + } } + + // update stats, but don't update NDV as it will not change + updateStats(stats, cardinality, true, gop, false); } else { + + // NO COLUMN STATS if (parentStats != null) { stats = parentStats.clone(); + final long parentNumRows = stats.getNumRows(); - // worst case, in the absence of column statistics assume half the rows are emitted + // if we don't have column stats, we just assume hash aggregation is disabled if (mapSide) { + // MAP SIDE + + if (containsGroupingSet) { + // Case 2: NO column stats, NO hash aggregation, grouping sets + cardinality = parentNumRows * sizeOfGroupingSet; + + if (isDebugEnabled) { + LOG.debug("[Case 2] STATS-" + gop.toString() + ": cardinality: " + cardinality); + } + } else { + // Case 1: NO column stats, NO hash aggregation, NO grouping sets + cardinality = parentNumRows; - // map side - newNumRows = multiplier * stats.getNumRows(); - newDataSize = multiplier * stats.getDataSize(); - stats.setNumRows(newNumRows); - stats.setDataSize(newDataSize); + if (isDebugEnabled) { + LOG.debug("[Case 1] STATS-" + gop.toString() + ": cardinality: " + cardinality); + } + } } else { + // REDUCE SIDE + + // Case 7: NO column stats + cardinality = parentNumRows / 2; - // reduce side - newNumRows = parentStats.getNumRows() / 2; - updateStats(stats, newNumRows, false, gop); + if (isDebugEnabled) { + LOG.debug("[Case 7] STATS-" + gop.toString() + ": cardinality: " + cardinality); + } } + + updateStats(stats, cardinality, false, gop); } } @@ -738,7 +861,7 @@ public class StatsRulesProcFactory { gop.setStatistics(stats); - if (LOG.isDebugEnabled() && stats != null) { + if (isDebugEnabled && stats != null) { LOG.debug("[0] STATS-" + gop.toString() + ": " + stats.extendedToString()); } } catch (CloneNotSupportedException e) { @@ -747,6 +870,103 @@ public class StatsRulesProcFactory { return null; } + /** + * This method does not take into account many configs used at runtime to + * disable hash aggregation like HIVEMAPAGGRHASHMINREDUCTION. This method + * roughly estimates the number of rows and size of each row to see if it + * can fit in hashtable for aggregation. + * @param gop - group by operator + * @param colStats - column stats for key columns + * @param conf - hive conf + * @return + */ + private boolean checkMapSideAggregation(GroupByOperator gop, + List<ColStatistics> colStats, HiveConf conf) { + + List<AggregationDesc> aggDesc = gop.getConf().getAggregators(); + GroupByDesc desc = gop.getConf(); + GroupByDesc.Mode mode = desc.getMode(); + + if (mode.equals(GroupByDesc.Mode.HASH)) { + float hashAggMem = conf.getFloatVar(HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); + float hashAggMaxThreshold = conf.getFloatVar(HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); + + // get available map memory + long totalMemory = StatsUtils.getAvailableMemory(conf) * 1000L * 1000L; + long maxMemHashAgg = Math.round(totalMemory * hashAggMem * hashAggMaxThreshold); + + // estimated number of rows will be product of NDVs + long numEstimatedRows = 1; + + // estimate size of key from column statistics + long avgKeySize = 0; + for (ColStatistics cs : colStats) { + if (cs != null) { + numEstimatedRows *= cs.getCountDistint(); + avgKeySize += Math.ceil(cs.getAvgColLen()); + } + } + + // average value size will be sum of all sizes of aggregation buffers + long avgValSize = 0; + // go over all aggregation buffers and see they implement estimable + // interface if so they aggregate the size of the aggregation buffer + GenericUDAFEvaluator[] aggregationEvaluators; + aggregationEvaluators = new GenericUDAFEvaluator[aggDesc.size()]; + + // get aggregation evaluators + for (int i = 0; i < aggregationEvaluators.length; i++) { + AggregationDesc agg = aggDesc.get(i); + aggregationEvaluators[i] = agg.getGenericUDAFEvaluator(); + } + + // estimate size of aggregation buffer + for (int i = 0; i < aggregationEvaluators.length; i++) { + + // each evaluator has constant java object overhead + avgValSize += gop.javaObjectOverHead; + GenericUDAFEvaluator.AggregationBuffer agg = null; + try { + agg = aggregationEvaluators[i].getNewAggregationBuffer(); + } catch (HiveException e) { + // in case of exception assume unknown type (256 bytes) + avgValSize += gop.javaSizeUnknownType; + } + + // aggregate size from aggregation buffers + if (agg != null) { + if (GenericUDAFEvaluator.isEstimable(agg)) { + avgValSize += ((GenericUDAFEvaluator.AbstractAggregationBuffer) agg) + .estimate(); + } else { + // if the aggregation buffer is not estimable then get all the + // declared fields and compute the sizes from field types + Field[] fArr = ObjectInspectorUtils + .getDeclaredNonStaticFields(agg.getClass()); + for (Field f : fArr) { + long avgSize = StatsUtils + .getAvgColLenOfFixedLengthTypes(f.getType().getName()); + avgValSize += avgSize == 0 ? gop.javaSizeUnknownType : avgSize; + } + } + } + } + + // total size of each hash entry + long hashEntrySize = gop.javaHashEntryOverHead + avgKeySize + avgValSize; + + // estimated hash table size + long estHashTableSize = numEstimatedRows * hashEntrySize; + + if (estHashTableSize < maxMemHashAgg) { + return true; + } + } + + // worst-case, hash aggregation disabled + return false; + } + private long applyGBYRule(long numRows, long dvProd) { long newNumRows = numRows; @@ -967,7 +1187,7 @@ public class StatsRulesProcFactory { outInTabAlias); jop.setStatistics(stats); - if (LOG.isDebugEnabled()) { + if (isDebugEnabled) { LOG.debug("[0] STATS-" + jop.toString() + ": " + stats.extendedToString()); } } else { @@ -1001,7 +1221,7 @@ public class StatsRulesProcFactory { wcStats.setDataSize(setMaxIfInvalid(newDataSize)); jop.setStatistics(wcStats); - if (LOG.isDebugEnabled()) { + if (isDebugEnabled) { LOG.debug("[1] STATS-" + jop.toString() + ": " + wcStats.extendedToString()); } } @@ -1195,7 +1415,7 @@ public class StatsRulesProcFactory { } lop.setStatistics(stats); - if (LOG.isDebugEnabled()) { + if (isDebugEnabled) { LOG.debug("[0] STATS-" + lop.toString() + ": " + stats.extendedToString()); } } else { @@ -1213,7 +1433,7 @@ public class StatsRulesProcFactory { } lop.setStatistics(wcStats); - if (LOG.isDebugEnabled()) { + if (isDebugEnabled) { LOG.debug("[1] STATS-" + lop.toString() + ": " + wcStats.extendedToString()); } } @@ -1281,7 +1501,7 @@ public class StatsRulesProcFactory { outStats.setColumnStats(colStats); } rop.setStatistics(outStats); - if (LOG.isDebugEnabled()) { + if (isDebugEnabled) { LOG.debug("[0] STATS-" + rop.toString() + ": " + outStats.extendedToString()); } } catch (CloneNotSupportedException e) { @@ -1322,7 +1542,7 @@ public class StatsRulesProcFactory { stats.addToColumnStats(parentStats.getColumnStats()); op.getConf().setStatistics(stats); - if (LOG.isDebugEnabled()) { + if (isDebugEnabled) { LOG.debug("[0] STATS-" + op.toString() + ": " + stats.extendedToString()); } } @@ -1378,6 +1598,7 @@ public class StatsRulesProcFactory { return new DefaultStatsRule(); } + /** * Update the basic statistics of the statistics object based on the row number * @param stats @@ -1389,6 +1610,12 @@ public class StatsRulesProcFactory { */ static void updateStats(Statistics stats, long newNumRows, boolean useColStats, Operator<? extends OperatorDesc> op) { + updateStats(stats, newNumRows, useColStats, op, true); + } + + static void updateStats(Statistics stats, long newNumRows, + boolean useColStats, Operator<? extends OperatorDesc> op, + boolean updateNDV) { if (newNumRows <= 0) { LOG.info("STATS-" + op.toString() + ": Overflow in number of rows." @@ -1406,17 +1633,19 @@ public class StatsRulesProcFactory { long oldNumNulls = cs.getNumNulls(); long oldDV = cs.getCountDistint(); long newNumNulls = Math.round(ratio * oldNumNulls); - long newDV = oldDV; + cs.setNumNulls(newNumNulls); + if (updateNDV) { + long newDV = oldDV; - // if ratio is greater than 1, then number of rows increases. This can happen - // when some operators like GROUPBY duplicates the input rows in which case - // number of distincts should not change. Update the distinct count only when - // the output number of rows is less than input number of rows. - if (ratio <= 1.0) { - newDV = (long) Math.ceil(ratio * oldDV); + // if ratio is greater than 1, then number of rows increases. This can happen + // when some operators like GROUPBY duplicates the input rows in which case + // number of distincts should not change. Update the distinct count only when + // the output number of rows is less than input number of rows. + if (ratio <= 1.0) { + newDV = (long) Math.ceil(ratio * oldDV); + } + cs.setCountDistint(newDV); } - cs.setNumNulls(newNumNulls); - cs.setCountDistint(newDV); } stats.setColumnStats(colStats); long newDataSize = StatsUtils.getDataSizeFromColumnStats(newNumRows, colStats); Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java Tue Oct 14 19:06:45 2014 @@ -115,6 +115,10 @@ public abstract class BaseSemanticAnalyz protected LineageInfo linfo; protected TableAccessInfo tableAccessInfo; protected ColumnAccessInfo columnAccessInfo; + /** + * Columns accessed by updates + */ + protected ColumnAccessInfo updateColumnAccessInfo; public boolean skipAuthorization() { @@ -203,7 +207,7 @@ public abstract class BaseSemanticAnalyz } public abstract void analyzeInternal(ASTNode ast) throws SemanticException; - public void init() { + public void init(boolean clearPartsCache) { //no-op } @@ -213,7 +217,7 @@ public abstract class BaseSemanticAnalyz public void analyze(ASTNode ast, Context ctx) throws SemanticException { initCtx(ctx); - init(); + init(true); analyzeInternal(ast); } @@ -240,7 +244,7 @@ public abstract class BaseSemanticAnalyz this.fetchTask = fetchTask; } - protected void reset() { + protected void reset(boolean clearPartsCache) { rootTasks = new ArrayList<Task<? extends Serializable>>(); } @@ -402,7 +406,6 @@ public abstract class BaseSemanticAnalyz @SuppressWarnings("nls") public static String unescapeSQLString(String b) { - Character enclosure = null; // Some of the strings can be passed in as unicode. For example, the @@ -483,7 +486,7 @@ public abstract class BaseSemanticAnalyz case '\\': sb.append("\\"); break; - // The following 2 lines are exactly what MySQL does + // The following 2 lines are exactly what MySQL does TODO: why do we do this? case '%': sb.append("\\%"); break; @@ -501,6 +504,58 @@ public abstract class BaseSemanticAnalyz return sb.toString(); } + /** + * Escapes the string for AST; doesn't enclose it in quotes, however. + */ + public static String escapeSQLString(String b) { + // There's usually nothing to escape so we will be optimistic. + String result = b; + for (int i = 0; i < result.length(); ++i) { + char currentChar = result.charAt(i); + if (currentChar == '\\' && ((i + 1) < result.length())) { + // TODO: do we need to handle the "this is what MySQL does" here? + char nextChar = result.charAt(i + 1); + if (nextChar == '%' || nextChar == '_') { + ++i; + continue; + } + } + switch (currentChar) { + case '\0': result = spliceString(result, i, "\\0"); ++i; break; + case '\'': result = spliceString(result, i, "\\'"); ++i; break; + case '\"': result = spliceString(result, i, "\\\""); ++i; break; + case '\b': result = spliceString(result, i, "\\b"); ++i; break; + case '\n': result = spliceString(result, i, "\\n"); ++i; break; + case '\r': result = spliceString(result, i, "\\r"); ++i; break; + case '\t': result = spliceString(result, i, "\\t"); ++i; break; + case '\\': result = spliceString(result, i, "\\\\"); ++i; break; + case '\u001A': result = spliceString(result, i, "\\Z"); ++i; break; + default: { + if (currentChar < ' ') { + String hex = Integer.toHexString(currentChar); + String unicode = "\\u"; + for (int j = 4; j > hex.length(); --j) { + unicode += '0'; + } + unicode += hex; + result = spliceString(result, i, unicode); + i += (unicode.length() - 1); + } + break; // if not a control character, do nothing + } + } + } + return result; + } + + private static String spliceString(String str, int i, String replacement) { + return spliceString(str, i, 1, replacement); + } + + private static String spliceString(String str, int i, int length, String replacement) { + return str.substring(0, i) + replacement + str.substring(i + length); + } + public HashSet<ReadEntity> getInputs() { return inputs; } @@ -866,6 +921,14 @@ public abstract class BaseSemanticAnalyz this.columnAccessInfo = columnAccessInfo; } + public ColumnAccessInfo getUpdateColumnAccessInfo() { + return updateColumnAccessInfo; + } + + public void setUpdateColumnAccessInfo(ColumnAccessInfo updateColumnAccessInfo) { + this.updateColumnAccessInfo = updateColumnAccessInfo; + } + protected LinkedHashMap<String, String> extractPartitionSpecs(Tree partspec) throws SemanticException { LinkedHashMap<String, String> partSpec = new LinkedHashMap<String, String>(); @@ -1222,7 +1285,7 @@ public abstract class BaseSemanticAnalyz try { database = db.getDatabase(dbName); } catch (Exception e) { - throw new SemanticException(ErrorMsg.DATABASE_NOT_EXISTS.getMsg(dbName), e); + throw new SemanticException(e.getMessage(), e); } if (database == null && throwException) { throw new SemanticException(ErrorMsg.DATABASE_NOT_EXISTS.getMsg(dbName)); @@ -1252,9 +1315,13 @@ public abstract class BaseSemanticAnalyz try { tab = database == null ? db.getTable(tblName, false) : db.getTable(database, tblName, false); - } catch (Exception e) { + } + catch (InvalidTableException e) { throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(tblName), e); } + catch (Exception e) { + throw new SemanticException(e.getMessage(), e); + } if (tab == null && throwException) { throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(tblName)); } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessInfo.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessInfo.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessInfo.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessInfo.java Tue Oct 14 19:06:45 2014 @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.parse; +import org.apache.hadoop.hive.ql.metadata.VirtualColumn; + import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -54,4 +56,21 @@ public class ColumnAccessInfo { } return mapping; } + + /** + * Strip a virtual column out of the set of columns. This is useful in cases where we do not + * want to be checking against the user reading virtual columns, namely update and delete. + * @param vc + */ + public void stripVirtualColumn(VirtualColumn vc) { + for (Map.Entry<String, Set<String>> e : tableToColumnAccessMap.entrySet()) { + for (String columnName : e.getValue()) { + if (vc.getName().equalsIgnoreCase(columnName)) { + e.getValue().remove(columnName); + break; + } + } + } + + } } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java Tue Oct 14 19:06:45 2014 @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.Context import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Table; /** @@ -58,7 +59,7 @@ public class ColumnStatsSemanticAnalyzer private Table tbl; public ColumnStatsSemanticAnalyzer(HiveConf conf) throws SemanticException { - super(conf); + super(conf, false); } private boolean shouldRewrite(ASTNode tree) { @@ -95,8 +96,10 @@ public class ColumnStatsSemanticAnalyzer String tableName = getUnescapedName((ASTNode) tree.getChild(0).getChild(0)); try { return db.getTable(tableName); + } catch (InvalidTableException e) { + throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(tableName), e); } catch (HiveException e) { - throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(tableName)); + throw new SemanticException(e.getMessage(), e); } } @@ -377,7 +380,7 @@ public class ColumnStatsSemanticAnalyzer QBParseInfo qbp; // initialize QB - init(); + init(true); // check if it is no scan. grammar prevents coexit noscan/columns super.processNoScanCommand(ast); Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java Tue Oct 14 19:06:45 2014 @@ -79,6 +79,7 @@ import org.apache.hadoop.hive.ql.lockmgr import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveUtils; +import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.authorization.AuthorizationParseUtils; @@ -267,11 +268,11 @@ public class DDLSemanticAnalyzer extends } else if (ast.getType() == HiveParser.TOK_ALTERTABLE_UNARCHIVE) { analyzeAlterTableArchive(qualified, ast, true); } else if (ast.getType() == HiveParser.TOK_ALTERTABLE_ADDCOLS) { - analyzeAlterTableModifyCols(qualified, ast, AlterTableTypes.ADDCOLS); + analyzeAlterTableModifyCols(qualified, ast, partSpec, AlterTableTypes.ADDCOLS); } else if (ast.getType() == HiveParser.TOK_ALTERTABLE_REPLACECOLS) { - analyzeAlterTableModifyCols(qualified, ast, AlterTableTypes.REPLACECOLS); + analyzeAlterTableModifyCols(qualified, ast, partSpec, AlterTableTypes.REPLACECOLS); } else if (ast.getType() == HiveParser.TOK_ALTERTABLE_RENAMECOL) { - analyzeAlterTableRenameCol(qualified, ast); + analyzeAlterTableRenameCol(qualified, ast, partSpec); } else if (ast.getType() == HiveParser.TOK_ALTERTABLE_ADDPARTS) { analyzeAlterTableAddParts(qualified, ast, false); } else if (ast.getType() == HiveParser.TOK_ALTERTABLE_DROPPARTS) { @@ -847,7 +848,8 @@ public class DDLSemanticAnalyzer extends outputs.add(new WriteEntity(tab, WriteEntity.WriteType.DDL_EXCLUSIVE)); } - DropTableDesc dropTblDesc = new DropTableDesc(tableName, expectView, ifExists); + boolean ifPurge = (ast.getFirstChildWithType(HiveParser.KW_PURGE) != null); + DropTableDesc dropTblDesc = new DropTableDesc(tableName, expectView, ifExists, ifPurge); rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), dropTblDesc), conf)); } @@ -1145,7 +1147,10 @@ public class DDLSemanticAnalyzer extends } } - inputs.add(new ReadEntity(getTable(tableName))); + Table tbl = getTable(tableName, false); + if (tbl != null) { + inputs.add(new ReadEntity(getTable(tableName))); + } DropIndexDesc dropIdxDesc = new DropIndexDesc(indexName, tableName); rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), @@ -1714,7 +1719,8 @@ public class DDLSemanticAnalyzer extends // assume the first component of DOT delimited name is tableName // get the attemptTableName - static public String getAttemptTableName(Hive db, String qualifiedName, boolean isColumn) { + static public String getAttemptTableName(Hive db, String qualifiedName, boolean isColumn) + throws SemanticException { // check whether the name starts with table // DESCRIBE table // DESCRIBE table.column @@ -1735,11 +1741,13 @@ public class DDLSemanticAnalyzer extends return tableName; } } - } catch (HiveException e) { + } catch (InvalidTableException e) { // assume the first DOT delimited component is tableName // OK if it is not // do nothing when having exception return null; + } catch (HiveException e) { + throw new SemanticException(e.getMessage(), e); } return null; } @@ -1820,7 +1828,7 @@ public class DDLSemanticAnalyzer extends ASTNode parentAst, ASTNode ast, String tableName, - Map<String, String> partSpec) { + Map<String, String> partSpec) throws SemanticException { // if parent has two children // it could be DESCRIBE table key @@ -1876,11 +1884,13 @@ public class DDLSemanticAnalyzer extends Table tab = null; try { tab = db.getTable(tableName); - } catch (HiveException e) { - // if table not valid - // throw semantic exception + } + catch (InvalidTableException e) { throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(tableName), e); } + catch (HiveException e) { + throw new SemanticException(e.getMessage(), e); + } if (partSpec != null) { Partition part = null; @@ -2230,6 +2240,10 @@ public class DDLSemanticAnalyzer extends if (ast.getChildCount() == 1) { String funcNames = stripQuotes(ast.getChild(0).getText()); showFuncsDesc = new ShowFunctionsDesc(ctx.getResFile(), funcNames); + } else if (ast.getChildCount() == 2) { + assert (ast.getChild(0).getType() == HiveParser.KW_LIKE); + String funcNames = stripQuotes(ast.getChild(1).getText()); + showFuncsDesc = new ShowFunctionsDesc(ctx.getResFile(), funcNames, true); } else { showFuncsDesc = new ShowFunctionsDesc(ctx.getResFile()); } @@ -2477,7 +2491,8 @@ public class DDLSemanticAnalyzer extends alterTblDesc), conf)); } - private void analyzeAlterTableRenameCol(String[] qualified, ASTNode ast) throws SemanticException { + private void analyzeAlterTableRenameCol(String[] qualified, ASTNode ast, + HashMap<String, String> partSpec) throws SemanticException { String newComment = null; String newType = null; newType = getTypeStringFromAST((ASTNode) ast.getChild(2)); @@ -2518,10 +2533,10 @@ public class DDLSemanticAnalyzer extends } String tblName = getDotName(qualified); - AlterTableDesc alterTblDesc = new AlterTableDesc(tblName, + AlterTableDesc alterTblDesc = new AlterTableDesc(tblName, partSpec, unescapeIdentifier(oldColName), unescapeIdentifier(newColName), newType, newComment, first, flagCol); - addInputsOutputsAlterTable(tblName, null, alterTblDesc); + addInputsOutputsAlterTable(tblName, partSpec, alterTblDesc); rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), alterTblDesc), conf)); @@ -2565,14 +2580,14 @@ public class DDLSemanticAnalyzer extends } private void analyzeAlterTableModifyCols(String[] qualified, ASTNode ast, - AlterTableTypes alterType) throws SemanticException { + HashMap<String, String> partSpec, AlterTableTypes alterType) throws SemanticException { String tblName = getDotName(qualified); List<FieldSchema> newCols = getColumns((ASTNode) ast.getChild(0)); - AlterTableDesc alterTblDesc = new AlterTableDesc(tblName, newCols, + AlterTableDesc alterTblDesc = new AlterTableDesc(tblName, partSpec, newCols, alterType); - addInputsOutputsAlterTable(tblName, null, alterTblDesc); + addInputsOutputsAlterTable(tblName, partSpec, alterTblDesc); rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), alterTblDesc), conf)); } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g Tue Oct 14 19:06:45 2014 @@ -263,7 +263,7 @@ searchCondition // INSERT INTO <table> (col1,col2,...) SELECT * FROM (VALUES(1,2,3),(4,5,6),...) as Foo(a,b,c) valueRowConstructor : - LPAREN atomExpression (COMMA atomExpression)* RPAREN -> ^(TOK_VALUE_ROW atomExpression+) + LPAREN precedenceUnaryPrefixExpression (COMMA precedenceUnaryPrefixExpression)* RPAREN -> ^(TOK_VALUE_ROW precedenceUnaryPrefixExpression+) ; valuesTableConstructor Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/FunctionSemanticAnalyzer.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/FunctionSemanticAnalyzer.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/FunctionSemanticAnalyzer.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/FunctionSemanticAnalyzer.java Tue Oct 14 19:06:45 2014 @@ -22,6 +22,8 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.Database; @@ -81,7 +83,7 @@ public class FunctionSemanticAnalyzer ex new CreateFunctionDesc(functionName, isTemporaryFunction, className, resources); rootTasks.add(TaskFactory.get(new FunctionWork(desc), conf)); - addEntities(functionName, isTemporaryFunction); + addEntities(functionName, isTemporaryFunction, resources); } private void analyzeDropFunction(ASTNode ast) throws SemanticException { @@ -106,7 +108,7 @@ public class FunctionSemanticAnalyzer ex DropFunctionDesc desc = new DropFunctionDesc(functionName, isTemporaryFunction); rootTasks.add(TaskFactory.get(new FunctionWork(desc), conf)); - addEntities(functionName, isTemporaryFunction); + addEntities(functionName, isTemporaryFunction, null); } private ResourceType getResourceType(ASTNode token) throws SemanticException { @@ -152,8 +154,8 @@ public class FunctionSemanticAnalyzer ex /** * Add write entities to the semantic analyzer to restrict function creation to privileged users. */ - private void addEntities(String functionName, boolean isTemporaryFunction) - throws SemanticException { + private void addEntities(String functionName, boolean isTemporaryFunction, + List<ResourceUri> resources) throws SemanticException { // If the function is being added under a database 'namespace', then add an entity representing // the database (only applicable to permanent/metastore functions). // We also add a second entity representing the function name. @@ -183,5 +185,13 @@ public class FunctionSemanticAnalyzer ex // Add the function name as a WriteEntity outputs.add(new WriteEntity(database, functionName, Type.FUNCTION, WriteEntity.WriteType.DDL_NO_LOCK)); + + if (resources != null) { + for (ResourceUri resource : resources) { + String uriPath = resource.getUri(); + outputs.add(new WriteEntity(new Path(uriPath), + FileUtils.isLocalFile(conf, uriPath))); + } + } } } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java Tue Oct 14 19:06:45 2014 @@ -29,6 +29,7 @@ import java.util.Set; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator; +import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; @@ -45,6 +46,7 @@ import org.apache.hadoop.hive.ql.lib.Nod import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.apache.hadoop.hive.ql.plan.MergeJoinWork; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; @@ -132,6 +134,8 @@ public class GenTezProcContext implement // remember which reducesinks we've already connected public final Set<ReduceSinkOperator> connectedReduceSinks; + public final Map<Operator<?>, MergeJoinWork> opMergeJoinWorkMap; + public CommonMergeJoinOperator currentMergeJoinOperator; // remember the event operators we've seen public final Set<AppMasterEventOperator> eventOperatorSet; @@ -176,6 +180,8 @@ public class GenTezProcContext implement this.eventOperatorSet = new LinkedHashSet<AppMasterEventOperator>(); this.abandonedEventOperatorSet = new LinkedHashSet<AppMasterEventOperator>(); this.tsToEventMap = new LinkedHashMap<TableScanOperator, List<AppMasterEventOperator>>(); + this.opMergeJoinWorkMap = new LinkedHashMap<Operator<?>, MergeJoinWork>(); + this.currentMergeJoinOperator = null; rootTasks.add(currentTask); } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java Tue Oct 14 19:06:45 2014 @@ -167,7 +167,8 @@ public class GenTezUtils { GenMapRedUtils.setKeyAndValueDesc(reduceWork, reduceSink); // remember which parent belongs to which tag - reduceWork.getTagToInput().put(reduceSink.getConf().getTag(), + int tag = reduceSink.getConf().getTag(); + reduceWork.getTagToInput().put(tag == -1 ? 0 : tag, context.preceedingWork.getName()); // remember the output name of the reduce sink Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java Tue Oct 14 19:06:45 2014 @@ -28,6 +28,8 @@ import java.util.Stack; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; +import org.apache.hadoop.hive.ql.exec.DummyStoreOperator; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; @@ -38,11 +40,14 @@ import org.apache.hadoop.hive.ql.lib.Nod import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.MergeJoinWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.plan.TezWork.VertexType; import org.apache.hadoop.hive.ql.plan.UnionWork; /** @@ -126,6 +131,48 @@ public class GenTezWork implements NodeP context.childToWorkMap.get(operator).add(work); } + // this transformation needs to be first because it changes the work item itself. + // which can affect the working of all downstream transformations. + if (context.currentMergeJoinOperator != null) { + // we are currently walking the big table side of the merge join. we need to create or hook up + // merge join work. + MergeJoinWork mergeJoinWork = null; + if (context.opMergeJoinWorkMap.containsKey(operator)) { + // we have found a merge work corresponding to this closing operator. Hook up this work. + mergeJoinWork = context.opMergeJoinWorkMap.get(operator); + } else { + // we need to create the merge join work + mergeJoinWork = new MergeJoinWork(); + mergeJoinWork.setMergeJoinOperator(context.currentMergeJoinOperator); + tezWork.add(mergeJoinWork); + context.opMergeJoinWorkMap.put(operator, mergeJoinWork); + } + // connect the work correctly. + mergeJoinWork.addMergedWork(work, null); + Operator<? extends OperatorDesc> parentOp = + getParentFromStack(context.currentMergeJoinOperator, stack); + int pos = context.currentMergeJoinOperator.getTagForOperator(parentOp); + work.setTag(pos); + tezWork.setVertexType(work, VertexType.MULTI_INPUT_UNINITIALIZED_EDGES); + for (BaseWork parentWork : tezWork.getParents(work)) { + TezEdgeProperty edgeProp = tezWork.getEdgeProperty(parentWork, work); + tezWork.disconnect(parentWork, work); + tezWork.connect(parentWork, mergeJoinWork, edgeProp); + } + + for (BaseWork childWork : tezWork.getChildren(work)) { + TezEdgeProperty edgeProp = tezWork.getEdgeProperty(work, childWork); + tezWork.disconnect(work, childWork); + tezWork.connect(mergeJoinWork, childWork, edgeProp); + } + tezWork.remove(work); + context.rootToWorkMap.put(root, mergeJoinWork); + context.childToWorkMap.get(operator).remove(work); + context.childToWorkMap.get(operator).add(mergeJoinWork); + work = mergeJoinWork; + context.currentMergeJoinOperator = null; + } + // remember which mapjoin operator links with which work if (!context.currentMapJoinOperators.isEmpty()) { for (MapJoinOperator mj: context.currentMapJoinOperators) { @@ -169,6 +216,9 @@ public class GenTezWork implements NodeP LOG.debug("connecting "+parentWork.getName()+" with "+work.getName()); TezEdgeProperty edgeProp = parentWorkMap.getValue(); tezWork.connect(parentWork, work, edgeProp); + if (edgeProp.getEdgeType() == EdgeType.CUSTOM_EDGE) { + tezWork.setVertexType(work, VertexType.INITIALIZED_EDGES); + } // need to set up output name for reduce sink now that we know the name // of the downstream work @@ -192,14 +242,6 @@ public class GenTezWork implements NodeP context.currentMapJoinOperators.clear(); } - // This is where we cut the tree as described above. We also remember that - // we might have to connect parent work with this work later. - for (Operator<?> parent: new ArrayList<Operator<?>>(root.getParentOperators())) { - context.leafOperatorToFollowingWork.put(parent, work); - LOG.debug("Removing " + parent + " as parent from " + root); - root.removeParent(parent); - } - if (!context.currentUnionOperators.isEmpty()) { // if there are union all operators we need to add the work to the set // of union operators. @@ -229,6 +271,21 @@ public class GenTezWork implements NodeP work = unionWork; } + + // This is where we cut the tree as described above. We also remember that + // we might have to connect parent work with this work later. + boolean removeParents = false; + for (Operator<?> parent: new ArrayList<Operator<?>>(root.getParentOperators())) { + removeParents = true; + context.leafOperatorToFollowingWork.put(parent, work); + LOG.debug("Removing " + parent + " as parent from " + root); + } + if (removeParents) { + for (Operator<?> parent : new ArrayList<Operator<?>>(root.getParentOperators())) { + root.removeParent(parent); + } + } + // We're scanning a tree from roots to leaf (this is not technically // correct, demux and mux operators might form a diamond shape, but // we will only scan one path and ignore the others, because the @@ -248,31 +305,64 @@ public class GenTezWork implements NodeP LOG.debug("Second pass. Leaf operator: "+operator +" has common downstream work:"+followingWork); - // need to add this branch to the key + value info - assert operator instanceof ReduceSinkOperator - && followingWork instanceof ReduceWork; - ReduceSinkOperator rs = (ReduceSinkOperator) operator; - ReduceWork rWork = (ReduceWork) followingWork; - GenMapRedUtils.setKeyAndValueDesc(rWork, rs); - - // remember which parent belongs to which tag - rWork.getTagToInput().put(rs.getConf().getTag(), work.getName()); - - // remember the output name of the reduce sink - rs.getConf().setOutputName(rWork.getName()); - - if (!context.connectedReduceSinks.contains(rs)) { - // add dependency between the two work items - TezEdgeProperty edgeProp; - if (rWork.isAutoReduceParallelism()) { - edgeProp = - new TezEdgeProperty(context.conf, EdgeType.SIMPLE_EDGE, true, - rWork.getMinReduceTasks(), rWork.getMaxReduceTasks(), bytesPerReducer); + if (operator instanceof DummyStoreOperator) { + // this is the small table side. + assert (followingWork instanceof MergeJoinWork); + MergeJoinWork mergeJoinWork = (MergeJoinWork) followingWork; + CommonMergeJoinOperator mergeJoinOp = mergeJoinWork.getMergeJoinOperator(); + work.setTag(mergeJoinOp.getTagForOperator(operator)); + mergeJoinWork.addMergedWork(null, work); + tezWork.setVertexType(mergeJoinWork, VertexType.MULTI_INPUT_UNINITIALIZED_EDGES); + for (BaseWork parentWork : tezWork.getParents(work)) { + TezEdgeProperty edgeProp = tezWork.getEdgeProperty(parentWork, work); + tezWork.disconnect(parentWork, work); + tezWork.connect(parentWork, mergeJoinWork, edgeProp); + } + work = mergeJoinWork; + } else { + // need to add this branch to the key + value info + assert operator instanceof ReduceSinkOperator + && ((followingWork instanceof ReduceWork) || (followingWork instanceof MergeJoinWork) + || followingWork instanceof UnionWork); + ReduceSinkOperator rs = (ReduceSinkOperator) operator; + ReduceWork rWork = null; + if (followingWork instanceof MergeJoinWork) { + MergeJoinWork mergeJoinWork = (MergeJoinWork) followingWork; + rWork = (ReduceWork) mergeJoinWork.getMainWork(); + } else if (followingWork instanceof UnionWork) { + // this can only be possible if there is merge work followed by the union + UnionWork unionWork = (UnionWork) followingWork; + int index = getMergeIndex(tezWork, unionWork, rs); + // guaranteed to be instance of MergeJoinWork if index is valid + MergeJoinWork mergeJoinWork = (MergeJoinWork) tezWork.getChildren(unionWork).get(index); + // disconnect the connection to union work and connect to merge work + followingWork = mergeJoinWork; + rWork = (ReduceWork) mergeJoinWork.getMainWork(); } else { - edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE); + rWork = (ReduceWork) followingWork; + } + GenMapRedUtils.setKeyAndValueDesc(rWork, rs); + + // remember which parent belongs to which tag + int tag = rs.getConf().getTag(); + rWork.getTagToInput().put(tag == -1 ? 0 : tag, work.getName()); + + // remember the output name of the reduce sink + rs.getConf().setOutputName(rWork.getName()); + + if (!context.connectedReduceSinks.contains(rs)) { + // add dependency between the two work items + TezEdgeProperty edgeProp; + if (rWork.isAutoReduceParallelism()) { + edgeProp = + new TezEdgeProperty(context.conf, EdgeType.SIMPLE_EDGE, true, + rWork.getMinReduceTasks(), rWork.getMaxReduceTasks(), bytesPerReducer); + } else { + edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE); + } + tezWork.connect(work, followingWork, edgeProp); + context.connectedReduceSinks.add(rs); } - tezWork.connect(work, rWork, edgeProp); - context.connectedReduceSinks.add(rs); } } else { LOG.debug("First pass. Leaf operator: "+operator); @@ -289,4 +379,28 @@ public class GenTezWork implements NodeP return null; } + + private int getMergeIndex(TezWork tezWork, UnionWork unionWork, ReduceSinkOperator rs) { + int index = 0; + for (BaseWork baseWork : tezWork.getChildren(unionWork)) { + if (baseWork instanceof MergeJoinWork) { + MergeJoinWork mergeJoinWork = (MergeJoinWork) baseWork; + int tag = mergeJoinWork.getMergeJoinOperator().getTagForOperator(rs); + if (tag != -1) { + return index; + } else { + index++; + } + } + } + + return -1; + } + + @SuppressWarnings("unchecked") + private Operator<? extends OperatorDesc> getParentFromStack(Node currentMergeJoinOperator, + Stack<Node> stack) { + int pos = stack.indexOf(currentMergeJoinOperator); + return (Operator<? extends OperatorDesc>) stack.get(pos - 1); + } }
