HIVE-20367: Vectorization: Support streaming for PTF AVG, MAX, MIN, SUM (Matt McCline, reviewed by Teddy Choi)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cc38bcc5 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cc38bcc5 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cc38bcc5 Branch: refs/heads/master Commit: cc38bcc5a993304898ba37b8496f13a15d62bf16 Parents: 6a28265 Author: Matt McCline <[email protected]> Authored: Fri Aug 24 09:30:42 2018 -0700 Committer: Matt McCline <[email protected]> Committed: Fri Aug 24 09:30:42 2018 -0700 ---------------------------------------------------------------------- .../exec/vector/ptf/VectorPTFEvaluatorBase.java | 17 +- .../vector/ptf/VectorPTFEvaluatorCount.java | 9 +- .../vector/ptf/VectorPTFEvaluatorCountStar.java | 9 +- .../ptf/VectorPTFEvaluatorDecimalAvg.java | 22 +- .../VectorPTFEvaluatorDecimalFirstValue.java | 4 +- .../ptf/VectorPTFEvaluatorDecimalLastValue.java | 12 +- .../ptf/VectorPTFEvaluatorDecimalMax.java | 25 +- .../ptf/VectorPTFEvaluatorDecimalMin.java | 23 +- .../ptf/VectorPTFEvaluatorDecimalSum.java | 9 +- .../vector/ptf/VectorPTFEvaluatorDenseRank.java | 11 +- .../vector/ptf/VectorPTFEvaluatorDoubleAvg.java | 18 +- .../ptf/VectorPTFEvaluatorDoubleFirstValue.java | 4 +- .../ptf/VectorPTFEvaluatorDoubleLastValue.java | 12 +- .../vector/ptf/VectorPTFEvaluatorDoubleMax.java | 13 +- .../vector/ptf/VectorPTFEvaluatorDoubleMin.java | 11 +- .../vector/ptf/VectorPTFEvaluatorDoubleSum.java | 9 +- .../vector/ptf/VectorPTFEvaluatorLongAvg.java | 18 +- .../ptf/VectorPTFEvaluatorLongFirstValue.java | 4 +- .../ptf/VectorPTFEvaluatorLongLastValue.java | 12 +- .../vector/ptf/VectorPTFEvaluatorLongMax.java | 9 +- .../vector/ptf/VectorPTFEvaluatorLongMin.java | 9 +- .../vector/ptf/VectorPTFEvaluatorLongSum.java | 9 +- .../exec/vector/ptf/VectorPTFEvaluatorRank.java | 13 +- .../vector/ptf/VectorPTFEvaluatorRowNumber.java | 5 +- .../VectorPTFEvaluatorStreamingDecimalAvg.java | 185 +++++++++++++ .../VectorPTFEvaluatorStreamingDecimalMax.java | 163 +++++++++++ .../VectorPTFEvaluatorStreamingDecimalMin.java | 163 +++++++++++ .../VectorPTFEvaluatorStreamingDecimalSum.java | 154 +++++++++++ .../VectorPTFEvaluatorStreamingDoubleAvg.java | 174 ++++++++++++ .../VectorPTFEvaluatorStreamingDoubleMax.java | 164 ++++++++++++ .../VectorPTFEvaluatorStreamingDoubleMin.java | 166 ++++++++++++ .../VectorPTFEvaluatorStreamingDoubleSum.java | 152 +++++++++++ .../ptf/VectorPTFEvaluatorStreamingLongAvg.java | 168 ++++++++++++ .../ptf/VectorPTFEvaluatorStreamingLongMax.java | 164 ++++++++++++ .../ptf/VectorPTFEvaluatorStreamingLongMin.java | 166 ++++++++++++ .../ptf/VectorPTFEvaluatorStreamingLongSum.java | 154 +++++++++++ .../exec/vector/ptf/VectorPTFGroupBatches.java | 10 +- .../hive/ql/optimizer/physical/Vectorizer.java | 31 ++- .../hadoop/hive/ql/plan/VectorPTFDesc.java | 98 +++++-- .../test/results/clientpositive/llap/ptf.q.out | 12 +- .../llap/vector_ptf_part_simple.q.out | 119 ++++++++- .../clientpositive/llap/vector_windowing.q.out | 244 +++++++++++++++-- .../llap/vector_windowing_expressions.q.out | 77 +++++- .../llap/vector_windowing_order_null.q.out | 82 +++++- .../llap/vector_windowing_windowspec.q.out | 82 +++++- .../clientpositive/llap/vectorized_ptf.q.out | 237 ++++++++++++++-- .../clientpositive/perf/spark/query51.q.out | 1 + .../clientpositive/perf/tez/query51.q.out | 18 +- .../test/results/clientpositive/spark/ptf.q.out | 7 + .../clientpositive/spark/vectorized_ptf.q.out | 267 +++++++++++++++++-- 50 files changed, 3342 insertions(+), 203 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorBase.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorBase.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorBase.java index 785725c..437c319 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorBase.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorBase.java @@ -73,14 +73,19 @@ public abstract class VectorPTFEvaluatorBase { } // Evaluate the aggregation over one of the group's batches. - public abstract void evaluateGroupBatch(VectorizedRowBatch batch, boolean isLastGroupBatch) throws HiveException; + public abstract void evaluateGroupBatch(VectorizedRowBatch batch) + throws HiveException; - // Returns true if the aggregation result will be streamed. - public boolean streamsResult() { - // Assume it is not streamjng by default. - return false; + // Do any work necessary after the last batch for a group has been processed. Necessary + // for both streaming and non-streaming evaluators.. + public void doLastBatchWork() { + // By default, do nothing. } + // Returns true if the aggregation result will be streamed. + // Otherwise, we must evaluate whole group before producing a result. + public abstract boolean streamsResult(); + public int getOutputColumnNum() { return outputColumnNum; } @@ -88,7 +93,7 @@ public abstract class VectorPTFEvaluatorBase { // After processing all the group's batches with evaluateGroupBatch, is the non-streaming // aggregation result null? public boolean isGroupResultNull() { - return false; + throw new RuntimeException("Not implemented"); } // What is the ColumnVector type of the aggregation result? http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCount.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCount.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCount.java index 9409c80..77b9892 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCount.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCount.java @@ -42,7 +42,8 @@ public class VectorPTFEvaluatorCount extends VectorPTFEvaluatorBase { resetEvaluator(); } - public void evaluateGroupBatch(VectorizedRowBatch batch, boolean isLastGroupBatch) + @Override + public void evaluateGroupBatch(VectorizedRowBatch batch) throws HiveException { evaluateInputExpr(batch); @@ -83,6 +84,12 @@ public class VectorPTFEvaluatorCount extends VectorPTFEvaluatorBase { } @Override + public boolean streamsResult() { + // We must evaluate whole group before producing a result. + return false; + } + + @Override public boolean isGroupResultNull() { return false; } http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCountStar.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCountStar.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCountStar.java index 9f9c04a..e44b614 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCountStar.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCountStar.java @@ -38,7 +38,8 @@ public class VectorPTFEvaluatorCountStar extends VectorPTFEvaluatorBase { resetEvaluator(); } - public void evaluateGroupBatch(VectorizedRowBatch batch, boolean isLastGroupBatch) { + @Override + public void evaluateGroupBatch(VectorizedRowBatch batch) { // No input expression for COUNT(*). // evaluateInputExpr(batch); @@ -48,6 +49,12 @@ public class VectorPTFEvaluatorCountStar extends VectorPTFEvaluatorBase { } @Override + public boolean streamsResult() { + // We must evaluate whole group before producing a result. + return false; + } + + @Override public boolean isGroupResultNull() { return false; } http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalAvg.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalAvg.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalAvg.java index 4541843..85281c2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalAvg.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalAvg.java @@ -51,7 +51,8 @@ public class VectorPTFEvaluatorDecimalAvg extends VectorPTFEvaluatorBase { resetEvaluator(); } - public void evaluateGroupBatch(VectorizedRowBatch batch, boolean isLastGroupBatch) + @Override + public void evaluateGroupBatch(VectorizedRowBatch batch) throws HiveException { evaluateInputExpr(batch); @@ -124,17 +125,24 @@ public class VectorPTFEvaluatorDecimalAvg extends VectorPTFEvaluatorBase { } } } + } - if (isLastGroupBatch) { - if (!isGroupResultNull) { - avg.set(sum); - temp.setFromLong(nonNullGroupCount); - avg.mutateDivide(temp); - } + @Override + public void doLastBatchWork() { + if (!isGroupResultNull) { + avg.set(sum); + temp.setFromLong(nonNullGroupCount); + avg.mutateDivide(temp); } } @Override + public boolean streamsResult() { + // We must evaluate whole group before producing a result. + return false; + } + + @Override public boolean isGroupResultNull() { return isGroupResultNull; } http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalFirstValue.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalFirstValue.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalFirstValue.java index c36fb77..078e56a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalFirstValue.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalFirstValue.java @@ -48,7 +48,8 @@ public class VectorPTFEvaluatorDecimalFirstValue extends VectorPTFEvaluatorBase resetEvaluator(); } - public void evaluateGroupBatch(VectorizedRowBatch batch, boolean isLastGroupBatch) + @Override + public void evaluateGroupBatch(VectorizedRowBatch batch) throws HiveException { evaluateInputExpr(batch); @@ -98,6 +99,7 @@ public class VectorPTFEvaluatorDecimalFirstValue extends VectorPTFEvaluatorBase } } + @Override public boolean streamsResult() { return true; } http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalLastValue.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalLastValue.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalLastValue.java index 380ce60..6f97111 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalLastValue.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalLastValue.java @@ -47,7 +47,8 @@ public class VectorPTFEvaluatorDecimalLastValue extends VectorPTFEvaluatorBase { resetEvaluator(); } - public void evaluateGroupBatch(VectorizedRowBatch batch, boolean isLastGroupBatch) + @Override + public void evaluateGroupBatch(VectorizedRowBatch batch) throws HiveException { evaluateInputExpr(batch); @@ -57,9 +58,6 @@ public class VectorPTFEvaluatorDecimalLastValue extends VectorPTFEvaluatorBase { // We do not filter when PTF is in reducer. Preconditions.checkState(!batch.selectedInUse); - if (!isLastGroupBatch) { - return; - } final int size = batch.size; if (size == 0) { return; @@ -88,6 +86,12 @@ public class VectorPTFEvaluatorDecimalLastValue extends VectorPTFEvaluatorBase { } @Override + public boolean streamsResult() { + // We must evaluate whole group before producing a result. + return false; + } + + @Override public boolean isGroupResultNull() { return isGroupResultNull; } http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMax.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMax.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMax.java index 46ee261..f66deb6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMax.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMax.java @@ -44,7 +44,8 @@ public class VectorPTFEvaluatorDecimalMax extends VectorPTFEvaluatorBase { resetEvaluator(); } - public void evaluateGroupBatch(VectorizedRowBatch batch, boolean isLastGroupBatch) + @Override + public void evaluateGroupBatch(VectorizedRowBatch batch) throws HiveException { evaluateInputExpr(batch); @@ -97,15 +98,15 @@ public class VectorPTFEvaluatorDecimalMax extends VectorPTFEvaluatorBase { return; } } + HiveDecimalWritable[] vector = decimalColVector.vector; + + final HiveDecimalWritable firstValue = vector[i++]; if (isGroupResultNull) { - max.set(vector[i++]); + max.set(firstValue); isGroupResultNull = false; - } else { - final HiveDecimalWritable dec = vector[i++]; - if (dec.compareTo(max) == 1) { - max.set(dec); - } + } else if (firstValue.compareTo(max) == 1) { + max.set(firstValue); } for (; i < size; i++) { if (!batchIsNull[i]) { @@ -119,6 +120,12 @@ public class VectorPTFEvaluatorDecimalMax extends VectorPTFEvaluatorBase { } @Override + public boolean streamsResult() { + // We must evaluate whole group before producing a result. + return false; + } + + @Override public boolean isGroupResultNull() { return isGroupResultNull; } @@ -133,11 +140,9 @@ public class VectorPTFEvaluatorDecimalMax extends VectorPTFEvaluatorBase { return max; } - private static HiveDecimal MIN_VALUE = HiveDecimal.create("-99999999999999999999999999999999999999"); - @Override public void resetEvaluator() { isGroupResultNull = true; - max.set(MIN_VALUE); + max.setFromLong(0); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMin.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMin.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMin.java index f881356..9f5a89a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMin.java @@ -44,7 +44,7 @@ public class VectorPTFEvaluatorDecimalMin extends VectorPTFEvaluatorBase { resetEvaluator(); } - public void evaluateGroupBatch(VectorizedRowBatch batch, boolean isLastGroupBatch) + public void evaluateGroupBatch(VectorizedRowBatch batch) throws HiveException { evaluateInputExpr(batch); @@ -98,14 +98,13 @@ public class VectorPTFEvaluatorDecimalMin extends VectorPTFEvaluatorBase { } } HiveDecimalWritable[] vector = decimalColVector.vector; + + final HiveDecimalWritable firstValue = vector[i++]; if (isGroupResultNull) { - min.set(vector[i++]); + min.set(firstValue); isGroupResultNull = false; - } else { - final HiveDecimalWritable dec = vector[i++]; - if (dec.compareTo(min) == -1) { - min.set(dec); - } + } else if (firstValue.compareTo(min) == -1) { + min.set(firstValue); } for (; i < size; i++) { if (!batchIsNull[i]) { @@ -119,6 +118,12 @@ public class VectorPTFEvaluatorDecimalMin extends VectorPTFEvaluatorBase { } @Override + public boolean streamsResult() { + // We must evaluate whole group before producing a result. + return false; + } + + @Override public boolean isGroupResultNull() { return isGroupResultNull; } @@ -133,11 +138,9 @@ public class VectorPTFEvaluatorDecimalMin extends VectorPTFEvaluatorBase { return min; } - private static HiveDecimal MAX_VALUE = HiveDecimal.create("99999999999999999999999999999999999999"); - @Override public void resetEvaluator() { isGroupResultNull = true; - min.set(MAX_VALUE); + min.setFromLong(0); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalSum.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalSum.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalSum.java index 4b31dc4..93d8ed5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalSum.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalSum.java @@ -46,7 +46,8 @@ public class VectorPTFEvaluatorDecimalSum extends VectorPTFEvaluatorBase { resetEvaluator(); } - public void evaluateGroupBatch(VectorizedRowBatch batch, boolean isLastGroupBatch) + @Override + public void evaluateGroupBatch(VectorizedRowBatch batch) throws HiveException { evaluateInputExpr(batch); @@ -115,6 +116,12 @@ public class VectorPTFEvaluatorDecimalSum extends VectorPTFEvaluatorBase { } @Override + public boolean streamsResult() { + // We must evaluate whole group before producing a result. + return false; + } + + @Override public boolean isGroupResultNull() { return isGroupResultNull; } http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDenseRank.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDenseRank.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDenseRank.java index 5025171..cb6b586 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDenseRank.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDenseRank.java @@ -41,7 +41,8 @@ public class VectorPTFEvaluatorDenseRank extends VectorPTFEvaluatorBase { resetEvaluator(); } - public void evaluateGroupBatch(VectorizedRowBatch batch, boolean isLastGroupBatch) + @Override + public void evaluateGroupBatch(VectorizedRowBatch batch) throws HiveException { evaluateInputExpr(batch); @@ -50,12 +51,14 @@ public class VectorPTFEvaluatorDenseRank extends VectorPTFEvaluatorBase { longColVector.isRepeating = true; longColVector.isNull[0] = false; longColVector.vector[0] = denseRank; + } - if (isLastGroupBatch) { - denseRank++; - } + @Override + public void doLastBatchWork() { + denseRank++; } + @Override public boolean streamsResult() { // No group value. return true; http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleAvg.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleAvg.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleAvg.java index 224177a..e20a562 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleAvg.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleAvg.java @@ -45,7 +45,8 @@ public class VectorPTFEvaluatorDoubleAvg extends VectorPTFEvaluatorBase { resetEvaluator(); } - public void evaluateGroupBatch(VectorizedRowBatch batch, boolean isLastGroupBatch) + @Override + public void evaluateGroupBatch(VectorizedRowBatch batch) throws HiveException { evaluateInputExpr(batch); @@ -117,15 +118,22 @@ public class VectorPTFEvaluatorDoubleAvg extends VectorPTFEvaluatorBase { sum += varSum; } } + } - if (isLastGroupBatch) { - if (!isGroupResultNull) { - avg = sum / nonNullGroupCount; - } + @Override + public void doLastBatchWork() { + if (!isGroupResultNull) { + avg = sum / nonNullGroupCount; } } @Override + public boolean streamsResult() { + // We must evaluate whole group before producing a result. + return false; + } + + @Override public boolean isGroupResultNull() { return isGroupResultNull; } http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleFirstValue.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleFirstValue.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleFirstValue.java index d20d10c..26bd083 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleFirstValue.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleFirstValue.java @@ -45,7 +45,8 @@ public class VectorPTFEvaluatorDoubleFirstValue extends VectorPTFEvaluatorBase { resetEvaluator(); } - public void evaluateGroupBatch(VectorizedRowBatch batch, boolean isLastGroupBatch) + @Override + public void evaluateGroupBatch(VectorizedRowBatch batch) throws HiveException { evaluateInputExpr(batch); @@ -95,6 +96,7 @@ public class VectorPTFEvaluatorDoubleFirstValue extends VectorPTFEvaluatorBase { } } + @Override public boolean streamsResult() { return true; } http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleLastValue.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleLastValue.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleLastValue.java index 83a8e33..9986e9a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleLastValue.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleLastValue.java @@ -43,7 +43,8 @@ public class VectorPTFEvaluatorDoubleLastValue extends VectorPTFEvaluatorBase { resetEvaluator(); } - public void evaluateGroupBatch(VectorizedRowBatch batch, boolean isLastGroupBatch) + @Override + public void evaluateGroupBatch(VectorizedRowBatch batch) throws HiveException { evaluateInputExpr(batch); @@ -53,9 +54,6 @@ public class VectorPTFEvaluatorDoubleLastValue extends VectorPTFEvaluatorBase { // We do not filter when PTF is in reducer. Preconditions.checkState(!batch.selectedInUse); - if (!isLastGroupBatch) { - return; - } final int size = batch.size; if (size == 0) { return; @@ -84,6 +82,12 @@ public class VectorPTFEvaluatorDoubleLastValue extends VectorPTFEvaluatorBase { } @Override + public boolean streamsResult() { + // We must evaluate whole group before producing a result. + return false; + } + + @Override public boolean isGroupResultNull() { return isGroupResultNull; } http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleMax.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleMax.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleMax.java index 50280d9..8c8e8ad 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleMax.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleMax.java @@ -41,7 +41,8 @@ public class VectorPTFEvaluatorDoubleMax extends VectorPTFEvaluatorBase { resetEvaluator(); } - public void evaluateGroupBatch(VectorizedRowBatch batch, boolean isLastGroupBatch) + @Override + public void evaluateGroupBatch(VectorizedRowBatch batch) throws HiveException { evaluateInputExpr(batch); @@ -64,7 +65,7 @@ public class VectorPTFEvaluatorDoubleMax extends VectorPTFEvaluatorBase { isGroupResultNull = false; } else { final double repeatedMax = doubleColVector.vector[0]; - if (repeatedMax < max) { + if (repeatedMax > max) { max = repeatedMax; } } @@ -112,6 +113,12 @@ public class VectorPTFEvaluatorDoubleMax extends VectorPTFEvaluatorBase { } @Override + public boolean streamsResult() { + // We must evaluate whole group before producing a result. + return false; + } + + @Override public boolean isGroupResultNull() { return isGroupResultNull; } @@ -129,6 +136,6 @@ public class VectorPTFEvaluatorDoubleMax extends VectorPTFEvaluatorBase { @Override public void resetEvaluator() { isGroupResultNull = true; - max = Double.MIN_VALUE; + max = 0.0; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleMin.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleMin.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleMin.java index 24788af..87d8757 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleMin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleMin.java @@ -41,7 +41,8 @@ public class VectorPTFEvaluatorDoubleMin extends VectorPTFEvaluatorBase { resetEvaluator(); } - public void evaluateGroupBatch(VectorizedRowBatch batch, boolean isLastGroupBatch) + @Override + public void evaluateGroupBatch(VectorizedRowBatch batch) throws HiveException { evaluateInputExpr(batch); @@ -112,6 +113,12 @@ public class VectorPTFEvaluatorDoubleMin extends VectorPTFEvaluatorBase { } @Override + public boolean streamsResult() { + // We must evaluate whole group before producing a result. + return false; + } + + @Override public boolean isGroupResultNull() { return isGroupResultNull; } @@ -129,6 +136,6 @@ public class VectorPTFEvaluatorDoubleMin extends VectorPTFEvaluatorBase { @Override public void resetEvaluator() { isGroupResultNull = true; - min = Double.MAX_VALUE; + min = 0.0; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleSum.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleSum.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleSum.java index 902d81e..85a77c2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleSum.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleSum.java @@ -41,7 +41,8 @@ public class VectorPTFEvaluatorDoubleSum extends VectorPTFEvaluatorBase { resetEvaluator(); } - public void evaluateGroupBatch(VectorizedRowBatch batch, boolean isLastGroupBatch) + @Override + public void evaluateGroupBatch(VectorizedRowBatch batch) throws HiveException { evaluateInputExpr(batch); @@ -109,6 +110,12 @@ public class VectorPTFEvaluatorDoubleSum extends VectorPTFEvaluatorBase { } @Override + public boolean streamsResult() { + // We must evaluate whole group before producing a result. + return false; + } + + @Override public boolean isGroupResultNull() { return isGroupResultNull; } http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongAvg.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongAvg.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongAvg.java index e2d1768..4b525bf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongAvg.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongAvg.java @@ -45,7 +45,8 @@ public class VectorPTFEvaluatorLongAvg extends VectorPTFEvaluatorBase { resetEvaluator(); } - public void evaluateGroupBatch(VectorizedRowBatch batch, boolean isLastGroupBatch) + @Override + public void evaluateGroupBatch(VectorizedRowBatch batch) throws HiveException { evaluateInputExpr(batch); @@ -117,15 +118,22 @@ public class VectorPTFEvaluatorLongAvg extends VectorPTFEvaluatorBase { sum += varSum; } } + } - if (isLastGroupBatch) { - if (!isGroupResultNull) { - avg = ((double) sum) / nonNullGroupCount; - } + @Override + public void doLastBatchWork() { + if (!isGroupResultNull) { + avg = ((double) sum) / nonNullGroupCount; } } @Override + public boolean streamsResult() { + // We must evaluate whole group before producing a result. + return false; + } + + @Override public boolean isGroupResultNull() { return isGroupResultNull; } http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongFirstValue.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongFirstValue.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongFirstValue.java index 37323fe..fa497ee 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongFirstValue.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongFirstValue.java @@ -45,7 +45,8 @@ public class VectorPTFEvaluatorLongFirstValue extends VectorPTFEvaluatorBase { resetEvaluator(); } - public void evaluateGroupBatch(VectorizedRowBatch batch, boolean isLastGroupBatch) + @Override + public void evaluateGroupBatch(VectorizedRowBatch batch) throws HiveException { evaluateInputExpr(batch); @@ -95,6 +96,7 @@ public class VectorPTFEvaluatorLongFirstValue extends VectorPTFEvaluatorBase { } } + @Override public boolean streamsResult() { return true; } http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongLastValue.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongLastValue.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongLastValue.java index 925841b..fe768cc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongLastValue.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongLastValue.java @@ -44,7 +44,8 @@ public class VectorPTFEvaluatorLongLastValue extends VectorPTFEvaluatorBase { resetEvaluator(); } - public void evaluateGroupBatch(VectorizedRowBatch batch, boolean isLastGroupBatch) + @Override + public void evaluateGroupBatch(VectorizedRowBatch batch) throws HiveException { evaluateInputExpr(batch); @@ -54,9 +55,6 @@ public class VectorPTFEvaluatorLongLastValue extends VectorPTFEvaluatorBase { // We do not filter when PTF is in reducer. Preconditions.checkState(!batch.selectedInUse); - if (!isLastGroupBatch) { - return; - } final int size = batch.size; if (size == 0) { return; @@ -85,6 +83,12 @@ public class VectorPTFEvaluatorLongLastValue extends VectorPTFEvaluatorBase { } @Override + public boolean streamsResult() { + // We must evaluate whole group before producing a result. + return false; + } + + @Override public boolean isGroupResultNull() { return isGroupResultNull; } http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongMax.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongMax.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongMax.java index 638f1b7..87a6431 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongMax.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongMax.java @@ -41,7 +41,8 @@ public class VectorPTFEvaluatorLongMax extends VectorPTFEvaluatorBase { resetEvaluator(); } - public void evaluateGroupBatch(VectorizedRowBatch batch, boolean isLastGroupBatch) + @Override + public void evaluateGroupBatch(VectorizedRowBatch batch) throws HiveException { evaluateInputExpr(batch); @@ -112,6 +113,12 @@ public class VectorPTFEvaluatorLongMax extends VectorPTFEvaluatorBase { } @Override + public boolean streamsResult() { + // We must evaluate whole group before producing a result. + return false; + } + + @Override public boolean isGroupResultNull() { return isGroupResultNull; } http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongMin.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongMin.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongMin.java index 6238a03..9192b5b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongMin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongMin.java @@ -41,7 +41,8 @@ public class VectorPTFEvaluatorLongMin extends VectorPTFEvaluatorBase { resetEvaluator(); } - public void evaluateGroupBatch(VectorizedRowBatch batch, boolean isLastGroupBatch) + @Override + public void evaluateGroupBatch(VectorizedRowBatch batch) throws HiveException { evaluateInputExpr(batch); @@ -112,6 +113,12 @@ public class VectorPTFEvaluatorLongMin extends VectorPTFEvaluatorBase { } @Override + public boolean streamsResult() { + // We must evaluate whole group before producing a result. + return false; + } + + @Override public boolean isGroupResultNull() { return isGroupResultNull; } http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongSum.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongSum.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongSum.java index afd3952..8c67d24 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongSum.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongSum.java @@ -41,7 +41,8 @@ public class VectorPTFEvaluatorLongSum extends VectorPTFEvaluatorBase { resetEvaluator(); } - public void evaluateGroupBatch(VectorizedRowBatch batch, boolean isLastGroupBatch) + @Override + public void evaluateGroupBatch(VectorizedRowBatch batch) throws HiveException { evaluateInputExpr(batch); @@ -109,6 +110,12 @@ public class VectorPTFEvaluatorLongSum extends VectorPTFEvaluatorBase { } @Override + public boolean streamsResult() { + // We must evaluate whole group before producing a result. + return false; + } + + @Override public boolean isGroupResultNull() { return isGroupResultNull; } http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorRank.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorRank.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorRank.java index 9cbc816..d20c60c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorRank.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorRank.java @@ -42,7 +42,8 @@ public class VectorPTFEvaluatorRank extends VectorPTFEvaluatorBase { resetEvaluator(); } - public void evaluateGroupBatch(VectorizedRowBatch batch, boolean isLastGroupBatch) + @Override + public void evaluateGroupBatch(VectorizedRowBatch batch) throws HiveException { evaluateInputExpr(batch); @@ -56,13 +57,15 @@ public class VectorPTFEvaluatorRank extends VectorPTFEvaluatorBase { longColVector.isNull[0] = false; longColVector.vector[0] = rank; groupCount += batch.size; + } - if (isLastGroupBatch) { - rank += groupCount; - groupCount = 0; - } + @Override + public void doLastBatchWork() { + rank += groupCount; + groupCount = 0; } + @Override public boolean streamsResult() { // No group value. return true; http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorRowNumber.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorRowNumber.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorRowNumber.java index 94de1d7..384541c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorRowNumber.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorRowNumber.java @@ -40,7 +40,8 @@ public class VectorPTFEvaluatorRowNumber extends VectorPTFEvaluatorBase { resetEvaluator(); } - public void evaluateGroupBatch(VectorizedRowBatch batch, boolean isLastGroupBatch) + @Override + public void evaluateGroupBatch(VectorizedRowBatch batch) throws HiveException { evaluateInputExpr(batch); @@ -53,11 +54,13 @@ public class VectorPTFEvaluatorRowNumber extends VectorPTFEvaluatorBase { } } + @Override public boolean streamsResult() { // No group value. return true; } + @Override public boolean isGroupResultNull() { return false; } http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDecimalAvg.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDecimalAvg.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDecimalAvg.java new file mode 100644 index 0000000..e51d1fc --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDecimalAvg.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.ptf; + +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; + +import com.google.common.base.Preconditions; + +/** + * This class evaluates streaming HiveDecimal avg() for a PTF group. + * + * Stream average non-null column values and output sum / non-null count as we go along. + */ +public class VectorPTFEvaluatorStreamingDecimalAvg extends VectorPTFEvaluatorBase { + + protected boolean isNull; + protected HiveDecimalWritable sum; + private int nonNullGroupCount; + private HiveDecimalWritable temp; + private HiveDecimalWritable avg; + + public VectorPTFEvaluatorStreamingDecimalAvg(WindowFrameDef windowFrameDef, VectorExpression inputVecExpr, + int outputColumnNum) { + super(windowFrameDef, inputVecExpr, outputColumnNum); + sum = new HiveDecimalWritable(); + temp = new HiveDecimalWritable(); + avg = new HiveDecimalWritable(); + resetEvaluator(); + } + + @Override + public void evaluateGroupBatch(VectorizedRowBatch batch) + throws HiveException { + + evaluateInputExpr(batch); + + // Sum all non-null decimal column values for avg; maintain isGroupResultNull; after last row of + // last group batch compute the group avg when sum is non-null. + + // We do not filter when PTF is in reducer. + Preconditions.checkState(!batch.selectedInUse); + + final int size = batch.size; + if (size == 0) { + return; + } + DecimalColumnVector decimalColVector = ((DecimalColumnVector) batch.cols[inputColumnNum]); + + DecimalColumnVector outputColVector = (DecimalColumnVector) batch.cols[outputColumnNum]; + + if (decimalColVector.isRepeating) { + + if (decimalColVector.noNulls || !decimalColVector.isNull[0]) { + + // We have a repeated value. + isNull = false; + HiveDecimalWritable repeatedValue = decimalColVector.vector[0]; + + for (int i = 0; i < size; i++) { + sum.mutateAdd(repeatedValue); + nonNullGroupCount++; + + // Output row i AVG. + avg.set(sum); + temp.setFromLong(nonNullGroupCount); + avg.mutateDivide(temp); + outputColVector.set(i, avg); + } + } else { + if (isNull) { + outputColVector.isNull[0] = true; + outputColVector.noNulls = false; + } else { + + // Continue previous AVG. + outputColVector.set(0, avg); + } + outputColVector.isRepeating = true; + } + } else if (decimalColVector.noNulls) { + isNull = false; + HiveDecimalWritable[] vector = decimalColVector.vector; + for (int i = 0; i < size; i++) { + sum.mutateAdd(vector[i]); + nonNullGroupCount++; + + // Output row i AVG. + avg.set(sum); + temp.setFromLong(nonNullGroupCount); + avg.mutateDivide(temp); + outputColVector.set(i, avg); + } + } else { + boolean[] batchIsNull = decimalColVector.isNull; + int i = 0; + while (batchIsNull[i]) { + if (isNull) { + outputColVector.isNull[i] = true; + outputColVector.noNulls = false; + } else { + + // Continue previous AVG. + outputColVector.set(i, avg); + } + if (++i >= size) { + return; + } + } + + isNull = false; + HiveDecimalWritable[] vector = decimalColVector.vector; + + sum.mutateAdd(vector[i]); + nonNullGroupCount++; + + // Output row i AVG. + avg.set(sum); + temp.setFromLong(nonNullGroupCount); + avg.mutateDivide(temp); + + outputColVector.set(i++, avg); + + for (; i < size; i++) { + if (!batchIsNull[i]) { + sum.mutateAdd(vector[i]); + nonNullGroupCount++; + + avg.set(sum); + temp.setFromLong(nonNullGroupCount); + avg.mutateDivide(temp); + + // Output row i AVG. + outputColVector.set(i, avg); + } else { + + // Continue previous AVG. + outputColVector.set(i, avg); + } + } + } + } + + @Override + public boolean streamsResult() { + // No group value. + return true; + } + + @Override + public Type getResultColumnVectorType() { + return Type.DECIMAL; + } + + @Override + public void resetEvaluator() { + isNull = true; + sum.set(HiveDecimal.ZERO); + nonNullGroupCount = 0; + avg.set(HiveDecimal.ZERO); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDecimalMax.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDecimalMax.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDecimalMax.java new file mode 100644 index 0000000..9357242 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDecimalMax.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.ptf; + +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; + +import com.google.common.base.Preconditions; + +/** + * This class evaluates HiveDecimal max() for a PTF group. + */ +public class VectorPTFEvaluatorStreamingDecimalMax extends VectorPTFEvaluatorBase { + + protected boolean isNull; + protected HiveDecimalWritable max; + + public VectorPTFEvaluatorStreamingDecimalMax(WindowFrameDef windowFrameDef, VectorExpression inputVecExpr, + int outputColumnNum) { + super(windowFrameDef, inputVecExpr, outputColumnNum); + max = new HiveDecimalWritable(); + resetEvaluator(); + } + + @Override + public void evaluateGroupBatch(VectorizedRowBatch batch) + throws HiveException { + + evaluateInputExpr(batch); + + // Determine maximum of all non-null decimal column values; maintain isNull. + + // We do not filter when PTF is in reducer. + Preconditions.checkState(!batch.selectedInUse); + + final int size = batch.size; + if (size == 0) { + return; + } + DecimalColumnVector decimalColVector = ((DecimalColumnVector) batch.cols[inputColumnNum]); + + DecimalColumnVector outputColVector = (DecimalColumnVector) batch.cols[outputColumnNum]; + + if (decimalColVector.isRepeating) { + + if (decimalColVector.noNulls || !decimalColVector.isNull[0]) { + + HiveDecimalWritable repeatedMax = decimalColVector.vector[0]; + if (isNull) { + max.set(repeatedMax); + isNull = false; + } else if (repeatedMax.compareTo(max) == 1) { + max.set(repeatedMax); + } + outputColVector.set(0, max); + } else if (isNull) { + outputColVector.isNull[0] = true; + outputColVector.noNulls = false; + } else { + + // Continue previous MAX. + outputColVector.set(0, max); + } + outputColVector.isRepeating = true; + } else if (decimalColVector.noNulls) { + HiveDecimalWritable[] vector = decimalColVector.vector; + for (int i = 0; i < size; i++) { + final HiveDecimalWritable value = vector[i]; + if (isNull) { + max.set(value); + isNull = false; + } else if (value.compareTo(max) == 1) { + max.set(value); + } + outputColVector.set(i, max); + } + } else { + boolean[] batchIsNull = decimalColVector.isNull; + int i = 0; + while (batchIsNull[i]) { + if (isNull) { + outputColVector.isNull[i] = true; + outputColVector.noNulls = false; + } else { + + // Continue previous MAX. + outputColVector.set(i, max); + } + if (++i >= size) { + return; + } + } + + HiveDecimalWritable[] vector = decimalColVector.vector; + + final HiveDecimalWritable firstValue = vector[i]; + if (isNull) { + max.set(firstValue); + isNull = false; + } else if (firstValue.compareTo(max) == 1) { + max.set(firstValue); + } + + outputColVector.set(i++, max); + + for (; i < size; i++) { + if (!batchIsNull[i]) { + final HiveDecimalWritable value = vector[i]; + if (isNull) { + max.set(value); + isNull = false; + } else if (value.compareTo(max) == 1) { + max.set(value); + } + outputColVector.set(i, max); + } else { + + // Continue previous MAX. + outputColVector.set(i, max); + } + } + } + } + + @Override + public boolean streamsResult() { + // No group value. + return true; + } + + @Override + public Type getResultColumnVectorType() { + return Type.DECIMAL; + } + + @Override + public void resetEvaluator() { + isNull = true; + max.set(HiveDecimal.ZERO); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDecimalMin.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDecimalMin.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDecimalMin.java new file mode 100644 index 0000000..51b43d7 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDecimalMin.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.ptf; + +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; + +import com.google.common.base.Preconditions; + +/** + * This class evaluates HiveDecimal min() for a PTF group. + */ +public class VectorPTFEvaluatorStreamingDecimalMin extends VectorPTFEvaluatorBase { + + protected boolean isNull; + protected HiveDecimalWritable min; + + public VectorPTFEvaluatorStreamingDecimalMin(WindowFrameDef windowFrameDef, VectorExpression inputVecExpr, + int outputColumnNum) { + super(windowFrameDef, inputVecExpr, outputColumnNum); + min = new HiveDecimalWritable(); + resetEvaluator(); + } + + @Override + public void evaluateGroupBatch(VectorizedRowBatch batch) + throws HiveException { + + evaluateInputExpr(batch); + + // Determine minimum of all non-null decimal column values; maintain isNull. + + // We do not filter when PTF is in reducer. + Preconditions.checkState(!batch.selectedInUse); + + final int size = batch.size; + if (size == 0) { + return; + } + DecimalColumnVector decimalColVector = ((DecimalColumnVector) batch.cols[inputColumnNum]); + + DecimalColumnVector outputColVector = (DecimalColumnVector) batch.cols[outputColumnNum]; + + if (decimalColVector.isRepeating) { + + if (decimalColVector.noNulls || !decimalColVector.isNull[0]) { + + HiveDecimalWritable repeatedMin = decimalColVector.vector[0]; + if (isNull) { + min.set(repeatedMin); + isNull = false; + } else if (repeatedMin.compareTo(min) == -1) { + min.set(repeatedMin); + } + outputColVector.set(0, min); + } else if (isNull) { + outputColVector.isNull[0] = true; + outputColVector.noNulls = false; + } else { + + // Continue previous MIN. + outputColVector.set(0, min); + } + outputColVector.isRepeating = true; + } else if (decimalColVector.noNulls) { + HiveDecimalWritable[] vector = decimalColVector.vector; + for (int i = 0; i < size; i++) { + final HiveDecimalWritable value = vector[i]; + if (isNull) { + min.set(value); + isNull = false; + } else if (value.compareTo(min) == -1) { + min.set(value); + } + outputColVector.set(i, min); + } + } else { + boolean[] batchIsNull = decimalColVector.isNull; + int i = 0; + while (batchIsNull[i]) { + if (isNull) { + outputColVector.isNull[i] = true; + outputColVector.noNulls = false; + } else { + + // Continue previous MIN. + outputColVector.set(i, min); + } + if (++i >= size) { + return; + } + } + + HiveDecimalWritable[] vector = decimalColVector.vector; + + final HiveDecimalWritable firstValue = vector[i]; + if (isNull) { + min.set(firstValue); + isNull = false; + } else if (firstValue.compareTo(min) == -1) { + min.set(firstValue); + } + + outputColVector.set(i++, min); + + for (; i < size; i++) { + if (!batchIsNull[i]) { + final HiveDecimalWritable value = vector[i]; + if (isNull) { + min.set(value); + isNull = false; + } else if (value.compareTo(min) == -1) { + min.set(value); + } + outputColVector.set(i, min); + } else { + + // Continue previous MIN. + outputColVector.set(i, min); + } + } + } + } + + @Override + public boolean streamsResult() { + // No group value. + return true; + } + + @Override + public Type getResultColumnVectorType() { + return Type.DECIMAL; + } + + @Override + public void resetEvaluator() { + isNull = true; + min.set(HiveDecimal.ZERO); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDecimalSum.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDecimalSum.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDecimalSum.java new file mode 100644 index 0000000..bc8620a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDecimalSum.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.ptf; + +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; + +import com.google.common.base.Preconditions; + +/** + * This class evaluates HiveDecimal sum() for a PTF group. + */ +public class VectorPTFEvaluatorStreamingDecimalSum extends VectorPTFEvaluatorBase { + + protected boolean isNull; + protected HiveDecimalWritable sum; + + public VectorPTFEvaluatorStreamingDecimalSum(WindowFrameDef windowFrameDef, VectorExpression inputVecExpr, + int outputColumnNum) { + super(windowFrameDef, inputVecExpr, outputColumnNum); + sum = new HiveDecimalWritable(); + resetEvaluator(); + } + + @Override + public void evaluateGroupBatch(VectorizedRowBatch batch) + throws HiveException { + + evaluateInputExpr(batch); + + // Sum all non-null decimal column values; maintain isGroupResultNull. + + // We do not filter when PTF is in reducer. + Preconditions.checkState(!batch.selectedInUse); + + final int size = batch.size; + if (size == 0) { + return; + } + DecimalColumnVector decimalColVector = ((DecimalColumnVector) batch.cols[inputColumnNum]); + + DecimalColumnVector outputColVector = (DecimalColumnVector) batch.cols[outputColumnNum]; + + if (decimalColVector.isRepeating) { + + if (decimalColVector.noNulls || !decimalColVector.isNull[0]) { + + // We have a repeated value. + isNull = false; + HiveDecimalWritable repeatedValue = decimalColVector.vector[0]; + + for (int i = 0; i < size; i++) { + sum.mutateAdd(repeatedValue); + + // Output row i SUM. + outputColVector.set(i, sum); + } + } else { + if (isNull) { + outputColVector.isNull[0] = true; + outputColVector.noNulls = false; + } else { + + // Continue previous SUM. + outputColVector.set(0, sum); + } + outputColVector.isRepeating = true; + } + } else if (decimalColVector.noNulls) { + isNull = false; + HiveDecimalWritable[] vector = decimalColVector.vector; + for (int i = 0; i < size; i++) { + sum.mutateAdd(vector[i]); + + // Output row i sum. + outputColVector.set(i, sum); + } + } else { + boolean[] batchIsNull = decimalColVector.isNull; + int i = 0; + while (batchIsNull[i]) { + if (isNull) { + outputColVector.isNull[i] = true; + outputColVector.noNulls = false; + } else { + + // Continue previous SUM. + outputColVector.set(i, sum); + } + if (++i >= size) { + return; + } + } + + isNull = false; + HiveDecimalWritable[] vector = decimalColVector.vector; + + sum.mutateAdd(vector[i++]); + + // Output row i sum. + outputColVector.set(i, sum); + + for (; i < size; i++) { + if (!batchIsNull[i]) { + sum.mutateAdd(vector[i]); + outputColVector.set(i, sum); + } else { + + // Continue previous SUM. + outputColVector.set(i, sum); + } + } + } + } + + @Override + public boolean streamsResult() { + // No group value. + return true; + } + + @Override + public Type getResultColumnVectorType() { + return Type.DECIMAL; + } + + @Override + public void resetEvaluator() { + isNull = true; + sum.set(HiveDecimal.ZERO);; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDoubleAvg.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDoubleAvg.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDoubleAvg.java new file mode 100644 index 0000000..f6c5942 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDoubleAvg.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.ptf; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; + +import com.google.common.base.Preconditions; + +/** + * This class evaluates double avg() for a PTF group. + * + * Sum up non-null column values; group result is sum / non-null count. + */ +public class VectorPTFEvaluatorStreamingDoubleAvg extends VectorPTFEvaluatorBase { + + protected boolean isNull; + protected double sum; + private int nonNullGroupCount; + protected double avg; + + public VectorPTFEvaluatorStreamingDoubleAvg(WindowFrameDef windowFrameDef, VectorExpression inputVecExpr, + int outputColumnNum) { + super(windowFrameDef, inputVecExpr, outputColumnNum); + resetEvaluator(); + } + + @Override + public void evaluateGroupBatch(VectorizedRowBatch batch) + throws HiveException { + + evaluateInputExpr(batch); + + // Sum all non-null double column values for avg; maintain isGroupResultNull; after last row of + // last group batch compute the group avg when sum is non-null. + + // We do not filter when PTF is in reducer. + Preconditions.checkState(!batch.selectedInUse); + + final int size = batch.size; + if (size == 0) { + return; + } + DoubleColumnVector doubleColVector = ((DoubleColumnVector) batch.cols[inputColumnNum]); + + DoubleColumnVector outputColVector = (DoubleColumnVector) batch.cols[outputColumnNum]; + double[] outputVector = outputColVector.vector; + + if (doubleColVector.isRepeating) { + + if (doubleColVector.noNulls || !doubleColVector.isNull[0]) { + + // We have a repeated value. + isNull = false; + final double repeatedValue = doubleColVector.vector[0]; + + for (int i = 0; i < size; i++) { + sum += repeatedValue; + nonNullGroupCount++; + + avg = sum / nonNullGroupCount; + + // Output row i AVG. + outputVector[i] = avg; + } + } else { + if (isNull) { + outputColVector.isNull[0] = true; + outputColVector.noNulls = false; + } else { + + // Continue previous AVG. + outputVector[0] = avg; + } + outputColVector.isRepeating = true; + } + } else if (doubleColVector.noNulls) { + isNull = false; + double[] vector = doubleColVector.vector; + for (int i = 0; i < size; i++) { + sum += vector[i]; + nonNullGroupCount++; + + avg = sum / nonNullGroupCount; + + // Output row i AVG. + outputVector[i] = avg; + } + } else { + boolean[] batchIsNull = doubleColVector.isNull; + int i = 0; + while (batchIsNull[i]) { + if (isNull) { + outputColVector.isNull[i] = true; + outputColVector.noNulls = false; + } else { + + // Continue previous AVG. + outputVector[i] = avg; + } + if (++i >= size) { + return; + } + } + + isNull = false; + double[] vector = doubleColVector.vector; + + sum += vector[i]; + nonNullGroupCount++; + + avg = sum / nonNullGroupCount; + + // Output row i AVG. + outputVector[i++] = avg; + + for (; i < size; i++) { + if (!batchIsNull[i]) { + sum += vector[i]; + nonNullGroupCount++; + + avg = sum / nonNullGroupCount; + + // Output row i average. + outputVector[i] = avg; + } else { + + // Continue previous AVG. + outputVector[i] = avg; + } + } + } + } + + @Override + public boolean streamsResult() { + // No group value. + return true; + } + + @Override + public Type getResultColumnVectorType() { + return Type.DOUBLE; + } + + @Override + public void resetEvaluator() { + isNull = true; + sum = 0.0; + nonNullGroupCount = 0; + avg = 0.0; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDoubleMax.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDoubleMax.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDoubleMax.java new file mode 100644 index 0000000..1d61cc5 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDoubleMax.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.ptf; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; + +import com.google.common.base.Preconditions; + +/** + * This class evaluates double max() for a PTF group. + */ +public class VectorPTFEvaluatorStreamingDoubleMax extends VectorPTFEvaluatorBase { + + protected boolean isNull; + protected double max; + + public VectorPTFEvaluatorStreamingDoubleMax(WindowFrameDef windowFrameDef, VectorExpression inputVecExpr, + int outputColumnNum) { + super(windowFrameDef, inputVecExpr, outputColumnNum); + resetEvaluator(); + } + + @Override + public void evaluateGroupBatch(VectorizedRowBatch batch) + throws HiveException { + + evaluateInputExpr(batch); + + // Determine maximum of all non-null double column values; maintain isNull. + + // We do not filter when PTF is in reducer. + Preconditions.checkState(!batch.selectedInUse); + + final int size = batch.size; + if (size == 0) { + return; + } + DoubleColumnVector doubleColVector = ((DoubleColumnVector) batch.cols[inputColumnNum]); + + DoubleColumnVector outputColVector = (DoubleColumnVector) batch.cols[outputColumnNum]; + double[] outputVector = outputColVector.vector; + + if (doubleColVector.isRepeating) { + + if (doubleColVector.noNulls || !doubleColVector.isNull[0]) { + + // We have a repeated value but we only need to evaluate once for MIN/MAX. + final double repeatedMax = doubleColVector.vector[0]; + + if (isNull) { + max = repeatedMax; + isNull = false; + } else if (repeatedMax > max) { + max = repeatedMax; + } + outputVector[0] = max; + } else if (isNull) { + outputColVector.isNull[0] = true; + outputColVector.noNulls = false; + } else { + + // Continue previous MAX. + outputVector[0] = max; + } + outputColVector.isRepeating = true; + } else if (doubleColVector.noNulls) { + double[] vector = doubleColVector.vector; + for (int i = 0; i < size; i++) { + final double value = vector[i]; + if (isNull) { + max = value; + isNull = false; + } else if (value > max) { + max = value; + } + outputVector[i] = max; + } + } else { + boolean[] batchIsNull = doubleColVector.isNull; + int i = 0; + while (batchIsNull[i]) { + if (isNull) { + outputColVector.isNull[i] = true; + outputColVector.noNulls = false; + } else { + + // Continue previous MAX. + outputVector[i] = max; + } + if (++i >= size) { + return; + } + } + + double[] vector = doubleColVector.vector; + + final double firstValue = vector[i]; + if (isNull) { + max = firstValue; + isNull = false; + } else if (firstValue > max) { + max = firstValue; + } + + // Output row i max. + outputVector[i++] = max; + + for (; i < size; i++) { + if (!batchIsNull[i]) { + final double value = vector[i]; + if (isNull) { + max = value; + isNull = false; + } else if (value > max) { + max = value; + } + outputVector[i] = max; + } else { + + // Continue previous MAX. + outputVector[i] = max; + } + } + } + } + + @Override + public boolean streamsResult() { + // No group value. + return true; + } + + @Override + public Type getResultColumnVectorType() { + return Type.DOUBLE; + } + + @Override + public void resetEvaluator() { + isNull = true; + max = 0.0; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDoubleMin.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDoubleMin.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDoubleMin.java new file mode 100644 index 0000000..9ac197d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDoubleMin.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.ptf; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; + +import com.google.common.base.Preconditions; + +/** + * This class evaluates double min() for a PTF group. + */ +public class VectorPTFEvaluatorStreamingDoubleMin extends VectorPTFEvaluatorBase { + + protected boolean isNull; + protected double min; + + public VectorPTFEvaluatorStreamingDoubleMin(WindowFrameDef windowFrameDef, VectorExpression inputVecExpr, + int outputColumnNum) { + super(windowFrameDef, inputVecExpr, outputColumnNum); + resetEvaluator(); + } + + @Override + public void evaluateGroupBatch(VectorizedRowBatch batch) + throws HiveException { + + evaluateInputExpr(batch); + + // Determine minimum of all non-null double column values; maintain isNull. + + // We do not filter when PTF is in reducer. + Preconditions.checkState(!batch.selectedInUse); + + final int size = batch.size; + if (size == 0) { + return; + } + DoubleColumnVector doubleColVector = ((DoubleColumnVector) batch.cols[inputColumnNum]); + + DoubleColumnVector outputColVector = (DoubleColumnVector) batch.cols[outputColumnNum]; + double[] outputVector = outputColVector.vector; + + if (doubleColVector.isRepeating) { + + if (doubleColVector.noNulls || !doubleColVector.isNull[0]) { + + // We have a repeated value but we only need to evaluate once for MIN/MAX. + final double repeatedMin = doubleColVector.vector[0]; + + if (isNull) { + min = repeatedMin; + isNull = false; + } else if (repeatedMin < min) { + min = repeatedMin; + } + outputVector[0] = min; + } else if (isNull) { + outputColVector.isNull[0] = true; + outputColVector.noNulls = false; + } else { + + // Continue previous MIN. + outputVector[0] = min; + } + outputColVector.isRepeating = true; + } else if (doubleColVector.noNulls) { + double[] vector = doubleColVector.vector; + for (int i = 0; i < size; i++) { + final double value = vector[i]; + if (isNull) { + min = value; + isNull = false; + } else if (value < min) { + min = value; + } + outputVector[i] = min; + } + } else { + boolean[] batchIsNull = doubleColVector.isNull; + int i = 0; + while (batchIsNull[i]) { + if (isNull) { + outputColVector.isNull[i] = true; + outputColVector.noNulls = false; + } else { + + // Continue previous MIN. + outputVector[i] = min; + } + if (++i >= size) { + return; + } + } + + double[] vector = doubleColVector.vector; + + final double firstValue = vector[i]; + if (isNull) { + min = firstValue; + isNull = false; + } else if (firstValue < min) { + min = firstValue; + } + + // Output row i min. + outputVector[i++] = min; + + for (; i < size; i++) { + if (!batchIsNull[i]) { + final double value = vector[i]; + if (isNull) { + min = value; + isNull = false; + } else if (value < min) { + min = value; + } + + // Output row i min. + outputVector[i] = min; + } else { + + // Continue previous MIN. + outputVector[i] = min; + } + } + } + } + + @Override + public boolean streamsResult() { + // No group value. + return true; + } + + @Override + public Type getResultColumnVectorType() { + return Type.DOUBLE; + } + + @Override + public void resetEvaluator() { + isNull = true; + min = 0.0; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDoubleSum.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDoubleSum.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDoubleSum.java new file mode 100644 index 0000000..8f17663 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingDoubleSum.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.ptf; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; + +import com.google.common.base.Preconditions; + +/** + * This class evaluates double sum() for a PTF group. + */ +public class VectorPTFEvaluatorStreamingDoubleSum extends VectorPTFEvaluatorBase { + + protected boolean isNull; + protected double sum; + + public VectorPTFEvaluatorStreamingDoubleSum(WindowFrameDef windowFrameDef, VectorExpression inputVecExpr, + int outputColumnNum) { + super(windowFrameDef, inputVecExpr, outputColumnNum); + resetEvaluator(); + } + + @Override + public void evaluateGroupBatch(VectorizedRowBatch batch) + throws HiveException { + + evaluateInputExpr(batch); + + // We do not filter when PTF is in reducer. + Preconditions.checkState(!batch.selectedInUse); + + final int size = batch.size; + if (size == 0) { + return; + } + DoubleColumnVector doubleColVector = ((DoubleColumnVector) batch.cols[inputColumnNum]); + + DoubleColumnVector outputColVector = (DoubleColumnVector) batch.cols[outputColumnNum]; + double[] outputVector = outputColVector.vector; + + if (doubleColVector.isRepeating) { + + if (doubleColVector.noNulls || !doubleColVector.isNull[0]) { + + // We have a repeated value. + isNull = false; + final double repeatedValue = doubleColVector.vector[0]; + + for (int i = 0; i < size; i++) { + sum += repeatedValue; + + // Output row i SUM. + outputVector[i] = sum; + } + } else { + if (isNull) { + outputColVector.isNull[0] = true; + outputColVector.noNulls = false; + } else { + + // Continue previous SUM. + outputVector[0] = sum; + } + outputColVector.isRepeating = true; + } + } else if (doubleColVector.noNulls) { + isNull = false; + double[] vector = doubleColVector.vector; + for (int i = 0; i < size; i++) { + sum += vector[i]; + + // Output row i SUM. + outputVector[i] = sum; + } + } else { + boolean[] batchIsNull = doubleColVector.isNull; + int i = 0; + while (batchIsNull[i]) { + if (isNull) { + outputColVector.isNull[i] = true; + outputColVector.noNulls = false; + } else { + + // Continue previous SUM. + outputVector[i] = sum; + } + if (++i >= size) { + return; + } + } + + isNull = false; + double[] vector = doubleColVector.vector; + + sum += vector[i]; + + // Output row i sum. + outputVector[i++] = sum; + + for (; i < size; i++) { + if (!batchIsNull[i]) { + sum += vector[i]; + + // Output row i sum. + outputVector[i] = sum; + } else { + + // Continue previous SUM. + outputVector[i] = sum; + } + } + } + } + + @Override + public boolean streamsResult() { + // No group value. + return true; + } + + @Override + public Type getResultColumnVectorType() { + return Type.DOUBLE; + } + + @Override + public void resetEvaluator() { + isNull = true; + sum = 0.0; + } +} \ No newline at end of file
