Repository: flink Updated Branches: refs/heads/master 86d32ac84 -> 17dd915e8
[FLINK-6124] [table] support max/min aggregations for string type This closes #3579. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/17dd915e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/17dd915e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/17dd915e Branch: refs/heads/master Commit: 17dd915e8a18e60fa32ada9500d3632ba162720a Parents: 86d32ac Author: Zhenghua Gao <[email protected]> Authored: Mon Mar 20 19:29:31 2017 +0800 Committer: twalthr <[email protected]> Committed: Tue Mar 21 12:18:30 2017 +0100 ---------------------------------------------------------------------- .../functions/aggfunctions/MaxAggFunction.scala | 8 +++++ .../functions/aggfunctions/MinAggFunction.scala | 8 +++++ .../table/runtime/aggregate/AggregateUtil.scala | 4 +++ .../scala/batch/sql/AggregationsITCase.scala | 31 +++--------------- .../aggfunctions/MaxAggFunctionTest.scala | 33 ++++++++++++++++++++ .../aggfunctions/MinAggFunctionTest.scala | 33 ++++++++++++++++++++ 6 files changed, 90 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/17dd915e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala index 55e3e5f..3793434 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala @@ -155,3 +155,11 @@ class DecimalMaxAggFunction extends MaxAggFunction[BigDecimal] { override def getInitValue = BigDecimal.ZERO override def getValueTypeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO } + +/** + * Built-in String Max aggregate function + */ +class StringMaxAggFunction extends MaxAggFunction[String] { + override def getInitValue = "".toString + override def getValueTypeInfo = BasicTypeInfo.STRING_TYPE_INFO +} http://git-wip-us.apache.org/repos/asf/flink/blob/17dd915e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala index 647388a..41361fd 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala @@ -155,3 +155,11 @@ class DecimalMinAggFunction extends MinAggFunction[BigDecimal] { override def getInitValue: BigDecimal = BigDecimal.ZERO override def getValueTypeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO } + +/** + * Built-in String Min aggregate function + */ +class StringMinAggFunction extends MinAggFunction[String] { + override def getInitValue = "".toString + override def getValueTypeInfo = BasicTypeInfo.STRING_TYPE_INFO +} http://git-wip-us.apache.org/repos/asf/flink/blob/17dd915e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala index b6b3445..9feec17 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala @@ -916,6 +916,8 @@ object AggregateUtil { new DecimalMinAggFunction case BOOLEAN => new BooleanMinAggFunction + case VARCHAR | CHAR => + new StringMinAggFunction case sqlType: SqlTypeName => throw new TableException("Min aggregate does no support type:" + sqlType) } @@ -961,6 +963,8 @@ object AggregateUtil { new DecimalMaxAggFunction case BOOLEAN => new BooleanMaxAggFunction + case VARCHAR | CHAR => + new StringMaxAggFunction case sqlType: SqlTypeName => throw new TableException("Max aggregate does no support type:" + sqlType) } http://git-wip-us.apache.org/repos/asf/flink/blob/17dd915e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala index a60cfaa..cceb272 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala @@ -92,36 +92,13 @@ class AggregationsITCase( } @Test - def testWorkingAggregationDataTypes(): Unit = { + def testAggregationDataTypes(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) - val sqlQuery = - "SELECT avg(_1), avg(_2), avg(_3), avg(_4), avg(_5), avg(_6), count(_7), " + - " sum(CAST(_6 AS DECIMAL))" + - "FROM MyTable" - - val ds = env.fromElements( - (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"), - (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")) - tEnv.registerDataSet("MyTable", ds) - - val result = tEnv.sql(sqlQuery) - - val expected = "1,1,1,1,1.5,1.5,2,3.0" - val results = result.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testTableWorkingAggregationDataTypes(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val sqlQuery = "SELECT avg(a), avg(b), avg(c), avg(d), avg(e), avg(f), count(g)" + - "FROM MyTable" + val sqlQuery = "SELECT avg(a), avg(b), avg(c), avg(d), avg(e), avg(f), count(g), " + + "min(g), min('Ciao'), max(g), max('Ciao'), sum(CAST(f AS DECIMAL)) FROM MyTable" val ds = env.fromElements( (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"), @@ -130,7 +107,7 @@ class AggregationsITCase( val result = tEnv.sql(sqlQuery) - val expected = "1,1,1,1,1.5,1.5,2" + val expected = "1,1,1,1,1.5,1.5,2,Ciao,Ciao,Hello,Ciao,3.0" val results = result.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } http://git-wip-us.apache.org/repos/asf/flink/blob/17dd915e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionTest.scala index 396be24..38ea993 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionTest.scala @@ -192,3 +192,36 @@ class DecimalMaxAggFunctionTest extends AggFunctionTestBase[BigDecimal] { override def supportRetraction: Boolean = false } + +class StringMaxAggFunctionTest extends AggFunctionTestBase[String] { + override def inputValueSets: Seq[Seq[_]] = Seq( + Seq( + new String("a"), + new String("b"), + new String("c"), + null.asInstanceOf[String], + new String("d") + ), + Seq( + null.asInstanceOf[String], + null.asInstanceOf[String], + null.asInstanceOf[String] + ), + Seq( + new String("1House"), + new String("Household"), + new String("house"), + new String("household") + ) + ) + + override def expectedResults: Seq[String] = Seq( + new String("d"), + null.asInstanceOf[String], + new String("household") + ) + + override def aggregator: AggregateFunction[String] = new StringMaxAggFunction() + + override def supportRetraction: Boolean = false +} http://git-wip-us.apache.org/repos/asf/flink/blob/17dd915e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionTest.scala index 7d9e52b..84e541a 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionTest.scala @@ -192,3 +192,36 @@ class DecimalMinAggFunctionTest extends AggFunctionTestBase[BigDecimal] { override def supportRetraction: Boolean = false } + +class StringMinAggFunctionTest extends AggFunctionTestBase[String] { + override def inputValueSets: Seq[Seq[_]] = Seq( + Seq( + new String("a"), + new String("b"), + new String("c"), + null.asInstanceOf[String], + new String("d") + ), + Seq( + null.asInstanceOf[String], + null.asInstanceOf[String], + null.asInstanceOf[String] + ), + Seq( + new String("1House"), + new String("Household"), + new String("house"), + new String("household") + ) + ) + + override def expectedResults: Seq[String] = Seq( + new String("a"), + null.asInstanceOf[String], + new String("1House") + ) + + override def aggregator: AggregateFunction[String] = new StringMinAggFunction() + + override def supportRetraction: Boolean = false +}
