Repository: hive
Updated Branches:
  refs/heads/master 0ec6e8893 -> 11f1e47eb


HIVE-12944: Support SUM(DISTINCT) for partitioning query. (Aihua Xu, reviewed 
by Szehon Ho)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/11f1e47e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/11f1e47e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/11f1e47e

Branch: refs/heads/master
Commit: 11f1e47ebadc7cba24e1fb9f0dfbfaf7f786d2cb
Parents: 0ec6e88
Author: Aihua Xu <aihu...@apache.org>
Authored: Wed Jan 27 11:25:00 2016 -0500
Committer: Aihua Xu <aihu...@apache.org>
Committed: Mon Feb 1 10:15:01 2016 -0500

----------------------------------------------------------------------
 .../functions/HiveSqlSumAggFunction.java        |   3 +
 .../translator/SqlFunctionConverter.java        |   6 +
 .../hive/ql/udf/generic/GenericUDAFCount.java   |   3 -
 .../hive/ql/udf/generic/GenericUDAFSum.java     | 110 ++++++++++++++-----
 .../queries/clientpositive/windowing_distinct.q |   8 ++
 .../clientnegative/invalid_sum_syntax.q.out     |   2 +-
 .../clientpositive/windowing_distinct.q.out     |  26 +++++
 7 files changed, 125 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/11f1e47e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlSumAggFunction.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlSumAggFunction.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlSumAggFunction.java
index 056eaeb..498cd0e 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlSumAggFunction.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlSumAggFunction.java
@@ -71,6 +71,9 @@ public class HiveSqlSumAggFunction extends SqlAggFunction {
 
   //~ Methods ----------------------------------------------------------------
 
+  public boolean isDistinct() {
+    return isDistinct;
+  }
 
   @Override
   public <T> T unwrap(Class<T> clazz) {

http://git-wip-us.apache.org/repos/asf/hive/blob/11f1e47e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/SqlFunctionConverter.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/SqlFunctionConverter.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/SqlFunctionConverter.java
index 75c38fa..19aa414 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/SqlFunctionConverter.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/SqlFunctionConverter.java
@@ -229,6 +229,12 @@ public class SqlFunctionConverter {
                     "TOK_FUNCTIONDI");
               }
             }
+          } else if (op instanceof HiveSqlSumAggFunction) { // case 
SUM(DISTINCT)
+            HiveSqlSumAggFunction sumFunction = (HiveSqlSumAggFunction) op;
+            if (sumFunction.isDistinct()) {
+              node = (ASTNode) 
ParseDriver.adaptor.create(HiveParser.TOK_FUNCTIONDI,
+                  "TOK_FUNCTIONDI");
+            }
           }
           node.addChild((ASTNode) 
ParseDriver.adaptor.create(HiveParser.Identifier, op.getName()));
         }

http://git-wip-us.apache.org/repos/asf/hive/blob/11f1e47e/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java
index f526c43..2825045 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java
@@ -17,13 +17,11 @@
  */
 package org.apache.hadoop.hive.ql.udf.generic;
 
-import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.hive.ql.exec.Description;
 import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.util.JavaDataModel;
-import org.apache.hadoop.hive.serde2.lazy.LazyString;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
@@ -31,7 +29,6 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspect
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
 
 /**
  * This class implements the COUNT aggregation function as in SQL.

http://git-wip-us.apache.org/repos/asf/hive/blob/11f1e47e/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java
index 0968008..7b1d6e5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java
@@ -24,14 +24,14 @@ import org.apache.hadoop.hive.ql.exec.Description;
 import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec;
-import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef;
 import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
 import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
@@ -87,6 +87,17 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
     }
   }
 
+  @Override
+  public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info)
+      throws SemanticException {
+    TypeInfo[] parameters = info.getParameters();
+
+    GenericUDAFSumEvaluator eval = (GenericUDAFSumEvaluator) 
getEvaluator(parameters);
+    eval.setSumDistinct(info.isDistinct());
+
+    return eval;
+  }
+
   public static PrimitiveObjectInspector.PrimitiveCategory 
getReturnType(TypeInfo type) {
     if (type.getCategory() != ObjectInspector.Category.PRIMITIVE) {
       return null;
@@ -111,12 +122,54 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
   }
 
   /**
+   * The base type for sum operator evaluator
+   *
+   */
+  public static abstract class GenericUDAFSumEvaluator<ResultType> extends 
GenericUDAFEvaluator {
+    static abstract class SumAgg<T> extends AbstractAggregationBuffer {
+      boolean empty;
+      T sum;
+      Object previousValue = null;
+    }
+
+    protected PrimitiveObjectInspector inputOI;
+    protected ObjectInspector outputOI;
+    protected ResultType result;
+    protected boolean sumDistinct;
+
+    public boolean sumDistinct() {
+      return sumDistinct;
+    }
+
+    public void setSumDistinct(boolean sumDistinct) {
+      this.sumDistinct = sumDistinct;
+    }
+
+    /**
+     * Check if the input object is the same as the previous one for the case 
of
+     * SUM(DISTINCT).
+     * @param input the input object
+     * @return True if sumDistinct is false or the input is different from the 
previous object
+     */
+    protected boolean checkDistinct(SumAgg agg, Object input) {
+      if (this.sumDistinct &&
+          ObjectInspectorUtils.compare(input, inputOI, agg.previousValue, 
outputOI) == 0) {
+        return false;
+      }
+
+      agg.previousValue = ObjectInspectorUtils.copyToStandardObject(
+          input, inputOI, ObjectInspectorCopyOption.JAVA);
+      return true;
+    }
+
+
+  }
+
+  /**
    * GenericUDAFSumHiveDecimal.
    *
    */
-  public static class GenericUDAFSumHiveDecimal extends GenericUDAFEvaluator {
-    private PrimitiveObjectInspector inputOI;
-    private HiveDecimalWritable result;
+  public static class GenericUDAFSumHiveDecimal extends 
GenericUDAFSumEvaluator<HiveDecimalWritable> {
 
     @Override
     public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws 
HiveException {
@@ -124,6 +177,8 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
       super.init(m, parameters);
       result = new HiveDecimalWritable(HiveDecimal.ZERO);
       inputOI = (PrimitiveObjectInspector) parameters[0];
+      outputOI = ObjectInspectorUtils.getStandardObjectInspector(inputOI,
+          ObjectInspectorCopyOption.JAVA);
       // The output precision is 10 greater than the input which should cover 
at least
       // 10b rows. The scale is the same as the input.
       DecimalTypeInfo outputTypeInfo = null;
@@ -138,9 +193,7 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
 
     /** class for storing decimal sum value. */
     @AggregationType(estimable = false) // hard to know exactly for decimals
-    static class SumHiveDecimalAgg extends AbstractAggregationBuffer {
-      boolean empty;
-      HiveDecimal sum;
+    static class SumHiveDecimalAgg extends SumAgg<HiveDecimal> {
     }
 
     @Override
@@ -152,7 +205,7 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
 
     @Override
     public void reset(AggregationBuffer agg) throws HiveException {
-      SumHiveDecimalAgg bdAgg = (SumHiveDecimalAgg) agg;
+      SumAgg<HiveDecimal> bdAgg = (SumAgg<HiveDecimal>) agg;
       bdAgg.empty = true;
       bdAgg.sum = HiveDecimal.ZERO;
     }
@@ -163,7 +216,9 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
     public void iterate(AggregationBuffer agg, Object[] parameters) throws 
HiveException {
       assert (parameters.length == 1);
       try {
-        merge(agg, parameters[0]);
+        if (checkDistinct((SumAgg) agg, parameters[0])) {
+          merge(agg, parameters[0]);
+        }
       } catch (NumberFormatException e) {
         if (!warned) {
           warned = true;
@@ -239,24 +294,21 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
    * GenericUDAFSumDouble.
    *
    */
-  public static class GenericUDAFSumDouble extends GenericUDAFEvaluator {
-    private PrimitiveObjectInspector inputOI;
-    private DoubleWritable result;
-
+  public static class GenericUDAFSumDouble extends 
GenericUDAFSumEvaluator<DoubleWritable> {
     @Override
     public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws 
HiveException {
       assert (parameters.length == 1);
       super.init(m, parameters);
       result = new DoubleWritable(0);
       inputOI = (PrimitiveObjectInspector) parameters[0];
+      outputOI = ObjectInspectorUtils.getStandardObjectInspector(inputOI,
+          ObjectInspectorCopyOption.JAVA);
       return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
     }
 
     /** class for storing double sum value. */
     @AggregationType(estimable = true)
-    static class SumDoubleAgg extends AbstractAggregationBuffer {
-      boolean empty;
-      double sum;
+    static class SumDoubleAgg extends SumAgg<Double> {
       @Override
       public int estimate() { return JavaDataModel.PRIMITIVES1 + 
JavaDataModel.PRIMITIVES2; }
     }
@@ -272,7 +324,7 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
     public void reset(AggregationBuffer agg) throws HiveException {
       SumDoubleAgg myagg = (SumDoubleAgg) agg;
       myagg.empty = true;
-      myagg.sum = 0;
+      myagg.sum = 0.0;
     }
 
     boolean warned = false;
@@ -281,7 +333,9 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
     public void iterate(AggregationBuffer agg, Object[] parameters) throws 
HiveException {
       assert (parameters.length == 1);
       try {
-        merge(agg, parameters[0]);
+        if (checkDistinct((SumAgg) agg, parameters[0])) {
+          merge(agg, parameters[0]);
+        }
       } catch (NumberFormatException e) {
         if (!warned) {
           warned = true;
@@ -354,24 +408,21 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
    * GenericUDAFSumLong.
    *
    */
-  public static class GenericUDAFSumLong extends GenericUDAFEvaluator {
-    private PrimitiveObjectInspector inputOI;
-    protected LongWritable result;
-
+  public static class GenericUDAFSumLong extends 
GenericUDAFSumEvaluator<LongWritable> {
     @Override
     public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws 
HiveException {
       assert (parameters.length == 1);
       super.init(m, parameters);
       result = new LongWritable(0);
       inputOI = (PrimitiveObjectInspector) parameters[0];
+      outputOI = ObjectInspectorUtils.getStandardObjectInspector(inputOI,
+          ObjectInspectorCopyOption.JAVA);
       return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
     }
 
     /** class for storing double sum value. */
     @AggregationType(estimable = true)
-    static class SumLongAgg extends AbstractAggregationBuffer {
-      boolean empty;
-      long sum;
+    static class SumLongAgg extends SumAgg<Long> {
       @Override
       public int estimate() { return JavaDataModel.PRIMITIVES1 + 
JavaDataModel.PRIMITIVES2; }
     }
@@ -387,7 +438,7 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
     public void reset(AggregationBuffer agg) throws HiveException {
       SumLongAgg myagg = (SumLongAgg) agg;
       myagg.empty = true;
-      myagg.sum = 0;
+      myagg.sum = 0L;
     }
 
     private boolean warned = false;
@@ -396,7 +447,9 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
     public void iterate(AggregationBuffer agg, Object[] parameters) throws 
HiveException {
       assert (parameters.length == 1);
       try {
-        merge(agg, parameters[0]);
+        if (checkDistinct((SumAgg) agg, parameters[0])) {
+          merge(agg, parameters[0]);
+        }
       } catch (NumberFormatException e) {
         if (!warned) {
           warned = true;
@@ -460,5 +513,4 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
       };
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/11f1e47e/ql/src/test/queries/clientpositive/windowing_distinct.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/windowing_distinct.q 
b/ql/src/test/queries/clientpositive/windowing_distinct.q
index 94f4044..9f6ddfd 100644
--- a/ql/src/test/queries/clientpositive/windowing_distinct.q
+++ b/ql/src/test/queries/clientpositive/windowing_distinct.q
@@ -28,3 +28,11 @@ SELECT COUNT(DISTINCT t) OVER (PARTITION BY index),
        COUNT(DISTINCT dec) OVER (PARTITION BY index),
        COUNT(DISTINCT bin) OVER (PARTITION BY index)
 FROM windowing_distinct;
+
+SELECT SUM(DISTINCT t) OVER (PARTITION BY index),
+       SUM(DISTINCT d) OVER (PARTITION BY index),
+       SUM(DISTINCT s) OVER (PARTITION BY index),
+       SUM(DISTINCT concat('Mr.', s)) OVER (PARTITION BY index),
+       SUM(DISTINCT ts) OVER (PARTITION BY index),
+       SUM(DISTINCT dec) OVER (PARTITION BY index)
+FROM windowing_distinct;

http://git-wip-us.apache.org/repos/asf/hive/blob/11f1e47e/ql/src/test/results/clientnegative/invalid_sum_syntax.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/invalid_sum_syntax.q.out 
b/ql/src/test/results/clientnegative/invalid_sum_syntax.q.out
index 28d65d7..346bca1 100644
--- a/ql/src/test/results/clientnegative/invalid_sum_syntax.q.out
+++ b/ql/src/test/results/clientnegative/invalid_sum_syntax.q.out
@@ -1 +1 @@
-FAILED: SemanticException The specified syntax for UDAF invocation is invalid.
+FAILED: UDFArgumentTypeException Exactly one argument is expected.

http://git-wip-us.apache.org/repos/asf/hive/blob/11f1e47e/ql/src/test/results/clientpositive/windowing_distinct.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/windowing_distinct.q.out 
b/ql/src/test/results/clientpositive/windowing_distinct.q.out
index 50f8ff8..0858f0f 100644
--- a/ql/src/test/results/clientpositive/windowing_distinct.q.out
+++ b/ql/src/test/results/clientpositive/windowing_distinct.q.out
@@ -76,3 +76,29 @@ POSTHOOK: Input: default@windowing_distinct
 2      2       2       2       2       2       2       2
 2      2       2       2       2       2       2       2
 2      2       2       2       2       2       2       2
+PREHOOK: query: SELECT SUM(DISTINCT t) OVER (PARTITION BY index),
+       SUM(DISTINCT d) OVER (PARTITION BY index),
+       SUM(DISTINCT s) OVER (PARTITION BY index),
+       SUM(DISTINCT concat('Mr.', s)) OVER (PARTITION BY index),
+       SUM(DISTINCT ts) OVER (PARTITION BY index),
+       SUM(DISTINCT dec) OVER (PARTITION BY index)
+FROM windowing_distinct
+PREHOOK: type: QUERY
+PREHOOK: Input: default@windowing_distinct
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT SUM(DISTINCT t) OVER (PARTITION BY index),
+       SUM(DISTINCT d) OVER (PARTITION BY index),
+       SUM(DISTINCT s) OVER (PARTITION BY index),
+       SUM(DISTINCT concat('Mr.', s)) OVER (PARTITION BY index),
+       SUM(DISTINCT ts) OVER (PARTITION BY index),
+       SUM(DISTINCT dec) OVER (PARTITION BY index)
+FROM windowing_distinct
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@windowing_distinct
+#### A masked pattern was here ####
+54     56.63   0.0     0.0     2.724315837406296E9     57
+54     56.63   0.0     0.0     2.724315837406296E9     57
+54     56.63   0.0     0.0     2.724315837406296E9     57
+235    77.42   0.0     0.0     2.724315837406612E9     69
+235    77.42   0.0     0.0     2.724315837406612E9     69
+235    77.42   0.0     0.0     2.724315837406612E9     69

Reply via email to