Repository: hive
Updated Branches:
  refs/heads/master be7329582 -> a7cf25a5a


HIVE-13936: Add streaming support for row_number (Yongzhi Chen, reviewed by 
Chaoyu Tang)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a7cf25a5
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a7cf25a5
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a7cf25a5

Branch: refs/heads/master
Commit: a7cf25a5a84ee3f60c6123f40cf7aa9bb90987ee
Parents: be73295
Author: Yongzhi Chen <[email protected]>
Authored: Tue Aug 9 10:39:27 2016 -0400
Committer: Yongzhi Chen <[email protected]>
Committed: Thu Aug 18 11:04:00 2016 -0400

----------------------------------------------------------------------
 .../ql/udf/generic/GenericUDAFRowNumber.java    | 48 ++++++++++++++++++--
 .../clientpositive/windowing_streaming.q        |  4 ++
 .../clientpositive/windowing_streaming.q.out    | 33 ++++++++++++++
 3 files changed, 80 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/a7cf25a5/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRowNumber.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRowNumber.java 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRowNumber.java
index 8e672e6..e56aeea 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRowNumber.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRowNumber.java
@@ -27,7 +27,10 @@ import 
org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
 import org.apache.hadoop.hive.ql.exec.WindowFunctionDescription;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
 import 
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
+import 
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRank.GenericUDAFAbstractRankEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRank.RankBuffer;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
@@ -59,22 +62,36 @@ public class GenericUDAFRowNumber extends 
AbstractGenericUDAFResolver {
 
     ArrayList<IntWritable> rowNums;
     int nextRow;
+    boolean supportsStreaming;
 
     void init() {
       rowNums = new ArrayList<IntWritable>();
+      nextRow = 1;
+      if (supportsStreaming) {
+        rowNums.add(null);
+      }
     }
 
-    RowNumberBuffer() {
+    RowNumberBuffer(boolean supportsStreaming) {
+      this.supportsStreaming = supportsStreaming;
       init();
-      nextRow = 1;
     }
 
     void incr() {
-      rowNums.add(new IntWritable(nextRow++));
+      if (supportsStreaming) {
+        rowNums.set(0,new IntWritable(nextRow++));
+      } else {
+        rowNums.add(new IntWritable(nextRow++));
+      }
     }
   }
 
-  public static class GenericUDAFRowNumberEvaluator extends 
GenericUDAFEvaluator {
+  public static class GenericUDAFAbstractRowNumberEvaluator extends 
GenericUDAFEvaluator {
+    boolean isStreamingMode = false;
+
+    protected boolean isStreaming() {
+      return isStreamingMode;
+    }
 
     @Override
     public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws 
HiveException {
@@ -89,7 +106,7 @@ public class GenericUDAFRowNumber extends 
AbstractGenericUDAFResolver {
 
     @Override
     public AggregationBuffer getNewAggregationBuffer() throws HiveException {
-      return new RowNumberBuffer();
+      return new RowNumberBuffer(isStreamingMode);
     }
 
     @Override
@@ -118,5 +135,26 @@ public class GenericUDAFRowNumber extends 
AbstractGenericUDAFResolver {
     }
 
   }
+
+  public static class GenericUDAFRowNumberEvaluator extends 
GenericUDAFAbstractRowNumberEvaluator
+  implements ISupportStreamingModeForWindowing {
+
+    @Override
+    public Object getNextResult(AggregationBuffer agg) throws HiveException {
+      return ((RowNumberBuffer) agg).rowNums.get(0);
+    }
+
+    @Override
+    public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) {
+      isStreamingMode = true;
+      return this;
+    }
+
+    @Override
+    public int getRowsRemainingAfterTerminate() throws HiveException {
+      return 0;
+    }
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/a7cf25a5/ql/src/test/queries/clientpositive/windowing_streaming.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/windowing_streaming.q 
b/ql/src/test/queries/clientpositive/windowing_streaming.q
index 294fe09..b8442e6 100644
--- a/ql/src/test/queries/clientpositive/windowing_streaming.q
+++ b/ql/src/test/queries/clientpositive/windowing_streaming.q
@@ -43,6 +43,10 @@ select *
 from (select t, f, rank() over(partition by t order by f) r from over10k) a 
 where r < 6 and t < 5;
 
+select *
+from (select t, f, row_number() over(partition by t order by f) r from 
over10k) a
+where r < 8 and t < 0;
+
 set hive.vectorized.execution.enabled=false;
 set hive.limit.pushdown.memory.usage=0.8;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/a7cf25a5/ql/src/test/results/clientpositive/windowing_streaming.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/windowing_streaming.q.out 
b/ql/src/test/results/clientpositive/windowing_streaming.q.out
index a74ddb3..701ae40 100644
--- a/ql/src/test/results/clientpositive/windowing_streaming.q.out
+++ b/ql/src/test/results/clientpositive/windowing_streaming.q.out
@@ -287,6 +287,39 @@ POSTHOOK: Input: default@over10k
 4      5.53    3
 4      5.76    4
 4      7.26    5
+PREHOOK: query: select *
+from (select t, f, row_number() over(partition by t order by f) r from 
over10k) a
+where r < 8 and t < 0
+PREHOOK: type: QUERY
+PREHOOK: Input: default@over10k
+#### A masked pattern was here ####
+POSTHOOK: query: select *
+from (select t, f, row_number() over(partition by t order by f) r from 
over10k) a
+where r < 8 and t < 0
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@over10k
+#### A masked pattern was here ####
+-3     0.56    1
+-3     0.83    2
+-3     2.26    3
+-3     2.48    4
+-3     3.82    5
+-3     6.8     6
+-3     6.83    7
+-2     1.55    1
+-2     1.65    2
+-2     1.79    3
+-2     4.06    4
+-2     4.4     5
+-2     5.43    6
+-2     5.59    7
+-1     0.79    1
+-1     0.95    2
+-1     1.27    3
+-1     1.49    4
+-1     2.8     5
+-1     4.08    6
+-1     4.31    7
 PREHOOK: query: explain
 select * from (select ctinyint, cdouble, rank() over(partition by ctinyint 
order by cdouble) r from  alltypesorc) a where r < 5
 PREHOOK: type: QUERY

Reply via email to