Repository: hive Updated Branches: refs/heads/master be7329582 -> a7cf25a5a
HIVE-13936: Add streaming support for row_number (Yongzhi Chen, reviewed by Chaoyu Tang) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a7cf25a5 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a7cf25a5 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a7cf25a5 Branch: refs/heads/master Commit: a7cf25a5a84ee3f60c6123f40cf7aa9bb90987ee Parents: be73295 Author: Yongzhi Chen <[email protected]> Authored: Tue Aug 9 10:39:27 2016 -0400 Committer: Yongzhi Chen <[email protected]> Committed: Thu Aug 18 11:04:00 2016 -0400 ---------------------------------------------------------------------- .../ql/udf/generic/GenericUDAFRowNumber.java | 48 ++++++++++++++++++-- .../clientpositive/windowing_streaming.q | 4 ++ .../clientpositive/windowing_streaming.q.out | 33 ++++++++++++++ 3 files changed, 80 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/a7cf25a5/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRowNumber.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRowNumber.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRowNumber.java index 8e672e6..e56aeea 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRowNumber.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRowNumber.java @@ -27,7 +27,10 @@ import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.exec.WindowFunctionDescription; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRank.GenericUDAFAbstractRankEvaluator; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRank.RankBuffer; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; @@ -59,22 +62,36 @@ public class GenericUDAFRowNumber extends AbstractGenericUDAFResolver { ArrayList<IntWritable> rowNums; int nextRow; + boolean supportsStreaming; void init() { rowNums = new ArrayList<IntWritable>(); + nextRow = 1; + if (supportsStreaming) { + rowNums.add(null); + } } - RowNumberBuffer() { + RowNumberBuffer(boolean supportsStreaming) { + this.supportsStreaming = supportsStreaming; init(); - nextRow = 1; } void incr() { - rowNums.add(new IntWritable(nextRow++)); + if (supportsStreaming) { + rowNums.set(0,new IntWritable(nextRow++)); + } else { + rowNums.add(new IntWritable(nextRow++)); + } } } - public static class GenericUDAFRowNumberEvaluator extends GenericUDAFEvaluator { + public static class GenericUDAFAbstractRowNumberEvaluator extends GenericUDAFEvaluator { + boolean isStreamingMode = false; + + protected boolean isStreaming() { + return isStreamingMode; + } @Override public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { @@ -89,7 +106,7 @@ public class GenericUDAFRowNumber extends AbstractGenericUDAFResolver { @Override public AggregationBuffer getNewAggregationBuffer() throws HiveException { - return new RowNumberBuffer(); + return new RowNumberBuffer(isStreamingMode); } @Override @@ -118,5 +135,26 @@ public class GenericUDAFRowNumber extends AbstractGenericUDAFResolver { } } + + public static class GenericUDAFRowNumberEvaluator extends GenericUDAFAbstractRowNumberEvaluator + implements ISupportStreamingModeForWindowing { + + @Override + public Object getNextResult(AggregationBuffer agg) throws HiveException { + return ((RowNumberBuffer) agg).rowNums.get(0); + } + + @Override + public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { + isStreamingMode = true; + return this; + } + + @Override + public int getRowsRemainingAfterTerminate() throws HiveException { + return 0; + } + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/a7cf25a5/ql/src/test/queries/clientpositive/windowing_streaming.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/windowing_streaming.q b/ql/src/test/queries/clientpositive/windowing_streaming.q index 294fe09..b8442e6 100644 --- a/ql/src/test/queries/clientpositive/windowing_streaming.q +++ b/ql/src/test/queries/clientpositive/windowing_streaming.q @@ -43,6 +43,10 @@ select * from (select t, f, rank() over(partition by t order by f) r from over10k) a where r < 6 and t < 5; +select * +from (select t, f, row_number() over(partition by t order by f) r from over10k) a +where r < 8 and t < 0; + set hive.vectorized.execution.enabled=false; set hive.limit.pushdown.memory.usage=0.8; http://git-wip-us.apache.org/repos/asf/hive/blob/a7cf25a5/ql/src/test/results/clientpositive/windowing_streaming.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/windowing_streaming.q.out b/ql/src/test/results/clientpositive/windowing_streaming.q.out index a74ddb3..701ae40 100644 --- a/ql/src/test/results/clientpositive/windowing_streaming.q.out +++ b/ql/src/test/results/clientpositive/windowing_streaming.q.out @@ -287,6 +287,39 @@ POSTHOOK: Input: default@over10k 4 5.53 3 4 5.76 4 4 7.26 5 +PREHOOK: query: select * +from (select t, f, row_number() over(partition by t order by f) r from over10k) a +where r < 8 and t < 0 +PREHOOK: type: QUERY +PREHOOK: Input: default@over10k +#### A masked pattern was here #### +POSTHOOK: query: select * +from (select t, f, row_number() over(partition by t order by f) r from over10k) a +where r < 8 and t < 0 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@over10k +#### A masked pattern was here #### +-3 0.56 1 +-3 0.83 2 +-3 2.26 3 +-3 2.48 4 +-3 3.82 5 +-3 6.8 6 +-3 6.83 7 +-2 1.55 1 +-2 1.65 2 +-2 1.79 3 +-2 4.06 4 +-2 4.4 5 +-2 5.43 6 +-2 5.59 7 +-1 0.79 1 +-1 0.95 2 +-1 1.27 3 +-1 1.49 4 +-1 2.8 5 +-1 4.08 6 +-1 4.31 7 PREHOOK: query: explain select * from (select ctinyint, cdouble, rank() over(partition by ctinyint order by cdouble) r from alltypesorc) a where r < 5 PREHOOK: type: QUERY
