Repository: flink Updated Branches: refs/heads/master 9e3439c01 -> 44c603d2b
[FLINK-8097] [table] Add built-in support for min/max aggregation for Date/Time This closes #5027. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/44c603d2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/44c603d2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/44c603d2 Branch: refs/heads/master Commit: 44c603d2b62fff20a7213ad55512dc38f43a50bc Parents: 9e3439c Author: Dian Fu <[email protected]> Authored: Fri Nov 17 13:22:47 2017 +0800 Committer: twalthr <[email protected]> Committed: Tue Nov 21 17:31:27 2017 +0100 ---------------------------------------------------------------------- .../functions/aggfunctions/MaxAggFunction.scala | 20 +++++- .../MaxAggFunctionWithRetract.scala | 20 +++++- .../functions/aggfunctions/MinAggFunction.scala | 20 +++++- .../MinAggFunctionWithRetract.scala | 20 +++++- .../table/functions/aggfunctions/Ordering.scala | 10 ++- .../table/runtime/aggregate/AggregateUtil.scala | 16 +++++ .../aggfunctions/MaxAggFunctionTest.scala | 54 ++++++++++++++++- .../MaxWithRetractAggFunctionTest.scala | 60 +++++++++++++++++- .../aggfunctions/MinAggFunctionTest.scala | 54 ++++++++++++++++- .../MinWithRetractAggFunctionTest.scala | 64 +++++++++++++++++++- 10 files changed, 325 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/44c603d2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala index 9097eba..eeb6190 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala @@ -19,13 +19,13 @@ package org.apache.flink.table.functions.aggfunctions import java.math.BigDecimal import java.lang.{Iterable => JIterable} -import java.sql.Timestamp +import java.sql.{Date, Time, Timestamp} import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.api.java.typeutils.TupleTypeInfo import org.apache.flink.table.api.Types -import org.apache.flink.table.functions.aggfunctions.Ordering.TimestampOrdering +import org.apache.flink.table.functions.aggfunctions.Ordering._ import org.apache.flink.table.functions.AggregateFunction /** The initial accumulator for Max aggregate function */ @@ -170,3 +170,19 @@ class TimestampMaxAggFunction extends MaxAggFunction[Timestamp] { override def getInitValue: Timestamp = new Timestamp(0) override def getValueTypeInfo = Types.SQL_TIMESTAMP } + +/** + * Built-in Date Max aggregate function + */ +class DateMaxAggFunction extends MaxAggFunction[Date] { + override def getInitValue: Date = new Date(0) + override def getValueTypeInfo = Types.SQL_DATE +} + +/** + * Built-in Time Max aggregate function + */ +class TimeMaxAggFunction extends MaxAggFunction[Time] { + override def getInitValue: Time = new Time(0) + override def getValueTypeInfo = Types.SQL_TIME +} http://git-wip-us.apache.org/repos/asf/flink/blob/44c603d2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala index fdbfef3..cbb395e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala @@ -20,13 +20,13 @@ package org.apache.flink.table.functions.aggfunctions import java.math.BigDecimal import java.util.{HashMap => JHashMap} import java.lang.{Iterable => JIterable} -import java.sql.Timestamp +import java.sql.{Date, Time, Timestamp} import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.api.java.typeutils.{MapTypeInfo, TupleTypeInfo} import org.apache.flink.table.api.Types -import org.apache.flink.table.functions.aggfunctions.Ordering.TimestampOrdering +import org.apache.flink.table.functions.aggfunctions.Ordering._ import org.apache.flink.table.functions.AggregateFunction /** The initial accumulator for Max with retraction aggregate function */ @@ -227,3 +227,19 @@ class TimestampMaxWithRetractAggFunction extends MaxWithRetractAggFunction[Times override def getInitValue: Timestamp = new Timestamp(0) override def getValueTypeInfo = Types.SQL_TIMESTAMP } + +/** + * Built-in Date Max with retraction aggregate function + */ +class DateMaxWithRetractAggFunction extends MaxWithRetractAggFunction[Date] { + override def getInitValue: Date = new Date(0) + override def getValueTypeInfo = Types.SQL_DATE +} + +/** + * Built-in Time Max with retraction aggregate function + */ +class TimeMaxWithRetractAggFunction extends MaxWithRetractAggFunction[Time] { + override def getInitValue: Time = new Time(0) + override def getValueTypeInfo = Types.SQL_TIME +} http://git-wip-us.apache.org/repos/asf/flink/blob/44c603d2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala index 1cb1ab0..915da7a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala @@ -19,13 +19,13 @@ package org.apache.flink.table.functions.aggfunctions import java.math.BigDecimal import java.lang.{Iterable => JIterable} -import java.sql.Timestamp +import java.sql.{Date, Time, Timestamp} import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.api.java.typeutils.TupleTypeInfo import org.apache.flink.table.api.Types -import org.apache.flink.table.functions.aggfunctions.Ordering.TimestampOrdering +import org.apache.flink.table.functions.aggfunctions.Ordering._ import org.apache.flink.table.functions.AggregateFunction /** The initial accumulator for Min aggregate function */ @@ -170,3 +170,19 @@ class TimestampMinAggFunction extends MinAggFunction[Timestamp] { override def getInitValue: Timestamp = new Timestamp(0) override def getValueTypeInfo = Types.SQL_TIMESTAMP } + +/** + * Built-in Date Min aggregate function + */ +class DateMinAggFunction extends MinAggFunction[Date] { + override def getInitValue: Date = new Date(0) + override def getValueTypeInfo = Types.SQL_DATE +} + +/** + * Built-in Time Min aggregate function + */ +class TimeMinAggFunction extends MinAggFunction[Time] { + override def getInitValue: Time = new Time(0) + override def getValueTypeInfo = Types.SQL_TIME +} http://git-wip-us.apache.org/repos/asf/flink/blob/44c603d2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala index 44fa37f..480f836 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala @@ -20,13 +20,13 @@ package org.apache.flink.table.functions.aggfunctions import java.math.BigDecimal import java.util.{HashMap => JHashMap} import java.lang.{Iterable => JIterable} -import java.sql.Timestamp +import java.sql.{Date, Time, Timestamp} import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.api.java.typeutils.{MapTypeInfo, TupleTypeInfo} import org.apache.flink.table.api.Types -import org.apache.flink.table.functions.aggfunctions.Ordering.TimestampOrdering +import org.apache.flink.table.functions.aggfunctions.Ordering._ import org.apache.flink.table.functions.AggregateFunction /** The initial accumulator for Min with retraction aggregate function */ @@ -227,3 +227,19 @@ class TimestampMinWithRetractAggFunction extends MinWithRetractAggFunction[Times override def getInitValue: Timestamp = new Timestamp(0) override def getValueTypeInfo = Types.SQL_TIMESTAMP } + +/** + * Built-in Date Min with retraction aggregate function + */ +class DateMinWithRetractAggFunction extends MinWithRetractAggFunction[Date] { + override def getInitValue: Date = new Date(0) + override def getValueTypeInfo = Types.SQL_DATE +} + +/** + * Built-in Time Min with retraction aggregate function + */ +class TimeMinWithRetractAggFunction extends MinWithRetractAggFunction[Time] { + override def getInitValue: Time = new Time(0) + override def getValueTypeInfo = Types.SQL_TIME +} http://git-wip-us.apache.org/repos/asf/flink/blob/44c603d2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/Ordering.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/Ordering.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/Ordering.scala index 15ea2e3..4ed33b5 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/Ordering.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/Ordering.scala @@ -18,10 +18,18 @@ package org.apache.flink.table.functions.aggfunctions -import java.sql.Timestamp +import java.sql.{Date, Time, Timestamp} object Ordering { implicit object TimestampOrdering extends Ordering[Timestamp] { override def compare(x: Timestamp, y: Timestamp): Int = x.compareTo(y) } + + implicit object DateOrdering extends Ordering[Date] { + override def compare(x: Date, y: Date): Int = x.compareTo(y) + } + + implicit object TimeOrdering extends Ordering[Time] { + override def compare(x: Time, y: Time): Int = x.compareTo(y) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/44c603d2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala index 79cc258..532bec6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala @@ -1269,6 +1269,10 @@ object AggregateUtil { new StringMinWithRetractAggFunction case TIMESTAMP => new TimestampMinWithRetractAggFunction + case DATE => + new DateMinWithRetractAggFunction + case TIME => + new TimeMinWithRetractAggFunction case sqlType: SqlTypeName => throw new TableException( s"Min with retract aggregate does no support type: '$sqlType'") @@ -1295,6 +1299,10 @@ object AggregateUtil { new StringMinAggFunction case TIMESTAMP => new TimestampMinAggFunction + case DATE => + new DateMinAggFunction + case TIME => + new TimeMinAggFunction case sqlType: SqlTypeName => throw new TableException(s"Min aggregate does no support type: '$sqlType'") } @@ -1322,6 +1330,10 @@ object AggregateUtil { new StringMaxWithRetractAggFunction case TIMESTAMP => new TimestampMaxWithRetractAggFunction + case DATE => + new DateMaxWithRetractAggFunction + case TIME => + new TimeMaxWithRetractAggFunction case sqlType: SqlTypeName => throw new TableException( s"Max with retract aggregate does no support type: '$sqlType'") @@ -1348,6 +1360,10 @@ object AggregateUtil { new StringMaxAggFunction case TIMESTAMP => new TimestampMaxAggFunction + case DATE => + new DateMaxAggFunction + case TIME => + new TimeMaxAggFunction case sqlType: SqlTypeName => throw new TableException(s"Max aggregate does no support type: '$sqlType'") } http://git-wip-us.apache.org/repos/asf/flink/blob/44c603d2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxAggFunctionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxAggFunctionTest.scala index 03faa80..8412cb3 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxAggFunctionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxAggFunctionTest.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.runtime.aggfunctions import java.math.BigDecimal -import java.sql.Timestamp +import java.sql.{Date, Time, Timestamp} import org.apache.flink.table.functions.AggregateFunction import org.apache.flink.table.functions.aggfunctions._ @@ -257,3 +257,55 @@ class TimestampMaxAggFunctionTest override def aggregator: AggregateFunction[Timestamp, MaxAccumulator[Timestamp]] = new TimestampMaxAggFunction() } + +class DateMaxAggFunctionTest + extends AggFunctionTestBase[Date, MaxAccumulator[Date]] { + override def inputValueSets: Seq[Seq[_]] = Seq( + Seq( + new Date(0), + new Date(1000), + new Date(100), + null.asInstanceOf[Date], + new Date(10) + ), + Seq( + null.asInstanceOf[Date], + null.asInstanceOf[Date], + null.asInstanceOf[Date] + ) + ) + + override def expectedResults: Seq[Date] = Seq( + new Date(1000), + null.asInstanceOf[Date] + ) + + override def aggregator: AggregateFunction[Date, MaxAccumulator[Date]] = + new DateMaxAggFunction() +} + +class TimeMaxAggFunctionTest + extends AggFunctionTestBase[Time, MaxAccumulator[Time]] { + override def inputValueSets: Seq[Seq[_]] = Seq( + Seq( + new Time(0), + new Time(1000), + new Time(100), + null.asInstanceOf[Time], + new Time(10) + ), + Seq( + null.asInstanceOf[Time], + null.asInstanceOf[Time], + null.asInstanceOf[Time] + ) + ) + + override def expectedResults: Seq[Time] = Seq( + new Time(1000), + null.asInstanceOf[Time] + ) + + override def aggregator: AggregateFunction[Time, MaxAccumulator[Time]] = + new TimeMaxAggFunction() +} http://git-wip-us.apache.org/repos/asf/flink/blob/44c603d2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxWithRetractAggFunctionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxWithRetractAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxWithRetractAggFunctionTest.scala index eb620b4..e883e6d 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxWithRetractAggFunctionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxWithRetractAggFunctionTest.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.runtime.aggfunctions import java.math.BigDecimal -import java.sql.Timestamp +import java.sql.{Date, Time, Timestamp} import org.apache.flink.table.functions.AggregateFunction import org.apache.flink.table.functions.aggfunctions._ @@ -272,3 +272,61 @@ class TimestampMaxWithRetractAggFunctionTest override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any]) } + +class DateMaxWithRetractAggFunctionTest + extends AggFunctionTestBase[Date, MaxWithRetractAccumulator[Date]] { + + override def inputValueSets: Seq[Seq[_]] = Seq( + Seq( + new Date(0), + new Date(1000), + new Date(100), + null.asInstanceOf[Date], + new Date(10) + ), + Seq( + null.asInstanceOf[Date], + null.asInstanceOf[Date], + null.asInstanceOf[Date] + ) + ) + + override def expectedResults: Seq[Date] = Seq( + new Date(1000), + null.asInstanceOf[Date] + ) + + override def aggregator: AggregateFunction[Date, MaxWithRetractAccumulator[Date]] = + new DateMaxWithRetractAggFunction() + + override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any]) +} + +class TimeMaxWithRetractAggFunctionTest + extends AggFunctionTestBase[Time, MaxWithRetractAccumulator[Time]] { + + override def inputValueSets: Seq[Seq[_]] = Seq( + Seq( + new Time(0), + new Time(1000), + new Time(100), + null.asInstanceOf[Time], + new Time(10) + ), + Seq( + null.asInstanceOf[Time], + null.asInstanceOf[Time], + null.asInstanceOf[Time] + ) + ) + + override def expectedResults: Seq[Time] = Seq( + new Time(1000), + null.asInstanceOf[Time] + ) + + override def aggregator: AggregateFunction[Time, MaxWithRetractAccumulator[Time]] = + new TimeMaxWithRetractAggFunction() + + override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any]) +} http://git-wip-us.apache.org/repos/asf/flink/blob/44c603d2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinAggFunctionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinAggFunctionTest.scala index 992d1fc..caded56 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinAggFunctionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinAggFunctionTest.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.runtime.aggfunctions import java.math.BigDecimal -import java.sql.Timestamp +import java.sql.{Date, Time, Timestamp} import org.apache.flink.table.functions.AggregateFunction import org.apache.flink.table.functions.aggfunctions._ @@ -258,3 +258,55 @@ class TimestampMinAggFunctionTest override def aggregator: AggregateFunction[Timestamp, MinAccumulator[Timestamp]] = new TimestampMinAggFunction() } + +class DateMinAggFunctionTest + extends AggFunctionTestBase[Date, MinAccumulator[Date]] { + override def inputValueSets: Seq[Seq[_]] = Seq( + Seq( + new Date(0), + new Date(1000), + new Date(100), + null.asInstanceOf[Date], + new Date(10) + ), + Seq( + null.asInstanceOf[Date], + null.asInstanceOf[Date], + null.asInstanceOf[Date] + ) + ) + + override def expectedResults: Seq[Date] = Seq( + new Date(0), + null.asInstanceOf[Date] + ) + + override def aggregator: AggregateFunction[Date, MinAccumulator[Date]] = + new DateMinAggFunction() +} + +class TimeMinAggFunctionTest + extends AggFunctionTestBase[Time, MinAccumulator[Time]] { + override def inputValueSets: Seq[Seq[_]] = Seq( + Seq( + new Time(0), + new Time(1000), + new Time(100), + null.asInstanceOf[Time], + new Time(10) + ), + Seq( + null.asInstanceOf[Time], + null.asInstanceOf[Time], + null.asInstanceOf[Time] + ) + ) + + override def expectedResults: Seq[Time] = Seq( + new Time(0), + null.asInstanceOf[Time] + ) + + override def aggregator: AggregateFunction[Time, MinAccumulator[Time]] = + new TimeMinAggFunction() +} http://git-wip-us.apache.org/repos/asf/flink/blob/44c603d2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinWithRetractAggFunctionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinWithRetractAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinWithRetractAggFunctionTest.scala index 323a651..2d6ff53 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinWithRetractAggFunctionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinWithRetractAggFunctionTest.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.runtime.aggfunctions import java.math.BigDecimal -import java.sql.Timestamp +import java.sql.{Date, Time, Timestamp} import org.apache.flink.table.functions.AggregateFunction import org.apache.flink.table.functions.aggfunctions._ @@ -274,3 +274,65 @@ class TimestampMinWithRetractAggFunctionTest override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any]) } + +class DateMinWithRetractAggFunctionTest + extends AggFunctionTestBase[Date, MinWithRetractAccumulator[Date]] { + + override def inputValueSets: Seq[Seq[_]] = Seq( + Seq( + new Date(0), + new Date(1000), + new Date(100), + null.asInstanceOf[Date], + new Date(10) + ), + Seq( + null, + null, + null, + null, + null + ) + ) + + override def expectedResults: Seq[Date] = Seq( + new Date(0), + null + ) + + override def aggregator: AggregateFunction[Date, MinWithRetractAccumulator[Date]] = + new DateMinWithRetractAggFunction() + + override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any]) +} + +class TimeMinWithRetractAggFunctionTest + extends AggFunctionTestBase[Time, MinWithRetractAccumulator[Time]] { + + override def inputValueSets: Seq[Seq[_]] = Seq( + Seq( + new Time(0), + new Time(1000), + new Time(100), + null.asInstanceOf[Time], + new Time(10) + ), + Seq( + null, + null, + null, + null, + null + ) + ) + + override def expectedResults: Seq[Time] = Seq( + new Time(0), + null + ) + + override def aggregator: AggregateFunction[Time, MinWithRetractAccumulator[Time]] = + new TimeMinWithRetractAggFunction() + + override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any]) +}
