Author: hashutosh Date: Thu Aug 29 15:44:42 2013 New Revision: 1518680 URL: http://svn.apache.org/r1518680 Log: HIVE-4964 : Cleanup PTF code: remove code dealing with non standard sql behavior we had original introduced (Harish Butani via Ashutosh Chauhan)
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDeserializer.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java?rev=1518680&r1=1518679&r2=1518680&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java Thu Aug 29 15:44:42 2013 @@ -34,8 +34,6 @@ import org.apache.hadoop.hive.ql.plan.PT import org.apache.hadoop.hive.ql.plan.PTFDesc.PTFInputDef; import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionDef; import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionedTableFunctionDef; -import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowExpressionDef; -import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowTableFunctionDef; import org.apache.hadoop.hive.ql.plan.PTFDeserializer; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag; @@ -44,11 +42,9 @@ import org.apache.hadoop.hive.serde2.Ser import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -public class PTFOperator extends Operator<PTFDesc> implements Serializable -{ +public class PTFOperator extends Operator<PTFDesc> implements Serializable { private static final long serialVersionUID = 1L; PTFPartition inputPart; @@ -67,8 +63,7 @@ public class PTFOperator extends Operato * 4. Create input partition to store rows coming from previous operator */ @Override - protected void initializeOp(Configuration jobConf) throws HiveException - { + protected void initializeOp(Configuration jobConf) throws HiveException { hiveConf = new HiveConf(jobConf, PTFOperator.class); // if the parent is ExtractOperator, this invocation is from reduce-side Operator<? extends OperatorDesc> parentOp = getParentOperators().get(0); @@ -78,13 +73,10 @@ public class PTFOperator extends Operato inputPart = createFirstPartitionForChain( inputObjInspectors[0], hiveConf, isMapOperator); - if (isMapOperator) - { + if (isMapOperator) { PartitionedTableFunctionDef tDef = conf.getStartOfChain(); outputObjInspector = tDef.getRawInputShape().getOI(); - } - else - { + } else { outputObjInspector = conf.getFuncDef().getOutputShape().getOI(); } @@ -94,16 +86,12 @@ public class PTFOperator extends Operato } @Override - protected void closeOp(boolean abort) throws HiveException - { + protected void closeOp(boolean abort) throws HiveException { super.closeOp(abort); if(inputPart.size() != 0){ - if (isMapOperator) - { + if (isMapOperator) { processMapFunction(); - } - else - { + } else { processInputPartition(); } } @@ -113,8 +101,7 @@ public class PTFOperator extends Operato @Override public void processOp(Object row, int tag) throws HiveException { - if (!isMapOperator ) - { + if (!isMapOperator ) { /* * checkif current row belongs to the current accumulated Partition: * - If not: @@ -126,20 +113,15 @@ public class PTFOperator extends Operato boolean keysAreEqual = (currentKeys != null && newKeys != null)? newKeys.equals(currentKeys) : false; - if (currentKeys != null && !keysAreEqual) - { + if (currentKeys != null && !keysAreEqual) { processInputPartition(); inputPart.reset(); } - if (currentKeys == null || !keysAreEqual) - { - if (currentKeys == null) - { + if (currentKeys == null || !keysAreEqual) { + if (currentKeys == null) { currentKeys = newKeys.copyKey(); - } - else - { + } else { currentKeys.copyKey(newKeys); } } @@ -156,16 +138,14 @@ public class PTFOperator extends Operato * @param hiveConf * @throws HiveException */ - protected void reconstructQueryDef(HiveConf hiveConf) throws HiveException - { + protected void reconstructQueryDef(HiveConf hiveConf) throws HiveException { PTFDeserializer dS = new PTFDeserializer(conf, (StructObjectInspector)inputObjInspectors[0], hiveConf); dS.initializePTFChain(conf.getFuncDef()); } - protected void setupKeysWrapper(ObjectInspector inputOI) throws HiveException - { + protected void setupKeysWrapper(ObjectInspector inputOI) throws HiveException { PartitionDef pDef = conf.getStartOfChain().getPartition(); ArrayList<PTFExpressionDef> exprs = pDef.getExpressions(); int numExprs = exprs.size(); @@ -173,8 +153,7 @@ public class PTFOperator extends Operato ObjectInspector[] keyOIs = new ObjectInspector[numExprs]; ObjectInspector[] currentKeyOIs = new ObjectInspector[numExprs]; - for(int i=0; i<numExprs; i++) - { + for(int i=0; i<numExprs; i++) { PTFExpressionDef exprDef = exprs.get(i); /* * Why cannot we just use the ExprNodeEvaluator on the column? @@ -192,29 +171,20 @@ public class PTFOperator extends Operato newKeys = keyWrapperFactory.getKeyWrapper(); } - protected void processInputPartition() throws HiveException - { + protected void processInputPartition() throws HiveException { PTFPartition outPart = executeChain(inputPart); - if ( conf.forWindowing() ) { - executeWindowExprs(outPart); - } - else { - PTFPartitionIterator<Object> pItr = outPart.iterator(); - while (pItr.hasNext()) - { - Object oRow = pItr.next(); - forward(oRow, outputObjInspector); - } - } + PTFPartitionIterator<Object> pItr = outPart.iterator(); + while (pItr.hasNext()) { + Object oRow = pItr.next(); + forward(oRow, outputObjInspector); + } } - protected void processMapFunction() throws HiveException - { + protected void processMapFunction() throws HiveException { PartitionedTableFunctionDef tDef = conf.getStartOfChain(); PTFPartition outPart = tDef.getTFunction().transformRawInput(inputPart); PTFPartitionIterator<Object> pItr = outPart.iterator(); - while (pItr.hasNext()) - { + while (pItr.hasNext()) { Object oRow = pItr.next(); forward(oRow, outputObjInspector); } @@ -234,8 +204,7 @@ public class PTFOperator extends Operato @Override - public OperatorType getType() - { + public OperatorType getType() { return OperatorType.PTF; } @@ -250,124 +219,23 @@ public class PTFOperator extends Operato * @throws HiveException */ private PTFPartition executeChain(PTFPartition part) - throws HiveException - { + throws HiveException { Stack<PartitionedTableFunctionDef> fnDefs = new Stack<PartitionedTableFunctionDef>(); PTFInputDef iDef = conf.getFuncDef(); - while (true) - { - if (iDef instanceof PartitionedTableFunctionDef) - { - fnDefs.push((PartitionedTableFunctionDef) iDef); - iDef = ((PartitionedTableFunctionDef) iDef).getInput(); - } - else - { - break; - } + + while (iDef instanceof PartitionedTableFunctionDef) { + fnDefs.push((PartitionedTableFunctionDef) iDef); + iDef = ((PartitionedTableFunctionDef) iDef).getInput(); } PartitionedTableFunctionDef currFnDef; - while (!fnDefs.isEmpty()) - { + while (!fnDefs.isEmpty()) { currFnDef = fnDefs.pop(); part = currFnDef.getTFunction().execute(part); } return part; } - /** - * If WindowingSpec contains any Windowing Expressions or has a - * Having condition then these are processed - * and forwarded on. Whereas when there is no having or WdwExprs - * just forward the rows in the output Partition. - * - * For e.g. Consider the following query: - * <pre> - * {@code - * select rank(), lead(rank(),1),... - * from xyz - * ... - * having rank() > 1 - * } - * </pre> - * rank() gets processed as a WdwFn; Its in the oPart(output partition) - * argument to executeWindowExprs. Here we first evaluate the having expression. - * So the first row of oPart gets filtered out. - * Next we evaluate lead(rank()) which is held as a WindowExpression and add it to the output. - * - * @param ptfDesc - * @param oPart output partition after Window Fns are processed. - * @param op - * @throws HiveException - */ - private void executeWindowExprs(PTFPartition oPart) - throws HiveException - { - WindowTableFunctionDef wTFnDef = (WindowTableFunctionDef) conf.getFuncDef(); - /* - * inputOI represents the row with WindowFn results present. - * So in the e.g. above it will have a column for 'rank()' - */ - StructObjectInspector inputOI = wTFnDef.getOutputFromWdwFnProcessing().getOI(); - /* - * outputOI represents the final row with the Windowing Expressions added. - * So in the e.g. above it will have a column for 'lead(rank())' in addition to - * all columns in inputOI. - */ - StructObjectInspector outputOI = wTFnDef.getOutputShape().getOI(); - int numCols = outputOI.getAllStructFieldRefs().size(); - ArrayList<WindowExpressionDef> wdwExprs = wTFnDef.getWindowExpressions(); - int numWdwExprs = wdwExprs == null ? 0 : wdwExprs.size(); - Object[] output = new Object[numCols]; - - /* - * If this Windowing invocation has no Window Expressions and doesn't need to be filtered, - * we can just forward the row in the oPart partition. - */ - boolean forwardRowsUntouched = (wdwExprs == null || wdwExprs.size() == 0 ); - - PTFPartitionIterator<Object> pItr = oPart.iterator(); - PTFOperator.connectLeadLagFunctionsToPartition(conf, pItr); - while (pItr.hasNext()) - { - int colCnt = 0; - Object oRow = pItr.next(); - - /* - * when there is no Windowing expressions or having; - * just forward the Object coming out of the Partition. - */ - if ( forwardRowsUntouched ) { - forward(oRow, outputObjInspector); - continue; - } - - /* - * Setup the output row columns in the following order - * - the columns in the SelectList processed by the PTF - * (ie the Select Exprs that have navigation expressions) - * - the columns from the final PTF. - */ - - if ( wdwExprs != null ) { - for (WindowExpressionDef wdwExpr : wdwExprs) - { - Object newCol = wdwExpr.getExprEvaluator().evaluate(oRow); - output[colCnt++] = newCol; - } - } - - for(; colCnt < numCols; ) { - StructField field = inputOI.getAllStructFieldRefs().get(colCnt - numWdwExprs); - output[colCnt++] = - ObjectInspectorUtils.copyToStandardObject(inputOI.getStructFieldData(oRow, field), - field.getFieldObjectInspector()); - } - - forward(output, outputObjInspector); - } - } /** * Create a new Partition. @@ -390,8 +258,7 @@ public class PTFOperator extends Operato * @throws HiveException */ public PTFPartition createFirstPartitionForChain(ObjectInspector oi, - HiveConf hiveConf, boolean isMapSide) throws HiveException - { + HiveConf hiveConf, boolean isMapSide) throws HiveException { PartitionedTableFunctionDef tabDef = conf.getStartOfChain(); TableFunctionEvaluator tEval = tabDef.getTFunction(); @@ -410,14 +277,12 @@ public class PTFOperator extends Operato } public static void connectLeadLagFunctionsToPartition(PTFDesc ptfDesc, - PTFPartitionIterator<Object> pItr) throws HiveException - { + PTFPartitionIterator<Object> pItr) throws HiveException { List<ExprNodeGenericFuncDesc> llFnDescs = ptfDesc.getLlInfo().getLeadLagExprs(); if (llFnDescs == null) { return; } - for (ExprNodeGenericFuncDesc llFnDesc : llFnDescs) - { + for (ExprNodeGenericFuncDesc llFnDesc : llFnDescs) { GenericUDFLeadLag llFn = (GenericUDFLeadLag) llFnDesc .getGenericUDF(); llFn.setpItr(pItr); Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java?rev=1518680&r1=1518679&r2=1518680&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java Thu Aug 29 15:44:42 2013 @@ -68,7 +68,6 @@ import org.apache.hadoop.hive.ql.plan.Ma import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PTFDesc; import org.apache.hadoop.hive.ql.plan.PTFDesc.PTFExpressionDef; -import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowExpressionDef; import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowFunctionDef; import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowTableFunctionDef; import org.apache.hadoop.hive.ql.plan.PlanUtils; @@ -223,12 +222,6 @@ public final class ColumnPrunerProcFacto } } } - if ( tDef.getWindowExpressions() != null ) { - for(WindowExpressionDef expr : tDef.getWindowExpressions()) { - ExprNodeDesc exprNode = expr.getExprNode(); - Utilities.mergeUniqElems(prunedCols, exprNode.getCols()); - } - } if(tDef.getPartition() != null){ for(PTFExpressionDef col : tDef.getPartition().getExpressions()){ ExprNodeDesc exprNode = col.getExprNode(); Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java?rev=1518680&r1=1518679&r2=1518680&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java Thu Aug 29 15:44:42 2013 @@ -72,7 +72,6 @@ import org.apache.hadoop.hive.ql.plan.PT import org.apache.hadoop.hive.ql.plan.PTFDesc.RangeBoundaryDef; import org.apache.hadoop.hive.ql.plan.PTFDesc.ShapeDetails; import org.apache.hadoop.hive.ql.plan.PTFDesc.ValueBoundaryDef; -import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowExpressionDef; import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowFrameDef; import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowFunctionDef; import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowTableFunctionDef; @@ -200,83 +199,26 @@ public class PTFTranslator { /* * set outputFromWdwFnProcessing */ - if (windowFunctions.size() > 0) { - ArrayList<String> aliases = new ArrayList<String>(); - ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(); - for (WindowFunctionDef wFnDef : windowFunctions) { - aliases.add(wFnDef.getAlias()); - if (wFnDef.isPivotResult()) { - fieldOIs.add(((ListObjectInspector) wFnDef.getOI()).getListElementObjectInspector()); - } else { - fieldOIs.add(wFnDef.getOI()); - } - } - PTFTranslator.addInputColumnsToList(inpShape, aliases, fieldOIs); - StructObjectInspector wdwOutOI = ObjectInspectorFactory.getStandardStructObjectInspector( - aliases, fieldOIs); - tFn.setWdwProcessingOutputOI(wdwOutOI); - RowResolver wdwOutRR = buildRowResolverForWindowing(wdwTFnDef, false); - ShapeDetails wdwOutShape = setupShape(wdwOutOI, null, wdwOutRR); - wdwTFnDef.setOutputFromWdwFnProcessing(wdwOutShape); - } - else { - wdwTFnDef.setOutputFromWdwFnProcessing(inpShape); - } - - /* - * process Wdw expressions - */ - ShapeDetails wdwOutShape = wdwTFnDef.getOutputFromWdwFnProcessing(); - ArrayList<WindowExpressionDef> windowExpressions = new ArrayList<WindowExpressionDef>(); - if (wdwSpec.getWindowExpressions() != null) { - for (WindowExpressionSpec expr : wdwSpec.getWindowExpressions()) { - if (!(expr instanceof WindowFunctionSpec)) { - try { - PTFExpressionDef eDef = buildExpressionDef(wdwOutShape, expr.getExpression()); - WindowExpressionDef wdwEDef = new WindowExpressionDef(eDef); - wdwEDef.setAlias(expr.getAlias()); - windowExpressions.add(wdwEDef); - } catch (HiveException he) { - throw new SemanticException(he); - } - } + ArrayList<String> aliases = new ArrayList<String>(); + ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(); + for (WindowFunctionDef wFnDef : windowFunctions) { + aliases.add(wFnDef.getAlias()); + if (wFnDef.isPivotResult()) { + fieldOIs.add(((ListObjectInspector) wFnDef.getOI()).getListElementObjectInspector()); + } else { + fieldOIs.add(wFnDef.getOI()); } - wdwTFnDef.setWindowExpressions(windowExpressions); - } - - /* - * set outputOI - */ - if (windowExpressions.size() > 0) { - ArrayList<String> aliases = new ArrayList<String>(); - ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(); - for (WindowExpressionDef wEDef : windowExpressions) { - aliases.add(wEDef.getAlias()); - fieldOIs.add(wEDef.getOI()); - } - PTFTranslator.addInputColumnsToList(wdwOutShape, aliases, fieldOIs); - StructObjectInspector outOI = ObjectInspectorFactory.getStandardStructObjectInspector( - aliases, fieldOIs); - RowResolver outRR = buildRowResolverForWindowing(wdwTFnDef, true); - ShapeDetails outShape = setupShape(outOI, null, outRR); - wdwTFnDef.setOutputShape(outShape); - } - else { - wdwTFnDef.setOutputShape(copyShape(wdwOutShape)); } + PTFTranslator.addInputColumnsToList(inpShape, aliases, fieldOIs); + StructObjectInspector wdwOutOI = ObjectInspectorFactory.getStandardStructObjectInspector( + aliases, fieldOIs); + tFn.setWdwProcessingOutputOI(wdwOutOI); + RowResolver wdwOutRR = buildRowResolverForWindowing(wdwTFnDef); + ShapeDetails wdwOutShape = setupShape(wdwOutOI, null, wdwOutRR); + wdwTFnDef.setOutputShape(wdwOutShape); tFn.setupOutputOI(); - /* - * If we have windowExpressions then we convert to Std. Object to process; - * we just stream these rows; no need to put in an output Partition. - */ - if (windowExpressions.size() > 0) { - StructObjectInspector oi = (StructObjectInspector) - ObjectInspectorUtils.getStandardObjectInspector(wdwTFnDef.getOutputShape().getOI()); - wdwTFnDef.getOutputShape().setOI(oi); - } - return ptfDesc; } @@ -949,23 +891,10 @@ public class PTFTranslator { return rwsch; } - protected RowResolver buildRowResolverForWindowing(WindowTableFunctionDef def, - boolean addWdwExprs) throws SemanticException { + protected RowResolver buildRowResolverForWindowing(WindowTableFunctionDef def) + throws SemanticException { RowResolver rr = new RowResolver(); HashMap<String, WindowExpressionSpec> aliasToExprMap = windowingSpec.getAliasToWdwExpr(); - /* - * add Window Expressions - */ - if (addWdwExprs) { - for (WindowExpressionDef wEDef : def.getWindowExpressions()) { - ASTNode ast = aliasToExprMap.get(wEDef.getAlias()).getExpression(); - ColumnInfo cInfo = new ColumnInfo(wEDef.getAlias(), - TypeInfoUtils.getTypeInfoFromObjectInspector(wEDef.getOI()), - null, - false); - rr.putExpression(ast, cInfo); - } - } /* * add Window Functions Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java?rev=1518680&r1=1518679&r2=1518680&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java Thu Aug 29 15:44:42 2013 @@ -67,19 +67,6 @@ public class WindowingSpec { windowSpecs.put(name, wdwSpec); } - public void addExpression(ASTNode expr, String alias) { - windowExpressions = windowExpressions == null ? - new ArrayList<WindowExpressionSpec>() : windowExpressions; - aliasToWdwExpr = aliasToWdwExpr == null ? - new HashMap<String, WindowExpressionSpec>() : aliasToWdwExpr; - WindowExpressionSpec wExprSpec = new WindowExpressionSpec(); - wExprSpec.setAlias(alias); - wExprSpec.setExpression(expr); - - windowExpressions.add(wExprSpec); - aliasToWdwExpr.put(alias, wExprSpec); - } - public void addWindowFunction(WindowFunctionSpec wFn) { windowExpressions = windowExpressions == null ? new ArrayList<WindowExpressionSpec>() : windowExpressions; Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java?rev=1518680&r1=1518679&r2=1518680&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java Thu Aug 29 15:44:42 2013 @@ -239,28 +239,8 @@ public class PTFDesc extends AbstractOpe } public static class WindowTableFunctionDef extends PartitionedTableFunctionDef { - ArrayList<WindowExpressionDef> windowExpressions; ArrayList<WindowFunctionDef> windowFunctions; - /* - * this shape omits the non WdwFunction Expressions. Expr Evaluators for the Window Expressions is based on this - * shape, so they can refer to the Wdw Function values. - * @note: this will eventually be removed, as plan is to push Wdw expression processing to separate Select Op after - * PTF Op. - */ - ShapeDetails outputFromWdwFnProcessing; - - public ArrayList<WindowExpressionDef> getWindowExpressions() { - return windowExpressions; - } - public void setWindowExpressions(ArrayList<WindowExpressionDef> windowExpressions) { - this.windowExpressions = windowExpressions; - } - public ShapeDetails getOutputFromWdwFnProcessing() { - return outputFromWdwFnProcessing; - } - public void setOutputFromWdwFnProcessing(ShapeDetails outputFromWdwFnProcessing) { - this.outputFromWdwFnProcessing = outputFromWdwFnProcessing; - } + public ArrayList<WindowFunctionDef> getWindowFunctions() { return windowFunctions; } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDeserializer.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDeserializer.java?rev=1518680&r1=1518679&r2=1518680&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDeserializer.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDeserializer.java Thu Aug 29 15:44:42 2013 @@ -38,7 +38,6 @@ import org.apache.hadoop.hive.ql.plan.PT import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionedTableFunctionDef; import org.apache.hadoop.hive.ql.plan.PTFDesc.ShapeDetails; import org.apache.hadoop.hive.ql.plan.PTFDesc.ValueBoundaryDef; -import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowExpressionDef; import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowFrameDef; import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowFunctionDef; import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowTableFunctionDef; @@ -53,7 +52,6 @@ import org.apache.hadoop.hive.serde2.Ser import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; @@ -114,67 +112,37 @@ public class PTFDeserializer { /* * 2. initialize WFns. */ - if (def.getWindowFunctions() != null) { - for (WindowFunctionDef wFnDef : def.getWindowFunctions()) { - - if (wFnDef.getArgs() != null) { - for (PTFExpressionDef arg : wFnDef.getArgs()) { - initialize(arg, inpShape); - } + for (WindowFunctionDef wFnDef : def.getWindowFunctions()) { + if (wFnDef.getArgs() != null) { + for (PTFExpressionDef arg : wFnDef.getArgs()) { + initialize(arg, inpShape); } - if (wFnDef.getWindowFrame() != null) { - WindowFrameDef wFrmDef = wFnDef.getWindowFrame(); - initialize(wFrmDef.getStart(), inpShape); - initialize(wFrmDef.getEnd(), inpShape); - } - setupWdwFnEvaluator(wFnDef); } - ArrayList<String> aliases = new ArrayList<String>(); - ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(); - for (WindowFunctionDef wFnDef : def.getWindowFunctions()) { - aliases.add(wFnDef.getAlias()); - if (wFnDef.isPivotResult()) { - fieldOIs.add(((ListObjectInspector) wFnDef.getOI()).getListElementObjectInspector()); - } else { - fieldOIs.add(wFnDef.getOI()); - } + if (wFnDef.getWindowFrame() != null) { + WindowFrameDef wFrmDef = wFnDef.getWindowFrame(); + initialize(wFrmDef.getStart(), inpShape); + initialize(wFrmDef.getEnd(), inpShape); } - PTFDeserializer.addInputColumnsToList(inpShape, aliases, fieldOIs); - StructObjectInspector wdwOutOI = ObjectInspectorFactory.getStandardStructObjectInspector( - aliases, fieldOIs); - tResolver.setWdwProcessingOutputOI(wdwOutOI); - initialize(def.getOutputFromWdwFnProcessing(), wdwOutOI); - } else { - def.setOutputFromWdwFnProcessing(inpShape); + setupWdwFnEvaluator(wFnDef); } - - inpShape = def.getOutputFromWdwFnProcessing(); - - /* - * 3. initialize WExprs. + having clause - */ - if (def.getWindowExpressions() != null) { - for (WindowExpressionDef wEDef : def.getWindowExpressions()) { - initialize(wEDef, inpShape); + ArrayList<String> aliases = new ArrayList<String>(); + ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(); + for (WindowFunctionDef wFnDef : def.getWindowFunctions()) { + aliases.add(wFnDef.getAlias()); + if (wFnDef.isPivotResult()) { + fieldOIs.add(((ListObjectInspector) wFnDef.getOI()).getListElementObjectInspector()); + } else { + fieldOIs.add(wFnDef.getOI()); } } - /* - * 4. give Evaluator chance to setup for Output execution; setup Output shape. - */ + PTFDeserializer.addInputColumnsToList(inpShape, aliases, fieldOIs); + StructObjectInspector wdwOutOI = ObjectInspectorFactory.getStandardStructObjectInspector( + aliases, fieldOIs); + tResolver.setWdwProcessingOutputOI(wdwOutOI); + initialize(def.getOutputShape(), wdwOutOI); tResolver.initializeOutputOI(); - initialize(def.getOutputShape(), tEval.getOutputOI()); - - /* - * If we have windowExpressions then we convert to Std. Object to process; - * we just stream these rows; no need to put in an output Partition. - */ - if (def.getWindowExpressions().size() > 0) { - StructObjectInspector oi = (StructObjectInspector) - ObjectInspectorUtils.getStandardObjectInspector(def.getOutputShape().getOI()); - def.getOutputShape().setOI(oi); - } } protected void initialize(PTFQueryInputDef def, StructObjectInspector OI) throws HiveException { Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java?rev=1518680&r1=1518679&r2=1518680&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java Thu Aug 29 15:44:42 2013 @@ -46,34 +46,12 @@ import org.apache.hadoop.hive.serde2.obj import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; -public class WindowingTableFunction extends TableFunctionEvaluator -{ - - @Override - public PTFPartition execute(PTFPartition iPart) - throws HiveException - { - WindowTableFunctionDef wFnDef = (WindowTableFunctionDef) getTableDef(); - PTFPartitionIterator<Object> pItr = iPart.iterator(); - PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, pItr); - - if ( outputPartition == null ) { - outputPartition = PTFPartition.create(ptfDesc.getCfg(), - wFnDef.getOutputFromWdwFnProcessing().getSerde(), - OI, wFnDef.getOutputFromWdwFnProcessing().getOI()); - } - else { - outputPartition.reset(); - } - - execute(pItr, outputPartition); - return outputPartition; - } +@SuppressWarnings("deprecation") +public class WindowingTableFunction extends TableFunctionEvaluator { @SuppressWarnings({ "unchecked", "rawtypes" }) @Override - public void execute(PTFPartitionIterator<Object> pItr, PTFPartition outP) throws HiveException - { + public void execute(PTFPartitionIterator<Object> pItr, PTFPartition outP) throws HiveException { ArrayList<List<?>> oColumns = new ArrayList<List<?>>(); PTFPartition iPart = pItr.getPartition(); StructObjectInspector inputOI; @@ -82,36 +60,29 @@ public class WindowingTableFunction exte WindowTableFunctionDef wTFnDef = (WindowTableFunctionDef) getTableDef(); Order order = wTFnDef.getOrder().getExpressions().get(0).getOrder(); - for(WindowFunctionDef wFn : wTFnDef.getWindowFunctions()) - { + for(WindowFunctionDef wFn : wTFnDef.getWindowFunctions()) { boolean processWindow = processWindow(wFn); pItr.reset(); - if ( !processWindow ) - { + if ( !processWindow ) { GenericUDAFEvaluator fEval = wFn.getWFnEval(); Object[] args = new Object[wFn.getArgs() == null ? 0 : wFn.getArgs().size()]; AggregationBuffer aggBuffer = fEval.getNewAggregationBuffer(); - while(pItr.hasNext()) - { + while(pItr.hasNext()) { Object row = pItr.next(); int i =0; if ( wFn.getArgs() != null ) { - for(PTFExpressionDef arg : wFn.getArgs()) - { + for(PTFExpressionDef arg : wFn.getArgs()) { args[i++] = arg.getExprEvaluator().evaluate(row); } } fEval.aggregate(aggBuffer, args); } Object out = fEval.evaluate(aggBuffer); - if ( !wFn.isPivotResult()) - { + if ( !wFn.isPivotResult()) { out = new SameList(iPart.size(), out); } oColumns.add((List<?>)out); - } - else - { + } else { oColumns.add(executeFnwithWindow(getQueryDef(), wFn, iPart, order)); } } @@ -122,18 +93,15 @@ public class WindowingTableFunction exte * - the input Rows columns */ - for(int i=0; i < iPart.size(); i++) - { + for(int i=0; i < iPart.size(); i++) { ArrayList oRow = new ArrayList(); Object iRow = iPart.getAt(i); - for(int j=0; j < oColumns.size(); j++) - { + for(int j=0; j < oColumns.size(); j++) { oRow.add(oColumns.get(j).get(i)); } - for(StructField f : inputOI.getAllStructFieldRefs()) - { + for(StructField f : inputOI.getAllStructFieldRefs()) { oRow.add(inputOI.getStructFieldData(iRow, f)); } @@ -187,15 +155,13 @@ public class WindowingTableFunction exte * - the Window Functions. */ @Override - public void initializeOutputOI() throws HiveException - { + public void initializeOutputOI() throws HiveException { setupOutputOI(); } @Override - public boolean transformsRawInput() - { + public boolean transformsRawInput() { return false; } @@ -225,26 +191,22 @@ public class WindowingTableFunction exte WindowFunctionDef wFnDef, PTFPartition iPart, Order order) - throws HiveException - { + throws HiveException { ArrayList<Object> vals = new ArrayList<Object>(); GenericUDAFEvaluator fEval = wFnDef.getWFnEval(); Object[] args = new Object[wFnDef.getArgs() == null ? 0 : wFnDef.getArgs().size()]; - for(int i=0; i < iPart.size(); i++) - { + for(int i=0; i < iPart.size(); i++) { AggregationBuffer aggBuffer = fEval.getNewAggregationBuffer(); Range rng = getRange(wFnDef, i, iPart, order); PTFPartitionIterator<Object> rItr = rng.iterator(); PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr); - while(rItr.hasNext()) - { + while(rItr.hasNext()) { Object row = rItr.next(); int j = 0; if ( wFnDef.getArgs() != null ) { - for(PTFExpressionDef arg : wFnDef.getArgs()) - { + for(PTFExpressionDef arg : wFnDef.getArgs()) { args[j++] = arg.getExprEvaluator().evaluate(row); } } @@ -701,8 +663,7 @@ public class WindowingTableFunction exte } } - public Object computeValue(Object row) throws HiveException - { + public Object computeValue(Object row) throws HiveException { Object o = expressionDef.getExprEvaluator().evaluate(row); return ObjectInspectorUtils.copyToStandardObject(o, expressionDef.getOI()); } @@ -713,11 +674,10 @@ public class WindowingTableFunction exte @SuppressWarnings("incomplete-switch") - public static ValueBoundaryScanner getScanner(ValueBoundaryDef vbDef, Order order) throws HiveException - { + public static ValueBoundaryScanner getScanner(ValueBoundaryDef vbDef, Order order) + throws HiveException { PrimitiveObjectInspector pOI = (PrimitiveObjectInspector) vbDef.getOI(); - switch(pOI.getPrimitiveCategory()) - { + switch(pOI.getPrimitiveCategory()) { case BYTE: case INT: case LONG: @@ -736,16 +696,14 @@ public class WindowingTableFunction exte } } - public static class LongValueBoundaryScanner extends ValueBoundaryScanner - { - public LongValueBoundaryScanner(BoundaryDef bndDef, Order order, PTFExpressionDef expressionDef) - { + public static class LongValueBoundaryScanner extends ValueBoundaryScanner { + public LongValueBoundaryScanner(BoundaryDef bndDef, Order order, + PTFExpressionDef expressionDef) { super(bndDef,order,expressionDef); } @Override - public boolean isGreater(Object v1, Object v2, int amt) - { + public boolean isGreater(Object v1, Object v2, int amt) { long l1 = PrimitiveObjectInspectorUtils.getLong(v1, (PrimitiveObjectInspector) expressionDef.getOI()); long l2 = PrimitiveObjectInspectorUtils.getLong(v2, @@ -754,8 +712,7 @@ public class WindowingTableFunction exte } @Override - public boolean isEqual(Object v1, Object v2) - { + public boolean isEqual(Object v1, Object v2) { long l1 = PrimitiveObjectInspectorUtils.getLong(v1, (PrimitiveObjectInspector) expressionDef.getOI()); long l2 = PrimitiveObjectInspectorUtils.getLong(v2, @@ -764,16 +721,14 @@ public class WindowingTableFunction exte } } - public static class DoubleValueBoundaryScanner extends ValueBoundaryScanner - { - public DoubleValueBoundaryScanner(BoundaryDef bndDef, Order order, PTFExpressionDef expressionDef) - { + public static class DoubleValueBoundaryScanner extends ValueBoundaryScanner { + public DoubleValueBoundaryScanner(BoundaryDef bndDef, Order order, + PTFExpressionDef expressionDef) { super(bndDef,order,expressionDef); } @Override - public boolean isGreater(Object v1, Object v2, int amt) - { + public boolean isGreater(Object v1, Object v2, int amt) { double d1 = PrimitiveObjectInspectorUtils.getDouble(v1, (PrimitiveObjectInspector) expressionDef.getOI()); double d2 = PrimitiveObjectInspectorUtils.getDouble(v2, @@ -782,8 +737,7 @@ public class WindowingTableFunction exte } @Override - public boolean isEqual(Object v1, Object v2) - { + public boolean isEqual(Object v1, Object v2) { double d1 = PrimitiveObjectInspectorUtils.getDouble(v1, (PrimitiveObjectInspector) expressionDef.getOI()); double d2 = PrimitiveObjectInspectorUtils.getDouble(v2, @@ -792,16 +746,14 @@ public class WindowingTableFunction exte } } - public static class StringValueBoundaryScanner extends ValueBoundaryScanner - { - public StringValueBoundaryScanner(BoundaryDef bndDef, Order order, PTFExpressionDef expressionDef) - { + public static class StringValueBoundaryScanner extends ValueBoundaryScanner { + public StringValueBoundaryScanner(BoundaryDef bndDef, Order order, + PTFExpressionDef expressionDef) { super(bndDef,order,expressionDef); } @Override - public boolean isGreater(Object v1, Object v2, int amt) - { + public boolean isGreater(Object v1, Object v2, int amt) { String s1 = PrimitiveObjectInspectorUtils.getString(v1, (PrimitiveObjectInspector) expressionDef.getOI()); String s2 = PrimitiveObjectInspectorUtils.getString(v2, @@ -810,8 +762,7 @@ public class WindowingTableFunction exte } @Override - public boolean isEqual(Object v1, Object v2) - { + public boolean isEqual(Object v1, Object v2) { String s1 = PrimitiveObjectInspectorUtils.getString(v1, (PrimitiveObjectInspector) expressionDef.getOI()); String s2 = PrimitiveObjectInspectorUtils.getString(v2, @@ -820,26 +771,22 @@ public class WindowingTableFunction exte } } - public static class SameList<E> extends AbstractList<E> - { + public static class SameList<E> extends AbstractList<E> { int sz; E val; - public SameList(int sz, E val) - { + public SameList(int sz, E val) { this.sz = sz; this.val = val; } @Override - public E get(int index) - { + public E get(int index) { return val; } @Override - public int size() - { + public int size() { return sz; }