Repository: hive
Updated Branches:
  refs/heads/branch-2.0 bfc25f42f -> c9221c806


HIVE-12574: windowing function returns incorrect result when the window size is 
larger than the partition size (reviewed by Jimmy Xiang)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c9221c80
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c9221c80
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c9221c80

Branch: refs/heads/branch-2.0
Commit: c9221c8068133608fc3502c009b56d4d3512e6e1
Parents: bfc25f4
Author: Aihua Xu <aihu...@apache.org>
Authored: Wed Dec 2 18:04:46 2015 -0500
Committer: Aihua Xu <aihu...@apache.org>
Committed: Mon Dec 7 20:33:38 2015 -0500

----------------------------------------------------------------------
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |  4 +-
 .../ql/udf/generic/GenericUDAFFirstValue.java   | 12 ++--
 .../ql/udf/generic/GenericUDAFLastValue.java    |  9 ++-
 .../hive/ql/udf/generic/GenericUDAFMax.java     | 14 +++--
 .../generic/GenericUDAFStreamingEvaluator.java  | 28 ++++++----
 .../hive/ql/udf/ptf/WindowingTableFunction.java | 18 +++---
 .../clientpositive/windowing_windowspec4.q      | 19 +++++++
 .../clientpositive/windowing_windowspec4.q.out  | 59 ++++++++++++++++++++
 8 files changed, 129 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/c9221c80/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index dca3081..5803a9c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -11771,9 +11771,9 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
       else
       {
         int amt = Integer.parseInt(amtNode.getText());
-        if ( amt < 0 ) {
+        if ( amt <= 0 ) {
           throw new SemanticException(
-              "Window Frame Boundary Amount must be a +ve integer, amount 
provide is: " + amt);
+              "Window Frame Boundary Amount must be a positive integer, 
provided amount is: " + amt);
         }
         bs.setAmt(amt);
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/c9221c80/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java
index 160ce91..7ad4eb9 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java
@@ -262,7 +262,7 @@ public class GenericUDAFFirstValue extends 
AbstractGenericUDAFResolver {
         }
       }
 
-      if (s.numRows >= wFrameDef.getEnd().getRelativeOffset()) {
+      if (s.hasResultReady()) {
         /*
          * if skipNulls is true and there are no rows in valueChain => all rows
          * in partition are null so far; so add null in o/p
@@ -293,12 +293,14 @@ public class GenericUDAFFirstValue extends 
AbstractGenericUDAFResolver {
       // For the case: X following and Y following, process first Y-X results 
and then insert X nulls.
       // For the case X preceding and Y following, process Y results.
       for (int i = Math.max(0, wFrameDef.getStart().getRelativeOffset()); i < 
wFrameDef.getEnd().getRelativeOffset(); i++) {
-        s.results.add(r == null ? null : r.val);
+        if (s.hasResultReady()) {
+          s.results.add(r == null ? null : r.val);
+        }
         s.numRows++;
         if (r != null) {
           int fIdx = (Integer) r.idx;
           if (!wFrameDef.isStartUnbounded()
-              && s.numRows + i >= fIdx + wFrameDef.getWindowSize()
+              && s.numRows >= fIdx + wFrameDef.getWindowSize()
               && !s.valueChain.isEmpty()) {
             s.valueChain.removeFirst();
             r = !s.valueChain.isEmpty() ? s.valueChain.getFirst() : r;
@@ -307,7 +309,9 @@ public class GenericUDAFFirstValue extends 
AbstractGenericUDAFResolver {
       }
 
       for (int i = 0; i < wFrameDef.getStart().getRelativeOffset(); i++) {
-        s.results.add(null);
+        if (s.hasResultReady()) {
+          s.results.add(null);
+        }
         s.numRows++;
       }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/c9221c80/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java
index f917621..4989a0b 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java
@@ -251,10 +251,15 @@ public class GenericUDAFLastValue extends 
AbstractGenericUDAFResolver {
       // For the case: X following and Y following, process first Y-X results 
and then insert X nulls.
       // For the case X preceding and Y following, process Y results.
       for (int i = Math.max(0, wFrameDef.getStart().getRelativeOffset()); i < 
wFrameDef.getEnd().getRelativeOffset(); i++) {
-        s.results.add(s.lastValue);
+        if (s.hasResultReady()) {
+          s.results.add(s.lastValue);
+        }
+        s.numRows++;
       }
       for (int i = 0; i < wFrameDef.getStart().getRelativeOffset(); i++) {
-        s.results.add(null);
+        if (s.hasResultReady()) {
+          s.results.add(null);
+        }
         s.numRows++;
       }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/c9221c80/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java
index 98abd5c..43b23fa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java
@@ -251,7 +251,7 @@ public class GenericUDAFMax extends 
AbstractGenericUDAFResolver {
         s.maxChain.addLast(new Object[] { o, s.numRows });
       }
 
-      if (s.numRows >= wFrameDef.getEnd().getRelativeOffset()) {
+      if (s.hasResultReady()) {
         s.results.add(s.maxChain.getFirst()[0]);
       }
       s.numRows++;
@@ -287,20 +287,24 @@ public class GenericUDAFMax extends 
AbstractGenericUDAFResolver {
       // For the case: X following and Y following, process first Y-X results 
and then insert X nulls.
       // For the case X preceding and Y following, process Y results.
       for (int i = Math.max(0, wFrameDef.getStart().getRelativeOffset()); i < 
wFrameDef.getEnd().getRelativeOffset(); i++) {
-        s.results.add(r == null ? null : r[0]);
+        if (s.hasResultReady()) {
+          s.results.add(r == null ? null : r[0]);
+        }
         s.numRows++;
         if (r != null) {
           int fIdx = (Integer) r[1];
           if (!wFrameDef.isStartUnbounded()
-              && s.numRows + i >= fIdx + wFrameDef.getWindowSize()
+              && s.numRows >= fIdx + wFrameDef.getWindowSize()
               && !s.maxChain.isEmpty()) {
             s.maxChain.removeFirst();
-            r = !s.maxChain.isEmpty() ? s.maxChain.getFirst() : r;
+            r = !s.maxChain.isEmpty() ? s.maxChain.getFirst() : null;
           }
         }
       }
       for (int i = 0; i < wFrameDef.getStart().getRelativeOffset(); i++) {
-        s.results.add(null);
+        if (s.hasResultReady()) {
+          s.results.add(null);
+        }
         s.numRows++;
       }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/c9221c80/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java
index 3c76404..d2e1b26 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java
@@ -56,6 +56,16 @@ public abstract class GenericUDAFStreamingEvaluator<T1> 
extends
       results.clear();
       numRows = 0;
     }
+
+    /**
+     * For the cases "X preceding and Y preceding" or the number of processed 
rows
+     * is more than the size of FOLLOWING window, we are able to generate a 
PTF result
+     * for a previous row.
+     * @return
+     */
+    public boolean hasResultReady() {
+      return this.numRows >= wFrameDef.getEnd().getRelativeOffset();
+    }
   }
 
   @Override
@@ -141,16 +151,6 @@ public abstract class GenericUDAFStreamingEvaluator<T1> 
extends
       }
 
       /**
-       * For the cases "X preceding and Y preceding" or the number of 
processed rows
-       * is more than the size of FOLLOWING window, we are able to generate a 
PTF result
-       * for a previous row.
-       * @return
-       */
-      public boolean hasResultReady() {
-        return this.numRows >= wFrameDef.getEnd().getRelativeOffset();
-      }
-
-      /**
        * Retrieve the next stored intermediate result, i.e.,
        * Get S[x-1] in the computation of S[x..y] = S[y] - S[x-1].
        */
@@ -206,11 +206,15 @@ public abstract class GenericUDAFStreamingEvaluator<T1> 
extends
       // For the case: X following and Y following, process first Y-X results 
and then insert X nulls.
       // For the case X preceding and Y following, process Y results.
       for (int i = Math.max(0, wFrameDef.getStart().getRelativeOffset()); i < 
wFrameDef.getEnd().getRelativeOffset(); i++) {
-        ss.results.add(getNextResult(ss));
+        if (ss.hasResultReady()) {
+          ss.results.add(getNextResult(ss));
+        }
         ss.numRows++;
       }
       for (int i = 0; i < wFrameDef.getStart().getRelativeOffset(); i++) {
-        ss.results.add(null);
+        if (ss.hasResultReady()) {
+          ss.results.add(null);
+        }
         ss.numRows++;
       }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/c9221c80/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
index 9d21103..2c076f50 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
@@ -496,18 +496,18 @@ public class WindowingTableFunction extends 
TableFunctionEvaluator {
         }
       } else {
         while (numRowsRemaining > 0) {
-          int rowToProcess = streamingState.rollingPart.size()
-              - numRowsRemaining;
-          Range rng = getRange(wFn, rowToProcess, streamingState.rollingPart,
-              streamingState.order);
-          PTFPartitionIterator<Object> rItr = rng.iterator();
-          PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr);
-          Object out = evaluateWindowFunction(wFn, rItr);
-          streamingState.fnOutputs[i].add(out);
+          int rowToProcess = streamingState.rollingPart.size() - 
numRowsRemaining;
+          if (rowToProcess >= 0) {
+            Range rng = getRange(wFn, rowToProcess, streamingState.rollingPart,
+                streamingState.order);
+            PTFPartitionIterator<Object> rItr = rng.iterator();
+            PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr);
+            Object out = evaluateWindowFunction(wFn, rItr);
+            streamingState.fnOutputs[i].add(out);
+          }
           numRowsRemaining--;
         }
       }
-
     }
 
     List<Object> oRows = new ArrayList<Object>();

http://git-wip-us.apache.org/repos/asf/hive/blob/c9221c80/ql/src/test/queries/clientpositive/windowing_windowspec4.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/windowing_windowspec4.q 
b/ql/src/test/queries/clientpositive/windowing_windowspec4.q
new file mode 100644
index 0000000..fcf0f25
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/windowing_windowspec4.q
@@ -0,0 +1,19 @@
+--Test small dataset with larger windowing
+
+drop table if exists smalltable_windowing;
+
+create table smalltable_windowing(
+      i int,
+      type string);
+insert into smalltable_windowing values(3, 'a'), (1, 'a'), (2, 'a');
+
+select type, i,
+max(i) over (partition by type order by i rows between 1 preceding and 7 
following),
+min(i) over (partition by type order by i rows between 1 preceding and 7 
following),
+first_value(i) over (partition by type order by i rows between 1 preceding and 
7 following),
+last_value(i) over (partition by type order by i rows between 1 preceding and 
7 following),
+avg(i) over (partition by type order by i rows between 1 preceding and 7 
following),
+sum(i) over (partition by type order by i rows between 1 preceding and 7 
following),
+collect_set(i) over (partition by type order by i rows between 1 preceding and 
7 following),
+count(i) over (partition by type order by i rows between 1 preceding and 7 
following)
+from smalltable_windowing;

http://git-wip-us.apache.org/repos/asf/hive/blob/c9221c80/ql/src/test/results/clientpositive/windowing_windowspec4.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/windowing_windowspec4.q.out 
b/ql/src/test/results/clientpositive/windowing_windowspec4.q.out
new file mode 100644
index 0000000..60e20ef
--- /dev/null
+++ b/ql/src/test/results/clientpositive/windowing_windowspec4.q.out
@@ -0,0 +1,59 @@
+PREHOOK: query: --Test small dataset with larger windowing
+
+drop table if exists smalltable_windowing
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: --Test small dataset with larger windowing
+
+drop table if exists smalltable_windowing
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create table smalltable_windowing(
+      i int,
+      type string)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@smalltable_windowing
+POSTHOOK: query: create table smalltable_windowing(
+      i int,
+      type string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@smalltable_windowing
+PREHOOK: query: insert into smalltable_windowing values(3, 'a'), (1, 'a'), (2, 
'a')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@values__tmp__table__1
+PREHOOK: Output: default@smalltable_windowing
+POSTHOOK: query: insert into smalltable_windowing values(3, 'a'), (1, 'a'), 
(2, 'a')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@values__tmp__table__1
+POSTHOOK: Output: default@smalltable_windowing
+POSTHOOK: Lineage: smalltable_windowing.i EXPRESSION 
[(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, 
type:string, comment:), ]
+POSTHOOK: Lineage: smalltable_windowing.type SIMPLE 
[(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, 
type:string, comment:), ]
+PREHOOK: query: select type, i,
+max(i) over (partition by type order by i rows between 1 preceding and 7 
following),
+min(i) over (partition by type order by i rows between 1 preceding and 7 
following),
+first_value(i) over (partition by type order by i rows between 1 preceding and 
7 following),
+last_value(i) over (partition by type order by i rows between 1 preceding and 
7 following),
+avg(i) over (partition by type order by i rows between 1 preceding and 7 
following),
+sum(i) over (partition by type order by i rows between 1 preceding and 7 
following),
+collect_set(i) over (partition by type order by i rows between 1 preceding and 
7 following),
+count(i) over (partition by type order by i rows between 1 preceding and 7 
following)
+from smalltable_windowing
+PREHOOK: type: QUERY
+PREHOOK: Input: default@smalltable_windowing
+#### A masked pattern was here ####
+POSTHOOK: query: select type, i,
+max(i) over (partition by type order by i rows between 1 preceding and 7 
following),
+min(i) over (partition by type order by i rows between 1 preceding and 7 
following),
+first_value(i) over (partition by type order by i rows between 1 preceding and 
7 following),
+last_value(i) over (partition by type order by i rows between 1 preceding and 
7 following),
+avg(i) over (partition by type order by i rows between 1 preceding and 7 
following),
+sum(i) over (partition by type order by i rows between 1 preceding and 7 
following),
+collect_set(i) over (partition by type order by i rows between 1 preceding and 
7 following),
+count(i) over (partition by type order by i rows between 1 preceding and 7 
following)
+from smalltable_windowing
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@smalltable_windowing
+#### A masked pattern was here ####
+a      1       3       1       1       3       2.0     6       [1,2,3] 3
+a      2       3       1       1       3       2.0     6       [1,2,3] 3
+a      3       3       2       2       3       2.5     5       [2,3]   2

Reply via email to