Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java?rev=1629563&r1=1629562&r2=1629563&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java Mon Oct 6 04:00:39 2014 @@ -28,6 +28,8 @@ import java.util.Stack; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; +import org.apache.hadoop.hive.ql.exec.DummyStoreOperator; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; @@ -38,11 +40,14 @@ import org.apache.hadoop.hive.ql.lib.Nod import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.MergeJoinWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.plan.TezWork.VertexType; import org.apache.hadoop.hive.ql.plan.UnionWork; /** @@ -126,6 +131,48 @@ public class GenTezWork implements NodeP context.childToWorkMap.get(operator).add(work); } + // this transformation needs to be first because it changes the work item itself. + // which can affect the working of all downstream transformations. + if (context.currentMergeJoinOperator != null) { + // we are currently walking the big table side of the merge join. we need to create or hook up + // merge join work. + MergeJoinWork mergeJoinWork = null; + if (context.opMergeJoinWorkMap.containsKey(operator)) { + // we have found a merge work corresponding to this closing operator. Hook up this work. + mergeJoinWork = context.opMergeJoinWorkMap.get(operator); + } else { + // we need to create the merge join work + mergeJoinWork = new MergeJoinWork(); + mergeJoinWork.setMergeJoinOperator(context.currentMergeJoinOperator); + tezWork.add(mergeJoinWork); + context.opMergeJoinWorkMap.put(operator, mergeJoinWork); + } + // connect the work correctly. + mergeJoinWork.addMergedWork(work, null); + Operator<? extends OperatorDesc> parentOp = + getParentFromStack(context.currentMergeJoinOperator, stack); + int pos = context.currentMergeJoinOperator.getTagForOperator(parentOp); + work.setTag(pos); + tezWork.setVertexType(work, VertexType.MULTI_INPUT_UNINITIALIZED_EDGES); + for (BaseWork parentWork : tezWork.getParents(work)) { + TezEdgeProperty edgeProp = tezWork.getEdgeProperty(parentWork, work); + tezWork.disconnect(parentWork, work); + tezWork.connect(parentWork, mergeJoinWork, edgeProp); + } + + for (BaseWork childWork : tezWork.getChildren(work)) { + TezEdgeProperty edgeProp = tezWork.getEdgeProperty(work, childWork); + tezWork.disconnect(work, childWork); + tezWork.connect(mergeJoinWork, childWork, edgeProp); + } + tezWork.remove(work); + context.rootToWorkMap.put(root, mergeJoinWork); + context.childToWorkMap.get(operator).remove(work); + context.childToWorkMap.get(operator).add(mergeJoinWork); + work = mergeJoinWork; + context.currentMergeJoinOperator = null; + } + // remember which mapjoin operator links with which work if (!context.currentMapJoinOperators.isEmpty()) { for (MapJoinOperator mj: context.currentMapJoinOperators) { @@ -169,6 +216,9 @@ public class GenTezWork implements NodeP LOG.debug("connecting "+parentWork.getName()+" with "+work.getName()); TezEdgeProperty edgeProp = parentWorkMap.getValue(); tezWork.connect(parentWork, work, edgeProp); + if (edgeProp.getEdgeType() == EdgeType.CUSTOM_EDGE) { + tezWork.setVertexType(work, VertexType.INITIALIZED_EDGES); + } // need to set up output name for reduce sink now that we know the name // of the downstream work @@ -192,14 +242,6 @@ public class GenTezWork implements NodeP context.currentMapJoinOperators.clear(); } - // This is where we cut the tree as described above. We also remember that - // we might have to connect parent work with this work later. - for (Operator<?> parent: new ArrayList<Operator<?>>(root.getParentOperators())) { - context.leafOperatorToFollowingWork.put(parent, work); - LOG.debug("Removing " + parent + " as parent from " + root); - root.removeParent(parent); - } - if (!context.currentUnionOperators.isEmpty()) { // if there are union all operators we need to add the work to the set // of union operators. @@ -229,6 +271,21 @@ public class GenTezWork implements NodeP work = unionWork; } + + // This is where we cut the tree as described above. We also remember that + // we might have to connect parent work with this work later. + boolean removeParents = false; + for (Operator<?> parent: new ArrayList<Operator<?>>(root.getParentOperators())) { + removeParents = true; + context.leafOperatorToFollowingWork.put(parent, work); + LOG.debug("Removing " + parent + " as parent from " + root); + } + if (removeParents) { + for (Operator<?> parent : new ArrayList<Operator<?>>(root.getParentOperators())) { + root.removeParent(parent); + } + } + // We're scanning a tree from roots to leaf (this is not technically // correct, demux and mux operators might form a diamond shape, but // we will only scan one path and ignore the others, because the @@ -248,31 +305,64 @@ public class GenTezWork implements NodeP LOG.debug("Second pass. Leaf operator: "+operator +" has common downstream work:"+followingWork); - // need to add this branch to the key + value info - assert operator instanceof ReduceSinkOperator - && followingWork instanceof ReduceWork; - ReduceSinkOperator rs = (ReduceSinkOperator) operator; - ReduceWork rWork = (ReduceWork) followingWork; - GenMapRedUtils.setKeyAndValueDesc(rWork, rs); - - // remember which parent belongs to which tag - rWork.getTagToInput().put(rs.getConf().getTag(), work.getName()); - - // remember the output name of the reduce sink - rs.getConf().setOutputName(rWork.getName()); - - if (!context.connectedReduceSinks.contains(rs)) { - // add dependency between the two work items - TezEdgeProperty edgeProp; - if (rWork.isAutoReduceParallelism()) { - edgeProp = - new TezEdgeProperty(context.conf, EdgeType.SIMPLE_EDGE, true, - rWork.getMinReduceTasks(), rWork.getMaxReduceTasks(), bytesPerReducer); + if (operator instanceof DummyStoreOperator) { + // this is the small table side. + assert (followingWork instanceof MergeJoinWork); + MergeJoinWork mergeJoinWork = (MergeJoinWork) followingWork; + CommonMergeJoinOperator mergeJoinOp = mergeJoinWork.getMergeJoinOperator(); + work.setTag(mergeJoinOp.getTagForOperator(operator)); + mergeJoinWork.addMergedWork(null, work); + tezWork.setVertexType(mergeJoinWork, VertexType.MULTI_INPUT_UNINITIALIZED_EDGES); + for (BaseWork parentWork : tezWork.getParents(work)) { + TezEdgeProperty edgeProp = tezWork.getEdgeProperty(parentWork, work); + tezWork.disconnect(parentWork, work); + tezWork.connect(parentWork, mergeJoinWork, edgeProp); + } + work = mergeJoinWork; + } else { + // need to add this branch to the key + value info + assert operator instanceof ReduceSinkOperator + && ((followingWork instanceof ReduceWork) || (followingWork instanceof MergeJoinWork) + || followingWork instanceof UnionWork); + ReduceSinkOperator rs = (ReduceSinkOperator) operator; + ReduceWork rWork = null; + if (followingWork instanceof MergeJoinWork) { + MergeJoinWork mergeJoinWork = (MergeJoinWork) followingWork; + rWork = (ReduceWork) mergeJoinWork.getMainWork(); + } else if (followingWork instanceof UnionWork) { + // this can only be possible if there is merge work followed by the union + UnionWork unionWork = (UnionWork) followingWork; + int index = getMergeIndex(tezWork, unionWork, rs); + // guaranteed to be instance of MergeJoinWork if index is valid + MergeJoinWork mergeJoinWork = (MergeJoinWork) tezWork.getChildren(unionWork).get(index); + // disconnect the connection to union work and connect to merge work + followingWork = mergeJoinWork; + rWork = (ReduceWork) mergeJoinWork.getMainWork(); } else { - edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE); + rWork = (ReduceWork) followingWork; + } + GenMapRedUtils.setKeyAndValueDesc(rWork, rs); + + // remember which parent belongs to which tag + int tag = rs.getConf().getTag(); + rWork.getTagToInput().put(tag == -1 ? 0 : tag, work.getName()); + + // remember the output name of the reduce sink + rs.getConf().setOutputName(rWork.getName()); + + if (!context.connectedReduceSinks.contains(rs)) { + // add dependency between the two work items + TezEdgeProperty edgeProp; + if (rWork.isAutoReduceParallelism()) { + edgeProp = + new TezEdgeProperty(context.conf, EdgeType.SIMPLE_EDGE, true, + rWork.getMinReduceTasks(), rWork.getMaxReduceTasks(), bytesPerReducer); + } else { + edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE); + } + tezWork.connect(work, followingWork, edgeProp); + context.connectedReduceSinks.add(rs); } - tezWork.connect(work, rWork, edgeProp); - context.connectedReduceSinks.add(rs); } } else { LOG.debug("First pass. Leaf operator: "+operator); @@ -289,4 +379,28 @@ public class GenTezWork implements NodeP return null; } + + private int getMergeIndex(TezWork tezWork, UnionWork unionWork, ReduceSinkOperator rs) { + int index = 0; + for (BaseWork baseWork : tezWork.getChildren(unionWork)) { + if (baseWork instanceof MergeJoinWork) { + MergeJoinWork mergeJoinWork = (MergeJoinWork) baseWork; + int tag = mergeJoinWork.getMergeJoinOperator().getTagForOperator(rs); + if (tag != -1) { + return index; + } else { + index++; + } + } + } + + return -1; + } + + @SuppressWarnings("unchecked") + private Operator<? extends OperatorDesc> getParentFromStack(Node currentMergeJoinOperator, + Stack<Node> stack) { + int pos = stack.indexOf(currentMergeJoinOperator); + return (Operator<? extends OperatorDesc>) stack.get(pos - 1); + } }
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g?rev=1629563&r1=1629562&r2=1629563&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g Mon Oct 6 04:00:39 2014 @@ -479,8 +479,9 @@ import java.util.HashMap; xlateMap.put("KW_SUBQUERY", "SUBQUERY"); xlateMap.put("KW_REWRITE", "REWRITE"); xlateMap.put("KW_UPDATE", "UPDATE"); - xlateMap.put("KW_VALUES", "VALUES"); + xlateMap.put("KW_PURGE", "PURGE"); + // Operators xlateMap.put("DOT", "."); @@ -929,7 +930,7 @@ dropIndexStatement dropTableStatement @init { pushMsg("drop statement", state); } @after { popMsg(state); } - : KW_DROP KW_TABLE ifExists? tableName -> ^(TOK_DROPTABLE tableName ifExists?) + : KW_DROP KW_TABLE ifExists? tableName KW_PURGE? -> ^(TOK_DROPTABLE tableName ifExists? KW_PURGE?) ; alterStatement @@ -945,8 +946,6 @@ alterTableStatementSuffix @init { pushMsg("alter table statement", state); } @after { popMsg(state); } : alterStatementSuffixRename[true] - | alterStatementSuffixAddCol - | alterStatementSuffixRenameCol | alterStatementSuffixUpdateStatsCol | alterStatementSuffixDropPartitions[true] | alterStatementSuffixAddPartitions[true] @@ -974,6 +973,8 @@ alterTblPartitionStatementSuffix | alterStatementSuffixClusterbySortby | alterStatementSuffixCompact | alterStatementSuffixUpdateStatsCol + | alterStatementSuffixRenameCol + | alterStatementSuffixAddCol ; alterStatementPartitionKeyType @@ -2237,7 +2238,7 @@ deleteStatement /*SET <columName> = (3 + col2)*/ columnAssignmentClause : - tableOrColumn EQUAL^ atomExpression + tableOrColumn EQUAL^ precedencePlusExpression ; /*SET col1 = 5, col2 = (4 + col4), ...*/ Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseDriver.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseDriver.java?rev=1629563&r1=1629562&r2=1629563&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseDriver.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseDriver.java Mon Oct 6 04:00:39 2014 @@ -131,7 +131,7 @@ public class ParseDriver { * so that the graph walking algorithms and the rules framework defined in * ql.lib can be used with the AST Nodes. */ - static final TreeAdaptor adaptor = new CommonTreeAdaptor() { + public static final TreeAdaptor adaptor = new CommonTreeAdaptor() { /** * Creates an ASTNode for the given token. The ASTNode is a wrapper around * antlr's CommonTree class that implements the Node interface. Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseUtils.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseUtils.java?rev=1629563&r1=1629562&r2=1629563&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseUtils.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseUtils.java Mon Oct 6 04:00:39 2014 @@ -111,7 +111,7 @@ public final class ParseUtils { * @param tableFieldTypeInfo TypeInfo to convert to * @return Expression converting column to the type specified by tableFieldTypeInfo */ - static ExprNodeDesc createConversionCast(ExprNodeDesc column, PrimitiveTypeInfo tableFieldTypeInfo) + public static ExprNodeDesc createConversionCast(ExprNodeDesc column, PrimitiveTypeInfo tableFieldTypeInfo) throws SemanticException { // Get base type, since type string may be parameterized String baseType = TypeInfoUtils.getBaseName(tableFieldTypeInfo.getTypeName()); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QBMetaData.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QBMetaData.java?rev=1629563&r1=1629562&r2=1629563&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QBMetaData.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QBMetaData.java Mon Oct 6 04:00:39 2014 @@ -104,10 +104,18 @@ public class QBMetaData { return nameToDestTable.get(alias.toLowerCase()); } + public Map<String, Table> getNameToDestTable() { + return nameToDestTable; + } + public Partition getDestPartitionForAlias(String alias) { return nameToDestPartition.get(alias.toLowerCase()); } + public Map<String, Partition> getNameToDestPartition() { + return nameToDestPartition; + } + public String getDestFileForAlias(String alias) { return nameToDestFile.get(alias.toLowerCase()); } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java?rev=1629563&r1=1629562&r2=1629563&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java Mon Oct 6 04:00:39 2014 @@ -49,7 +49,7 @@ public class RowResolver implements Seri * The primary(first) mapping is still only held in * invRslvMap. */ - private Map<String, String[]> altInvRslvMap; + private final Map<String, String[]> altInvRslvMap; private Map<String, ASTNode> expressionMap; // TODO: Refactor this and do in a more object oriented manner @@ -351,4 +351,73 @@ public class RowResolver implements Seri this.expressionMap = expressionMap; } + + // TODO: 1) How to handle collisions? 2) Should we be cloning ColumnInfo or + // not? + public static int add(RowResolver rrToAddTo, RowResolver rrToAddFrom, + int outputColPos, int numColumns) throws SemanticException { + String tabAlias; + String colAlias; + String[] qualifiedColName; + int i = 0; + + for (ColumnInfo cInfoFrmInput : rrToAddFrom.getRowSchema().getSignature()) { + if ( numColumns >= 0 && i == numColumns ) { + break; + } + ColumnInfo newCI = null; + qualifiedColName = rrToAddFrom.getInvRslvMap().get( + cInfoFrmInput.getInternalName()); + tabAlias = qualifiedColName[0]; + colAlias = qualifiedColName[1]; + + newCI = new ColumnInfo(cInfoFrmInput); + newCI.setInternalName(SemanticAnalyzer + .getColumnInternalName(outputColPos)); + + outputColPos++; + + if (rrToAddTo.get(tabAlias, colAlias) != null) { + LOG.debug("Found duplicate column alias in RR: " + rrToAddTo.get(tabAlias, colAlias)); + } else { + rrToAddTo.put(tabAlias, colAlias, newCI); + } + + qualifiedColName = rrToAddFrom.getAlternateMappings(cInfoFrmInput + .getInternalName()); + if (qualifiedColName != null) { + tabAlias = qualifiedColName[0]; + colAlias = qualifiedColName[1]; + rrToAddTo.put(tabAlias, colAlias, newCI); + } + i++; + } + + return outputColPos; + } + + public static int add(RowResolver rrToAddTo, RowResolver rrToAddFrom, + int outputColPos) throws SemanticException { + return add(rrToAddTo, rrToAddFrom, outputColPos, -1); + } + + /** + * Return a new row resolver that is combination of left RR and right RR. + * The schema will be schema of left, schema of right + * + * @param leftRR + * @param rightRR + * @return + * @throws SemanticException + */ + public static RowResolver getCombinedRR(RowResolver leftRR, + RowResolver rightRR) throws SemanticException { + int outputColPos = 0; + + RowResolver combinedRR = new RowResolver(); + outputColPos = add(combinedRR, leftRR, outputColPos); + outputColPos = add(combinedRR, rightRR, outputColPos); + + return combinedRR; + } }
