[hotfix] [table] Fix initialization of accumulators for MIN and MAX aggregates.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2d1721bb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2d1721bb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2d1721bb Branch: refs/heads/master Commit: 2d1721bb9b17333c3c06e3675a24d344aed3c87f Parents: 050f9a4 Author: Fabian Hueske <[email protected]> Authored: Thu Mar 2 22:57:47 2017 +0100 Committer: Fabian Hueske <[email protected]> Committed: Fri Mar 3 14:27:08 2017 +0100 ---------------------------------------------------------------------- .../functions/aggfunctions/MaxAggFunction.scala | 21 +++++++++++++++----- .../functions/aggfunctions/MinAggFunction.scala | 21 +++++++++++++++----- 2 files changed, 32 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2d1721bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala index 62ff88c..33cfd65 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala @@ -26,10 +26,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo import org.apache.flink.table.functions.{Accumulator, AggregateFunction} /** The initial accumulator for Max aggregate function */ -class MaxAccumulator[T] extends JTuple2[T, Boolean] with Accumulator { - f0 = 0.asInstanceOf[T] //max - f1 = false -} +class MaxAccumulator[T] extends JTuple2[T, Boolean] with Accumulator /** * Base class for built-in Max aggregate function @@ -39,7 +36,10 @@ class MaxAccumulator[T] extends JTuple2[T, Boolean] with Accumulator { abstract class MaxAggFunction[T](implicit ord: Ordering[T]) extends AggregateFunction[T] { override def createAccumulator(): Accumulator = { - new MaxAccumulator[T] + val acc = new MaxAccumulator[T] + acc.f0 = getInitValue + acc.f1 = false + acc } override def accumulate(accumulator: Accumulator, value: Any): Unit = { @@ -82,6 +82,8 @@ abstract class MaxAggFunction[T](implicit ord: Ordering[T]) extends AggregateFun BasicTypeInfo.BOOLEAN_TYPE_INFO) } + def getInitValue: T + def getValueTypeInfo: TypeInformation[_] } @@ -89,6 +91,7 @@ abstract class MaxAggFunction[T](implicit ord: Ordering[T]) extends AggregateFun * Built-in Byte Max aggregate function */ class ByteMaxAggFunction extends MaxAggFunction[Byte] { + override def getInitValue: Byte = 0.toByte override def getValueTypeInfo = BasicTypeInfo.BYTE_TYPE_INFO } @@ -96,6 +99,7 @@ class ByteMaxAggFunction extends MaxAggFunction[Byte] { * Built-in Short Max aggregate function */ class ShortMaxAggFunction extends MaxAggFunction[Short] { + override def getInitValue: Short = 0.toShort override def getValueTypeInfo = BasicTypeInfo.SHORT_TYPE_INFO } @@ -103,6 +107,7 @@ class ShortMaxAggFunction extends MaxAggFunction[Short] { * Built-in Int Max aggregate function */ class IntMaxAggFunction extends MaxAggFunction[Int] { + override def getInitValue: Int = 0 override def getValueTypeInfo = BasicTypeInfo.INT_TYPE_INFO } @@ -110,6 +115,7 @@ class IntMaxAggFunction extends MaxAggFunction[Int] { * Built-in Long Max aggregate function */ class LongMaxAggFunction extends MaxAggFunction[Long] { + override def getInitValue: Long = 0L override def getValueTypeInfo = BasicTypeInfo.LONG_TYPE_INFO } @@ -117,6 +123,7 @@ class LongMaxAggFunction extends MaxAggFunction[Long] { * Built-in Float Max aggregate function */ class FloatMaxAggFunction extends MaxAggFunction[Float] { + override def getInitValue: Float = 0.0f override def getValueTypeInfo = BasicTypeInfo.FLOAT_TYPE_INFO } @@ -124,6 +131,7 @@ class FloatMaxAggFunction extends MaxAggFunction[Float] { * Built-in Double Max aggregate function */ class DoubleMaxAggFunction extends MaxAggFunction[Double] { + override def getInitValue: Double = 0.0d override def getValueTypeInfo = BasicTypeInfo.DOUBLE_TYPE_INFO } @@ -131,6 +139,7 @@ class DoubleMaxAggFunction extends MaxAggFunction[Double] { * Built-in Boolean Max aggregate function */ class BooleanMaxAggFunction extends MaxAggFunction[Boolean] { + override def getInitValue = false override def getValueTypeInfo = BasicTypeInfo.BOOLEAN_TYPE_INFO } @@ -150,5 +159,7 @@ class DecimalMaxAggFunction extends MaxAggFunction[BigDecimal] { } } + override def getInitValue = BigDecimal.ZERO + override def getValueTypeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO } http://git-wip-us.apache.org/repos/asf/flink/blob/2d1721bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala index cddb873..1b2d6b0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala @@ -26,10 +26,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo import org.apache.flink.table.functions.{Accumulator, AggregateFunction} /** The initial accumulator for Min aggregate function */ -class MinAccumulator[T] extends JTuple2[T, Boolean] with Accumulator { - f0 = 0.asInstanceOf[T] //min - f1 = false -} +class MinAccumulator[T] extends JTuple2[T, Boolean] with Accumulator /** * Base class for built-in Min aggregate function @@ -39,7 +36,10 @@ class MinAccumulator[T] extends JTuple2[T, Boolean] with Accumulator { abstract class MinAggFunction[T](implicit ord: Ordering[T]) extends AggregateFunction[T] { override def createAccumulator(): Accumulator = { - new MinAccumulator[T] + val acc = new MinAccumulator[T] + acc.f0 = getInitValue + acc.f1 = false + acc } override def accumulate(accumulator: Accumulator, value: Any): Unit = { @@ -82,6 +82,8 @@ abstract class MinAggFunction[T](implicit ord: Ordering[T]) extends AggregateFun BasicTypeInfo.BOOLEAN_TYPE_INFO) } + def getInitValue: T + def getValueTypeInfo: TypeInformation[_] } @@ -89,6 +91,7 @@ abstract class MinAggFunction[T](implicit ord: Ordering[T]) extends AggregateFun * Built-in Byte Min aggregate function */ class ByteMinAggFunction extends MinAggFunction[Byte] { + override def getInitValue: Byte = 0.toByte override def getValueTypeInfo = BasicTypeInfo.BYTE_TYPE_INFO } @@ -96,6 +99,7 @@ class ByteMinAggFunction extends MinAggFunction[Byte] { * Built-in Short Min aggregate function */ class ShortMinAggFunction extends MinAggFunction[Short] { + override def getInitValue: Short = 0.toShort override def getValueTypeInfo = BasicTypeInfo.SHORT_TYPE_INFO } @@ -103,6 +107,7 @@ class ShortMinAggFunction extends MinAggFunction[Short] { * Built-in Int Min aggregate function */ class IntMinAggFunction extends MinAggFunction[Int] { + override def getInitValue: Int = 0 override def getValueTypeInfo = BasicTypeInfo.INT_TYPE_INFO } @@ -110,6 +115,7 @@ class IntMinAggFunction extends MinAggFunction[Int] { * Built-in Long Min aggregate function */ class LongMinAggFunction extends MinAggFunction[Long] { + override def getInitValue: Long = 0L override def getValueTypeInfo = BasicTypeInfo.LONG_TYPE_INFO } @@ -117,6 +123,7 @@ class LongMinAggFunction extends MinAggFunction[Long] { * Built-in Float Min aggregate function */ class FloatMinAggFunction extends MinAggFunction[Float] { + override def getInitValue: Float = 0.0f override def getValueTypeInfo = BasicTypeInfo.FLOAT_TYPE_INFO } @@ -124,6 +131,7 @@ class FloatMinAggFunction extends MinAggFunction[Float] { * Built-in Double Min aggregate function */ class DoubleMinAggFunction extends MinAggFunction[Double] { + override def getInitValue: Double = 0.0d override def getValueTypeInfo = BasicTypeInfo.DOUBLE_TYPE_INFO } @@ -131,6 +139,7 @@ class DoubleMinAggFunction extends MinAggFunction[Double] { * Built-in Boolean Min aggregate function */ class BooleanMinAggFunction extends MinAggFunction[Boolean] { + override def getInitValue: Boolean = false override def getValueTypeInfo = BasicTypeInfo.BOOLEAN_TYPE_INFO } @@ -150,5 +159,7 @@ class DecimalMinAggFunction extends MinAggFunction[BigDecimal] { } } + override def getInitValue: BigDecimal = BigDecimal.ZERO + override def getValueTypeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO }
