Author: hashutosh Date: Sat Feb 23 16:33:44 2013 New Revision: 1449363 URL: http://svn.apache.org/r1449363 Log: HIVE-4035 : Column Pruner for PTF Op (Prajakta Kalmegh via Ashutosh Chauhan)
Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java hive/branches/ptf-windowing/ql/src/test/queries/clientpositive/ptf_general_queries.q hive/branches/ptf-windowing/ql/src/test/results/clientpositive/ptf_general_queries.q.out Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java?rev=1449363&r1=1449362&r2=1449363&view=diff ============================================================================== --- hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java (original) +++ hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java Sat Feb 23 16:33:44 2013 @@ -23,19 +23,19 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; -import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.CommonJoinOperator; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.FilterOperator; -import org.apache.hadoop.hive.ql.exec.TableScanOperator; -import org.apache.hadoop.hive.ql.exec.SelectOperator; -import org.apache.hadoop.hive.ql.exec.ScriptOperator; import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.LateralViewForwardOperator; +import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; -import org.apache.hadoop.hive.ql.exec.UnionOperator; -import org.apache.hadoop.hive.ql.exec.CommonJoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.PTFOperator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; -import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator; -import org.apache.hadoop.hive.ql.exec.LateralViewForwardOperator; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.ScriptOperator; +import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; @@ -114,6 +114,9 @@ public class ColumnPruner implements Tra opRules.put(new RuleRegExp("R9", LateralViewForwardOperator.getOperatorName() + "%"), ColumnPrunerProcFactory.getLateralViewForwardProc()); + opRules.put(new RuleRegExp("R10", + PTFOperator.getOperatorName() + "%"), + ColumnPrunerProcFactory.getPTFProc()); // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along Dispatcher disp = new DefaultRuleDispatcher(ColumnPrunerProcFactory Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java?rev=1449363&r1=1449362&r2=1449363&view=diff ============================================================================== --- hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (original) +++ hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java Sat Feb 23 16:33:44 2013 @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.optimizer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -31,6 +32,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.CommonJoinOperator; +import org.apache.hadoop.hive.ql.exec.ExtractOperator; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.GroupByOperator; @@ -40,6 +42,7 @@ import org.apache.hadoop.hive.ql.exec.La import org.apache.hadoop.hive.ql.exec.LimitOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.PTFOperator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.ScriptOperator; @@ -62,11 +65,21 @@ import org.apache.hadoop.hive.ql.plan.Gr import org.apache.hadoop.hive.ql.plan.JoinDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.PTFDesc; +import org.apache.hadoop.hive.ql.plan.PTFDesc.PTFExpressionDef; +import org.apache.hadoop.hive.ql.plan.PTFDesc.ShapeDetails; +import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowExpressionDef; +import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowFunctionDef; +import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowTableFunctionDef; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; /** * Factory for generating the different node processors used by ColumnPruner. @@ -148,6 +161,158 @@ public final class ColumnPrunerProcFacto } /** + * - Pruning can only be done for Windowing. PTFs are black boxes, + * we assume all columns are needed. + * - add column names referenced in WindowFn args and in WindowFn expressions + * to the pruned list of the child Select Op. + * - Prune the Column names & types serde properties in each of the Shapes in the PTF Chain: + * - the InputDef's output shape + * - Window Tabl Functions: window output shape & output shape. + * - Why is pruning the Column names & types in the serde properties enough? + * - because during runtime we rebuild the OIs using these properties. + * - finally we set the prunedColList on the ColumnPrunerContx; + * and update the RR & signature on the PTFOp. + */ + public static class ColumnPrunerPTFProc implements NodeProcessor { + public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, + Object... nodeOutputs) throws SemanticException { + + PTFOperator op = (PTFOperator) nd; + PTFDesc conf = op.getConf(); + //Since we cannot know what columns will be needed by a PTF chain, + //we do not prune columns on PTFOperator for PTF chains. + if (!conf.forWindowing()) { + return getDefaultProc().process(nd, stack, ctx, nodeOutputs); + } + + ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx; + WindowTableFunctionDef def = (WindowTableFunctionDef) conf.getFuncDef(); + ArrayList<ColumnInfo> sig = new ArrayList<ColumnInfo>(); + + List<String> prunedCols = cppCtx.getPrunedColList(op.getChildOperators().get(0)); + //we create a copy of prunedCols to create a list of pruned columns for PTFOperator + prunedCols = new ArrayList<String>(prunedCols); + prunedColumnsList(prunedCols, def); + setSerdePropsOfShape(def.getInput().getOutputShape(), prunedCols); + setSerdePropsOfShape(def.getOutputFromWdwFnProcessing(), prunedCols); + setSerdePropsOfShape(def.getOutputShape(), prunedCols); + + RowResolver oldRR = cppCtx.getOpToParseCtxMap().get(op).getRowResolver(); + RowResolver newRR = buildPrunedRR(prunedCols, oldRR, sig); + cppCtx.getPrunedColLists().put(op, prunedInputList(prunedCols, def)); + cppCtx.getOpToParseCtxMap().get(op).setRowResolver(newRR); + op.getSchema().setSignature(sig); + return null; + } + + private static RowResolver buildPrunedRR(List<String> prunedCols, + RowResolver oldRR, ArrayList<ColumnInfo> sig) throws SemanticException{ + RowResolver newRR = new RowResolver(); + for (String col : prunedCols) { + String[] nm = oldRR.reverseLookup(col); + ColumnInfo colInfo = oldRR.get(nm[0], nm[1]); + if (colInfo != null) { + newRR.put(nm[0], nm[1], colInfo); + sig.add(colInfo); + } + } + return newRR; + } + + /* + * add any input columns referenced in WindowFn args or expressions. + */ + private void prunedColumnsList(List<String> prunedCols, WindowTableFunctionDef tDef) { + if ( tDef.getWindowFunctions() != null ) { + for(WindowFunctionDef wDef : tDef.getWindowFunctions() ) { + if ( wDef.getArgs() == null) { + continue; + } + for(PTFExpressionDef arg : wDef.getArgs()) { + ExprNodeDesc exprNode = arg.getExprNode(); + Utilities.mergeUniqElems(prunedCols, exprNode.getCols()); + } + } + } + if ( tDef.getWindowExpressions() != null ) { + for(WindowExpressionDef expr : tDef.getWindowExpressions()) { + ExprNodeDesc exprNode = expr.getExprNode(); + Utilities.mergeUniqElems(prunedCols, exprNode.getCols()); + } + } + } + + private List<String> getLowerCasePrunedCols(List<String> prunedCols){ + List<String> lowerCasePrunedCols = new ArrayList<String>(); + for (String col : prunedCols) { + lowerCasePrunedCols.add(col.toLowerCase()); + } + return lowerCasePrunedCols; + } + + /* + * reconstruct Column names & types list based on the prunedCols list. + */ + private void setSerdePropsOfShape(ShapeDetails shp, List<String> prunedCols) { + List<String> columnNames = Arrays.asList(shp.getSerdeProps().get( + org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS).split(",")); + List<TypeInfo> columnTypes = TypeInfoUtils + .getTypeInfosFromTypeString(shp.getSerdeProps().get( + org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES)); + /* + * fieldNames in OI are lower-cased. So we compare lower cased names for now. + */ + prunedCols = getLowerCasePrunedCols(prunedCols); + + StringBuilder cNames = new StringBuilder(); + StringBuilder cTypes = new StringBuilder(); + + boolean addComma = false; + for(int i=0; i < columnNames.size(); i++) { + if ( prunedCols.contains(columnNames.get(i)) ) { + cNames.append(addComma ? "," : ""); + cTypes.append(addComma ? "," : ""); + cNames.append(columnNames.get(i)); + cTypes.append(columnTypes.get(i)); + addComma = true; + } + } + shp.getSerdeProps().put( + org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS, cNames.toString()); + shp.getSerdeProps().put( + org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES, cTypes.toString()); + } + + /* + * from the prunedCols list filter out columns that refer to WindowFns or WindowExprs + * the returned list is set as the prunedList needed by the PTFOp. + */ + private ArrayList<String> prunedInputList(List<String> prunedCols, + WindowTableFunctionDef tDef) { + ArrayList<String> prunedInputCols = new ArrayList<String>(); + + StructObjectInspector OI = tDef.getInput().getOutputShape().getOI(); + for(StructField f : OI.getAllStructFieldRefs()) { + String fName = f.getFieldName(); + if ( prunedCols.contains(fName)) { + prunedInputCols.add(fName); + } + } + + return prunedInputCols; + } + } + + /** + * Factory method to get the ColumnPrunerGroupByProc class. + * + * @return ColumnPrunerGroupByProc + */ + public static ColumnPrunerPTFProc getPTFProc() { + return new ColumnPrunerPTFProc(); + } + + /** * The Default Node Processor for Column Pruning. */ public static class ColumnPrunerDefaultProc implements NodeProcessor { @@ -285,6 +450,39 @@ public final class ColumnPrunerProcFacto } Collections.sort(colLists); pruneReduceSinkOperator(flags, op, cppCtx); + } else if ((childOperators.size() == 1) + && (childOperators.get(0) instanceof ExtractOperator ) + && (childOperators.get(0).getChildOperators().size() == 1) + && (childOperators.get(0).getChildOperators().get(0) instanceof PTFOperator ) + && ((PTFOperator)childOperators.get(0). + getChildOperators().get(0)).getConf().forWindowing() ) { + + /* + * For RS that are followed by Extract & PTFOp for windowing + * - do the same thing as above. Reconstruct ValueColumn list based on what is required + * by the PTFOp. + */ + + assert parentOperators.size() == 1; + + PTFOperator ptfOp = (PTFOperator) childOperators.get(0).getChildOperators().get(0); + List<String> childCols = cppCtx.getPrunedColList(ptfOp); + boolean[] flags = new boolean[conf.getValueCols().size()]; + for (int i = 0; i < flags.length; i++) { + flags[i] = false; + } + if (childCols != null && childCols.size() > 0) { + ArrayList<String> outColNames = op.getConf().getOutputValueColumnNames(); + for(int i=0; i < outColNames.size(); i++ ) { + if ( childCols.contains(outColNames.get(i))) { + ExprNodeDesc exprNode = op.getConf().getValueCols().get(i); + flags[i] = true; + Utilities.mergeUniqElems(colLists, exprNode.getCols()); + } + } + } + Collections.sort(colLists); + pruneReduceSinkOperator(flags, op, cppCtx); } else { // Reduce Sink contains the columns needed - no need to aggregate from // children @@ -831,4 +1029,4 @@ public final class ColumnPrunerProcFacto return new ColumnPrunerMapJoinProc(); } -} +} \ No newline at end of file Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1449363&r1=1449362&r2=1449363&view=diff ============================================================================== --- hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original) +++ hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Sat Feb 23 16:33:44 2013 @@ -10825,11 +10825,6 @@ public class SemanticAnalyzer extends Ba Operator genWindowingPlan(WindowingSpec wSpec, Operator input) throws SemanticException { RowResolver rr = opParseCtx.get(input).getRowResolver(); - input = putOpInsertMap(OperatorFactory.getAndMakeChild( - new SelectDesc(true), new RowSchema(rr.getColumnInfos()), - input), rr); - - rr = opParseCtx.get(input).getRowResolver(); input = genReduceSinkPlanForWindowing(wSpec, rr, input); rr = opParseCtx.get(input).getRowResolver(); @@ -10881,11 +10876,8 @@ public class SemanticAnalyzer extends Ba orderCols.add(orderExpr); } - /* - * We add the column to value columns or output column names - * only if it is not a virtual column - */ ArrayList<ColumnInfo> colInfoList = inputRR.getColumnInfos(); + RowResolver rsNewRR = new RowResolver(); int pos = 0; for (ColumnInfo colInfo : colInfoList) { ExprNodeDesc valueColExpr = new ExprNodeColumnDesc(colInfo.getType(), colInfo @@ -10895,13 +10887,20 @@ public class SemanticAnalyzer extends Ba colExprMap.put(colInfo.getInternalName(), valueColExpr); String outColName = SemanticAnalyzer.getColumnInternalName(pos++); outputColumnNames.add(outColName); + + String[] alias = inputRR.reverseLookup(colInfo.getInternalName()); + ColumnInfo newColInfo = new ColumnInfo( + outColName, colInfo.getType(), alias[0], + colInfo.getIsVirtualCol(), colInfo.isHiddenVirtualCol()); + rsNewRR.put(alias[0], alias[1], newColInfo); + } input = putOpInsertMap(OperatorFactory.getAndMakeChild(PlanUtils .getReduceSinkDesc(orderCols, valueCols, outputColumnNames, false, -1, partCols, orderString.toString(), -1), - new RowSchema(inputRR.getColumnInfos()), input), inputRR); + new RowSchema(inputRR.getColumnInfos()), input), rsNewRR); input.setColumnExprMap(colExprMap); Modified: hive/branches/ptf-windowing/ql/src/test/queries/clientpositive/ptf_general_queries.q URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/test/queries/clientpositive/ptf_general_queries.q?rev=1449363&r1=1449362&r2=1449363&view=diff ============================================================================== --- hive/branches/ptf-windowing/ql/src/test/queries/clientpositive/ptf_general_queries.q (original) +++ hive/branches/ptf-windowing/ql/src/test/queries/clientpositive/ptf_general_queries.q Sat Feb 23 16:33:44 2013 @@ -16,6 +16,8 @@ CREATE TABLE part( LOAD DATA LOCAL INPATH '../data/files/part_tiny.txt' overwrite into table part; +set hive.ptf.partition.persistence.memsize=10485760; + create table flights_tiny ( ORIGIN_CITY_NAME string, DEST_CITY_NAME string, @@ -74,6 +76,13 @@ from part distribute by p_mfgr sort by p_name ; +-- 6. testJoinWithLeadLag +select p1.p_mfgr, p1.p_name, +p1.p_size, p1.p_size - lag(p1.p_size,1) as deltaSz +from part p1 join part p2 on p1.p_partkey = p2.p_partkey +distribute by p1.p_mfgr +sort by p1.p_name ; + -- 7. testJoinWithNoop select p_mfgr, p_name, p_size, p_size - lag(p_size,1) as deltaSz @@ -129,7 +138,7 @@ from noop(part partition by p_mfgr order by p_name ) -where p_size > 0 +having p_size > 0 distribute by p_mfgr sort by p_name; Modified: hive/branches/ptf-windowing/ql/src/test/results/clientpositive/ptf_general_queries.q.out URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/test/results/clientpositive/ptf_general_queries.q.out?rev=1449363&r1=1449362&r2=1449363&view=diff ============================================================================== --- hive/branches/ptf-windowing/ql/src/test/results/clientpositive/ptf_general_queries.q.out (original) +++ hive/branches/ptf-windowing/ql/src/test/results/clientpositive/ptf_general_queries.q.out Sat Feb 23 16:33:44 2013 @@ -308,6 +308,52 @@ Manufacturer#2 almond aquamarine sandy c Manufacturer#4 almond aquamarine yellow dodger mint 7 Manufacturer#4 almond azure aquamarine papaya violet 12 Manufacturer#5 almond azure blanched chiffon midnight 23 +PREHOOK: query: -- 6. testJoinWithLeadLag +select p1.p_mfgr, p1.p_name, +p1.p_size, p1.p_size - lag(p1.p_size,1) as deltaSz +from part p1 join part p2 on p1.p_partkey = p2.p_partkey +distribute by p1.p_mfgr +sort by p1.p_name +PREHOOK: type: QUERY +PREHOOK: Input: default@part +#### A masked pattern was here #### +POSTHOOK: query: -- 6. testJoinWithLeadLag +select p1.p_mfgr, p1.p_name, +p1.p_size, p1.p_size - lag(p1.p_size,1) as deltaSz +from part p1 join part p2 on p1.p_partkey = p2.p_partkey +distribute by p1.p_mfgr +sort by p1.p_name +POSTHOOK: type: QUERY +POSTHOOK: Input: default@part +#### A masked pattern was here #### +Manufacturer#1 almond antique burnished rose metallic 2 0 +Manufacturer#1 almond antique burnished rose metallic 2 0 +Manufacturer#1 almond antique burnished rose metallic 2 0 +Manufacturer#1 almond antique burnished rose metallic 2 0 +Manufacturer#1 almond antique chartreuse lavender yellow 34 32 +Manufacturer#1 almond antique salmon chartreuse burlywood 6 -28 +Manufacturer#1 almond aquamarine burnished black steel 28 22 +Manufacturer#1 almond aquamarine pink moccasin thistle 42 14 +Manufacturer#2 almond antique violet chocolate turquoise 14 0 +Manufacturer#2 almond antique violet turquoise frosted 40 26 +Manufacturer#2 almond aquamarine midnight light salmon 2 -38 +Manufacturer#2 almond aquamarine rose maroon antique 25 23 +Manufacturer#2 almond aquamarine sandy cyan gainsboro 18 -7 +Manufacturer#3 almond antique chartreuse khaki white 17 0 +Manufacturer#3 almond antique forest lavender goldenrod 14 -3 +Manufacturer#3 almond antique metallic orange dim 19 5 +Manufacturer#3 almond antique misty red olive 1 -18 +Manufacturer#3 almond antique olive coral navajo 45 44 +Manufacturer#4 almond antique gainsboro frosted violet 10 0 +Manufacturer#4 almond antique violet mint lemon 39 29 +Manufacturer#4 almond aquamarine floral ivory bisque 27 -12 +Manufacturer#4 almond aquamarine yellow dodger mint 7 -20 +Manufacturer#4 almond azure aquamarine papaya violet 12 5 +Manufacturer#5 almond antique blue firebrick mint 31 0 +Manufacturer#5 almond antique medium spring khaki 6 -25 +Manufacturer#5 almond antique sky peru orange 2 -4 +Manufacturer#5 almond aquamarine dodger light gainsboro 46 44 +Manufacturer#5 almond azure blanched chiffon midnight 23 -23 PREHOOK: query: -- 7. testJoinWithNoop select p_mfgr, p_name, p_size, p_size - lag(p_size,1) as deltaSz @@ -559,7 +605,7 @@ from noop(part partition by p_mfgr order by p_name ) -where p_size > 0 +having p_size > 0 distribute by p_mfgr sort by p_name PREHOOK: type: QUERY @@ -574,7 +620,7 @@ from noop(part partition by p_mfgr order by p_name ) -where p_size > 0 +having p_size > 0 distribute by p_mfgr sort by p_name POSTHOOK: type: QUERY