Author: hashutosh Date: Wed Feb 20 02:27:17 2013 New Revision: 1447989 URL: http://svn.apache.org/r1447989 Log: HIVE-3984 : Maintain a clear separation between Windowing & PTF at the specification level.
Added: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDeserializer.java Removed: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFSpec.java Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingExprNodeEvaluatorFactory.java hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFDenseRank.java hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFNTile.java hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentRank.java hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRank.java hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRowNumber.java hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFLeadLag.java hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NPath.java hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/Noop.java hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopWithMap.java hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionResolver.java hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java hive/branches/ptf-windowing/ql/src/test/queries/clientpositive/ptf_general_queries.q hive/branches/ptf-windowing/ql/src/test/queries/clientpositive/ptf_rcfile.q hive/branches/ptf-windowing/ql/src/test/queries/clientpositive/ptf_seqfile.q hive/branches/ptf-windowing/ql/src/test/results/clientpositive/ptf_general_queries.q.out hive/branches/ptf-windowing/ql/src/test/results/clientpositive/ptf_rcfile.q.out hive/branches/ptf-windowing/ql/src/test/results/clientpositive/ptf_seqfile.q.out Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java?rev=1447989&r1=1447988&r2=1447989&view=diff ============================================================================== --- hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java (original) +++ hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java Wed Feb 20 02:27:17 2013 @@ -38,6 +38,7 @@ public class QueryProperties { boolean hasSortBy = false; boolean hasJoinFollowedByGroupBy = false; boolean hasPTF = false; + boolean hasWindowing = false; // does the query have a using clause boolean usesScript = false; @@ -116,4 +117,12 @@ public class QueryProperties { public void setHasPTF(boolean hasPTF) { this.hasPTF = hasPTF; } + + public boolean hasWindowing() { + return hasWindowing; + } + + public void setHasWindowing(boolean hasWindowing) { + this.hasWindowing = hasWindowing; + } } Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java?rev=1447989&r1=1447988&r2=1447989&view=diff ============================================================================== --- hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java (original) +++ hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java Wed Feb 20 02:27:17 2013 @@ -27,17 +27,16 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.parse.PTFTranslator; -import org.apache.hadoop.hive.ql.parse.PTFTranslator.PTFDefDeserializer; -import org.apache.hadoop.hive.ql.parse.PTFTranslator.PTFTranslationInfo; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PTFDesc; -import org.apache.hadoop.hive.ql.plan.PTFDesc.ColumnDef; +import org.apache.hadoop.hive.ql.plan.PTFDesc.PTFExpressionDef; import org.apache.hadoop.hive.ql.plan.PTFDesc.PTFInputDef; import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionDef; -import org.apache.hadoop.hive.ql.plan.PTFDesc.TableFuncDef; -import org.apache.hadoop.hive.ql.plan.PTFDesc.WhereDef; +import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionedTableFunctionDef; +import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowExpressionDef; +import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowTableFunctionDef; +import org.apache.hadoop.hive.ql.plan.PTFDeserializer; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag; import org.apache.hadoop.hive.ql.udf.ptf.TableFunctionEvaluator; @@ -50,7 +49,6 @@ import org.apache.hadoop.hive.serde2.obj import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; -import org.apache.hadoop.io.Writable; public class PTFOperator extends Operator<PTFDesc> implements Serializable { @@ -86,19 +84,17 @@ public class PTFOperator extends Operato } reconstructQueryDef(hiveConf); - inputPart = PTFOperator.createFirstPartitionForChain(conf, + inputPart = createFirstPartitionForChain( inputObjInspectors[0], hiveConf, isMapOperator); - // OI for FileSinkOperator is taken from select-list (reduce-side) - // OI for ReduceSinkOperator is taken from TODO if (isMapOperator) { - TableFuncDef tDef = PTFTranslator.getFirstTableFunction(conf); - outputObjInspector = tDef.getRawInputOI(); + PartitionedTableFunctionDef tDef = conf.getStartOfChain(); + outputObjInspector = tDef.getRawInputShape().getOI(); } else { - outputObjInspector = conf.getSelectList().getOI(); + outputObjInspector = conf.getFuncDef().getOutputShape().getOI(); } setupKeysWrapper(inputObjInspectors[0]); @@ -141,7 +137,7 @@ public class PTFOperator extends Operato if (currentKeys != null && !keysAreEqual) { processInputPartition(); - inputPart = PTFOperator.createFirstPartitionForChain(conf, inputObjInspectors[0], hiveConf, isMapOperator); + inputPart = createFirstPartitionForChain(inputObjInspectors[0], hiveConf, isMapOperator); } if (currentKeys == null || !keysAreEqual) @@ -171,49 +167,59 @@ public class PTFOperator extends Operato protected void reconstructQueryDef(HiveConf hiveConf) throws HiveException { - PTFDefDeserializer qdd = new PTFDefDeserializer(hiveConf, - inputObjInspectors[0]); - PTFTranslator.PTFDefWalker qdw = new PTFTranslator.PTFDefWalker(qdd); - qdw.walk(conf); + PTFDeserializer dS = + new PTFDeserializer(conf, (StructObjectInspector)inputObjInspectors[0], hiveConf); + dS.initializePTFChain(conf.getFuncDef()); } protected void setupKeysWrapper(ObjectInspector inputOI) throws HiveException { - PartitionDef pDef = PTFTranslator.getFirstTableFunction(conf).getWindow().getPartDef(); - ArrayList<ColumnDef> cols = pDef.getColumns(); - int numCols = cols.size(); - ExprNodeEvaluator[] keyFields = new ExprNodeEvaluator[numCols]; - ObjectInspector[] keyOIs = new ObjectInspector[numCols]; - ObjectInspector[] currentKeyOIs = new ObjectInspector[numCols]; + PartitionDef pDef = conf.getStartOfChain().getPartition(); + ArrayList<PTFExpressionDef> exprs = pDef.getExpressions(); + int numExprs = exprs.size(); + ExprNodeEvaluator[] keyFields = new ExprNodeEvaluator[numExprs]; + ObjectInspector[] keyOIs = new ObjectInspector[numExprs]; + ObjectInspector[] currentKeyOIs = new ObjectInspector[numExprs]; - for(int i=0; i<numCols; i++) + for(int i=0; i<numExprs; i++) { - ColumnDef cDef = cols.get(i); + PTFExpressionDef exprDef = exprs.get(i); /* * Why cannot we just use the ExprNodeEvaluator on the column? * - because on the reduce-side it is initialized based on the rowOI of the HiveTable * and not the OI of the ExtractOp ( the parent of this Operator on the reduce-side) */ - keyFields[i] = ExprNodeEvaluatorFactory.get(cDef.getExprNode()); + keyFields[i] = ExprNodeEvaluatorFactory.get(exprDef.getExprNode()); keyOIs[i] = keyFields[i].initialize(inputOI); - currentKeyOIs[i] = ObjectInspectorUtils.getStandardObjectInspector(keyOIs[i], ObjectInspectorCopyOption.WRITABLE); + currentKeyOIs[i] = + ObjectInspectorUtils.getStandardObjectInspector(keyOIs[i], + ObjectInspectorCopyOption.WRITABLE); } keyWrapperFactory = new KeyWrapperFactory(keyFields, keyOIs, currentKeyOIs); - - newKeys = keyWrapperFactory.getKeyWrapper(); + newKeys = keyWrapperFactory.getKeyWrapper(); } protected void processInputPartition() throws HiveException { - PTFPartition outPart = PTFOperator.executeChain(conf, inputPart); - PTFOperator.executeSelectList(conf, outPart, this); + PTFPartition outPart = executeChain(inputPart); + if ( conf.forWindowing() ) { + executeWindowExprs(outPart); + } + else { + PTFPartitionIterator<Object> pItr = outPart.iterator(); + while (pItr.hasNext()) + { + Object oRow = pItr.next(); + forward(oRow, outputObjInspector); + } + } } protected void processMapFunction() throws HiveException { - TableFuncDef tDef = PTFTranslator.getFirstTableFunction(conf); - PTFPartition outPart = tDef.getFunction().transformRawInput(inputPart); + PartitionedTableFunctionDef tDef = conf.getStartOfChain(); + PTFPartition outPart = tDef.getTFunction().transformRawInput(inputPart); PTFPartitionIterator<Object> pItr = outPart.iterator(); while (pItr.hasNext()) { @@ -247,22 +253,21 @@ public class PTFOperator extends Operato * For each table function popped out of the stack, * execute the function on the input partition * and return an output partition. - * @param ptfDesc * @param part * @return * @throws HiveException */ - private static PTFPartition executeChain(PTFDesc ptfDesc, PTFPartition part) + private PTFPartition executeChain(PTFPartition part) throws HiveException { - Stack<TableFuncDef> fnDefs = new Stack<TableFuncDef>(); - PTFInputDef iDef = ptfDesc.getInput(); + Stack<PartitionedTableFunctionDef> fnDefs = new Stack<PartitionedTableFunctionDef>(); + PTFInputDef iDef = conf.getFuncDef(); while (true) { - if (iDef instanceof TableFuncDef) + if (iDef instanceof PartitionedTableFunctionDef) { - fnDefs.push((TableFuncDef) iDef); - iDef = ((TableFuncDef) iDef).getInput(); + fnDefs.push((PartitionedTableFunctionDef) iDef); + iDef = ((PartitionedTableFunctionDef) iDef).getInput(); } else { @@ -270,64 +275,96 @@ public class PTFOperator extends Operato } } - TableFuncDef currFnDef; + PartitionedTableFunctionDef currFnDef; while (!fnDefs.isEmpty()) { currFnDef = fnDefs.pop(); - part = currFnDef.getFunction().execute(part); + part = currFnDef.getTFunction().execute(part); } return part; } /** - * For each row in the partition: - * 1. evaluate the where condition if applicable. - * 2. evaluate the value for each column retrieved - * from the select list - * 3. Forward the writable value or object based on the - * implementation of the ForwardSink + * If WindowingSpec contains any Windowing Expressions or has a + * Having condition then these are processed + * and forwarded on. Whereas when there is no having or WdwExprs + * just forward the rows in the output Partition. + * + * For e.g. Consider the following query: + * <pre> + * {@code + * select rank(), lead(rank(),1),... + * from xyz + * ... + * having rank() > 1 + * } + * </pre> + * rank() gets processed as a WdwFn; Its in the oPart(output partition) + * argument to executeWindowExprs. Here we first evaluate the having expression. + * So the first row of oPart gets filtered out. + * Next we evaluate lead(rank()) which is held as a WindowExpression and add it to the output. + * * @param ptfDesc - * @param oPart - * @param rS + * @param oPart output partition after Window Fns are processed. + * @param op * @throws HiveException */ - @SuppressWarnings( - { "rawtypes", "unchecked" }) - private static void executeSelectList(PTFDesc ptfDesc, PTFPartition oPart, PTFOperator op) + private void executeWindowExprs(PTFPartition oPart) throws HiveException { - StructObjectInspector selectOI = ptfDesc.getSelectList().getOI(); - StructObjectInspector inputOI = ptfDesc.getInput().getOI(); - int numCols = selectOI.getAllStructFieldRefs().size(); - ArrayList<ColumnDef> cols = ptfDesc.getSelectList().getColumns(); - int numSelCols = cols == null ? 0 : cols.size(); + WindowTableFunctionDef wTFnDef = (WindowTableFunctionDef) conf.getFuncDef(); + /* + * inputOI represents the row with WindowFn results present. + * So in the e.g. above it will have a column for 'rank()' + */ + StructObjectInspector inputOI = wTFnDef.getOutputFromWdwFnProcessing().getOI(); + /* + * outputOI represents the final row with the Windowing Expressions added. + * So in the e.g. above it will have a column for 'lead(rank())' in addition to + * all columns in inputOI. + */ + StructObjectInspector outputOI = wTFnDef.getOutputShape().getOI(); + int numCols = outputOI.getAllStructFieldRefs().size(); + ArrayList<WindowExpressionDef> wdwExprs = wTFnDef.getWindowExpressions(); + int numWdwExprs = wdwExprs == null ? 0 : wdwExprs.size(); Object[] output = new Object[numCols]; - - WhereDef whDef = ptfDesc.getWhere(); - boolean applyWhere = whDef != null; - Converter whConverter = !applyWhere ? null + PTFExpressionDef havingExpr = wTFnDef.getHavingExpression(); + boolean applyHaving = havingExpr != null; + Converter hvgConverter = !applyHaving ? null : ObjectInspectorConverters .getConverter( - whDef.getOI(), + havingExpr.getOI(), PrimitiveObjectInspectorFactory.javaBooleanObjectInspector); - ExprNodeEvaluator whCondEval = !applyWhere ? null : whDef - .getExprEvaluator(); + ExprNodeEvaluator havingCondEval = !applyHaving ? null : havingExpr.getExprEvaluator(); + /* + * If this Windowing invocation has no Window Expressions and doesn't need to be filtered, + * we can just forward the row in the oPart partition. + */ + boolean forwardRowsUntouched = !applyHaving && (wdwExprs == null || wdwExprs.size() == 0 ); - Writable value = null; PTFPartitionIterator<Object> pItr = oPart.iterator(); - PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, pItr); + PTFOperator.connectLeadLagFunctionsToPartition(conf, pItr); while (pItr.hasNext()) { int colCnt = 0; Object oRow = pItr.next(); - if (applyWhere) + /* + * when there is no Windowing expressions or having; + * just forward the Object coming out of the Partition. + */ + if ( forwardRowsUntouched ) { + forward(oRow, outputObjInspector); + continue; + } + + if (applyHaving) { - Object whCond = null; - whCond = whCondEval.evaluate(oRow); - whCond = whConverter.convert(whCond); - if (whCond == null || !((Boolean) whCond).booleanValue()) + Object hvgCond = null; + hvgCond = havingCondEval.evaluate(oRow); + hvgCond = hvgConverter.convert(hvgCond); + if (!((Boolean) hvgCond).booleanValue()) { continue; } @@ -335,50 +372,61 @@ public class PTFOperator extends Operato /* * Setup the output row columns in the following order - * - the columns in the SelectList processed by the PTF (ie the Select Exprs that have navigation expressions) + * - the columns in the SelectList processed by the PTF + * (ie the Select Exprs that have navigation expressions) * - the columns from the final PTF. */ - if ( cols != null ) { - for (ColumnDef cDef : cols) + if ( wdwExprs != null ) { + for (WindowExpressionDef wdwExpr : wdwExprs) { - Object newCol = cDef.getExprEvaluator().evaluate(oRow); + Object newCol = wdwExpr.getExprEvaluator().evaluate(oRow); output[colCnt++] = newCol; } } for(; colCnt < numCols; ) { - StructField field = inputOI.getAllStructFieldRefs().get(colCnt - numSelCols); - output[colCnt++] = ObjectInspectorUtils.copyToStandardObject(inputOI.getStructFieldData(oRow, field), + StructField field = inputOI.getAllStructFieldRefs().get(colCnt - numWdwExprs); + output[colCnt++] = + ObjectInspectorUtils.copyToStandardObject(inputOI.getStructFieldData(oRow, field), field.getFieldObjectInspector()); } - op.forward(output, op.outputObjInspector); + forward(output, outputObjInspector); } } /** - * Create a new partition. - * The input OI is used to evaluate rows appended to the partition. - * The serde is determined based on whether the query has a map-phase - * or not. The OI on the serde is used by PTFs to evaluate output of the - * partition. - * @param ptfDesc + * Create a new Partition. + * A partition has 2 OIs: the OI for the rows being put in and the OI for the rows + * coming out. You specify the output OI by giving the Serde to use to Serialize. + * Typically these 2 OIs are the same; but not always. For the + * first PTF in a chain the OI of the incoming rows is dictated by the Parent Op + * to this PTFOp. The output OI from the Partition is typically LazyBinaryStruct, but + * not always. In the case of Noop/NoopMap we keep the Strcuture the same as + * what is given to us. + * <p> + * The Partition we want to create here is for feeding the First table function in the chain. + * So for map-side processing use the Serde from the output Shape its InputDef. + * For reduce-side processing use the Serde from its RawInputShape(the shape + * after map-side processing). * @param oi * @param hiveConf + * @param isMapSide * @return * @throws HiveException */ - public static PTFPartition createFirstPartitionForChain(PTFDesc ptfDesc, ObjectInspector oi, + public PTFPartition createFirstPartitionForChain(ObjectInspector oi, HiveConf hiveConf, boolean isMapSide) throws HiveException { - TableFuncDef tabDef = PTFTranslator.getFirstTableFunction(ptfDesc); - TableFunctionEvaluator tEval = tabDef.getFunction(); + PartitionedTableFunctionDef tabDef = conf.getStartOfChain(); + TableFunctionEvaluator tEval = tabDef.getTFunction(); String partClassName = tEval.getPartitionClass(); int partMemSize = tEval.getPartitionMemSize(); PTFPartition part = null; - SerDe serde = tabDef.getInput().getSerde(); + SerDe serde = isMapSide ? tabDef.getInput().getOutputShape().getSerde() : + tabDef.getRawInputShape().getSerde(); part = new PTFPartition(partClassName, partMemSize, serde, (StructObjectInspector) oi); return part; @@ -388,9 +436,7 @@ public class PTFOperator extends Operato public static void connectLeadLagFunctionsToPartition(PTFDesc ptfDesc, PTFPartitionIterator<Object> pItr) throws HiveException { - PTFTranslationInfo tInfo = ptfDesc.getTranslationInfo(); - List<ExprNodeGenericFuncDesc> llFnDescs = tInfo.getLLInfo() - .getLeadLagExprs(); + List<ExprNodeGenericFuncDesc> llFnDescs = ptfDesc.getLlInfo().getLeadLagExprs(); if (llFnDescs == null) { return; } Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java?rev=1447989&r1=1447988&r2=1447989&view=diff ============================================================================== --- hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java (original) +++ hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java Wed Feb 20 02:27:17 2013 @@ -142,7 +142,7 @@ public class PTFPartition public PTFPartitionIterator<Object> range(int start, int end) { assert(start >= 0); - assert(end < size()); + assert(end <= size()); assert(start <= end); return new PItr(start, end); } Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java?rev=1447989&r1=1447988&r2=1447989&view=diff ============================================================================== --- hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java (original) +++ hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java Wed Feb 20 02:27:17 2013 @@ -43,7 +43,7 @@ import org.antlr.runtime.tree.BaseTree; import org.antlr.runtime.tree.CommonTree; import org.apache.hadoop.hive.ql.exec.Utilities.EnumDelegate; import org.apache.hadoop.hive.ql.parse.ASTNode; -import org.apache.hadoop.hive.ql.parse.PTFSpec.WindowFrameSpec.Direction; +import org.apache.hadoop.hive.ql.parse.WindowingSpec.Direction; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java?rev=1447989&r1=1447988&r2=1447989&view=diff ============================================================================== --- hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java (original) +++ hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java Wed Feb 20 02:27:17 2013 @@ -34,7 +34,13 @@ public @interface WindowFunctionDescript { Description description (); /** - * controls whether this function can be applied to a Window + * controls whether this function can be applied to a Window. + * <p> + * Ranking function: Rank, Dense_Rank, Percent_Rank and Cume_Dist don't operate on Windows. + * Why? a window specification implies a row specific range i.e. every row gets its own set of rows to process the UDAF on. + * For ranking defining a set of rows for every row makes no sense. + * <p> + * All other UDAFs can be computed for a Window. */ boolean supportsWindow() default true; /** @@ -45,3 +51,4 @@ public @interface WindowFunctionDescript */ boolean pivotResult() default false; } + Added: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java?rev=1447989&view=auto ============================================================================== --- hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java (added) +++ hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java Wed Feb 20 02:27:17 2013 @@ -0,0 +1,515 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse; + +import java.util.ArrayList; + +import org.apache.hadoop.hive.ql.exec.PTFUtils; + +public class PTFInvocationSpec { + + PartitionedTableFunctionSpec function; + + public PartitionedTableFunctionSpec getFunction() { + return function; + } + + public void setFunction(PartitionedTableFunctionSpec function) { + this.function = function; + } + + public PartitionedTableFunctionSpec getStartOfChain() { + return function == null ? null : function.getStartOfChain(); + } + + public String getQueryInputName() { + return function == null ? null : function.getQueryInputName(); + } + + public PTFQueryInputSpec getQueryInput() { + return function == null ? null : function.getQueryInput(); + } + + /* + * A PTF Input represents the input to a PTF Function. An Input can be a Hive SubQuery or Table + * or another PTF Function. An Input instance captures the ASTNode that this instance was created from. + */ + public abstract static class PTFInputSpec { + ASTNode astNode; + + public ASTNode getAstNode() { + return astNode; + } + + public void setAstNode(ASTNode astNode) { + this.astNode = astNode; + } + + public abstract PTFInputSpec getInput(); + + public abstract String getQueryInputName(); + public abstract PTFQueryInputSpec getQueryInput(); + } + + public static enum PTFQueryInputType { + TABLE, + SUBQUERY, + PTFCOMPONENT, + WINDOWING; + } + + /* + * A PTF input that represents a source in the overall Query. This could be a Table or a SubQuery. + * If a PTF chain requires execution by multiple PTF Operators; + * then the original Invocation object is decomposed into a set of Component Invocations. + * Every component Invocation but the first one ends in a PTFQueryInputSpec instance. + * During the construction of the Operator plan a PTFQueryInputSpec object in the chain implies connect the PTF Operator to the + * 'input' i.e. has been generated so far. + */ + public static class PTFQueryInputSpec extends PTFInputSpec { + String source; + PTFQueryInputType type; + + public String getSource() { + return source; + } + public void setSource(String source) { + this.source = source; + } + public PTFQueryInputType getType() { + return type; + } + public void setType(PTFQueryInputType type) { + this.type = type; + } + + @Override + public PTFInputSpec getInput() { + return null; + } + + @Override + public String getQueryInputName() { + return getSource(); + } + @Override + public PTFQueryInputSpec getQueryInput() { + return this; + } + } + + /* + * Represents a PTF Invocation. Captures: + * - function name and alias + * - the Partitioning details about its input + * - its arguments. The ASTNodes representing the arguments are captured here. + * - a reference to its Input + */ + public static class PartitionedTableFunctionSpec extends PTFInputSpec { + String name; + String alias; + ArrayList<ASTNode> args; + PartitioningSpec partitioning; + PTFInputSpec input; + public String getName() { + return name; + } + public void setName(String name) { + this.name = name; + } + public String getAlias() { + return alias; + } + public void setAlias(String alias) { + this.alias = alias; + } + public ArrayList<ASTNode> getArgs() { + return args; + } + public void setArgs(ArrayList<ASTNode> args) { + this.args = args; + } + public PartitioningSpec getPartitioning() { + return partitioning; + } + public void setPartitioning(PartitioningSpec partitioning) { + this.partitioning = partitioning; + } + @Override + public PTFInputSpec getInput() { + return input; + } + public void setInput(PTFInputSpec input) { + this.input = input; + } + public PartitionSpec getPartition() { + return getPartitioning() == null ? null : getPartitioning().getPartSpec(); + } + public void setPartition(PartitionSpec partSpec) { + partitioning = partitioning == null ? new PartitioningSpec() : partitioning; + partitioning.setPartSpec(partSpec); + } + public OrderSpec getOrder() { + return getPartitioning() == null ? null : getPartitioning().getOrderSpec(); + } + public void setOrder(OrderSpec orderSpec) { + partitioning = partitioning == null ? new PartitioningSpec() : partitioning; + partitioning.setOrderSpec(orderSpec); + } + public void addArg(ASTNode arg) + { + args = args == null ? new ArrayList<ASTNode>() : args; + args.add(arg); + } + + public PartitionedTableFunctionSpec getStartOfChain() { + if ( input instanceof PartitionedTableFunctionSpec ) { + return ((PartitionedTableFunctionSpec)input).getStartOfChain(); + } + return this; + } + @Override + public String getQueryInputName() { + return input.getQueryInputName(); + } + @Override + public PTFQueryInputSpec getQueryInput() { + return input.getQueryInput(); + } + } + + /* + * Captures how the Input to a PTF Function should be partitioned and + * ordered. Refers to a /Partition/ and /Order/ instance. + */ + public static class PartitioningSpec { + PartitionSpec partSpec; + OrderSpec orderSpec; + public PartitionSpec getPartSpec() { + return partSpec; + } + public void setPartSpec(PartitionSpec partSpec) { + this.partSpec = partSpec; + } + public OrderSpec getOrderSpec() { + return orderSpec; + } + public void setOrderSpec(OrderSpec orderSpec) { + this.orderSpec = orderSpec; + } + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((orderSpec == null) ? 0 : orderSpec.hashCode()); + result = prime * result + ((partSpec == null) ? 0 : partSpec.hashCode()); + return result; + } + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + PartitioningSpec other = (PartitioningSpec) obj; + if (orderSpec == null) { + if (other.orderSpec != null) { + return false; + } + } else if (!orderSpec.equals(other.orderSpec)) { + return false; + } + if (partSpec == null) { + if (other.partSpec != null) { + return false; + } + } else if (!partSpec.equals(other.partSpec)) { + return false; + } + return true; + } + } + + /* + * Captures how an Input should be Partitioned. This is captured as a + * list of ASTNodes that are the expressions in the Distribute/Cluster + * by clause specifying the partitioning applied for a PTF invocation. + */ + public static class PartitionSpec { + ArrayList<PartitionExpression> expressions; + + public ArrayList<PartitionExpression> getExpressions() + { + return expressions; + } + + public void setExpressions(ArrayList<PartitionExpression> columns) + { + this.expressions = columns; + } + + public void addExpression(PartitionExpression c) + { + expressions = expressions == null ? new ArrayList<PartitionExpression>() : expressions; + expressions.add(c); + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = 1; + result = prime * result + ((expressions == null) ? 0 : expressions.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + PartitionSpec other = (PartitionSpec) obj; + if (expressions == null) + { + if (other.expressions != null) { + return false; + } + } + else if (!expressions.equals(other.expressions)) { + return false; + } + return true; + } + + @Override + public String toString() + { + return PTFUtils.sprintf("partitionColumns=%s",PTFUtils.toString(expressions)); + } + } + + public static class PartitionExpression + { + ASTNode expression; + + public PartitionExpression() {} + + public PartitionExpression(PartitionExpression peSpec) + { + expression = peSpec.getExpression(); + } + + public ASTNode getExpression() { + return expression; + } + + public void setExpression(ASTNode expression) { + this.expression = expression; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((expression == null) ? 0 : expression.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + PartitionExpression other = (PartitionExpression) obj; + if (expression == null) { + if (other.expression != null) { + return false; + } + } else if (!expression.toStringTree().equals(other.expression.toStringTree())) { + return false; + } + return true; + } + + @Override + public String toString() + { + return expression.toStringTree(); + } + + } + + /* + * Captures how the Input should be Ordered. This is captured as a list + * of ASTNodes that are the expressions in the Sort By clause in a + * PTF invocation. + */ + public static class OrderSpec + { + ArrayList<OrderExpression> expressions; + + public OrderSpec() {} + + public OrderSpec(PartitionSpec pSpec) + { + for(PartitionExpression peSpec : pSpec.getExpressions()) + { + addExpression(new OrderExpression(peSpec)); + } + } + + public ArrayList<OrderExpression> getExpressions() + { + return expressions; + } + + public void setExpressions(ArrayList<OrderExpression> columns) + { + this.expressions = columns; + } + + public void addExpression(OrderExpression c) + { + expressions = expressions == null ? new ArrayList<OrderExpression>() : expressions; + expressions.add(c); + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = 1; + result = prime * result + ((expressions == null) ? 0 : expressions.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + OrderSpec other = (OrderSpec) obj; + if (expressions == null) + { + if (other.expressions != null) { + return false; + } + } + else if (!expressions.equals(other.expressions)) { + return false; + } + return true; + } + + @Override + public String toString() + { + return PTFUtils.sprintf("orderColumns=%s",PTFUtils.toString(expressions)); + } + } + + public static enum Order + { + ASC, + DESC; + } + + public static class OrderExpression extends PartitionExpression + { + Order order; + + public OrderExpression() {} + + public OrderExpression(PartitionExpression peSpec) + { + super(peSpec); + order = Order.ASC; + } + + public Order getOrder() + { + return order; + } + + public void setOrder(Order order) + { + this.order = order; + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + ((order == null) ? 0 : order.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if (!super.equals(obj)) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + OrderExpression other = (OrderExpression) obj; + if (order != other.order) { + return false; + } + return true; + } + + @Override + public String toString() + { + return PTFUtils.sprintf("%s %s", super.toString(), order); + } + } + +}