Repository: flink
Updated Branches:
  refs/heads/master 9e3439c01 -> 44c603d2b


[FLINK-8097] [table] Add built-in support for min/max aggregation for Date/Time

This closes #5027.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/44c603d2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/44c603d2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/44c603d2

Branch: refs/heads/master
Commit: 44c603d2b62fff20a7213ad55512dc38f43a50bc
Parents: 9e3439c
Author: Dian Fu <[email protected]>
Authored: Fri Nov 17 13:22:47 2017 +0800
Committer: twalthr <[email protected]>
Committed: Tue Nov 21 17:31:27 2017 +0100

----------------------------------------------------------------------
 .../functions/aggfunctions/MaxAggFunction.scala | 20 +++++-
 .../MaxAggFunctionWithRetract.scala             | 20 +++++-
 .../functions/aggfunctions/MinAggFunction.scala | 20 +++++-
 .../MinAggFunctionWithRetract.scala             | 20 +++++-
 .../table/functions/aggfunctions/Ordering.scala | 10 ++-
 .../table/runtime/aggregate/AggregateUtil.scala | 16 +++++
 .../aggfunctions/MaxAggFunctionTest.scala       | 54 ++++++++++++++++-
 .../MaxWithRetractAggFunctionTest.scala         | 60 +++++++++++++++++-
 .../aggfunctions/MinAggFunctionTest.scala       | 54 ++++++++++++++++-
 .../MinWithRetractAggFunctionTest.scala         | 64 +++++++++++++++++++-
 10 files changed, 325 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/44c603d2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
index 9097eba..eeb6190 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
@@ -19,13 +19,13 @@ package org.apache.flink.table.functions.aggfunctions
 
 import java.math.BigDecimal
 import java.lang.{Iterable => JIterable}
-import java.sql.Timestamp
+import java.sql.{Date, Time, Timestamp}
 
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.api.java.typeutils.TupleTypeInfo
 import org.apache.flink.table.api.Types
-import org.apache.flink.table.functions.aggfunctions.Ordering.TimestampOrdering
+import org.apache.flink.table.functions.aggfunctions.Ordering._
 import org.apache.flink.table.functions.AggregateFunction
 
 /** The initial accumulator for Max aggregate function */
@@ -170,3 +170,19 @@ class TimestampMaxAggFunction extends 
MaxAggFunction[Timestamp] {
   override def getInitValue: Timestamp = new Timestamp(0)
   override def getValueTypeInfo = Types.SQL_TIMESTAMP
 }
+
+/**
+  * Built-in Date Max aggregate function
+  */
+class DateMaxAggFunction extends MaxAggFunction[Date] {
+  override def getInitValue: Date = new Date(0)
+  override def getValueTypeInfo = Types.SQL_DATE
+}
+
+/**
+  * Built-in Time Max aggregate function
+  */
+class TimeMaxAggFunction extends MaxAggFunction[Time] {
+  override def getInitValue: Time = new Time(0)
+  override def getValueTypeInfo = Types.SQL_TIME
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44c603d2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala
index fdbfef3..cbb395e 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala
@@ -20,13 +20,13 @@ package org.apache.flink.table.functions.aggfunctions
 import java.math.BigDecimal
 import java.util.{HashMap => JHashMap}
 import java.lang.{Iterable => JIterable}
-import java.sql.Timestamp
+import java.sql.{Date, Time, Timestamp}
 
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.api.java.typeutils.{MapTypeInfo, TupleTypeInfo}
 import org.apache.flink.table.api.Types
-import org.apache.flink.table.functions.aggfunctions.Ordering.TimestampOrdering
+import org.apache.flink.table.functions.aggfunctions.Ordering._
 import org.apache.flink.table.functions.AggregateFunction
 
 /** The initial accumulator for Max with retraction aggregate function */
@@ -227,3 +227,19 @@ class TimestampMaxWithRetractAggFunction extends 
MaxWithRetractAggFunction[Times
   override def getInitValue: Timestamp = new Timestamp(0)
   override def getValueTypeInfo = Types.SQL_TIMESTAMP
 }
+
+/**
+  * Built-in Date Max with retraction aggregate function
+  */
+class DateMaxWithRetractAggFunction extends MaxWithRetractAggFunction[Date] {
+  override def getInitValue: Date = new Date(0)
+  override def getValueTypeInfo = Types.SQL_DATE
+}
+
+/**
+  * Built-in Time Max with retraction aggregate function
+  */
+class TimeMaxWithRetractAggFunction extends MaxWithRetractAggFunction[Time] {
+  override def getInitValue: Time = new Time(0)
+  override def getValueTypeInfo = Types.SQL_TIME
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44c603d2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
index 1cb1ab0..915da7a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
@@ -19,13 +19,13 @@ package org.apache.flink.table.functions.aggfunctions
 
 import java.math.BigDecimal
 import java.lang.{Iterable => JIterable}
-import java.sql.Timestamp
+import java.sql.{Date, Time, Timestamp}
 
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.api.java.typeutils.TupleTypeInfo
 import org.apache.flink.table.api.Types
-import org.apache.flink.table.functions.aggfunctions.Ordering.TimestampOrdering
+import org.apache.flink.table.functions.aggfunctions.Ordering._
 import org.apache.flink.table.functions.AggregateFunction
 
 /** The initial accumulator for Min aggregate function */
@@ -170,3 +170,19 @@ class TimestampMinAggFunction extends 
MinAggFunction[Timestamp] {
   override def getInitValue: Timestamp = new Timestamp(0)
   override def getValueTypeInfo = Types.SQL_TIMESTAMP
 }
+
+/**
+  * Built-in Date Min aggregate function
+  */
+class DateMinAggFunction extends MinAggFunction[Date] {
+  override def getInitValue: Date = new Date(0)
+  override def getValueTypeInfo = Types.SQL_DATE
+}
+
+/**
+  * Built-in Time Min aggregate function
+  */
+class TimeMinAggFunction extends MinAggFunction[Time] {
+  override def getInitValue: Time = new Time(0)
+  override def getValueTypeInfo = Types.SQL_TIME
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44c603d2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala
index 44fa37f..480f836 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala
@@ -20,13 +20,13 @@ package org.apache.flink.table.functions.aggfunctions
 import java.math.BigDecimal
 import java.util.{HashMap => JHashMap}
 import java.lang.{Iterable => JIterable}
-import java.sql.Timestamp
+import java.sql.{Date, Time, Timestamp}
 
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.api.java.typeutils.{MapTypeInfo, TupleTypeInfo}
 import org.apache.flink.table.api.Types
-import org.apache.flink.table.functions.aggfunctions.Ordering.TimestampOrdering
+import org.apache.flink.table.functions.aggfunctions.Ordering._
 import org.apache.flink.table.functions.AggregateFunction
 
 /** The initial accumulator for Min with retraction aggregate function */
@@ -227,3 +227,19 @@ class TimestampMinWithRetractAggFunction extends 
MinWithRetractAggFunction[Times
   override def getInitValue: Timestamp = new Timestamp(0)
   override def getValueTypeInfo = Types.SQL_TIMESTAMP
 }
+
+/**
+  * Built-in Date Min with retraction aggregate function
+  */
+class DateMinWithRetractAggFunction extends MinWithRetractAggFunction[Date] {
+  override def getInitValue: Date = new Date(0)
+  override def getValueTypeInfo = Types.SQL_DATE
+}
+
+/**
+  * Built-in Time Min with retraction aggregate function
+  */
+class TimeMinWithRetractAggFunction extends MinWithRetractAggFunction[Time] {
+  override def getInitValue: Time = new Time(0)
+  override def getValueTypeInfo = Types.SQL_TIME
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44c603d2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/Ordering.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/Ordering.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/Ordering.scala
index 15ea2e3..4ed33b5 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/Ordering.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/Ordering.scala
@@ -18,10 +18,18 @@
 
 package org.apache.flink.table.functions.aggfunctions
 
-import java.sql.Timestamp
+import java.sql.{Date, Time, Timestamp}
 
 object Ordering {
   implicit object TimestampOrdering extends Ordering[Timestamp] {
     override def compare(x: Timestamp, y: Timestamp): Int = x.compareTo(y)
   }
+
+  implicit object DateOrdering extends Ordering[Date] {
+    override def compare(x: Date, y: Date): Int = x.compareTo(y)
+  }
+
+  implicit object TimeOrdering extends Ordering[Time] {
+    override def compare(x: Time, y: Time): Int = x.compareTo(y)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44c603d2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 79cc258..532bec6 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -1269,6 +1269,10 @@ object AggregateUtil {
                   new StringMinWithRetractAggFunction
                 case TIMESTAMP =>
                   new TimestampMinWithRetractAggFunction
+                case DATE =>
+                  new DateMinWithRetractAggFunction
+                case TIME =>
+                  new TimeMinWithRetractAggFunction
                 case sqlType: SqlTypeName =>
                   throw new TableException(
                     s"Min with retract aggregate does no support type: 
'$sqlType'")
@@ -1295,6 +1299,10 @@ object AggregateUtil {
                   new StringMinAggFunction
                 case TIMESTAMP =>
                   new TimestampMinAggFunction
+                case DATE =>
+                  new DateMinAggFunction
+                case TIME =>
+                  new TimeMinAggFunction
                 case sqlType: SqlTypeName =>
                   throw new TableException(s"Min aggregate does no support 
type: '$sqlType'")
               }
@@ -1322,6 +1330,10 @@ object AggregateUtil {
                   new StringMaxWithRetractAggFunction
                 case TIMESTAMP =>
                   new TimestampMaxWithRetractAggFunction
+                case DATE =>
+                  new DateMaxWithRetractAggFunction
+                case TIME =>
+                  new TimeMaxWithRetractAggFunction
                 case sqlType: SqlTypeName =>
                   throw new TableException(
                     s"Max with retract aggregate does no support type: 
'$sqlType'")
@@ -1348,6 +1360,10 @@ object AggregateUtil {
                   new StringMaxAggFunction
                 case TIMESTAMP =>
                   new TimestampMaxAggFunction
+                case DATE =>
+                  new DateMaxAggFunction
+                case TIME =>
+                  new TimeMaxAggFunction
                 case sqlType: SqlTypeName =>
                   throw new TableException(s"Max aggregate does no support 
type: '$sqlType'")
               }

http://git-wip-us.apache.org/repos/asf/flink/blob/44c603d2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxAggFunctionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxAggFunctionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxAggFunctionTest.scala
index 03faa80..8412cb3 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxAggFunctionTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxAggFunctionTest.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.runtime.aggfunctions
 
 import java.math.BigDecimal
-import java.sql.Timestamp
+import java.sql.{Date, Time, Timestamp}
 
 import org.apache.flink.table.functions.AggregateFunction
 import org.apache.flink.table.functions.aggfunctions._
@@ -257,3 +257,55 @@ class TimestampMaxAggFunctionTest
   override def aggregator: AggregateFunction[Timestamp, 
MaxAccumulator[Timestamp]] =
     new TimestampMaxAggFunction()
 }
+
+class DateMaxAggFunctionTest
+  extends AggFunctionTestBase[Date, MaxAccumulator[Date]] {
+  override def inputValueSets: Seq[Seq[_]] = Seq(
+    Seq(
+      new Date(0),
+      new Date(1000),
+      new Date(100),
+      null.asInstanceOf[Date],
+      new Date(10)
+    ),
+    Seq(
+      null.asInstanceOf[Date],
+      null.asInstanceOf[Date],
+      null.asInstanceOf[Date]
+    )
+  )
+
+  override def expectedResults: Seq[Date] = Seq(
+    new Date(1000),
+    null.asInstanceOf[Date]
+  )
+
+  override def aggregator: AggregateFunction[Date, MaxAccumulator[Date]] =
+    new DateMaxAggFunction()
+}
+
+class TimeMaxAggFunctionTest
+  extends AggFunctionTestBase[Time, MaxAccumulator[Time]] {
+  override def inputValueSets: Seq[Seq[_]] = Seq(
+    Seq(
+      new Time(0),
+      new Time(1000),
+      new Time(100),
+      null.asInstanceOf[Time],
+      new Time(10)
+    ),
+    Seq(
+      null.asInstanceOf[Time],
+      null.asInstanceOf[Time],
+      null.asInstanceOf[Time]
+    )
+  )
+
+  override def expectedResults: Seq[Time] = Seq(
+    new Time(1000),
+    null.asInstanceOf[Time]
+  )
+
+  override def aggregator: AggregateFunction[Time, MaxAccumulator[Time]] =
+    new TimeMaxAggFunction()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44c603d2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxWithRetractAggFunctionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxWithRetractAggFunctionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxWithRetractAggFunctionTest.scala
index eb620b4..e883e6d 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxWithRetractAggFunctionTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxWithRetractAggFunctionTest.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.runtime.aggfunctions
 
 import java.math.BigDecimal
-import java.sql.Timestamp
+import java.sql.{Date, Time, Timestamp}
 
 import org.apache.flink.table.functions.AggregateFunction
 import org.apache.flink.table.functions.aggfunctions._
@@ -272,3 +272,61 @@ class TimestampMaxWithRetractAggFunctionTest
 
   override def retractFunc = aggregator.getClass.getMethod("retract", accType, 
classOf[Any])
 }
+
+class DateMaxWithRetractAggFunctionTest
+  extends AggFunctionTestBase[Date, MaxWithRetractAccumulator[Date]] {
+
+  override def inputValueSets: Seq[Seq[_]] = Seq(
+    Seq(
+      new Date(0),
+      new Date(1000),
+      new Date(100),
+      null.asInstanceOf[Date],
+      new Date(10)
+    ),
+    Seq(
+      null.asInstanceOf[Date],
+      null.asInstanceOf[Date],
+      null.asInstanceOf[Date]
+    )
+  )
+
+  override def expectedResults: Seq[Date] = Seq(
+    new Date(1000),
+    null.asInstanceOf[Date]
+  )
+
+  override def aggregator: AggregateFunction[Date, 
MaxWithRetractAccumulator[Date]] =
+    new DateMaxWithRetractAggFunction()
+
+  override def retractFunc = aggregator.getClass.getMethod("retract", accType, 
classOf[Any])
+}
+
+class TimeMaxWithRetractAggFunctionTest
+  extends AggFunctionTestBase[Time, MaxWithRetractAccumulator[Time]] {
+
+  override def inputValueSets: Seq[Seq[_]] = Seq(
+    Seq(
+      new Time(0),
+      new Time(1000),
+      new Time(100),
+      null.asInstanceOf[Time],
+      new Time(10)
+    ),
+    Seq(
+      null.asInstanceOf[Time],
+      null.asInstanceOf[Time],
+      null.asInstanceOf[Time]
+    )
+  )
+
+  override def expectedResults: Seq[Time] = Seq(
+    new Time(1000),
+    null.asInstanceOf[Time]
+  )
+
+  override def aggregator: AggregateFunction[Time, 
MaxWithRetractAccumulator[Time]] =
+    new TimeMaxWithRetractAggFunction()
+
+  override def retractFunc = aggregator.getClass.getMethod("retract", accType, 
classOf[Any])
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44c603d2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinAggFunctionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinAggFunctionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinAggFunctionTest.scala
index 992d1fc..caded56 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinAggFunctionTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinAggFunctionTest.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.runtime.aggfunctions
 
 import java.math.BigDecimal
-import java.sql.Timestamp
+import java.sql.{Date, Time, Timestamp}
 
 import org.apache.flink.table.functions.AggregateFunction
 import org.apache.flink.table.functions.aggfunctions._
@@ -258,3 +258,55 @@ class TimestampMinAggFunctionTest
   override def aggregator: AggregateFunction[Timestamp, 
MinAccumulator[Timestamp]] =
     new TimestampMinAggFunction()
 }
+
+class DateMinAggFunctionTest
+  extends AggFunctionTestBase[Date, MinAccumulator[Date]] {
+  override def inputValueSets: Seq[Seq[_]] = Seq(
+    Seq(
+      new Date(0),
+      new Date(1000),
+      new Date(100),
+      null.asInstanceOf[Date],
+      new Date(10)
+    ),
+    Seq(
+      null.asInstanceOf[Date],
+      null.asInstanceOf[Date],
+      null.asInstanceOf[Date]
+    )
+  )
+
+  override def expectedResults: Seq[Date] = Seq(
+    new Date(0),
+    null.asInstanceOf[Date]
+  )
+
+  override def aggregator: AggregateFunction[Date, MinAccumulator[Date]] =
+    new DateMinAggFunction()
+}
+
+class TimeMinAggFunctionTest
+  extends AggFunctionTestBase[Time, MinAccumulator[Time]] {
+  override def inputValueSets: Seq[Seq[_]] = Seq(
+    Seq(
+      new Time(0),
+      new Time(1000),
+      new Time(100),
+      null.asInstanceOf[Time],
+      new Time(10)
+    ),
+    Seq(
+      null.asInstanceOf[Time],
+      null.asInstanceOf[Time],
+      null.asInstanceOf[Time]
+    )
+  )
+
+  override def expectedResults: Seq[Time] = Seq(
+    new Time(0),
+    null.asInstanceOf[Time]
+  )
+
+  override def aggregator: AggregateFunction[Time, MinAccumulator[Time]] =
+    new TimeMinAggFunction()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44c603d2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinWithRetractAggFunctionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinWithRetractAggFunctionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinWithRetractAggFunctionTest.scala
index 323a651..2d6ff53 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinWithRetractAggFunctionTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinWithRetractAggFunctionTest.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.runtime.aggfunctions
 
 import java.math.BigDecimal
-import java.sql.Timestamp
+import java.sql.{Date, Time, Timestamp}
 
 import org.apache.flink.table.functions.AggregateFunction
 import org.apache.flink.table.functions.aggfunctions._
@@ -274,3 +274,65 @@ class TimestampMinWithRetractAggFunctionTest
 
   override def retractFunc = aggregator.getClass.getMethod("retract", accType, 
classOf[Any])
 }
+
+class DateMinWithRetractAggFunctionTest
+  extends AggFunctionTestBase[Date, MinWithRetractAccumulator[Date]] {
+
+  override def inputValueSets: Seq[Seq[_]] = Seq(
+    Seq(
+      new Date(0),
+      new Date(1000),
+      new Date(100),
+      null.asInstanceOf[Date],
+      new Date(10)
+    ),
+    Seq(
+      null,
+      null,
+      null,
+      null,
+      null
+    )
+  )
+
+  override def expectedResults: Seq[Date] = Seq(
+    new Date(0),
+    null
+  )
+
+  override def aggregator: AggregateFunction[Date, 
MinWithRetractAccumulator[Date]] =
+    new DateMinWithRetractAggFunction()
+
+  override def retractFunc = aggregator.getClass.getMethod("retract", accType, 
classOf[Any])
+}
+
+class TimeMinWithRetractAggFunctionTest
+  extends AggFunctionTestBase[Time, MinWithRetractAccumulator[Time]] {
+
+  override def inputValueSets: Seq[Seq[_]] = Seq(
+    Seq(
+      new Time(0),
+      new Time(1000),
+      new Time(100),
+      null.asInstanceOf[Time],
+      new Time(10)
+    ),
+    Seq(
+      null,
+      null,
+      null,
+      null,
+      null
+    )
+  )
+
+  override def expectedResults: Seq[Time] = Seq(
+    new Time(0),
+    null
+  )
+
+  override def aggregator: AggregateFunction[Time, 
MinWithRetractAccumulator[Time]] =
+    new TimeMinWithRetractAggFunction()
+
+  override def retractFunc = aggregator.getClass.getMethod("retract", accType, 
classOf[Any])
+}

Reply via email to