Repository: spark
Updated Branches:
refs/heads/branch-1.3 cba684268 -> 93975a378
[SPARK-5680][SQL] Sum function on all null values, should return zero
SELECT sum('a'), avg('a'), variance('a'), std('a') FROM src;
Should give output as
0.0 NULL NULL NULL
This fixes hive udaf_number_format.q
Author: Venkata Ramana G <ramana.gollamudihuawei.com>
Author: Venkata Ramana Gollamudi <[email protected]>
Closes #4466 from gvramana/sum_fix and squashes the following commits:
42e14d1 [Venkata Ramana Gollamudi] Added comments
39415c0 [Venkata Ramana Gollamudi] Handled the partitioned Sum expression
scenario
df66515 [Venkata Ramana Gollamudi] code style fix
4be2606 [Venkata Ramana Gollamudi] Add udaf_number_format to whitelist and
golden answer
330fd64 [Venkata Ramana Gollamudi] fix sum function for all null data
(cherry picked from commit ee569a0c7171d149eee52877def902378eaf695e)
Signed-off-by: Michael Armbrust <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/93975a37
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/93975a37
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/93975a37
Branch: refs/heads/branch-1.3
Commit: 93975a3786fbf4581553b347fa56fb2b7da6f861
Parents: cba6842
Author: Venkata Ramana Gollamudi <[email protected]>
Authored: Sat Mar 21 13:24:24 2015 -0700
Committer: Michael Armbrust <[email protected]>
Committed: Sat Mar 21 13:24:53 2015 -0700
----------------------------------------------------------------------
.../sql/catalyst/expressions/aggregates.scala | 68 +++++++++++++++++++-
.../hive/execution/HiveCompatibilitySuite.scala | 1 +
...er_format-0-eff4ef3c207d14d5121368f294697964 | 0
...er_format-1-4a03c4328565c60ca99689239f07fb16 | 1 +
4 files changed, 67 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/93975a37/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
index 735b748..5297d1e 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
@@ -346,13 +346,13 @@ case class Sum(child: Expression) extends
PartialAggregate with trees.UnaryNode[
case DecimalType.Fixed(_, _) =>
val partialSum = Alias(Sum(Cast(child, DecimalType.Unlimited)),
"PartialSum")()
SplitEvaluation(
- Cast(Sum(partialSum.toAttribute), dataType),
+ Cast(CombineSum(partialSum.toAttribute), dataType),
partialSum :: Nil)
case _ =>
val partialSum = Alias(Sum(child), "PartialSum")()
SplitEvaluation(
- Sum(partialSum.toAttribute),
+ CombineSum(partialSum.toAttribute),
partialSum :: Nil)
}
}
@@ -360,6 +360,30 @@ case class Sum(child: Expression) extends PartialAggregate
with trees.UnaryNode[
override def newInstance() = new SumFunction(child, this)
}
+/**
+ * Sum should satisfy 3 cases:
+ * 1) sum of all null values = zero
+ * 2) sum for table column with no data = null
+ * 3) sum of column with null and not null values = sum of not null values
+ * Require separate CombineSum Expression and function as it has to
distinguish "No data" case
+ * versus "data equals null" case, while aggregating results and at each
partial expression.i.e.,
+ * Combining PartitionLevel InputData
+ * <-- null
+ * Zero <-- Zero <-- null
+ *
+ * <-- null <-- no data
+ * null <-- null <-- no data
+ */
+case class CombineSum(child: Expression) extends AggregateExpression {
+ def this() = this(null)
+
+ override def children = child :: Nil
+ override def nullable = true
+ override def dataType = child.dataType
+ override def toString = s"CombineSum($child)"
+ override def newInstance() = new CombineSumFunction(child, this)
+}
+
case class SumDistinct(child: Expression)
extends PartialAggregate with trees.UnaryNode[Expression] {
@@ -565,7 +589,8 @@ case class SumFunction(expr: Expression, base:
AggregateExpression) extends Aggr
private val sum = MutableLiteral(null, calcType)
- private val addFunction = Coalesce(Seq(Add(Coalesce(Seq(sum, zero)),
Cast(expr, calcType)), sum))
+ private val addFunction =
+ Coalesce(Seq(Add(Coalesce(Seq(sum, zero)), Cast(expr, calcType)), sum,
zero))
override def update(input: Row): Unit = {
sum.update(addFunction, input)
@@ -580,6 +605,43 @@ case class SumFunction(expr: Expression, base:
AggregateExpression) extends Aggr
}
}
+case class CombineSumFunction(expr: Expression, base: AggregateExpression)
+ extends AggregateFunction {
+
+ def this() = this(null, null) // Required for serialization.
+
+ private val calcType =
+ expr.dataType match {
+ case DecimalType.Fixed(_, _) =>
+ DecimalType.Unlimited
+ case _ =>
+ expr.dataType
+ }
+
+ private val zero = Cast(Literal(0), calcType)
+
+ private val sum = MutableLiteral(null, calcType)
+
+ private val addFunction =
+ Coalesce(Seq(Add(Coalesce(Seq(sum, zero)), Cast(expr, calcType)), sum,
zero))
+
+ override def update(input: Row): Unit = {
+ val result = expr.eval(input)
+ // partial sum result can be null only when no input rows present
+ if(result != null) {
+ sum.update(addFunction, input)
+ }
+ }
+
+ override def eval(input: Row): Any = {
+ expr.dataType match {
+ case DecimalType.Fixed(_, _) =>
+ Cast(sum, dataType).eval(null)
+ case _ => sum.eval(null)
+ }
+ }
+}
+
case class SumDistinctFunction(expr: Expression, base: AggregateExpression)
extends AggregateFunction {
http://git-wip-us.apache.org/repos/asf/spark/blob/93975a37/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git
a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index 6126ce7..122b6f2 100644
---
a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++
b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -795,6 +795,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with
BeforeAndAfter {
"udaf_covar_pop",
"udaf_covar_samp",
"udaf_histogram_numeric",
+ "udaf_number_format",
"udf2",
"udf5",
"udf6",
http://git-wip-us.apache.org/repos/asf/spark/blob/93975a37/sql/hive/src/test/resources/golden/udaf_number_format-0-eff4ef3c207d14d5121368f294697964
----------------------------------------------------------------------
diff --git
a/sql/hive/src/test/resources/golden/udaf_number_format-0-eff4ef3c207d14d5121368f294697964
b/sql/hive/src/test/resources/golden/udaf_number_format-0-eff4ef3c207d14d5121368f294697964
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/spark/blob/93975a37/sql/hive/src/test/resources/golden/udaf_number_format-1-4a03c4328565c60ca99689239f07fb16
----------------------------------------------------------------------
diff --git
a/sql/hive/src/test/resources/golden/udaf_number_format-1-4a03c4328565c60ca99689239f07fb16
b/sql/hive/src/test/resources/golden/udaf_number_format-1-4a03c4328565c60ca99689239f07fb16
new file mode 100644
index 0000000..c6f275a
--- /dev/null
+++
b/sql/hive/src/test/resources/golden/udaf_number_format-1-4a03c4328565c60ca99689239f07fb16
@@ -0,0 +1 @@
+0.0 NULL NULL NULL
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]