Repository: hive Updated Branches: refs/heads/branch-2.0 bfc25f42f -> c9221c806
HIVE-12574: windowing function returns incorrect result when the window size is larger than the partition size (reviewed by Jimmy Xiang) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c9221c80 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c9221c80 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c9221c80 Branch: refs/heads/branch-2.0 Commit: c9221c8068133608fc3502c009b56d4d3512e6e1 Parents: bfc25f4 Author: Aihua Xu <aihu...@apache.org> Authored: Wed Dec 2 18:04:46 2015 -0500 Committer: Aihua Xu <aihu...@apache.org> Committed: Mon Dec 7 20:33:38 2015 -0500 ---------------------------------------------------------------------- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 4 +- .../ql/udf/generic/GenericUDAFFirstValue.java | 12 ++-- .../ql/udf/generic/GenericUDAFLastValue.java | 9 ++- .../hive/ql/udf/generic/GenericUDAFMax.java | 14 +++-- .../generic/GenericUDAFStreamingEvaluator.java | 28 ++++++---- .../hive/ql/udf/ptf/WindowingTableFunction.java | 18 +++--- .../clientpositive/windowing_windowspec4.q | 19 +++++++ .../clientpositive/windowing_windowspec4.q.out | 59 ++++++++++++++++++++ 8 files changed, 129 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c9221c80/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index dca3081..5803a9c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -11771,9 +11771,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { else { int amt = Integer.parseInt(amtNode.getText()); - if ( amt < 0 ) { + if ( amt <= 0 ) { throw new SemanticException( - "Window Frame Boundary Amount must be a +ve integer, amount provide is: " + amt); + "Window Frame Boundary Amount must be a positive integer, provided amount is: " + amt); } bs.setAmt(amt); } http://git-wip-us.apache.org/repos/asf/hive/blob/c9221c80/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java index 160ce91..7ad4eb9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java @@ -262,7 +262,7 @@ public class GenericUDAFFirstValue extends AbstractGenericUDAFResolver { } } - if (s.numRows >= wFrameDef.getEnd().getRelativeOffset()) { + if (s.hasResultReady()) { /* * if skipNulls is true and there are no rows in valueChain => all rows * in partition are null so far; so add null in o/p @@ -293,12 +293,14 @@ public class GenericUDAFFirstValue extends AbstractGenericUDAFResolver { // For the case: X following and Y following, process first Y-X results and then insert X nulls. // For the case X preceding and Y following, process Y results. for (int i = Math.max(0, wFrameDef.getStart().getRelativeOffset()); i < wFrameDef.getEnd().getRelativeOffset(); i++) { - s.results.add(r == null ? null : r.val); + if (s.hasResultReady()) { + s.results.add(r == null ? null : r.val); + } s.numRows++; if (r != null) { int fIdx = (Integer) r.idx; if (!wFrameDef.isStartUnbounded() - && s.numRows + i >= fIdx + wFrameDef.getWindowSize() + && s.numRows >= fIdx + wFrameDef.getWindowSize() && !s.valueChain.isEmpty()) { s.valueChain.removeFirst(); r = !s.valueChain.isEmpty() ? s.valueChain.getFirst() : r; @@ -307,7 +309,9 @@ public class GenericUDAFFirstValue extends AbstractGenericUDAFResolver { } for (int i = 0; i < wFrameDef.getStart().getRelativeOffset(); i++) { - s.results.add(null); + if (s.hasResultReady()) { + s.results.add(null); + } s.numRows++; } http://git-wip-us.apache.org/repos/asf/hive/blob/c9221c80/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java index f917621..4989a0b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java @@ -251,10 +251,15 @@ public class GenericUDAFLastValue extends AbstractGenericUDAFResolver { // For the case: X following and Y following, process first Y-X results and then insert X nulls. // For the case X preceding and Y following, process Y results. for (int i = Math.max(0, wFrameDef.getStart().getRelativeOffset()); i < wFrameDef.getEnd().getRelativeOffset(); i++) { - s.results.add(s.lastValue); + if (s.hasResultReady()) { + s.results.add(s.lastValue); + } + s.numRows++; } for (int i = 0; i < wFrameDef.getStart().getRelativeOffset(); i++) { - s.results.add(null); + if (s.hasResultReady()) { + s.results.add(null); + } s.numRows++; } http://git-wip-us.apache.org/repos/asf/hive/blob/c9221c80/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java index 98abd5c..43b23fa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java @@ -251,7 +251,7 @@ public class GenericUDAFMax extends AbstractGenericUDAFResolver { s.maxChain.addLast(new Object[] { o, s.numRows }); } - if (s.numRows >= wFrameDef.getEnd().getRelativeOffset()) { + if (s.hasResultReady()) { s.results.add(s.maxChain.getFirst()[0]); } s.numRows++; @@ -287,20 +287,24 @@ public class GenericUDAFMax extends AbstractGenericUDAFResolver { // For the case: X following and Y following, process first Y-X results and then insert X nulls. // For the case X preceding and Y following, process Y results. for (int i = Math.max(0, wFrameDef.getStart().getRelativeOffset()); i < wFrameDef.getEnd().getRelativeOffset(); i++) { - s.results.add(r == null ? null : r[0]); + if (s.hasResultReady()) { + s.results.add(r == null ? null : r[0]); + } s.numRows++; if (r != null) { int fIdx = (Integer) r[1]; if (!wFrameDef.isStartUnbounded() - && s.numRows + i >= fIdx + wFrameDef.getWindowSize() + && s.numRows >= fIdx + wFrameDef.getWindowSize() && !s.maxChain.isEmpty()) { s.maxChain.removeFirst(); - r = !s.maxChain.isEmpty() ? s.maxChain.getFirst() : r; + r = !s.maxChain.isEmpty() ? s.maxChain.getFirst() : null; } } } for (int i = 0; i < wFrameDef.getStart().getRelativeOffset(); i++) { - s.results.add(null); + if (s.hasResultReady()) { + s.results.add(null); + } s.numRows++; } http://git-wip-us.apache.org/repos/asf/hive/blob/c9221c80/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java index 3c76404..d2e1b26 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java @@ -56,6 +56,16 @@ public abstract class GenericUDAFStreamingEvaluator<T1> extends results.clear(); numRows = 0; } + + /** + * For the cases "X preceding and Y preceding" or the number of processed rows + * is more than the size of FOLLOWING window, we are able to generate a PTF result + * for a previous row. + * @return + */ + public boolean hasResultReady() { + return this.numRows >= wFrameDef.getEnd().getRelativeOffset(); + } } @Override @@ -141,16 +151,6 @@ public abstract class GenericUDAFStreamingEvaluator<T1> extends } /** - * For the cases "X preceding and Y preceding" or the number of processed rows - * is more than the size of FOLLOWING window, we are able to generate a PTF result - * for a previous row. - * @return - */ - public boolean hasResultReady() { - return this.numRows >= wFrameDef.getEnd().getRelativeOffset(); - } - - /** * Retrieve the next stored intermediate result, i.e., * Get S[x-1] in the computation of S[x..y] = S[y] - S[x-1]. */ @@ -206,11 +206,15 @@ public abstract class GenericUDAFStreamingEvaluator<T1> extends // For the case: X following and Y following, process first Y-X results and then insert X nulls. // For the case X preceding and Y following, process Y results. for (int i = Math.max(0, wFrameDef.getStart().getRelativeOffset()); i < wFrameDef.getEnd().getRelativeOffset(); i++) { - ss.results.add(getNextResult(ss)); + if (ss.hasResultReady()) { + ss.results.add(getNextResult(ss)); + } ss.numRows++; } for (int i = 0; i < wFrameDef.getStart().getRelativeOffset(); i++) { - ss.results.add(null); + if (ss.hasResultReady()) { + ss.results.add(null); + } ss.numRows++; } http://git-wip-us.apache.org/repos/asf/hive/blob/c9221c80/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java index 9d21103..2c076f50 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java @@ -496,18 +496,18 @@ public class WindowingTableFunction extends TableFunctionEvaluator { } } else { while (numRowsRemaining > 0) { - int rowToProcess = streamingState.rollingPart.size() - - numRowsRemaining; - Range rng = getRange(wFn, rowToProcess, streamingState.rollingPart, - streamingState.order); - PTFPartitionIterator<Object> rItr = rng.iterator(); - PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr); - Object out = evaluateWindowFunction(wFn, rItr); - streamingState.fnOutputs[i].add(out); + int rowToProcess = streamingState.rollingPart.size() - numRowsRemaining; + if (rowToProcess >= 0) { + Range rng = getRange(wFn, rowToProcess, streamingState.rollingPart, + streamingState.order); + PTFPartitionIterator<Object> rItr = rng.iterator(); + PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr); + Object out = evaluateWindowFunction(wFn, rItr); + streamingState.fnOutputs[i].add(out); + } numRowsRemaining--; } } - } List<Object> oRows = new ArrayList<Object>(); http://git-wip-us.apache.org/repos/asf/hive/blob/c9221c80/ql/src/test/queries/clientpositive/windowing_windowspec4.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/windowing_windowspec4.q b/ql/src/test/queries/clientpositive/windowing_windowspec4.q new file mode 100644 index 0000000..fcf0f25 --- /dev/null +++ b/ql/src/test/queries/clientpositive/windowing_windowspec4.q @@ -0,0 +1,19 @@ +--Test small dataset with larger windowing + +drop table if exists smalltable_windowing; + +create table smalltable_windowing( + i int, + type string); +insert into smalltable_windowing values(3, 'a'), (1, 'a'), (2, 'a'); + +select type, i, +max(i) over (partition by type order by i rows between 1 preceding and 7 following), +min(i) over (partition by type order by i rows between 1 preceding and 7 following), +first_value(i) over (partition by type order by i rows between 1 preceding and 7 following), +last_value(i) over (partition by type order by i rows between 1 preceding and 7 following), +avg(i) over (partition by type order by i rows between 1 preceding and 7 following), +sum(i) over (partition by type order by i rows between 1 preceding and 7 following), +collect_set(i) over (partition by type order by i rows between 1 preceding and 7 following), +count(i) over (partition by type order by i rows between 1 preceding and 7 following) +from smalltable_windowing; http://git-wip-us.apache.org/repos/asf/hive/blob/c9221c80/ql/src/test/results/clientpositive/windowing_windowspec4.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/windowing_windowspec4.q.out b/ql/src/test/results/clientpositive/windowing_windowspec4.q.out new file mode 100644 index 0000000..60e20ef --- /dev/null +++ b/ql/src/test/results/clientpositive/windowing_windowspec4.q.out @@ -0,0 +1,59 @@ +PREHOOK: query: --Test small dataset with larger windowing + +drop table if exists smalltable_windowing +PREHOOK: type: DROPTABLE +POSTHOOK: query: --Test small dataset with larger windowing + +drop table if exists smalltable_windowing +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table smalltable_windowing( + i int, + type string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@smalltable_windowing +POSTHOOK: query: create table smalltable_windowing( + i int, + type string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@smalltable_windowing +PREHOOK: query: insert into smalltable_windowing values(3, 'a'), (1, 'a'), (2, 'a') +PREHOOK: type: QUERY +PREHOOK: Input: default@values__tmp__table__1 +PREHOOK: Output: default@smalltable_windowing +POSTHOOK: query: insert into smalltable_windowing values(3, 'a'), (1, 'a'), (2, 'a') +POSTHOOK: type: QUERY +POSTHOOK: Input: default@values__tmp__table__1 +POSTHOOK: Output: default@smalltable_windowing +POSTHOOK: Lineage: smalltable_windowing.i EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: smalltable_windowing.type SIMPLE [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: select type, i, +max(i) over (partition by type order by i rows between 1 preceding and 7 following), +min(i) over (partition by type order by i rows between 1 preceding and 7 following), +first_value(i) over (partition by type order by i rows between 1 preceding and 7 following), +last_value(i) over (partition by type order by i rows between 1 preceding and 7 following), +avg(i) over (partition by type order by i rows between 1 preceding and 7 following), +sum(i) over (partition by type order by i rows between 1 preceding and 7 following), +collect_set(i) over (partition by type order by i rows between 1 preceding and 7 following), +count(i) over (partition by type order by i rows between 1 preceding and 7 following) +from smalltable_windowing +PREHOOK: type: QUERY +PREHOOK: Input: default@smalltable_windowing +#### A masked pattern was here #### +POSTHOOK: query: select type, i, +max(i) over (partition by type order by i rows between 1 preceding and 7 following), +min(i) over (partition by type order by i rows between 1 preceding and 7 following), +first_value(i) over (partition by type order by i rows between 1 preceding and 7 following), +last_value(i) over (partition by type order by i rows between 1 preceding and 7 following), +avg(i) over (partition by type order by i rows between 1 preceding and 7 following), +sum(i) over (partition by type order by i rows between 1 preceding and 7 following), +collect_set(i) over (partition by type order by i rows between 1 preceding and 7 following), +count(i) over (partition by type order by i rows between 1 preceding and 7 following) +from smalltable_windowing +POSTHOOK: type: QUERY +POSTHOOK: Input: default@smalltable_windowing +#### A masked pattern was here #### +a 1 3 1 1 3 2.0 6 [1,2,3] 3 +a 2 3 1 1 3 2.0 6 [1,2,3] 3 +a 3 3 2 2 3 2.5 5 [2,3] 2