Author: hashutosh
Date: Wed Feb 20 02:27:17 2013
New Revision: 1447989

URL: http://svn.apache.org/r1447989
Log:
HIVE-3984 : Maintain a clear separation between Windowing & PTF at the 
specification level.

Added:
    
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java
    
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java
    
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDeserializer.java
Removed:
    
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFSpec.java
Modified:
    
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java
    
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
    
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java
    
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java
    
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java
    
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java
    
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java
    
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
    
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingExprNodeEvaluatorFactory.java
    
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java
    
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFDenseRank.java
    
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java
    
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java
    
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFNTile.java
    
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentRank.java
    
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRank.java
    
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRowNumber.java
    
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFLeadLag.java
    
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NPath.java
    
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/Noop.java
    
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopWithMap.java
    
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java
    
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionResolver.java
    
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
    
hive/branches/ptf-windowing/ql/src/test/queries/clientpositive/ptf_general_queries.q
    hive/branches/ptf-windowing/ql/src/test/queries/clientpositive/ptf_rcfile.q
    hive/branches/ptf-windowing/ql/src/test/queries/clientpositive/ptf_seqfile.q
    
hive/branches/ptf-windowing/ql/src/test/results/clientpositive/ptf_general_queries.q.out
    
hive/branches/ptf-windowing/ql/src/test/results/clientpositive/ptf_rcfile.q.out
    
hive/branches/ptf-windowing/ql/src/test/results/clientpositive/ptf_seqfile.q.out

Modified: 
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java
URL: 
http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java?rev=1447989&r1=1447988&r2=1447989&view=diff
==============================================================================
--- 
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java
 (original)
+++ 
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java
 Wed Feb 20 02:27:17 2013
@@ -38,6 +38,7 @@ public class QueryProperties {
   boolean hasSortBy = false;
   boolean hasJoinFollowedByGroupBy = false;
   boolean hasPTF = false;
+  boolean hasWindowing = false;
 
   // does the query have a using clause
   boolean usesScript = false;
@@ -116,4 +117,12 @@ public class QueryProperties {
   public void setHasPTF(boolean hasPTF) {
     this.hasPTF = hasPTF;
   }
+
+  public boolean hasWindowing() {
+    return hasWindowing;
+  }
+
+  public void setHasWindowing(boolean hasWindowing) {
+    this.hasWindowing = hasWindowing;
+  }
 }

Modified: 
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
URL: 
http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java?rev=1447989&r1=1447988&r2=1447989&view=diff
==============================================================================
--- 
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
 (original)
+++ 
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
 Wed Feb 20 02:27:17 2013
@@ -27,17 +27,16 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.parse.PTFTranslator;
-import org.apache.hadoop.hive.ql.parse.PTFTranslator.PTFDefDeserializer;
-import org.apache.hadoop.hive.ql.parse.PTFTranslator.PTFTranslationInfo;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PTFDesc;
-import org.apache.hadoop.hive.ql.plan.PTFDesc.ColumnDef;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.PTFExpressionDef;
 import org.apache.hadoop.hive.ql.plan.PTFDesc.PTFInputDef;
 import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionDef;
-import org.apache.hadoop.hive.ql.plan.PTFDesc.TableFuncDef;
-import org.apache.hadoop.hive.ql.plan.PTFDesc.WhereDef;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionedTableFunctionDef;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowExpressionDef;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowTableFunctionDef;
+import org.apache.hadoop.hive.ql.plan.PTFDeserializer;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag;
 import org.apache.hadoop.hive.ql.udf.ptf.TableFunctionEvaluator;
@@ -50,7 +49,6 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-import org.apache.hadoop.io.Writable;
 
 public class PTFOperator extends Operator<PTFDesc> implements Serializable
 {
@@ -86,19 +84,17 @@ public class PTFOperator extends Operato
                }
 
                reconstructQueryDef(hiveConf);
-    inputPart = PTFOperator.createFirstPartitionForChain(conf,
+    inputPart = createFirstPartitionForChain(
         inputObjInspectors[0], hiveConf, isMapOperator);
 
-               // OI for FileSinkOperator is taken from select-list 
(reduce-side)
-               // OI for ReduceSinkOperator is taken from TODO
                if (isMapOperator)
                {
-                       TableFuncDef tDef = 
PTFTranslator.getFirstTableFunction(conf);
-                       outputObjInspector = tDef.getRawInputOI();
+                       PartitionedTableFunctionDef tDef = 
conf.getStartOfChain();
+                       outputObjInspector = tDef.getRawInputShape().getOI();
                }
                else
                {
-                       outputObjInspector = conf.getSelectList().getOI();
+                       outputObjInspector = 
conf.getFuncDef().getOutputShape().getOI();
                }
 
                setupKeysWrapper(inputObjInspectors[0]);
@@ -141,7 +137,7 @@ public class PTFOperator extends Operato
       if (currentKeys != null && !keysAreEqual)
       {
         processInputPartition();
-        inputPart = PTFOperator.createFirstPartitionForChain(conf, 
inputObjInspectors[0], hiveConf, isMapOperator);
+        inputPart = createFirstPartitionForChain(inputObjInspectors[0], 
hiveConf, isMapOperator);
       }
 
       if (currentKeys == null || !keysAreEqual)
@@ -171,49 +167,59 @@ public class PTFOperator extends Operato
        protected void reconstructQueryDef(HiveConf hiveConf) throws 
HiveException
        {
 
-         PTFDefDeserializer qdd = new PTFDefDeserializer(hiveConf,
-                               inputObjInspectors[0]);
-         PTFTranslator.PTFDefWalker qdw = new PTFTranslator.PTFDefWalker(qdd);
-               qdw.walk(conf);
+         PTFDeserializer dS =
+             new PTFDeserializer(conf, 
(StructObjectInspector)inputObjInspectors[0], hiveConf);
+         dS.initializePTFChain(conf.getFuncDef());
        }
 
        protected void setupKeysWrapper(ObjectInspector inputOI) throws 
HiveException
        {
-               PartitionDef pDef = 
PTFTranslator.getFirstTableFunction(conf).getWindow().getPartDef();
-               ArrayList<ColumnDef> cols = pDef.getColumns();
-               int numCols = cols.size();
-               ExprNodeEvaluator[] keyFields = new ExprNodeEvaluator[numCols];
-               ObjectInspector[] keyOIs = new ObjectInspector[numCols];
-               ObjectInspector[] currentKeyOIs = new ObjectInspector[numCols];
+               PartitionDef pDef = conf.getStartOfChain().getPartition();
+               ArrayList<PTFExpressionDef> exprs = pDef.getExpressions();
+               int numExprs = exprs.size();
+               ExprNodeEvaluator[] keyFields = new ExprNodeEvaluator[numExprs];
+               ObjectInspector[] keyOIs = new ObjectInspector[numExprs];
+               ObjectInspector[] currentKeyOIs = new ObjectInspector[numExprs];
 
-               for(int i=0; i<numCols; i++)
+               for(int i=0; i<numExprs; i++)
                {
-                       ColumnDef cDef = cols.get(i);
+                 PTFExpressionDef exprDef = exprs.get(i);
                        /*
                         * Why cannot we just use the ExprNodeEvaluator on the 
column?
                         * - because on the reduce-side it is initialized based 
on the rowOI of the HiveTable
                         *   and not the OI of the ExtractOp ( the parent of 
this Operator on the reduce-side)
                         */
-                       keyFields[i] = 
ExprNodeEvaluatorFactory.get(cDef.getExprNode());
+                       keyFields[i] = 
ExprNodeEvaluatorFactory.get(exprDef.getExprNode());
                        keyOIs[i] = keyFields[i].initialize(inputOI);
-                       currentKeyOIs[i] = 
ObjectInspectorUtils.getStandardObjectInspector(keyOIs[i], 
ObjectInspectorCopyOption.WRITABLE);
+                       currentKeyOIs[i] =
+                           
ObjectInspectorUtils.getStandardObjectInspector(keyOIs[i],
+                               ObjectInspectorCopyOption.WRITABLE);
                }
 
                keyWrapperFactory = new KeyWrapperFactory(keyFields, keyOIs, 
currentKeyOIs);
-
-           newKeys = keyWrapperFactory.getKeyWrapper();
+         newKeys = keyWrapperFactory.getKeyWrapper();
        }
 
        protected void processInputPartition() throws HiveException
        {
-         PTFPartition outPart = PTFOperator.executeChain(conf, inputPart);
-    PTFOperator.executeSelectList(conf, outPart, this);
+         PTFPartition outPart = executeChain(inputPart);
+         if ( conf.forWindowing() ) {
+           executeWindowExprs(outPart);
+         }
+         else {
+           PTFPartitionIterator<Object> pItr = outPart.iterator();
+           while (pItr.hasNext())
+           {
+             Object oRow = pItr.next();
+             forward(oRow, outputObjInspector);
+           }
+         }
        }
 
        protected void processMapFunction() throws HiveException
        {
-         TableFuncDef tDef = PTFTranslator.getFirstTableFunction(conf);
-    PTFPartition outPart = tDef.getFunction().transformRawInput(inputPart);
+         PartitionedTableFunctionDef tDef = conf.getStartOfChain();
+    PTFPartition outPart = tDef.getTFunction().transformRawInput(inputPart);
     PTFPartitionIterator<Object> pItr = outPart.iterator();
     while (pItr.hasNext())
     {
@@ -247,22 +253,21 @@ public class PTFOperator extends Operato
    * For each table function popped out of the stack,
    * execute the function on the input partition
    * and return an output partition.
-   * @param ptfDesc
    * @param part
    * @return
    * @throws HiveException
    */
-  private static PTFPartition executeChain(PTFDesc ptfDesc, PTFPartition part)
+  private PTFPartition executeChain(PTFPartition part)
       throws HiveException
   {
-    Stack<TableFuncDef> fnDefs = new Stack<TableFuncDef>();
-    PTFInputDef iDef = ptfDesc.getInput();
+    Stack<PartitionedTableFunctionDef> fnDefs = new 
Stack<PartitionedTableFunctionDef>();
+    PTFInputDef iDef = conf.getFuncDef();
     while (true)
     {
-      if (iDef instanceof TableFuncDef)
+      if (iDef instanceof PartitionedTableFunctionDef)
       {
-        fnDefs.push((TableFuncDef) iDef);
-        iDef = ((TableFuncDef) iDef).getInput();
+        fnDefs.push((PartitionedTableFunctionDef) iDef);
+        iDef = ((PartitionedTableFunctionDef) iDef).getInput();
       }
       else
       {
@@ -270,64 +275,96 @@ public class PTFOperator extends Operato
       }
     }
 
-    TableFuncDef currFnDef;
+    PartitionedTableFunctionDef currFnDef;
     while (!fnDefs.isEmpty())
     {
       currFnDef = fnDefs.pop();
-      part = currFnDef.getFunction().execute(part);
+      part = currFnDef.getTFunction().execute(part);
     }
     return part;
   }
 
   /**
-   * For each row in the partition:
-   * 1. evaluate the where condition if applicable.
-   * 2. evaluate the value for each column retrieved
-   *    from the select list
-   * 3. Forward the writable value or object based on the
-   *    implementation of the ForwardSink
+   * If WindowingSpec contains any Windowing Expressions or has a
+   * Having condition then these are processed
+   * and forwarded on. Whereas when there is no having or WdwExprs
+   * just forward the rows in the output Partition.
+   *
+   * For e.g. Consider the following query:
+   * <pre>
+   * {@code
+   *  select rank(), lead(rank(),1),...
+   *  from xyz
+   *  ...
+   *  having rank() > 1
+   *  }
+   * </pre>
+   * rank() gets processed as a WdwFn; Its in the oPart(output partition)
+   * argument to executeWindowExprs. Here we first evaluate the having 
expression.
+   * So the first row of oPart gets filtered out.
+   * Next we evaluate lead(rank()) which is held as a WindowExpression and add 
it to the output.
+   *
    * @param ptfDesc
-   * @param oPart
-   * @param rS
+   * @param oPart output partition after Window Fns are processed.
+   * @param op
    * @throws HiveException
    */
-  @SuppressWarnings(
-  { "rawtypes", "unchecked" })
-  private static void executeSelectList(PTFDesc ptfDesc, PTFPartition oPart, 
PTFOperator op)
+  private void executeWindowExprs(PTFPartition oPart)
       throws HiveException
   {
-    StructObjectInspector selectOI = ptfDesc.getSelectList().getOI();
-    StructObjectInspector inputOI = ptfDesc.getInput().getOI();
-    int numCols = selectOI.getAllStructFieldRefs().size();
-    ArrayList<ColumnDef> cols = ptfDesc.getSelectList().getColumns();
-    int numSelCols = cols == null ? 0 : cols.size();
+    WindowTableFunctionDef wTFnDef = (WindowTableFunctionDef) 
conf.getFuncDef();
+    /*
+     * inputOI represents the row with WindowFn results present.
+     * So in the e.g. above it will have a column for 'rank()'
+     */
+    StructObjectInspector inputOI = 
wTFnDef.getOutputFromWdwFnProcessing().getOI();
+    /*
+     * outputOI represents the final row with the Windowing Expressions added.
+     * So in the e.g. above it will have a column for 'lead(rank())' in 
addition to
+     * all columns in inputOI.
+     */
+    StructObjectInspector outputOI = wTFnDef.getOutputShape().getOI();
+    int numCols = outputOI.getAllStructFieldRefs().size();
+    ArrayList<WindowExpressionDef> wdwExprs = wTFnDef.getWindowExpressions();
+    int numWdwExprs = wdwExprs == null ? 0 : wdwExprs.size();
     Object[] output = new Object[numCols];
 
-
-    WhereDef whDef = ptfDesc.getWhere();
-    boolean applyWhere = whDef != null;
-    Converter whConverter = !applyWhere ? null
+    PTFExpressionDef havingExpr = wTFnDef.getHavingExpression();
+    boolean applyHaving = havingExpr != null;
+    Converter hvgConverter = !applyHaving ? null
         : ObjectInspectorConverters
             .getConverter(
-                whDef.getOI(),
+                havingExpr.getOI(),
                 PrimitiveObjectInspectorFactory.javaBooleanObjectInspector);
-    ExprNodeEvaluator whCondEval = !applyWhere ? null : whDef
-        .getExprEvaluator();
+    ExprNodeEvaluator havingCondEval = !applyHaving ? null : 
havingExpr.getExprEvaluator();
+    /*
+     * If this Windowing invocation has no Window Expressions and doesn't need 
to be filtered,
+     * we can just forward the row in the oPart partition.
+     */
+    boolean forwardRowsUntouched = !applyHaving && (wdwExprs == null || 
wdwExprs.size() == 0 );
 
-    Writable value = null;
     PTFPartitionIterator<Object> pItr = oPart.iterator();
-    PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, pItr);
+    PTFOperator.connectLeadLagFunctionsToPartition(conf, pItr);
     while (pItr.hasNext())
     {
       int colCnt = 0;
       Object oRow = pItr.next();
 
-      if (applyWhere)
+      /*
+       * when there is no Windowing expressions or having;
+       * just forward the Object coming out of the Partition.
+       */
+      if ( forwardRowsUntouched ) {
+        forward(oRow, outputObjInspector);
+        continue;
+      }
+
+      if (applyHaving)
       {
-        Object whCond = null;
-        whCond = whCondEval.evaluate(oRow);
-        whCond = whConverter.convert(whCond);
-        if (whCond == null || !((Boolean) whCond).booleanValue())
+        Object hvgCond = null;
+        hvgCond = havingCondEval.evaluate(oRow);
+        hvgCond = hvgConverter.convert(hvgCond);
+        if (!((Boolean) hvgCond).booleanValue())
         {
           continue;
         }
@@ -335,50 +372,61 @@ public class PTFOperator extends Operato
 
       /*
        * Setup the output row columns in the following order
-       * - the columns in the SelectList processed by the PTF (ie the Select 
Exprs that have navigation expressions)
+       * - the columns in the SelectList processed by the PTF
+       * (ie the Select Exprs that have navigation expressions)
        * - the columns from the final PTF.
        */
 
-      if ( cols != null ) {
-        for (ColumnDef cDef : cols)
+      if ( wdwExprs != null ) {
+        for (WindowExpressionDef wdwExpr : wdwExprs)
         {
-          Object newCol = cDef.getExprEvaluator().evaluate(oRow);
+          Object newCol = wdwExpr.getExprEvaluator().evaluate(oRow);
           output[colCnt++] = newCol;
         }
       }
 
       for(; colCnt < numCols; ) {
-        StructField field = inputOI.getAllStructFieldRefs().get(colCnt - 
numSelCols);
-        output[colCnt++] = 
ObjectInspectorUtils.copyToStandardObject(inputOI.getStructFieldData(oRow, 
field),
+        StructField field = inputOI.getAllStructFieldRefs().get(colCnt - 
numWdwExprs);
+        output[colCnt++] =
+            
ObjectInspectorUtils.copyToStandardObject(inputOI.getStructFieldData(oRow, 
field),
             field.getFieldObjectInspector());
       }
 
-      op.forward(output, op.outputObjInspector);
+      forward(output, outputObjInspector);
     }
   }
 
   /**
-   * Create a new partition.
-   * The input OI is used to evaluate rows appended to the partition.
-   * The serde is determined based on whether the query has a map-phase
-   * or not. The OI on the serde is used by PTFs to evaluate output of the
-   * partition.
-   * @param ptfDesc
+   * Create a new Partition.
+   * A partition has 2 OIs: the OI for the rows being put in and the OI for 
the rows
+   * coming out. You specify the output OI by giving the Serde to use to 
Serialize.
+   * Typically these 2 OIs are the same; but not always. For the
+   * first PTF in a chain the OI of the incoming rows is dictated by the 
Parent Op
+   * to this PTFOp. The output OI from the Partition is typically 
LazyBinaryStruct, but
+   * not always. In the case of Noop/NoopMap we keep the Strcuture the same as
+   * what is given to us.
+   * <p>
+   * The Partition we want to create here is for feeding the First table 
function in the chain.
+   * So for map-side processing use the Serde from the output Shape its 
InputDef.
+   * For reduce-side processing use the Serde from its RawInputShape(the shape
+   * after map-side processing).
    * @param oi
    * @param hiveConf
+   * @param isMapSide
    * @return
    * @throws HiveException
    */
-  public static PTFPartition createFirstPartitionForChain(PTFDesc ptfDesc, 
ObjectInspector oi,
+  public PTFPartition createFirstPartitionForChain(ObjectInspector oi,
       HiveConf hiveConf, boolean isMapSide) throws HiveException
   {
-    TableFuncDef tabDef = PTFTranslator.getFirstTableFunction(ptfDesc);
-    TableFunctionEvaluator tEval = tabDef.getFunction();
+    PartitionedTableFunctionDef tabDef = conf.getStartOfChain();
+    TableFunctionEvaluator tEval = tabDef.getTFunction();
     String partClassName = tEval.getPartitionClass();
     int partMemSize = tEval.getPartitionMemSize();
 
     PTFPartition part = null;
-    SerDe serde = tabDef.getInput().getSerde();
+    SerDe serde = isMapSide ? tabDef.getInput().getOutputShape().getSerde() :
+      tabDef.getRawInputShape().getSerde();
     part = new PTFPartition(partClassName, partMemSize, serde,
         (StructObjectInspector) oi);
     return part;
@@ -388,9 +436,7 @@ public class PTFOperator extends Operato
   public static void connectLeadLagFunctionsToPartition(PTFDesc ptfDesc,
       PTFPartitionIterator<Object> pItr) throws HiveException
   {
-    PTFTranslationInfo tInfo = ptfDesc.getTranslationInfo();
-    List<ExprNodeGenericFuncDesc> llFnDescs = tInfo.getLLInfo()
-        .getLeadLagExprs();
+    List<ExprNodeGenericFuncDesc> llFnDescs = 
ptfDesc.getLlInfo().getLeadLagExprs();
     if (llFnDescs == null) {
       return;
     }

Modified: 
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java
URL: 
http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java?rev=1447989&r1=1447988&r2=1447989&view=diff
==============================================================================
--- 
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java
 (original)
+++ 
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java
 Wed Feb 20 02:27:17 2013
@@ -142,7 +142,7 @@ public class PTFPartition
   public PTFPartitionIterator<Object> range(int start, int end)
   {
     assert(start >= 0);
-    assert(end < size());
+    assert(end <= size());
     assert(start <= end);
     return new PItr(start, end);
   }

Modified: 
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java
URL: 
http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java?rev=1447989&r1=1447988&r2=1447989&view=diff
==============================================================================
--- 
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java
 (original)
+++ 
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java
 Wed Feb 20 02:27:17 2013
@@ -43,7 +43,7 @@ import org.antlr.runtime.tree.BaseTree;
 import org.antlr.runtime.tree.CommonTree;
 import org.apache.hadoop.hive.ql.exec.Utilities.EnumDelegate;
 import org.apache.hadoop.hive.ql.parse.ASTNode;
-import org.apache.hadoop.hive.ql.parse.PTFSpec.WindowFrameSpec.Direction;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.Direction;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 

Modified: 
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java
URL: 
http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java?rev=1447989&r1=1447988&r2=1447989&view=diff
==============================================================================
--- 
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java
 (original)
+++ 
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java
 Wed Feb 20 02:27:17 2013
@@ -34,7 +34,13 @@ public @interface WindowFunctionDescript
 {
        Description description ();
        /**
-        * controls whether this function can be applied to a Window
+        * controls whether this function can be applied to a Window.
+        * <p>
+        * Ranking function: Rank, Dense_Rank, Percent_Rank and Cume_Dist don't 
operate on Windows.
+        * Why? a window specification implies a row specific range i.e. every 
row gets its own set of rows to process the UDAF on.
+        * For ranking defining a set of rows for every row makes no sense.
+        * <p>
+        * All other UDAFs can be computed for a Window.
         */
        boolean supportsWindow() default true;
        /**
@@ -45,3 +51,4 @@ public @interface WindowFunctionDescript
         */
        boolean pivotResult() default false;
 }
+

Added: 
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java
URL: 
http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java?rev=1447989&view=auto
==============================================================================
--- 
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java
 (added)
+++ 
hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java
 Wed Feb 20 02:27:17 2013
@@ -0,0 +1,515 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.hive.ql.exec.PTFUtils;
+
+public class PTFInvocationSpec {
+
+  PartitionedTableFunctionSpec function;
+
+  public PartitionedTableFunctionSpec getFunction() {
+    return function;
+  }
+
+  public void setFunction(PartitionedTableFunctionSpec function) {
+    this.function = function;
+  }
+
+  public PartitionedTableFunctionSpec getStartOfChain() {
+    return function == null ? null : function.getStartOfChain();
+  }
+
+  public String getQueryInputName() {
+    return function == null ? null : function.getQueryInputName();
+  }
+
+  public PTFQueryInputSpec getQueryInput() {
+    return function == null ? null : function.getQueryInput();
+  }
+
+  /*
+   * A PTF Input represents the input to a PTF Function. An Input can be a 
Hive SubQuery or Table
+   * or another PTF Function. An Input instance captures the ASTNode that this 
instance was created from.
+   */
+  public abstract static class PTFInputSpec {
+    ASTNode astNode;
+
+    public ASTNode getAstNode() {
+      return astNode;
+    }
+
+    public void setAstNode(ASTNode astNode) {
+      this.astNode = astNode;
+    }
+
+    public abstract PTFInputSpec getInput();
+
+    public abstract String getQueryInputName();
+    public abstract PTFQueryInputSpec getQueryInput();
+  }
+
+  public static enum PTFQueryInputType {
+    TABLE,
+    SUBQUERY,
+    PTFCOMPONENT,
+    WINDOWING;
+  }
+
+  /*
+   * A PTF input that represents a source in the overall Query. This could be 
a Table or a SubQuery.
+   * If a PTF chain requires execution by multiple PTF Operators;
+   * then the original Invocation object is decomposed into a set of Component 
Invocations.
+   * Every component Invocation but the first one ends in a PTFQueryInputSpec 
instance.
+   * During the construction of the Operator plan a PTFQueryInputSpec object 
in the chain implies connect the PTF Operator to the
+   * 'input' i.e. has been generated so far.
+   */
+  public static class PTFQueryInputSpec extends PTFInputSpec {
+    String source;
+    PTFQueryInputType type;
+
+    public String getSource() {
+      return source;
+    }
+    public void setSource(String source) {
+      this.source = source;
+    }
+    public PTFQueryInputType getType() {
+      return type;
+    }
+    public void setType(PTFQueryInputType type) {
+      this.type = type;
+    }
+
+    @Override
+    public PTFInputSpec getInput() {
+      return null;
+    }
+
+    @Override
+    public String getQueryInputName() {
+      return getSource();
+    }
+    @Override
+    public PTFQueryInputSpec getQueryInput() {
+      return this;
+    }
+  }
+
+  /*
+   * Represents a PTF Invocation. Captures:
+   * - function name and alias
+   * - the Partitioning details about its input
+   * - its arguments. The ASTNodes representing the arguments are captured 
here.
+   * - a reference to its Input
+   */
+  public static class PartitionedTableFunctionSpec  extends PTFInputSpec {
+    String name;
+    String alias;
+    ArrayList<ASTNode> args;
+    PartitioningSpec partitioning;
+    PTFInputSpec input;
+    public String getName() {
+      return name;
+    }
+    public void setName(String name) {
+      this.name = name;
+    }
+    public String getAlias() {
+      return alias;
+    }
+    public void setAlias(String alias) {
+      this.alias = alias;
+    }
+    public ArrayList<ASTNode> getArgs() {
+      return args;
+    }
+    public void setArgs(ArrayList<ASTNode> args) {
+      this.args = args;
+    }
+    public PartitioningSpec getPartitioning() {
+      return partitioning;
+    }
+    public void setPartitioning(PartitioningSpec partitioning) {
+      this.partitioning = partitioning;
+    }
+    @Override
+    public PTFInputSpec getInput() {
+      return input;
+    }
+    public void setInput(PTFInputSpec input) {
+      this.input = input;
+    }
+    public PartitionSpec getPartition() {
+      return getPartitioning() == null ? null : 
getPartitioning().getPartSpec();
+    }
+    public void setPartition(PartitionSpec partSpec) {
+      partitioning = partitioning == null ? new PartitioningSpec() : 
partitioning;
+      partitioning.setPartSpec(partSpec);
+    }
+    public OrderSpec getOrder() {
+      return getPartitioning() == null ? null : 
getPartitioning().getOrderSpec();
+    }
+    public void setOrder(OrderSpec orderSpec) {
+      partitioning = partitioning == null ? new PartitioningSpec() : 
partitioning;
+      partitioning.setOrderSpec(orderSpec);
+    }
+    public void addArg(ASTNode arg)
+    {
+      args = args == null ? new ArrayList<ASTNode>() : args;
+      args.add(arg);
+    }
+
+    public PartitionedTableFunctionSpec getStartOfChain() {
+      if ( input instanceof PartitionedTableFunctionSpec ) {
+        return ((PartitionedTableFunctionSpec)input).getStartOfChain();
+      }
+      return this;
+    }
+    @Override
+    public String getQueryInputName() {
+      return input.getQueryInputName();
+    }
+    @Override
+    public PTFQueryInputSpec getQueryInput() {
+      return input.getQueryInput();
+    }
+  }
+
+  /*
+   * Captures how the Input to a PTF Function should be partitioned and
+   * ordered. Refers to a /Partition/ and /Order/ instance.
+   */
+  public static class PartitioningSpec {
+    PartitionSpec partSpec;
+    OrderSpec orderSpec;
+    public PartitionSpec getPartSpec() {
+      return partSpec;
+    }
+    public void setPartSpec(PartitionSpec partSpec) {
+      this.partSpec = partSpec;
+    }
+    public OrderSpec getOrderSpec() {
+      return orderSpec;
+    }
+    public void setOrderSpec(OrderSpec orderSpec) {
+      this.orderSpec = orderSpec;
+    }
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((orderSpec == null) ? 0 : 
orderSpec.hashCode());
+      result = prime * result + ((partSpec == null) ? 0 : partSpec.hashCode());
+      return result;
+    }
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      PartitioningSpec other = (PartitioningSpec) obj;
+      if (orderSpec == null) {
+        if (other.orderSpec != null) {
+          return false;
+        }
+      } else if (!orderSpec.equals(other.orderSpec)) {
+        return false;
+      }
+      if (partSpec == null) {
+        if (other.partSpec != null) {
+          return false;
+        }
+      } else if (!partSpec.equals(other.partSpec)) {
+        return false;
+      }
+      return true;
+    }
+  }
+
+  /*
+   * Captures how an Input should be Partitioned. This is captured as a
+   * list of ASTNodes that are the expressions in the Distribute/Cluster
+   * by clause specifying the partitioning applied for a PTF invocation.
+   */
+  public static class PartitionSpec {
+    ArrayList<PartitionExpression> expressions;
+
+    public ArrayList<PartitionExpression> getExpressions()
+    {
+      return expressions;
+    }
+
+    public void setExpressions(ArrayList<PartitionExpression> columns)
+    {
+      this.expressions = columns;
+    }
+
+    public void addExpression(PartitionExpression c)
+    {
+      expressions = expressions == null ? new ArrayList<PartitionExpression>() 
: expressions;
+      expressions.add(c);
+    }
+
+    @Override
+    public int hashCode()
+    {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((expressions == null) ? 0 : 
expressions.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      PartitionSpec other = (PartitionSpec) obj;
+      if (expressions == null)
+      {
+        if (other.expressions != null) {
+          return false;
+        }
+      }
+      else if (!expressions.equals(other.expressions)) {
+        return false;
+      }
+      return true;
+    }
+
+    @Override
+    public String toString()
+    {
+      return 
PTFUtils.sprintf("partitionColumns=%s",PTFUtils.toString(expressions));
+    }
+  }
+
+  public static class PartitionExpression
+  {
+    ASTNode expression;
+
+    public PartitionExpression() {}
+
+    public PartitionExpression(PartitionExpression peSpec)
+    {
+      expression = peSpec.getExpression();
+    }
+
+    public ASTNode getExpression() {
+      return expression;
+    }
+
+    public void setExpression(ASTNode expression) {
+      this.expression = expression;
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((expression == null) ? 0 : 
expression.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      PartitionExpression other = (PartitionExpression) obj;
+      if (expression == null) {
+        if (other.expression != null) {
+          return false;
+        }
+      } else if 
(!expression.toStringTree().equals(other.expression.toStringTree())) {
+        return false;
+      }
+      return true;
+    }
+
+    @Override
+    public String toString()
+    {
+    return expression.toStringTree();
+    }
+
+  }
+
+  /*
+   * Captures how the Input should be Ordered. This is captured as a list
+   * of ASTNodes that are the expressions in the Sort By clause in a
+   * PTF invocation.
+   */
+  public static class OrderSpec
+  {
+    ArrayList<OrderExpression> expressions;
+
+    public OrderSpec() {}
+
+    public OrderSpec(PartitionSpec pSpec)
+    {
+      for(PartitionExpression peSpec : pSpec.getExpressions())
+      {
+        addExpression(new OrderExpression(peSpec));
+      }
+    }
+
+    public ArrayList<OrderExpression> getExpressions()
+    {
+      return expressions;
+    }
+
+    public void setExpressions(ArrayList<OrderExpression> columns)
+    {
+      this.expressions = columns;
+    }
+
+    public void addExpression(OrderExpression c)
+    {
+      expressions = expressions == null ? new ArrayList<OrderExpression>() : 
expressions;
+      expressions.add(c);
+    }
+
+    @Override
+    public int hashCode()
+    {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((expressions == null) ? 0 : 
expressions.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      OrderSpec other = (OrderSpec) obj;
+      if (expressions == null)
+      {
+        if (other.expressions != null) {
+          return false;
+        }
+      }
+      else if (!expressions.equals(other.expressions)) {
+        return false;
+      }
+      return true;
+    }
+
+    @Override
+    public String toString()
+    {
+      return 
PTFUtils.sprintf("orderColumns=%s",PTFUtils.toString(expressions));
+    }
+  }
+
+  public static enum Order
+  {
+    ASC,
+    DESC;
+  }
+
+  public static class OrderExpression extends PartitionExpression
+  {
+    Order order;
+
+    public OrderExpression() {}
+
+    public OrderExpression(PartitionExpression peSpec)
+    {
+      super(peSpec);
+      order = Order.ASC;
+    }
+
+    public Order getOrder()
+    {
+      return order;
+    }
+
+    public void setOrder(Order order)
+    {
+      this.order = order;
+    }
+
+    @Override
+    public int hashCode()
+    {
+      final int prime = 31;
+      int result = super.hashCode();
+      result = prime * result + ((order == null) ? 0 : order.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+      if (this == obj) {
+        return true;
+      }
+      if (!super.equals(obj)) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      OrderExpression other = (OrderExpression) obj;
+      if (order != other.order) {
+        return false;
+      }
+      return true;
+    }
+
+    @Override
+    public String toString()
+    {
+      return PTFUtils.sprintf("%s %s", super.toString(), order);
+    }
+  }
+
+}


Reply via email to