[FLINK-3971] [tableAPI] Fix handling of null values in aggregations. This closes #2049
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fdf43609 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fdf43609 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fdf43609 Branch: refs/heads/master Commit: fdf43609977c807deb3bb81bf1095efb721fe688 Parents: e0b9e8d Author: gallenvara <[email protected]> Authored: Mon May 30 18:30:07 2016 +0800 Committer: Fabian Hueske <[email protected]> Committed: Tue Jun 14 15:05:32 2016 +0200 ---------------------------------------------------------------------- .../table/runtime/aggregate/AvgAggregate.scala | 60 ++++++++++++++++---- .../table/runtime/aggregate/MaxAggregate.scala | 42 ++++++-------- .../table/runtime/aggregate/MinAggregate.scala | 44 +++++++------- .../table/runtime/aggregate/SumAggregate.scala | 12 +++- .../runtime/aggregate/AvgAggregateTest.scala | 11 +++- .../runtime/aggregate/CountAggregateTest.scala | 5 +- .../runtime/aggregate/MaxAggregateTest.scala | 25 +++++++- .../runtime/aggregate/MinAggregateTest.scala | 25 +++++++- .../runtime/aggregate/SumAggregateTest.scala | 14 ++++- 9 files changed, 165 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fdf43609/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala index 8cf181a..e724648 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala @@ -59,11 +59,17 @@ abstract class IntegralAvgAggregate[T] extends AvgAggregate[T] { buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, bufferCount)) } + override def evaluate(buffer : Row): T = { + doEvaluate(buffer).asInstanceOf[T] + } + override def intermediateDataType = Array( BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO) def doPrepare(value: Any, partial: Row): Unit + + def doEvaluate(buffer: Row): Any } class ByteAvgAggregate extends IntegralAvgAggregate[Byte] { @@ -73,10 +79,14 @@ class ByteAvgAggregate extends IntegralAvgAggregate[Byte] { partial.setField(partialCountIndex, 1L) } - override def evaluate(buffer: Row): Byte = { + override def doEvaluate(buffer: Row): Any = { val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Long] val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long] - (bufferSum / bufferCount).toByte + if (bufferCount == 0L) { + null + } else { + (bufferSum / bufferCount).toByte + } } } @@ -88,10 +98,14 @@ class ShortAvgAggregate extends IntegralAvgAggregate[Short] { partial.setField(partialCountIndex, 1L) } - override def evaluate(buffer: Row): Short = { + override def doEvaluate(buffer: Row): Any = { val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Long] val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long] - (bufferSum / bufferCount).toShort + if (bufferCount == 0L) { + null + } else { + (bufferSum / bufferCount).toShort + } } } @@ -103,10 +117,14 @@ class IntAvgAggregate extends IntegralAvgAggregate[Int] { partial.setField(partialCountIndex, 1L) } - override def evaluate(buffer: Row): Int = { + override def doEvaluate(buffer: Row): Any = { val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Long] val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long] - (bufferSum / bufferCount).toInt + if (bufferCount == 0L) { + null + } else { + (bufferSum / bufferCount).toInt + } } } @@ -145,10 +163,14 @@ class LongAvgAggregate extends IntegralAvgAggregate[Long] { buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, bufferCount)) } - override def evaluate(buffer: Row): Long = { + override def doEvaluate(buffer: Row): Any = { val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[BigInteger] val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long] - bufferSum.divide(BigInteger.valueOf(bufferCount)).longValue() + if (bufferCount == 0L) { + null + } else { + bufferSum.divide(BigInteger.valueOf(bufferCount)).longValue() + } } } @@ -178,11 +200,17 @@ abstract class FloatingAvgAggregate[T: Numeric] extends AvgAggregate[T] { buffer.setField(partialCountIndex, partialCount + bufferCount) } + override def evaluate(buffer : Row): T = { + doEvaluate(buffer).asInstanceOf[T] + } + override def intermediateDataType = Array( BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO) def doPrepare(value: Any, partial: Row): Unit + + def doEvaluate(buffer: Row): Any } class FloatAvgAggregate extends FloatingAvgAggregate[Float] { @@ -194,10 +222,14 @@ class FloatAvgAggregate extends FloatingAvgAggregate[Float] { } - override def evaluate(buffer: Row): Float = { + override def doEvaluate(buffer: Row): Any = { val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Double] val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long] - (bufferSum / bufferCount).toFloat + if (bufferCount == 0L) { + null + } else { + (bufferSum / bufferCount).toFloat + } } } @@ -209,9 +241,13 @@ class DoubleAvgAggregate extends FloatingAvgAggregate[Double] { partial.setField(partialCountIndex, 1L) } - override def evaluate(buffer: Row): Double = { + override def doEvaluate(buffer: Row): Any = { val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Double] val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long] - (bufferSum / bufferCount) + if (bufferCount == 0L) { + null + } else { + (bufferSum / bufferCount) + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/fdf43609/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala index 9ad0468..b9b86d1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala @@ -25,6 +25,15 @@ abstract class MaxAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] { protected var maxIndex = -1 /** + * Initiate the intermediate aggregate value in Row. + * + * @param intermediate The intermediate aggregate row to initiate. + */ + override def initiate(intermediate: Row): Unit = { + intermediate.setField(maxIndex, null) + } + + /** * Accessed in MapFunction, prepare the input of partial aggregate. * * @param value @@ -47,9 +56,15 @@ abstract class MaxAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] { */ override def merge(intermediate: Row, buffer: Row): Unit = { val partialValue = intermediate.productElement(maxIndex).asInstanceOf[T] - val bufferValue = buffer.productElement(maxIndex).asInstanceOf[T] - val max: T = if (ord.compare(partialValue, bufferValue) > 0) partialValue else bufferValue - buffer.setField(maxIndex, max) + if (partialValue != null) { + val bufferValue = buffer.productElement(maxIndex).asInstanceOf[T] + if (bufferValue != null) { + val max : T = if (ord.compare(partialValue, bufferValue) > 0) partialValue else bufferValue + buffer.setField(maxIndex, max) + } else { + buffer.setField(maxIndex, partialValue) + } + } } /** @@ -73,61 +88,40 @@ class ByteMaxAggregate extends MaxAggregate[Byte] { override def intermediateDataType = Array(BasicTypeInfo.BYTE_TYPE_INFO) - override def initiate(intermediate: Row): Unit = { - intermediate.setField(maxIndex, Byte.MinValue) - } } class ShortMaxAggregate extends MaxAggregate[Short] { override def intermediateDataType = Array(BasicTypeInfo.SHORT_TYPE_INFO) - override def initiate(intermediate: Row): Unit = { - intermediate.setField(maxIndex, Short.MinValue) - } } class IntMaxAggregate extends MaxAggregate[Int] { override def intermediateDataType = Array(BasicTypeInfo.INT_TYPE_INFO) - override def initiate(intermediate: Row): Unit = { - intermediate.setField(maxIndex, Int.MinValue) - } } class LongMaxAggregate extends MaxAggregate[Long] { override def intermediateDataType = Array(BasicTypeInfo.LONG_TYPE_INFO) - override def initiate(intermediate: Row): Unit = { - intermediate.setField(maxIndex, Long.MinValue) - } } class FloatMaxAggregate extends MaxAggregate[Float] { override def intermediateDataType = Array(BasicTypeInfo.FLOAT_TYPE_INFO) - override def initiate(intermediate: Row): Unit = { - intermediate.setField(maxIndex, Float.MinValue) - } } class DoubleMaxAggregate extends MaxAggregate[Double] { override def intermediateDataType = Array(BasicTypeInfo.DOUBLE_TYPE_INFO) - override def initiate(intermediate: Row): Unit = { - intermediate.setField(maxIndex, Double.MinValue) - } } class BooleanMaxAggregate extends MaxAggregate[Boolean] { override def intermediateDataType = Array(BasicTypeInfo.BOOLEAN_TYPE_INFO) - override def initiate(intermediate: Row): Unit = { - intermediate.setField(maxIndex, false) - } } http://git-wip-us.apache.org/repos/asf/flink/blob/fdf43609/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala index b607e6b..5d656f4 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala @@ -20,11 +20,20 @@ package org.apache.flink.api.table.runtime.aggregate import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.api.table.Row -abstract class MinAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T]{ +abstract class MinAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] { protected var minIndex: Int = _ /** + * Initiate the intermediate aggregate value in Row. + * + * @param intermediate The intermediate aggregate row to initiate. + */ + override def initiate(intermediate: Row): Unit = { + intermediate.setField(minIndex, null) + } + + /** * Accessed in MapFunction, prepare the input of partial aggregate. * * @param value @@ -47,9 +56,15 @@ abstract class MinAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T]{ */ override def merge(partial: Row, buffer: Row): Unit = { val partialValue = partial.productElement(minIndex).asInstanceOf[T] - val bufferValue = buffer.productElement(minIndex).asInstanceOf[T] - val min: T = if (ord.compare(partialValue, bufferValue) < 0) partialValue else bufferValue - buffer.setField(minIndex, min) + if (partialValue != null) { + val bufferValue = buffer.productElement(minIndex).asInstanceOf[T] + if (bufferValue != null) { + val min : T = if (ord.compare(partialValue, bufferValue) < 0) partialValue else bufferValue + buffer.setField(minIndex, min) + } else { + buffer.setField(minIndex, partialValue) + } + } } /** @@ -73,61 +88,40 @@ class ByteMinAggregate extends MinAggregate[Byte] { override def intermediateDataType = Array(BasicTypeInfo.BYTE_TYPE_INFO) - override def initiate(intermediate: Row): Unit = { - intermediate.setField(minIndex, Byte.MaxValue) - } } class ShortMinAggregate extends MinAggregate[Short] { override def intermediateDataType = Array(BasicTypeInfo.SHORT_TYPE_INFO) - override def initiate(intermediate: Row): Unit = { - intermediate.setField(minIndex, Short.MaxValue) - } } class IntMinAggregate extends MinAggregate[Int] { override def intermediateDataType = Array(BasicTypeInfo.INT_TYPE_INFO) - override def initiate(intermediate: Row): Unit = { - intermediate.setField(minIndex, Int.MaxValue) - } } class LongMinAggregate extends MinAggregate[Long] { override def intermediateDataType = Array(BasicTypeInfo.LONG_TYPE_INFO) - override def initiate(intermediate: Row): Unit = { - intermediate.setField(minIndex, Long.MaxValue) - } } class FloatMinAggregate extends MinAggregate[Float] { override def intermediateDataType = Array(BasicTypeInfo.FLOAT_TYPE_INFO) - override def initiate(intermediate: Row): Unit = { - intermediate.setField(minIndex, Float.MaxValue) - } } class DoubleMinAggregate extends MinAggregate[Double] { override def intermediateDataType = Array(BasicTypeInfo.DOUBLE_TYPE_INFO) - override def initiate(intermediate: Row): Unit = { - intermediate.setField(minIndex, Double.MaxValue) - } } class BooleanMinAggregate extends MinAggregate[Boolean] { override def intermediateDataType = Array(BasicTypeInfo.BOOLEAN_TYPE_INFO) - override def initiate(intermediate: Row): Unit = { - intermediate.setField(minIndex, true) - } } http://git-wip-us.apache.org/repos/asf/flink/blob/fdf43609/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala index b4c56fe..6db6632 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala @@ -27,13 +27,19 @@ abstract class SumAggregate[T: Numeric] protected var sumIndex: Int = _ override def initiate(partial: Row): Unit = { - partial.setField(sumIndex, numeric.zero) + partial.setField(sumIndex, null) } override def merge(partial1: Row, buffer: Row): Unit = { val partialValue = partial1.productElement(sumIndex).asInstanceOf[T] - val bufferValue = buffer.productElement(sumIndex).asInstanceOf[T] - buffer.setField(sumIndex, numeric.plus(partialValue, bufferValue)) + if (partialValue != null) { + val bufferValue = buffer.productElement(sumIndex).asInstanceOf[T] + if (bufferValue != null) { + buffer.setField(sumIndex, numeric.plus(partialValue, bufferValue)) + } else { + buffer.setField(sumIndex, partialValue) + } + } } override def evaluate(buffer: Row): T = { http://git-wip-us.apache.org/repos/asf/flink/blob/fdf43609/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregateTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregateTest.scala index 2575fa2..48dc313 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregateTest.scala @@ -56,13 +56,22 @@ abstract class AvgAggregateTestBase[T: Numeric] extends AggregateTestBase[T] { numeric.negate(maxVal), numeric.negate(minVal), null.asInstanceOf[T] + ), + Seq( + null.asInstanceOf[T], + null.asInstanceOf[T], + null.asInstanceOf[T], + null.asInstanceOf[T], + null.asInstanceOf[T], + null.asInstanceOf[T] ) ) override def expectedResults: Seq[T] = Seq( minVal, maxVal, - numeric.fromInt(0) + numeric.fromInt(0), + null.asInstanceOf[T] ) } http://git-wip-us.apache.org/repos/asf/flink/blob/fdf43609/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregateTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregateTest.scala index ce27d7c..4389a3a 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregateTest.scala @@ -21,10 +21,11 @@ package org.apache.flink.api.table.runtime.aggregate class CountAggregateTest extends AggregateTestBase[Long] { override def inputValueSets: Seq[Seq[_]] = Seq( - Seq("a", "b", null, "c", null, "d", "e", null, "f") + Seq("a", "b", null, "c", null, "d", "e", null, "f"), + Seq(null, null, null, null, null, null) ) - override def expectedResults: Seq[Long] = Seq(6L) + override def expectedResults: Seq[Long] = Seq(6L, 0L) override def aggregator: Aggregate[Long] = new CountAggregate() } http://git-wip-us.apache.org/repos/asf/flink/blob/fdf43609/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregateTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregateTest.scala index e049e49..97385ae 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregateTest.scala @@ -38,10 +38,21 @@ abstract class MaxAggregateTestBase[T: Numeric] extends AggregateTestBase[T] { numeric.fromInt(-20), numeric.fromInt(17), null.asInstanceOf[T] + ), + Seq( + null.asInstanceOf[T], + null.asInstanceOf[T], + null.asInstanceOf[T], + null.asInstanceOf[T], + null.asInstanceOf[T], + null.asInstanceOf[T] ) ) - override def expectedResults: Seq[T] = Seq(maxVal) + override def expectedResults: Seq[T] = Seq( + maxVal, + null.asInstanceOf[T] + ) } class ByteMaxAggregateTest extends MaxAggregateTestBase[Byte] { @@ -113,10 +124,20 @@ class BooleanMaxAggregateTest extends AggregateTestBase[Boolean] { false, true, null.asInstanceOf[Boolean] + ), + Seq( + null.asInstanceOf[Boolean], + null.asInstanceOf[Boolean], + null.asInstanceOf[Boolean] ) ) - override def expectedResults: Seq[Boolean] = Seq(false, true, true) + override def expectedResults: Seq[Boolean] = Seq( + false, + true, + true, + null.asInstanceOf[Boolean] + ) override def aggregator: Aggregate[Boolean] = new BooleanMaxAggregate() } http://git-wip-us.apache.org/repos/asf/flink/blob/fdf43609/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregateTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregateTest.scala index 7cf7bb1..cd77c10 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregateTest.scala @@ -38,10 +38,21 @@ abstract class MinAggregateTestBase[T: Numeric] extends AggregateTestBase[T] { numeric.fromInt(-20), numeric.fromInt(17), null.asInstanceOf[T] + ), + Seq( + null.asInstanceOf[T], + null.asInstanceOf[T], + null.asInstanceOf[T], + null.asInstanceOf[T], + null.asInstanceOf[T], + null.asInstanceOf[T] ) ) - override def expectedResults: Seq[T] = Seq(minVal) + override def expectedResults: Seq[T] = Seq( + minVal, + null.asInstanceOf[T] + ) } class ByteMinAggregateTest extends MinAggregateTestBase[Byte] { @@ -113,10 +124,20 @@ class BooleanMinAggregateTest extends AggregateTestBase[Boolean] { false, true, null.asInstanceOf[Boolean] + ), + Seq( + null.asInstanceOf[Boolean], + null.asInstanceOf[Boolean], + null.asInstanceOf[Boolean] ) ) - override def expectedResults: Seq[Boolean] = Seq(false, true, false) + override def expectedResults: Seq[Boolean] = Seq( + false, + true, + false, + null.asInstanceOf[Boolean] + ) override def aggregator: Aggregate[Boolean] = new BooleanMinAggregate() } http://git-wip-us.apache.org/repos/asf/flink/blob/fdf43609/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregateTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregateTest.scala index f5de3fc..fb6fc39 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregateTest.scala @@ -39,11 +39,21 @@ abstract class SumAggregateTestBase[T: Numeric] extends AggregateTestBase[T] { numeric.fromInt(17), null.asInstanceOf[T], maxVal + ), + Seq( + null.asInstanceOf[T], + null.asInstanceOf[T], + null.asInstanceOf[T], + null.asInstanceOf[T], + null.asInstanceOf[T], + null.asInstanceOf[T] ) ) - override def expectedResults: Seq[T] = Seq(numeric.fromInt(2)) - + override def expectedResults: Seq[T] = Seq( + numeric.fromInt(2), + null.asInstanceOf[T] + ) } class ByteSumAggregateTest extends SumAggregateTestBase[Byte] {
