HIVE-20367: Vectorization: Support streaming for PTF AVG, MAX, MIN, SUM (Matt 
McCline, reviewed by Teddy Choi)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cc38bcc5
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cc38bcc5
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cc38bcc5

Branch: refs/heads/master
Commit: cc38bcc5a993304898ba37b8496f13a15d62bf16
Parents: 6a28265
Author: Matt McCline <[email protected]>
Authored: Fri Aug 24 09:30:42 2018 -0700
Committer: Matt McCline <[email protected]>
Committed: Fri Aug 24 09:30:42 2018 -0700

----------------------------------------------------------------------
 .../exec/vector/ptf/VectorPTFEvaluatorBase.java |  17 +-
 .../vector/ptf/VectorPTFEvaluatorCount.java     |   9 +-
 .../vector/ptf/VectorPTFEvaluatorCountStar.java |   9 +-
 .../ptf/VectorPTFEvaluatorDecimalAvg.java       |  22 +-
 .../VectorPTFEvaluatorDecimalFirstValue.java    |   4 +-
 .../ptf/VectorPTFEvaluatorDecimalLastValue.java |  12 +-
 .../ptf/VectorPTFEvaluatorDecimalMax.java       |  25 +-
 .../ptf/VectorPTFEvaluatorDecimalMin.java       |  23 +-
 .../ptf/VectorPTFEvaluatorDecimalSum.java       |   9 +-
 .../vector/ptf/VectorPTFEvaluatorDenseRank.java |  11 +-
 .../vector/ptf/VectorPTFEvaluatorDoubleAvg.java |  18 +-
 .../ptf/VectorPTFEvaluatorDoubleFirstValue.java |   4 +-
 .../ptf/VectorPTFEvaluatorDoubleLastValue.java  |  12 +-
 .../vector/ptf/VectorPTFEvaluatorDoubleMax.java |  13 +-
 .../vector/ptf/VectorPTFEvaluatorDoubleMin.java |  11 +-
 .../vector/ptf/VectorPTFEvaluatorDoubleSum.java |   9 +-
 .../vector/ptf/VectorPTFEvaluatorLongAvg.java   |  18 +-
 .../ptf/VectorPTFEvaluatorLongFirstValue.java   |   4 +-
 .../ptf/VectorPTFEvaluatorLongLastValue.java    |  12 +-
 .../vector/ptf/VectorPTFEvaluatorLongMax.java   |   9 +-
 .../vector/ptf/VectorPTFEvaluatorLongMin.java   |   9 +-
 .../vector/ptf/VectorPTFEvaluatorLongSum.java   |   9 +-
 .../exec/vector/ptf/VectorPTFEvaluatorRank.java |  13 +-
 .../vector/ptf/VectorPTFEvaluatorRowNumber.java |   5 +-
 .../VectorPTFEvaluatorStreamingDecimalAvg.java  | 185 +++++++++++++
 .../VectorPTFEvaluatorStreamingDecimalMax.java  | 163 +++++++++++
 .../VectorPTFEvaluatorStreamingDecimalMin.java  | 163 +++++++++++
 .../VectorPTFEvaluatorStreamingDecimalSum.java  | 154 +++++++++++
 .../VectorPTFEvaluatorStreamingDoubleAvg.java   | 174 ++++++++++++
 .../VectorPTFEvaluatorStreamingDoubleMax.java   | 164 ++++++++++++
 .../VectorPTFEvaluatorStreamingDoubleMin.java   | 166 ++++++++++++
 .../VectorPTFEvaluatorStreamingDoubleSum.java   | 152 +++++++++++
 .../ptf/VectorPTFEvaluatorStreamingLongAvg.java | 168 ++++++++++++
 .../ptf/VectorPTFEvaluatorStreamingLongMax.java | 164 ++++++++++++
 .../ptf/VectorPTFEvaluatorStreamingLongMin.java | 166 ++++++++++++
 .../ptf/VectorPTFEvaluatorStreamingLongSum.java | 154 +++++++++++
 .../exec/vector/ptf/VectorPTFGroupBatches.java  |  10 +-
 .../hive/ql/optimizer/physical/Vectorizer.java  |  31 ++-
 .../hadoop/hive/ql/plan/VectorPTFDesc.java      |  98 +++++--
 .../test/results/clientpositive/llap/ptf.q.out  |  12 +-
 .../llap/vector_ptf_part_simple.q.out           | 119 ++++++++-
 .../clientpositive/llap/vector_windowing.q.out  | 244 +++++++++++++++--
 .../llap/vector_windowing_expressions.q.out     |  77 +++++-
 .../llap/vector_windowing_order_null.q.out      |  82 +++++-
 .../llap/vector_windowing_windowspec.q.out      |  82 +++++-
 .../clientpositive/llap/vectorized_ptf.q.out    | 237 ++++++++++++++--
 .../clientpositive/perf/spark/query51.q.out     |   1 +
 .../clientpositive/perf/tez/query51.q.out       |  18 +-
 .../test/results/clientpositive/spark/ptf.q.out |   7 +
 .../clientpositive/spark/vectorized_ptf.q.out   | 267 +++++++++++++++++--
 50 files changed, 3342 insertions(+), 203 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorBase.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorBase.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorBase.java
index 785725c..437c319 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorBase.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorBase.java
@@ -73,14 +73,19 @@ public abstract class VectorPTFEvaluatorBase {
   }
 
   // Evaluate the aggregation over one of the group's batches.
-  public abstract void evaluateGroupBatch(VectorizedRowBatch batch, boolean 
isLastGroupBatch) throws HiveException;
+  public abstract void evaluateGroupBatch(VectorizedRowBatch batch)
+      throws HiveException;
 
-  // Returns true if the aggregation result will be streamed.
-  public boolean streamsResult() {
-    // Assume it is not streamjng by default.
-    return false;
+  // Do any work necessary after the last batch for a group has been 
processed.  Necessary
+  // for both streaming and non-streaming evaluators..
+  public void doLastBatchWork() {
+    // By default, do nothing.
   }
 
+  // Returns true if the aggregation result will be streamed.
+  // Otherwise, we must evaluate whole group before producing a result.
+  public abstract boolean streamsResult();
+
   public int getOutputColumnNum() {
     return outputColumnNum;
   }
@@ -88,7 +93,7 @@ public abstract class VectorPTFEvaluatorBase {
   // After processing all the group's batches with evaluateGroupBatch, is the 
non-streaming
   // aggregation result null?
   public boolean isGroupResultNull() {
-    return false;
+    throw new RuntimeException("Not implemented");
   }
 
   // What is the ColumnVector type of the aggregation result?

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCount.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCount.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCount.java
index 9409c80..77b9892 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCount.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCount.java
@@ -42,7 +42,8 @@ public class VectorPTFEvaluatorCount extends 
VectorPTFEvaluatorBase {
     resetEvaluator();
   }
 
-  public void evaluateGroupBatch(VectorizedRowBatch batch, boolean 
isLastGroupBatch)
+  @Override
+  public void evaluateGroupBatch(VectorizedRowBatch batch)
       throws HiveException {
 
     evaluateInputExpr(batch);
@@ -83,6 +84,12 @@ public class VectorPTFEvaluatorCount extends 
VectorPTFEvaluatorBase {
   }
 
   @Override
+  public boolean streamsResult() {
+    // We must evaluate whole group before producing a result.
+    return false;
+  }
+
+  @Override
   public boolean isGroupResultNull() {
     return false;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCountStar.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCountStar.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCountStar.java
index 9f9c04a..e44b614 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCountStar.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCountStar.java
@@ -38,7 +38,8 @@ public class VectorPTFEvaluatorCountStar extends 
VectorPTFEvaluatorBase {
     resetEvaluator();
   }
 
-  public void evaluateGroupBatch(VectorizedRowBatch batch, boolean 
isLastGroupBatch) {
+  @Override
+  public void evaluateGroupBatch(VectorizedRowBatch batch) {
     // No input expression for COUNT(*).
     // evaluateInputExpr(batch);
 
@@ -48,6 +49,12 @@ public class VectorPTFEvaluatorCountStar extends 
VectorPTFEvaluatorBase {
   }
 
   @Override
+  public boolean streamsResult() {
+    // We must evaluate whole group before producing a result.
+    return false;
+  }
+
+  @Override
   public boolean isGroupResultNull() {
     return false;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalAvg.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalAvg.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalAvg.java
index 4541843..85281c2 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalAvg.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalAvg.java
@@ -51,7 +51,8 @@ public class VectorPTFEvaluatorDecimalAvg extends 
VectorPTFEvaluatorBase {
     resetEvaluator();
   }
 
-  public void evaluateGroupBatch(VectorizedRowBatch batch, boolean 
isLastGroupBatch)
+  @Override
+  public void evaluateGroupBatch(VectorizedRowBatch batch)
       throws HiveException {
 
     evaluateInputExpr(batch);
@@ -124,17 +125,24 @@ public class VectorPTFEvaluatorDecimalAvg extends 
VectorPTFEvaluatorBase {
         }
       }
     }
+  }
 
-    if (isLastGroupBatch) {
-      if (!isGroupResultNull) {
-        avg.set(sum);
-        temp.setFromLong(nonNullGroupCount);
-        avg.mutateDivide(temp);
-      }
+  @Override
+  public void doLastBatchWork() {
+    if (!isGroupResultNull) {
+      avg.set(sum);
+      temp.setFromLong(nonNullGroupCount);
+      avg.mutateDivide(temp);
     }
   }
 
   @Override
+  public boolean streamsResult() {
+    // We must evaluate whole group before producing a result.
+    return false;
+  }
+
+  @Override
   public boolean isGroupResultNull() {
     return isGroupResultNull;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalFirstValue.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalFirstValue.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalFirstValue.java
index c36fb77..078e56a 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalFirstValue.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalFirstValue.java
@@ -48,7 +48,8 @@ public class VectorPTFEvaluatorDecimalFirstValue extends 
VectorPTFEvaluatorBase
     resetEvaluator();
   }
 
-  public void evaluateGroupBatch(VectorizedRowBatch batch, boolean 
isLastGroupBatch)
+  @Override
+  public void evaluateGroupBatch(VectorizedRowBatch batch)
       throws HiveException {
 
     evaluateInputExpr(batch);
@@ -98,6 +99,7 @@ public class VectorPTFEvaluatorDecimalFirstValue extends 
VectorPTFEvaluatorBase
     }
   }
 
+  @Override
   public boolean streamsResult() {
     return true;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalLastValue.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalLastValue.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalLastValue.java
index 380ce60..6f97111 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalLastValue.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalLastValue.java
@@ -47,7 +47,8 @@ public class VectorPTFEvaluatorDecimalLastValue extends 
VectorPTFEvaluatorBase {
     resetEvaluator();
   }
 
-  public void evaluateGroupBatch(VectorizedRowBatch batch, boolean 
isLastGroupBatch)
+  @Override
+  public void evaluateGroupBatch(VectorizedRowBatch batch)
       throws HiveException {
 
     evaluateInputExpr(batch);
@@ -57,9 +58,6 @@ public class VectorPTFEvaluatorDecimalLastValue extends 
VectorPTFEvaluatorBase {
     // We do not filter when PTF is in reducer.
     Preconditions.checkState(!batch.selectedInUse);
 
-    if (!isLastGroupBatch) {
-      return;
-    }
     final int size = batch.size;
     if (size == 0) {
       return;
@@ -88,6 +86,12 @@ public class VectorPTFEvaluatorDecimalLastValue extends 
VectorPTFEvaluatorBase {
   }
 
   @Override
+  public boolean streamsResult() {
+    // We must evaluate whole group before producing a result.
+    return false;
+  }
+
+  @Override
   public boolean isGroupResultNull() {
     return isGroupResultNull;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMax.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMax.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMax.java
index 46ee261..f66deb6 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMax.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMax.java
@@ -44,7 +44,8 @@ public class VectorPTFEvaluatorDecimalMax extends 
VectorPTFEvaluatorBase {
     resetEvaluator();
   }
 
-  public void evaluateGroupBatch(VectorizedRowBatch batch, boolean 
isLastGroupBatch)
+  @Override
+  public void evaluateGroupBatch(VectorizedRowBatch batch)
       throws HiveException {
 
     evaluateInputExpr(batch);
@@ -97,15 +98,15 @@ public class VectorPTFEvaluatorDecimalMax extends 
VectorPTFEvaluatorBase {
           return;
         }
       }
+
       HiveDecimalWritable[] vector = decimalColVector.vector;
+
+      final HiveDecimalWritable firstValue = vector[i++];
       if (isGroupResultNull) {
-        max.set(vector[i++]);
+        max.set(firstValue);
         isGroupResultNull = false;
-      } else {
-        final HiveDecimalWritable dec = vector[i++];
-        if (dec.compareTo(max) == 1) {
-          max.set(dec);
-        }
+      } else if (firstValue.compareTo(max) == 1) {
+        max.set(firstValue);
       }
       for (; i < size; i++) {
         if (!batchIsNull[i]) {
@@ -119,6 +120,12 @@ public class VectorPTFEvaluatorDecimalMax extends 
VectorPTFEvaluatorBase {
   }
 
   @Override
+  public boolean streamsResult() {
+    // We must evaluate whole group before producing a result.
+    return false;
+  }
+
+  @Override
   public boolean isGroupResultNull() {
     return isGroupResultNull;
   }
@@ -133,11 +140,9 @@ public class VectorPTFEvaluatorDecimalMax extends 
VectorPTFEvaluatorBase {
     return max;
   }
 
-  private static HiveDecimal MIN_VALUE = 
HiveDecimal.create("-99999999999999999999999999999999999999");
-
   @Override
   public void resetEvaluator() {
     isGroupResultNull = true;
-    max.set(MIN_VALUE);
+    max.setFromLong(0);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMin.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMin.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMin.java
index f881356..9f5a89a 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMin.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMin.java
@@ -44,7 +44,7 @@ public class VectorPTFEvaluatorDecimalMin extends 
VectorPTFEvaluatorBase {
     resetEvaluator();
   }
 
-  public void evaluateGroupBatch(VectorizedRowBatch batch, boolean 
isLastGroupBatch)
+  public void evaluateGroupBatch(VectorizedRowBatch batch)
       throws HiveException {
 
     evaluateInputExpr(batch);
@@ -98,14 +98,13 @@ public class VectorPTFEvaluatorDecimalMin extends 
VectorPTFEvaluatorBase {
         }
       }
       HiveDecimalWritable[] vector = decimalColVector.vector;
+
+      final HiveDecimalWritable firstValue = vector[i++];
       if (isGroupResultNull) {
-        min.set(vector[i++]);
+        min.set(firstValue);
         isGroupResultNull = false;
-      } else {
-        final HiveDecimalWritable dec = vector[i++];
-        if (dec.compareTo(min) == -1) {
-          min.set(dec);
-        }
+      } else if (firstValue.compareTo(min) == -1) {
+        min.set(firstValue);
       }
       for (; i < size; i++) {
         if (!batchIsNull[i]) {
@@ -119,6 +118,12 @@ public class VectorPTFEvaluatorDecimalMin extends 
VectorPTFEvaluatorBase {
   }
 
   @Override
+  public boolean streamsResult() {
+    // We must evaluate whole group before producing a result.
+    return false;
+  }
+
+  @Override
   public boolean isGroupResultNull() {
     return isGroupResultNull;
   }
@@ -133,11 +138,9 @@ public class VectorPTFEvaluatorDecimalMin extends 
VectorPTFEvaluatorBase {
     return min;
   }
 
-  private static HiveDecimal MAX_VALUE = 
HiveDecimal.create("99999999999999999999999999999999999999");
-
   @Override
   public void resetEvaluator() {
     isGroupResultNull = true;
-    min.set(MAX_VALUE);
+    min.setFromLong(0);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalSum.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalSum.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalSum.java
index 4b31dc4..93d8ed5 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalSum.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalSum.java
@@ -46,7 +46,8 @@ public class VectorPTFEvaluatorDecimalSum extends 
VectorPTFEvaluatorBase {
     resetEvaluator();
   }
 
-  public void evaluateGroupBatch(VectorizedRowBatch batch, boolean 
isLastGroupBatch)
+  @Override
+  public void evaluateGroupBatch(VectorizedRowBatch batch)
       throws HiveException {
 
     evaluateInputExpr(batch);
@@ -115,6 +116,12 @@ public class VectorPTFEvaluatorDecimalSum extends 
VectorPTFEvaluatorBase {
   }
 
   @Override
+  public boolean streamsResult() {
+    // We must evaluate whole group before producing a result.
+    return false;
+  }
+
+  @Override
   public boolean isGroupResultNull() {
     return isGroupResultNull;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDenseRank.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDenseRank.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDenseRank.java
index 5025171..cb6b586 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDenseRank.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDenseRank.java
@@ -41,7 +41,8 @@ public class VectorPTFEvaluatorDenseRank extends 
VectorPTFEvaluatorBase {
     resetEvaluator();
   }
 
-  public void evaluateGroupBatch(VectorizedRowBatch batch, boolean 
isLastGroupBatch)
+  @Override
+  public void evaluateGroupBatch(VectorizedRowBatch batch)
       throws HiveException {
 
     evaluateInputExpr(batch);
@@ -50,12 +51,14 @@ public class VectorPTFEvaluatorDenseRank extends 
VectorPTFEvaluatorBase {
     longColVector.isRepeating = true;
     longColVector.isNull[0] = false;
     longColVector.vector[0] = denseRank;
+  }
 
-    if (isLastGroupBatch) {
-      denseRank++;
-    }
+  @Override
+  public void doLastBatchWork() {
+    denseRank++;
   }
 
+  @Override
   public boolean streamsResult() {
     // No group value.
     return true;

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleAvg.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleAvg.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleAvg.java
index 224177a..e20a562 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleAvg.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleAvg.java
@@ -45,7 +45,8 @@ public class VectorPTFEvaluatorDoubleAvg extends 
VectorPTFEvaluatorBase {
     resetEvaluator();
   }
 
-  public void evaluateGroupBatch(VectorizedRowBatch batch, boolean 
isLastGroupBatch)
+  @Override
+  public void evaluateGroupBatch(VectorizedRowBatch batch)
       throws HiveException {
 
     evaluateInputExpr(batch);
@@ -117,15 +118,22 @@ public class VectorPTFEvaluatorDoubleAvg extends 
VectorPTFEvaluatorBase {
         sum += varSum;
       }
     }
+  }
 
-    if (isLastGroupBatch) {
-      if (!isGroupResultNull) {
-        avg = sum / nonNullGroupCount;
-      }
+  @Override
+  public void doLastBatchWork() {
+    if (!isGroupResultNull) {
+      avg = sum / nonNullGroupCount;
     }
   }
 
   @Override
+  public boolean streamsResult() {
+    // We must evaluate whole group before producing a result.
+    return false;
+  }
+
+  @Override
   public boolean isGroupResultNull() {
     return isGroupResultNull;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleFirstValue.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleFirstValue.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleFirstValue.java
index d20d10c..26bd083 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleFirstValue.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleFirstValue.java
@@ -45,7 +45,8 @@ public class VectorPTFEvaluatorDoubleFirstValue extends 
VectorPTFEvaluatorBase {
     resetEvaluator();
   }
 
-  public void evaluateGroupBatch(VectorizedRowBatch batch, boolean 
isLastGroupBatch)
+  @Override
+  public void evaluateGroupBatch(VectorizedRowBatch batch)
       throws HiveException {
 
     evaluateInputExpr(batch);
@@ -95,6 +96,7 @@ public class VectorPTFEvaluatorDoubleFirstValue extends 
VectorPTFEvaluatorBase {
     }
   }
 
+  @Override
   public boolean streamsResult() {
     return true;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleLastValue.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleLastValue.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleLastValue.java
index 83a8e33..9986e9a 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleLastValue.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleLastValue.java
@@ -43,7 +43,8 @@ public class VectorPTFEvaluatorDoubleLastValue extends 
VectorPTFEvaluatorBase {
     resetEvaluator();
   }
 
-  public void evaluateGroupBatch(VectorizedRowBatch batch, boolean 
isLastGroupBatch)
+  @Override
+  public void evaluateGroupBatch(VectorizedRowBatch batch)
     throws HiveException {
 
     evaluateInputExpr(batch);
@@ -53,9 +54,6 @@ public class VectorPTFEvaluatorDoubleLastValue extends 
VectorPTFEvaluatorBase {
     // We do not filter when PTF is in reducer.
     Preconditions.checkState(!batch.selectedInUse);
 
-    if (!isLastGroupBatch) {
-      return;
-    }
     final int size = batch.size;
     if (size == 0) {
       return;
@@ -84,6 +82,12 @@ public class VectorPTFEvaluatorDoubleLastValue extends 
VectorPTFEvaluatorBase {
   }
 
   @Override
+  public boolean streamsResult() {
+    // We must evaluate whole group before producing a result.
+    return false;
+  }
+
+  @Override
   public boolean isGroupResultNull() {
     return isGroupResultNull;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleMax.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleMax.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleMax.java
index 50280d9..8c8e8ad 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleMax.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleMax.java
@@ -41,7 +41,8 @@ public class VectorPTFEvaluatorDoubleMax extends 
VectorPTFEvaluatorBase {
     resetEvaluator();
   }
 
-  public void evaluateGroupBatch(VectorizedRowBatch batch, boolean 
isLastGroupBatch)
+  @Override
+  public void evaluateGroupBatch(VectorizedRowBatch batch)
       throws HiveException {
 
     evaluateInputExpr(batch);
@@ -64,7 +65,7 @@ public class VectorPTFEvaluatorDoubleMax extends 
VectorPTFEvaluatorBase {
           isGroupResultNull = false;
         } else {
           final double repeatedMax = doubleColVector.vector[0];
-          if (repeatedMax < max) {
+          if (repeatedMax > max) {
             max = repeatedMax;
           }
         }
@@ -112,6 +113,12 @@ public class VectorPTFEvaluatorDoubleMax extends 
VectorPTFEvaluatorBase {
   }
 
   @Override
+  public boolean streamsResult() {
+    // We must evaluate whole group before producing a result.
+    return false;
+  }
+
+  @Override
   public boolean isGroupResultNull() {
     return isGroupResultNull;
   }
@@ -129,6 +136,6 @@ public class VectorPTFEvaluatorDoubleMax extends 
VectorPTFEvaluatorBase {
   @Override
   public void resetEvaluator() {
     isGroupResultNull = true;
-    max = Double.MIN_VALUE;
+    max = 0.0;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleMin.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleMin.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleMin.java
index 24788af..87d8757 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleMin.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleMin.java
@@ -41,7 +41,8 @@ public class VectorPTFEvaluatorDoubleMin extends 
VectorPTFEvaluatorBase {
     resetEvaluator();
   }
 
-  public void evaluateGroupBatch(VectorizedRowBatch batch, boolean 
isLastGroupBatch)
+  @Override
+  public void evaluateGroupBatch(VectorizedRowBatch batch)
       throws HiveException {
 
     evaluateInputExpr(batch);
@@ -112,6 +113,12 @@ public class VectorPTFEvaluatorDoubleMin extends 
VectorPTFEvaluatorBase {
   }
 
   @Override
+  public boolean streamsResult() {
+    // We must evaluate whole group before producing a result.
+    return false;
+  }
+
+  @Override
   public boolean isGroupResultNull() {
     return isGroupResultNull;
   }
@@ -129,6 +136,6 @@ public class VectorPTFEvaluatorDoubleMin extends 
VectorPTFEvaluatorBase {
   @Override
   public void resetEvaluator() {
     isGroupResultNull = true;
-    min = Double.MAX_VALUE;
+    min = 0.0;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleSum.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleSum.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleSum.java
index 902d81e..85a77c2 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleSum.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleSum.java
@@ -41,7 +41,8 @@ public class VectorPTFEvaluatorDoubleSum extends 
VectorPTFEvaluatorBase {
     resetEvaluator();
   }
 
-  public void evaluateGroupBatch(VectorizedRowBatch batch, boolean 
isLastGroupBatch)
+  @Override
+  public void evaluateGroupBatch(VectorizedRowBatch batch)
       throws HiveException {
 
     evaluateInputExpr(batch);
@@ -109,6 +110,12 @@ public class VectorPTFEvaluatorDoubleSum extends 
VectorPTFEvaluatorBase {
   }
 
   @Override
+  public boolean streamsResult() {
+    // We must evaluate whole group before producing a result.
+    return false;
+  }
+
+  @Override
   public boolean isGroupResultNull() {
     return isGroupResultNull;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongAvg.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongAvg.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongAvg.java
index e2d1768..4b525bf 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongAvg.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongAvg.java
@@ -45,7 +45,8 @@ public class VectorPTFEvaluatorLongAvg extends 
VectorPTFEvaluatorBase {
     resetEvaluator();
   }
 
-  public void evaluateGroupBatch(VectorizedRowBatch batch, boolean 
isLastGroupBatch)
+  @Override
+  public void evaluateGroupBatch(VectorizedRowBatch batch)
       throws HiveException {
 
     evaluateInputExpr(batch);
@@ -117,15 +118,22 @@ public class VectorPTFEvaluatorLongAvg extends 
VectorPTFEvaluatorBase {
         sum += varSum;
       }
     }
+  }
 
-    if (isLastGroupBatch) {
-      if (!isGroupResultNull) {
-        avg = ((double) sum) / nonNullGroupCount;
-      }
+  @Override
+  public void doLastBatchWork() {
+    if (!isGroupResultNull) {
+      avg = ((double) sum) / nonNullGroupCount;
     }
   }
 
   @Override
+  public boolean streamsResult() {
+    // We must evaluate whole group before producing a result.
+    return false;
+  }
+
+  @Override
   public boolean isGroupResultNull() {
     return isGroupResultNull;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongFirstValue.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongFirstValue.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongFirstValue.java
index 37323fe..fa497ee 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongFirstValue.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongFirstValue.java
@@ -45,7 +45,8 @@ public class VectorPTFEvaluatorLongFirstValue extends 
VectorPTFEvaluatorBase {
     resetEvaluator();
   }
 
-  public void evaluateGroupBatch(VectorizedRowBatch batch, boolean 
isLastGroupBatch)
+  @Override
+  public void evaluateGroupBatch(VectorizedRowBatch batch)
       throws HiveException {
 
     evaluateInputExpr(batch);
@@ -95,6 +96,7 @@ public class VectorPTFEvaluatorLongFirstValue extends 
VectorPTFEvaluatorBase {
     }
   }
 
+  @Override
   public boolean streamsResult() {
     return true;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongLastValue.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongLastValue.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongLastValue.java
index 925841b..fe768cc 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongLastValue.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongLastValue.java
@@ -44,7 +44,8 @@ public class VectorPTFEvaluatorLongLastValue extends 
VectorPTFEvaluatorBase {
     resetEvaluator();
   }
 
-  public void evaluateGroupBatch(VectorizedRowBatch batch, boolean 
isLastGroupBatch)
+  @Override
+  public void evaluateGroupBatch(VectorizedRowBatch batch)
       throws HiveException {
 
     evaluateInputExpr(batch);
@@ -54,9 +55,6 @@ public class VectorPTFEvaluatorLongLastValue extends 
VectorPTFEvaluatorBase {
     // We do not filter when PTF is in reducer.
     Preconditions.checkState(!batch.selectedInUse);
 
-    if (!isLastGroupBatch) {
-      return;
-    }
     final int size = batch.size;
     if (size == 0) {
       return;
@@ -85,6 +83,12 @@ public class VectorPTFEvaluatorLongLastValue extends 
VectorPTFEvaluatorBase {
   }
 
   @Override
+  public boolean streamsResult() {
+    // We must evaluate whole group before producing a result.
+    return false;
+  }
+
+  @Override
   public boolean isGroupResultNull() {
     return isGroupResultNull;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongMax.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongMax.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongMax.java
index 638f1b7..87a6431 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongMax.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongMax.java
@@ -41,7 +41,8 @@ public class VectorPTFEvaluatorLongMax extends 
VectorPTFEvaluatorBase {
     resetEvaluator();
   }
 
-  public void evaluateGroupBatch(VectorizedRowBatch batch, boolean 
isLastGroupBatch)
+  @Override
+  public void evaluateGroupBatch(VectorizedRowBatch batch)
       throws HiveException {
 
     evaluateInputExpr(batch);
@@ -112,6 +113,12 @@ public class VectorPTFEvaluatorLongMax extends 
VectorPTFEvaluatorBase {
   }
 
   @Override
+  public boolean streamsResult() {
+    // We must evaluate whole group before producing a result.
+    return false;
+  }
+
+  @Override
   public boolean isGroupResultNull() {
     return isGroupResultNull;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongMin.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongMin.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongMin.java
index 6238a03..9192b5b 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongMin.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongMin.java
@@ -41,7 +41,8 @@ public class VectorPTFEvaluatorLongMin extends 
VectorPTFEvaluatorBase {
     resetEvaluator();
   }
 
-  public void evaluateGroupBatch(VectorizedRowBatch batch, boolean 
isLastGroupBatch)
+  @Override
+  public void evaluateGroupBatch(VectorizedRowBatch batch)
       throws HiveException {
 
     evaluateInputExpr(batch);
@@ -112,6 +113,12 @@ public class VectorPTFEvaluatorLongMin extends 
VectorPTFEvaluatorBase {
   }
 
   @Override
+  public boolean streamsResult() {
+    // We must evaluate whole group before producing a result.
+    return false;
+  }
+
+  @Override
   public boolean isGroupResultNull() {
     return isGroupResultNull;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongSum.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongSum.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongSum.java
index afd3952..8c67d24 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongSum.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongSum.java
@@ -41,7 +41,8 @@ public class VectorPTFEvaluatorLongSum extends 
VectorPTFEvaluatorBase {
     resetEvaluator();
   }
 
-  public void evaluateGroupBatch(VectorizedRowBatch batch, boolean 
isLastGroupBatch)
+  @Override
+  public void evaluateGroupBatch(VectorizedRowBatch batch)
       throws HiveException {
 
     evaluateInputExpr(batch);
@@ -109,6 +110,12 @@ public class VectorPTFEvaluatorLongSum extends 
VectorPTFEvaluatorBase {
   }
 
   @Override
+  public boolean streamsResult() {
+    // We must evaluate whole group before producing a result.
+    return false;
+  }
+
+  @Override
   public boolean isGroupResultNull() {
     return isGroupResultNull;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorRank.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorRank.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorRank.java
index 9cbc816..d20c60c 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorRank.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorRank.java
@@ -42,7 +42,8 @@ public class VectorPTFEvaluatorRank extends 
VectorPTFEvaluatorBase {
     resetEvaluator();
   }
 
-  public void evaluateGroupBatch(VectorizedRowBatch batch, boolean 
isLastGroupBatch)
+  @Override
+  public void evaluateGroupBatch(VectorizedRowBatch batch)
       throws HiveException {
 
     evaluateInputExpr(batch);
@@ -56,13 +57,15 @@ public class VectorPTFEvaluatorRank extends 
VectorPTFEvaluatorBase {
     longColVector.isNull[0] = false;
     longColVector.vector[0] = rank;
     groupCount += batch.size;
+  }
 
-    if (isLastGroupBatch) {
-      rank += groupCount;
-      groupCount = 0;
-    }
+  @Override
+  public void doLastBatchWork() {
+    rank += groupCount;
+    groupCount = 0;
   }
 
+  @Override
   public boolean streamsResult() {
     // No group value.
     return true;

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorRowNumber.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorRowNumber.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorRowNumber.java
index 94de1d7..384541c 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorRowNumber.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorRowNumber.java
@@ -40,7 +40,8 @@ public class VectorPTFEvaluatorRowNumber extends 
VectorPTFEvaluatorBase {
     resetEvaluator();
   }
 
-  public void evaluateGroupBatch(VectorizedRowBatch batch, boolean 
isLastGroupBatch)
+  @Override
+  public void evaluateGroupBatch(VectorizedRowBatch batch)
       throws HiveException {
 
     evaluateInputExpr(batch);
@@ -53,11 +54,13 @@ public class VectorPTFEvaluatorRowNumber extends 
VectorPTFEvaluatorBase {
     }
   }
 
+  @Override
   public boolean streamsResult() {
     // No group value.
     return true;
   }
 
+  @Override
   public boolean isGroupResultNull() {
     return false;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDecimalAvg.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDecimalAvg.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDecimalAvg.java
new file mode 100644
index 0000000..e51d1fc
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDecimalAvg.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.ptf;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class evaluates streaming HiveDecimal avg() for a PTF group.
+ *
+ * Stream average non-null column values and output sum / non-null count as we 
go along.
+ */
+public class VectorPTFEvaluatorStreamingDecimalAvg extends 
VectorPTFEvaluatorBase {
+
+  protected boolean isNull;
+  protected HiveDecimalWritable sum;
+  private int nonNullGroupCount;
+  private HiveDecimalWritable temp;
+  private HiveDecimalWritable avg;
+
+  public VectorPTFEvaluatorStreamingDecimalAvg(WindowFrameDef windowFrameDef, 
VectorExpression inputVecExpr,
+      int outputColumnNum) {
+    super(windowFrameDef, inputVecExpr, outputColumnNum);
+    sum = new HiveDecimalWritable();
+    temp = new HiveDecimalWritable();
+    avg = new HiveDecimalWritable();
+    resetEvaluator();
+  }
+
+  @Override
+  public void evaluateGroupBatch(VectorizedRowBatch batch)
+      throws HiveException {
+
+    evaluateInputExpr(batch);
+
+    // Sum all non-null decimal column values for avg; maintain 
isGroupResultNull; after last row of
+    // last group batch compute the group avg when sum is non-null.
+
+    // We do not filter when PTF is in reducer.
+    Preconditions.checkState(!batch.selectedInUse);
+
+    final int size = batch.size;
+    if (size == 0) {
+      return;
+    }
+    DecimalColumnVector decimalColVector = ((DecimalColumnVector) 
batch.cols[inputColumnNum]);
+
+    DecimalColumnVector outputColVector = (DecimalColumnVector) 
batch.cols[outputColumnNum];
+
+    if (decimalColVector.isRepeating) {
+
+      if (decimalColVector.noNulls || !decimalColVector.isNull[0]) {
+
+        // We have a repeated value.
+        isNull = false;
+        HiveDecimalWritable repeatedValue = decimalColVector.vector[0];
+
+        for (int i = 0; i < size; i++) {
+          sum.mutateAdd(repeatedValue);
+          nonNullGroupCount++;
+
+          // Output row i AVG.
+          avg.set(sum);
+          temp.setFromLong(nonNullGroupCount);
+          avg.mutateDivide(temp);
+          outputColVector.set(i, avg);
+        }
+      } else {
+        if (isNull) {
+          outputColVector.isNull[0] = true;
+          outputColVector.noNulls = false;
+        } else {
+
+          // Continue previous AVG.
+          outputColVector.set(0, avg);
+        }
+        outputColVector.isRepeating = true;
+      }
+    } else if (decimalColVector.noNulls) {
+      isNull = false;
+      HiveDecimalWritable[] vector = decimalColVector.vector;
+      for (int i = 0; i < size; i++) {
+        sum.mutateAdd(vector[i]);
+        nonNullGroupCount++;
+
+        // Output row i AVG.
+        avg.set(sum);
+        temp.setFromLong(nonNullGroupCount);
+        avg.mutateDivide(temp);
+        outputColVector.set(i, avg);
+      }
+    } else {
+      boolean[] batchIsNull = decimalColVector.isNull;
+      int i = 0;
+      while (batchIsNull[i]) {
+        if (isNull) {
+          outputColVector.isNull[i] = true;
+          outputColVector.noNulls = false;
+        } else {
+
+          // Continue previous AVG.
+          outputColVector.set(i, avg);
+        }
+        if (++i >= size) {
+          return;
+        }
+      }
+
+      isNull = false;
+      HiveDecimalWritable[] vector = decimalColVector.vector;
+
+      sum.mutateAdd(vector[i]);
+      nonNullGroupCount++;
+
+      // Output row i AVG.
+      avg.set(sum);
+      temp.setFromLong(nonNullGroupCount);
+      avg.mutateDivide(temp);
+
+      outputColVector.set(i++, avg);
+
+      for (; i < size; i++) {
+        if (!batchIsNull[i]) {
+          sum.mutateAdd(vector[i]);
+          nonNullGroupCount++;
+
+          avg.set(sum);
+          temp.setFromLong(nonNullGroupCount);
+          avg.mutateDivide(temp);
+
+          // Output row i AVG.
+          outputColVector.set(i, avg);
+        } else {
+
+          // Continue previous AVG.
+          outputColVector.set(i, avg);
+        }
+      }
+    }
+  }
+
+  @Override
+  public boolean streamsResult() {
+    // No group value.
+    return true;
+  }
+
+  @Override
+  public Type getResultColumnVectorType() {
+    return Type.DECIMAL;
+  }
+
+  @Override
+  public void resetEvaluator() {
+    isNull = true;
+    sum.set(HiveDecimal.ZERO);
+    nonNullGroupCount = 0;
+    avg.set(HiveDecimal.ZERO);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDecimalMax.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDecimalMax.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDecimalMax.java
new file mode 100644
index 0000000..9357242
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDecimalMax.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.ptf;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class evaluates HiveDecimal max() for a PTF group.
+ */
+public class VectorPTFEvaluatorStreamingDecimalMax extends 
VectorPTFEvaluatorBase {
+
+  protected boolean isNull;
+  protected HiveDecimalWritable max;
+
+  public VectorPTFEvaluatorStreamingDecimalMax(WindowFrameDef windowFrameDef, 
VectorExpression inputVecExpr,
+      int outputColumnNum) {
+    super(windowFrameDef, inputVecExpr, outputColumnNum);
+    max = new HiveDecimalWritable();
+    resetEvaluator();
+  }
+
+  @Override
+  public void evaluateGroupBatch(VectorizedRowBatch batch)
+      throws HiveException {
+
+    evaluateInputExpr(batch);
+
+    // Determine maximum of all non-null decimal column values; maintain 
isNull.
+
+    // We do not filter when PTF is in reducer.
+    Preconditions.checkState(!batch.selectedInUse);
+
+    final int size = batch.size;
+    if (size == 0) {
+      return;
+    }
+    DecimalColumnVector decimalColVector = ((DecimalColumnVector) 
batch.cols[inputColumnNum]);
+
+    DecimalColumnVector outputColVector = (DecimalColumnVector) 
batch.cols[outputColumnNum];
+
+    if (decimalColVector.isRepeating) {
+
+      if (decimalColVector.noNulls || !decimalColVector.isNull[0]) {
+
+        HiveDecimalWritable repeatedMax = decimalColVector.vector[0];
+        if (isNull) {
+          max.set(repeatedMax);
+          isNull = false;
+        } else if (repeatedMax.compareTo(max) == 1) {
+          max.set(repeatedMax);
+        }
+        outputColVector.set(0, max);
+      } else if (isNull) {
+        outputColVector.isNull[0] = true;
+        outputColVector.noNulls = false;
+      } else {
+
+        // Continue previous MAX.
+        outputColVector.set(0, max);
+      }
+      outputColVector.isRepeating = true;
+    } else if (decimalColVector.noNulls) {
+      HiveDecimalWritable[] vector = decimalColVector.vector;
+      for (int i = 0; i < size; i++) {
+        final HiveDecimalWritable value = vector[i];
+        if (isNull) {
+          max.set(value);
+          isNull = false;
+        } else if (value.compareTo(max) == 1) {
+          max.set(value);
+        }
+        outputColVector.set(i, max);
+      }
+    } else {
+      boolean[] batchIsNull = decimalColVector.isNull;
+      int i = 0;
+      while (batchIsNull[i]) {
+        if (isNull) {
+          outputColVector.isNull[i] = true;
+          outputColVector.noNulls = false;
+        } else {
+
+          // Continue previous MAX.
+          outputColVector.set(i, max);
+        }
+        if (++i >= size) {
+          return;
+        }
+      }
+
+      HiveDecimalWritable[] vector = decimalColVector.vector;
+
+      final HiveDecimalWritable firstValue = vector[i];
+      if (isNull) {
+        max.set(firstValue);
+        isNull = false;
+      } else if (firstValue.compareTo(max) == 1) {
+        max.set(firstValue);
+      }
+
+      outputColVector.set(i++, max);
+
+      for (; i < size; i++) {
+        if (!batchIsNull[i]) {
+          final HiveDecimalWritable value = vector[i];
+          if (isNull) {
+            max.set(value);
+            isNull = false;
+          } else if (value.compareTo(max) == 1) {
+            max.set(value);
+          }
+          outputColVector.set(i, max);
+        } else {
+
+          // Continue previous MAX.
+          outputColVector.set(i, max);
+        }
+      }
+    }
+  }
+
+  @Override
+  public boolean streamsResult() {
+    // No group value.
+    return true;
+  }
+
+  @Override
+  public Type getResultColumnVectorType() {
+    return Type.DECIMAL;
+  }
+
+  @Override
+  public void resetEvaluator() {
+    isNull = true;
+    max.set(HiveDecimal.ZERO);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDecimalMin.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDecimalMin.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDecimalMin.java
new file mode 100644
index 0000000..51b43d7
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDecimalMin.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.ptf;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class evaluates HiveDecimal min() for a PTF group.
+ */
+public class VectorPTFEvaluatorStreamingDecimalMin extends 
VectorPTFEvaluatorBase {
+
+  protected boolean isNull;
+  protected HiveDecimalWritable min;
+
+  public VectorPTFEvaluatorStreamingDecimalMin(WindowFrameDef windowFrameDef, 
VectorExpression inputVecExpr,
+      int outputColumnNum) {
+    super(windowFrameDef, inputVecExpr, outputColumnNum);
+    min = new HiveDecimalWritable();
+    resetEvaluator();
+  }
+
+  @Override
+  public void evaluateGroupBatch(VectorizedRowBatch batch)
+      throws HiveException {
+
+    evaluateInputExpr(batch);
+
+    // Determine minimum of all non-null decimal column values; maintain 
isNull.
+
+    // We do not filter when PTF is in reducer.
+    Preconditions.checkState(!batch.selectedInUse);
+
+    final int size = batch.size;
+    if (size == 0) {
+      return;
+    }
+    DecimalColumnVector decimalColVector = ((DecimalColumnVector) 
batch.cols[inputColumnNum]);
+
+    DecimalColumnVector outputColVector = (DecimalColumnVector) 
batch.cols[outputColumnNum];
+
+    if (decimalColVector.isRepeating) {
+
+      if (decimalColVector.noNulls || !decimalColVector.isNull[0]) {
+
+        HiveDecimalWritable repeatedMin = decimalColVector.vector[0];
+        if (isNull) {
+          min.set(repeatedMin);
+          isNull = false;
+        } else if (repeatedMin.compareTo(min) == -1) {
+          min.set(repeatedMin);
+        }
+        outputColVector.set(0, min);
+      } else if (isNull) {
+        outputColVector.isNull[0] = true;
+        outputColVector.noNulls = false;
+      } else {
+
+        // Continue previous MIN.
+        outputColVector.set(0, min);
+      }
+      outputColVector.isRepeating = true;
+    } else if (decimalColVector.noNulls) {
+      HiveDecimalWritable[] vector = decimalColVector.vector;
+      for (int i = 0; i < size; i++) {
+        final HiveDecimalWritable value = vector[i];
+        if (isNull) {
+          min.set(value);
+          isNull = false;
+        } else if (value.compareTo(min) == -1) {
+          min.set(value);
+        }
+        outputColVector.set(i, min);
+      }
+    } else {
+      boolean[] batchIsNull = decimalColVector.isNull;
+      int i = 0;
+      while (batchIsNull[i]) {
+        if (isNull) {
+          outputColVector.isNull[i] = true;
+          outputColVector.noNulls = false;
+        } else {
+
+          // Continue previous MIN.
+          outputColVector.set(i, min);
+        }
+        if (++i >= size) {
+          return;
+        }
+      }
+
+      HiveDecimalWritable[] vector = decimalColVector.vector;
+
+      final HiveDecimalWritable firstValue = vector[i];
+      if (isNull) {
+        min.set(firstValue);
+        isNull = false;
+      } else if (firstValue.compareTo(min) == -1) {
+        min.set(firstValue);
+      }
+
+      outputColVector.set(i++, min);
+
+      for (; i < size; i++) {
+        if (!batchIsNull[i]) {
+          final HiveDecimalWritable value = vector[i];
+          if (isNull) {
+            min.set(value);
+            isNull = false;
+          } else if (value.compareTo(min) == -1) {
+            min.set(value);
+          }
+          outputColVector.set(i, min);
+        } else {
+
+          // Continue previous MIN.
+          outputColVector.set(i, min);
+        }
+      }
+    }
+  }
+
+  @Override
+  public boolean streamsResult() {
+    // No group value.
+    return true;
+  }
+
+  @Override
+  public Type getResultColumnVectorType() {
+    return Type.DECIMAL;
+  }
+
+  @Override
+  public void resetEvaluator() {
+    isNull = true;
+    min.set(HiveDecimal.ZERO);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDecimalSum.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDecimalSum.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDecimalSum.java
new file mode 100644
index 0000000..bc8620a
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDecimalSum.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.ptf;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class evaluates HiveDecimal sum() for a PTF group.
+ */
+public class VectorPTFEvaluatorStreamingDecimalSum extends 
VectorPTFEvaluatorBase {
+
+  protected boolean isNull;
+  protected HiveDecimalWritable sum;
+
+  public VectorPTFEvaluatorStreamingDecimalSum(WindowFrameDef windowFrameDef, 
VectorExpression inputVecExpr,
+      int outputColumnNum) {
+    super(windowFrameDef, inputVecExpr, outputColumnNum);
+    sum = new HiveDecimalWritable();
+    resetEvaluator();
+  }
+
+  @Override
+  public void evaluateGroupBatch(VectorizedRowBatch batch)
+      throws HiveException {
+
+    evaluateInputExpr(batch);
+
+    // Sum all non-null decimal column values; maintain isGroupResultNull.
+
+    // We do not filter when PTF is in reducer.
+    Preconditions.checkState(!batch.selectedInUse);
+
+    final int size = batch.size;
+    if (size == 0) {
+      return;
+    }
+    DecimalColumnVector decimalColVector = ((DecimalColumnVector) 
batch.cols[inputColumnNum]);
+
+    DecimalColumnVector outputColVector = (DecimalColumnVector) 
batch.cols[outputColumnNum];
+
+    if (decimalColVector.isRepeating) {
+
+      if (decimalColVector.noNulls || !decimalColVector.isNull[0]) {
+
+        // We have a repeated value.
+        isNull = false;
+        HiveDecimalWritable repeatedValue = decimalColVector.vector[0];
+
+        for (int i = 0; i < size; i++) {
+          sum.mutateAdd(repeatedValue);
+
+          // Output row i SUM.
+          outputColVector.set(i, sum);
+        }
+      } else {
+        if (isNull) {
+          outputColVector.isNull[0] = true;
+          outputColVector.noNulls = false;
+        } else {
+
+          // Continue previous SUM.
+          outputColVector.set(0, sum);
+        }
+        outputColVector.isRepeating = true;
+      }
+    } else if (decimalColVector.noNulls) {
+      isNull = false;
+      HiveDecimalWritable[] vector = decimalColVector.vector;
+      for (int i = 0; i < size; i++) {
+        sum.mutateAdd(vector[i]);
+
+        // Output row i sum.
+        outputColVector.set(i, sum);
+      }
+    } else {
+      boolean[] batchIsNull = decimalColVector.isNull;
+      int i = 0;
+      while (batchIsNull[i]) {
+        if (isNull) {
+          outputColVector.isNull[i] = true;
+          outputColVector.noNulls = false;
+        } else {
+
+          // Continue previous SUM.
+          outputColVector.set(i, sum);
+        }
+        if (++i >= size) {
+          return;
+        }
+      }
+
+      isNull = false;
+      HiveDecimalWritable[] vector = decimalColVector.vector;
+
+      sum.mutateAdd(vector[i++]);
+
+      // Output row i sum.
+      outputColVector.set(i, sum);
+
+      for (; i < size; i++) {
+        if (!batchIsNull[i]) {
+          sum.mutateAdd(vector[i]);
+          outputColVector.set(i, sum);
+        } else {
+
+          // Continue previous SUM.
+          outputColVector.set(i, sum);
+        }
+      }
+    }
+  }
+
+  @Override
+  public boolean streamsResult() {
+    // No group value.
+    return true;
+  }
+
+  @Override
+  public Type getResultColumnVectorType() {
+    return Type.DECIMAL;
+  }
+
+  @Override
+  public void resetEvaluator() {
+    isNull = true;
+    sum.set(HiveDecimal.ZERO);;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDoubleAvg.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDoubleAvg.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDoubleAvg.java
new file mode 100644
index 0000000..f6c5942
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDoubleAvg.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.ptf;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class evaluates double avg() for a PTF group.
+ *
+ * Sum up non-null column values; group result is sum / non-null count.
+ */
+public class VectorPTFEvaluatorStreamingDoubleAvg extends 
VectorPTFEvaluatorBase {
+
+  protected boolean isNull;
+  protected double sum;
+  private int nonNullGroupCount;
+  protected double avg;
+
+  public VectorPTFEvaluatorStreamingDoubleAvg(WindowFrameDef windowFrameDef, 
VectorExpression inputVecExpr,
+      int outputColumnNum) {
+    super(windowFrameDef, inputVecExpr, outputColumnNum);
+    resetEvaluator();
+  }
+
+  @Override
+  public void evaluateGroupBatch(VectorizedRowBatch batch)
+      throws HiveException {
+
+    evaluateInputExpr(batch);
+
+    // Sum all non-null double column values for avg; maintain 
isGroupResultNull; after last row of
+    // last group batch compute the group avg when sum is non-null.
+
+    // We do not filter when PTF is in reducer.
+    Preconditions.checkState(!batch.selectedInUse);
+
+    final int size = batch.size;
+    if (size == 0) {
+      return;
+    }
+    DoubleColumnVector doubleColVector = ((DoubleColumnVector) 
batch.cols[inputColumnNum]);
+
+    DoubleColumnVector outputColVector = (DoubleColumnVector) 
batch.cols[outputColumnNum];
+    double[] outputVector = outputColVector.vector;
+
+    if (doubleColVector.isRepeating) {
+
+      if (doubleColVector.noNulls || !doubleColVector.isNull[0]) {
+
+        // We have a repeated value.
+        isNull = false;
+        final double repeatedValue = doubleColVector.vector[0];
+
+        for (int i = 0; i < size; i++) {
+          sum += repeatedValue;
+          nonNullGroupCount++;
+
+          avg = sum / nonNullGroupCount;
+
+          // Output row i AVG.
+          outputVector[i] = avg;
+        }
+      } else {
+        if (isNull) {
+          outputColVector.isNull[0] = true;
+          outputColVector.noNulls = false;
+        } else {
+
+          // Continue previous AVG.
+          outputVector[0] = avg;
+        }
+        outputColVector.isRepeating = true;
+      }
+    } else if (doubleColVector.noNulls) {
+      isNull = false;
+      double[] vector = doubleColVector.vector;
+      for (int i = 0; i < size; i++) {
+        sum += vector[i];
+        nonNullGroupCount++;
+
+        avg = sum / nonNullGroupCount;
+
+        // Output row i AVG.
+        outputVector[i] = avg;
+      }
+    } else {
+      boolean[] batchIsNull = doubleColVector.isNull;
+      int i = 0;
+      while (batchIsNull[i]) {
+        if (isNull) {
+          outputColVector.isNull[i] = true;
+          outputColVector.noNulls = false;
+        } else {
+
+          // Continue previous AVG.
+          outputVector[i] = avg;
+        }
+        if (++i >= size) {
+          return;
+        }
+      }
+
+      isNull = false;
+      double[] vector = doubleColVector.vector;
+
+      sum += vector[i];
+      nonNullGroupCount++;
+
+      avg = sum / nonNullGroupCount;
+
+      // Output row i AVG.
+      outputVector[i++] = avg;
+
+      for (; i < size; i++) {
+        if (!batchIsNull[i]) {
+          sum += vector[i];
+          nonNullGroupCount++;
+
+          avg = sum / nonNullGroupCount;
+
+          // Output row i average.
+          outputVector[i] = avg;
+        } else {
+
+          // Continue previous AVG.
+          outputVector[i] = avg;
+        }
+      }
+    }
+  }
+
+  @Override
+  public boolean streamsResult() {
+    // No group value.
+    return true;
+  }
+
+  @Override
+  public Type getResultColumnVectorType() {
+    return Type.DOUBLE;
+  }
+
+  @Override
+  public void resetEvaluator() {
+    isNull = true;
+    sum = 0.0;
+    nonNullGroupCount = 0;
+    avg = 0.0;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDoubleMax.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDoubleMax.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDoubleMax.java
new file mode 100644
index 0000000..1d61cc5
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDoubleMax.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.ptf;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class evaluates double max() for a PTF group.
+ */
+public class VectorPTFEvaluatorStreamingDoubleMax extends 
VectorPTFEvaluatorBase {
+
+  protected boolean isNull;
+  protected double max;
+
+  public VectorPTFEvaluatorStreamingDoubleMax(WindowFrameDef windowFrameDef, 
VectorExpression inputVecExpr,
+      int outputColumnNum) {
+    super(windowFrameDef, inputVecExpr, outputColumnNum);
+    resetEvaluator();
+  }
+
+  @Override
+  public void evaluateGroupBatch(VectorizedRowBatch batch)
+      throws HiveException {
+
+    evaluateInputExpr(batch);
+
+    // Determine maximum of all non-null double column values; maintain isNull.
+
+    // We do not filter when PTF is in reducer.
+    Preconditions.checkState(!batch.selectedInUse);
+
+    final int size = batch.size;
+    if (size == 0) {
+      return;
+    }
+    DoubleColumnVector doubleColVector = ((DoubleColumnVector) 
batch.cols[inputColumnNum]);
+
+    DoubleColumnVector outputColVector = (DoubleColumnVector) 
batch.cols[outputColumnNum];
+    double[] outputVector = outputColVector.vector;
+
+    if (doubleColVector.isRepeating) {
+
+      if (doubleColVector.noNulls || !doubleColVector.isNull[0]) {
+
+        // We have a repeated value but we only need to evaluate once for 
MIN/MAX.
+        final double repeatedMax = doubleColVector.vector[0];
+
+        if (isNull) {
+          max = repeatedMax;
+          isNull = false;
+        } else if (repeatedMax > max) {
+          max = repeatedMax;
+        }
+        outputVector[0] = max;
+      } else if (isNull) {
+        outputColVector.isNull[0] = true;
+        outputColVector.noNulls = false;
+      } else {
+
+        // Continue previous MAX.
+        outputVector[0] = max;
+      }
+      outputColVector.isRepeating = true;
+    } else if (doubleColVector.noNulls) {
+      double[] vector = doubleColVector.vector;
+      for (int i = 0; i < size; i++) {
+        final double value = vector[i];
+        if (isNull) {
+          max = value;
+          isNull = false;
+        } else if (value > max) {
+          max = value;
+        }
+        outputVector[i] = max;
+      }
+    } else {
+      boolean[] batchIsNull = doubleColVector.isNull;
+      int i = 0;
+      while (batchIsNull[i]) {
+        if (isNull) {
+          outputColVector.isNull[i] = true;
+          outputColVector.noNulls = false;
+        } else {
+
+          // Continue previous MAX.
+          outputVector[i] = max;
+        }
+        if (++i >= size) {
+          return;
+        }
+      }
+
+      double[] vector = doubleColVector.vector;
+
+      final double firstValue = vector[i];
+      if (isNull) {
+        max = firstValue;
+        isNull = false;
+      } else if (firstValue > max) {
+        max = firstValue;
+      }
+
+      // Output row i max.
+      outputVector[i++] = max;
+
+      for (; i < size; i++) {
+        if (!batchIsNull[i]) {
+          final double value = vector[i];
+          if (isNull) {
+            max = value;
+            isNull = false;
+          } else if (value > max) {
+            max = value;
+          }
+          outputVector[i] = max;
+        } else {
+
+          // Continue previous MAX.
+          outputVector[i] = max;
+        }
+      }
+    }
+  }
+
+  @Override
+  public boolean streamsResult() {
+    // No group value.
+    return true;
+  }
+
+  @Override
+  public Type getResultColumnVectorType() {
+    return Type.DOUBLE;
+  }
+
+  @Override
+  public void resetEvaluator() {
+    isNull = true;
+    max = 0.0;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDoubleMin.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDoubleMin.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDoubleMin.java
new file mode 100644
index 0000000..9ac197d
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDoubleMin.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.ptf;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class evaluates double min() for a PTF group.
+ */
+public class VectorPTFEvaluatorStreamingDoubleMin extends 
VectorPTFEvaluatorBase {
+
+  protected boolean isNull;
+  protected double min;
+
+  public VectorPTFEvaluatorStreamingDoubleMin(WindowFrameDef windowFrameDef, 
VectorExpression inputVecExpr,
+      int outputColumnNum) {
+    super(windowFrameDef, inputVecExpr, outputColumnNum);
+    resetEvaluator();
+  }
+
+  @Override
+  public void evaluateGroupBatch(VectorizedRowBatch batch)
+      throws HiveException {
+
+    evaluateInputExpr(batch);
+
+    // Determine minimum of all non-null double column values; maintain isNull.
+
+    // We do not filter when PTF is in reducer.
+    Preconditions.checkState(!batch.selectedInUse);
+
+    final int size = batch.size;
+    if (size == 0) {
+      return;
+    }
+    DoubleColumnVector doubleColVector = ((DoubleColumnVector) 
batch.cols[inputColumnNum]);
+
+    DoubleColumnVector outputColVector = (DoubleColumnVector) 
batch.cols[outputColumnNum];
+    double[] outputVector = outputColVector.vector;
+
+    if (doubleColVector.isRepeating) {
+
+      if (doubleColVector.noNulls || !doubleColVector.isNull[0]) {
+
+        // We have a repeated value but we only need to evaluate once for 
MIN/MAX.
+        final double repeatedMin = doubleColVector.vector[0];
+
+        if (isNull) {
+          min = repeatedMin;
+          isNull = false;
+        } else if (repeatedMin < min) {
+          min = repeatedMin;
+        }
+        outputVector[0] = min;
+      } else if (isNull) {
+        outputColVector.isNull[0] = true;
+        outputColVector.noNulls = false;
+      } else {
+
+        // Continue previous MIN.
+        outputVector[0] = min;
+      }
+      outputColVector.isRepeating = true;
+    } else if (doubleColVector.noNulls) {
+      double[] vector = doubleColVector.vector;
+      for (int i = 0; i < size; i++) {
+        final double value = vector[i];
+        if (isNull) {
+          min = value;
+          isNull = false;
+        } else if (value < min) {
+          min = value;
+        }
+        outputVector[i] = min;
+      }
+    } else {
+      boolean[] batchIsNull = doubleColVector.isNull;
+      int i = 0;
+      while (batchIsNull[i]) {
+        if (isNull) {
+          outputColVector.isNull[i] = true;
+          outputColVector.noNulls = false;
+        } else {
+
+          // Continue previous MIN.
+          outputVector[i] = min;
+        }
+        if (++i >= size) {
+          return;
+        }
+      }
+
+      double[] vector = doubleColVector.vector;
+
+      final double firstValue = vector[i];
+      if (isNull) {
+        min = firstValue;
+        isNull = false;
+      } else if (firstValue < min) {
+        min = firstValue;
+      }
+
+      // Output row i min.
+      outputVector[i++] = min;
+
+      for (; i < size; i++) {
+        if (!batchIsNull[i]) {
+          final double value = vector[i];
+          if (isNull) {
+            min = value;
+            isNull = false;
+          } else if (value < min) {
+            min = value;
+          }
+
+          // Output row i min.
+          outputVector[i] = min;
+        } else {
+
+          // Continue previous MIN.
+          outputVector[i] = min;
+        }
+      }
+    }
+  }
+
+  @Override
+  public boolean streamsResult() {
+    // No group value.
+    return true;
+  }
+
+  @Override
+  public Type getResultColumnVectorType() {
+    return Type.DOUBLE;
+  }
+
+  @Override
+  public void resetEvaluator() {
+    isNull = true;
+    min = 0.0;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDoubleSum.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDoubleSum.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDoubleSum.java
new file mode 100644
index 0000000..8f17663
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDoubleSum.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.ptf;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class evaluates double sum() for a PTF group.
+ */
+public class VectorPTFEvaluatorStreamingDoubleSum extends 
VectorPTFEvaluatorBase {
+
+  protected boolean isNull;
+  protected double sum;
+
+  public VectorPTFEvaluatorStreamingDoubleSum(WindowFrameDef windowFrameDef, 
VectorExpression inputVecExpr,
+      int outputColumnNum) {
+    super(windowFrameDef, inputVecExpr, outputColumnNum);
+    resetEvaluator();
+  }
+
+  @Override
+  public void evaluateGroupBatch(VectorizedRowBatch batch)
+      throws HiveException {
+
+    evaluateInputExpr(batch);
+
+    // We do not filter when PTF is in reducer.
+    Preconditions.checkState(!batch.selectedInUse);
+
+    final int size = batch.size;
+    if (size == 0) {
+      return;
+    }
+    DoubleColumnVector doubleColVector = ((DoubleColumnVector) 
batch.cols[inputColumnNum]);
+
+    DoubleColumnVector outputColVector = (DoubleColumnVector) 
batch.cols[outputColumnNum];
+    double[] outputVector = outputColVector.vector;
+
+    if (doubleColVector.isRepeating) {
+
+      if (doubleColVector.noNulls || !doubleColVector.isNull[0]) {
+
+        // We have a repeated value.
+        isNull = false;
+        final double repeatedValue = doubleColVector.vector[0];
+
+        for (int i = 0; i < size; i++) {
+          sum += repeatedValue;
+
+          // Output row i SUM.
+          outputVector[i] = sum;
+        }
+      } else {
+        if (isNull) {
+          outputColVector.isNull[0] = true;
+          outputColVector.noNulls = false;
+        } else {
+
+          // Continue previous SUM.
+          outputVector[0] = sum;
+        }
+        outputColVector.isRepeating = true;
+      }
+    } else if (doubleColVector.noNulls) {
+      isNull = false;
+      double[] vector = doubleColVector.vector;
+      for (int i = 0; i < size; i++) {
+        sum += vector[i];
+
+        // Output row i SUM.
+        outputVector[i] = sum;
+      }
+    } else {
+      boolean[] batchIsNull = doubleColVector.isNull;
+      int i = 0;
+      while (batchIsNull[i]) {
+        if (isNull) {
+          outputColVector.isNull[i] = true;
+          outputColVector.noNulls = false;
+        } else {
+
+          // Continue previous SUM.
+          outputVector[i] = sum;
+        }
+        if (++i >= size) {
+          return;
+        }
+      }
+
+      isNull = false;
+      double[] vector = doubleColVector.vector;
+
+      sum += vector[i];
+
+      // Output row i sum.
+      outputVector[i++] = sum;
+
+      for (; i < size; i++) {
+        if (!batchIsNull[i]) {
+          sum += vector[i];
+
+          // Output row i sum.
+          outputVector[i] = sum;
+        } else {
+
+          // Continue previous SUM.
+          outputVector[i] = sum;
+        }
+      }
+    }
+  }
+
+  @Override
+  public boolean streamsResult() {
+    // No group value.
+    return true;
+  }
+
+  @Override
+  public Type getResultColumnVectorType() {
+    return Type.DOUBLE;
+  }
+
+  @Override
+  public void resetEvaluator() {
+    isNull = true;
+    sum = 0.0;
+  }
+}
\ No newline at end of file

Reply via email to