This is an automated email from the ASF dual-hosted git repository.
ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 085859ec19f [FLINK-25476][table-planner] Support CHAR type in built-in
function MAX and MIN
085859ec19f is described below
commit 085859ec19f59d2f8e73632e96fae59f7821c7d0
Author: xuyang <[email protected]>
AuthorDate: Mon Jan 17 12:27:02 2022 +0800
[FLINK-25476][table-planner] Support CHAR type in built-in function MAX and
MIN
This closes #18375
---
.../planner/plan/utils/AggFunctionFactory.scala | 12 ++--
.../runtime/stream/sql/AggregateITCase.scala | 77 ++++++++++++++++++++++
2 files changed, 83 insertions(+), 6 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
index cbabddae03a..861f537f6c2 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
@@ -25,7 +25,7 @@ import
org.apache.flink.table.planner.functions.aggfunctions.SumWithRetractAggFu
import org.apache.flink.table.planner.functions.bridging.BridgingSqlAggFunction
import
org.apache.flink.table.planner.functions.sql.{SqlFirstLastValueAggFunction,
SqlListAggFunction}
import org.apache.flink.table.planner.functions.utils.AggSqlFunction
-import
org.apache.flink.table.runtime.functions.aggregate.{BuiltInAggregateFunction,
CollectAggFunction, FirstValueAggFunction, FirstValueWithRetractAggFunction,
JsonArrayAggFunction, JsonObjectAggFunction, LagAggFunction,
LastValueAggFunction, LastValueWithRetractAggFunction,
ListAggWithRetractAggFunction, ListAggWsWithRetractAggFunction,
MaxWithRetractAggFunction, MinWithRetractAggFunction}
+import org.apache.flink.table.runtime.functions.aggregate._
import
org.apache.flink.table.runtime.functions.aggregate.BatchApproxCountDistinctAggFunctions._
import org.apache.flink.table.types.logical._
import org.apache.flink.table.types.logical.LogicalTypeRoot._
@@ -273,8 +273,8 @@ class AggFunctionFactory(
val valueType = argTypes(0)
if (aggCallNeedRetractions(index)) {
valueType.getTypeRoot match {
- case TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE | BOOLEAN
| VARCHAR | DECIMAL |
- TIME_WITHOUT_TIME_ZONE | DATE | TIMESTAMP_WITHOUT_TIME_ZONE |
+ case TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE | BOOLEAN
| VARCHAR | CHAR |
+ DECIMAL | TIME_WITHOUT_TIME_ZONE | DATE |
TIMESTAMP_WITHOUT_TIME_ZONE |
TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
new MinWithRetractAggFunction(argTypes(0))
case t =>
@@ -382,8 +382,8 @@ class AggFunctionFactory(
val valueType = argTypes(0)
if (aggCallNeedRetractions(index)) {
valueType.getTypeRoot match {
- case TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE | BOOLEAN
| VARCHAR | DECIMAL |
- TIME_WITHOUT_TIME_ZONE | DATE | TIMESTAMP_WITHOUT_TIME_ZONE |
+ case TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE | BOOLEAN
| VARCHAR | CHAR |
+ DECIMAL | TIME_WITHOUT_TIME_ZONE | DATE |
TIMESTAMP_WITHOUT_TIME_ZONE |
TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
new MaxWithRetractAggFunction(argTypes(0))
case t =>
@@ -407,7 +407,7 @@ class AggFunctionFactory(
new MaxAggFunction.DoubleMaxAggFunction
case BOOLEAN =>
new MaxAggFunction.BooleanMaxAggFunction
- case VARCHAR =>
+ case VARCHAR | CHAR =>
new MaxAggFunction.StringMaxAggFunction
case DATE =>
new MaxAggFunction.DateMaxAggFunction
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index 99d5e2a5f26..0d11b69909a 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -1332,6 +1332,83 @@ class AggregateITCase(aggMode: AggMode, miniBatch:
MiniBatchMode, backend: State
assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
}
+ @TestTemplate
+ def testMinMaxWithChar(): Unit = {
+ val data =
+ List(
+ rowOf(1, "a", "gg"),
+ rowOf(1, "b", "hh"),
+ rowOf(2, "d", "j"),
+ rowOf(2, "c", "i")
+ )
+ val dataId = TestValuesTableFactory.registerData(data)
+ tEnv.executeSql(s"""
+ |CREATE TABLE src(
+ | `id` INT,
+ | `char1` CHAR(1),
+ | `char2` CHAR(2)
+ |) WITH (
+ | 'connector' = 'values',
+ | 'data-id' = '$dataId'
+ |)
+ |""".stripMargin)
+
+ val sql =
+ """
+ |select `id`, count(*), min(`char1`), max(`char1`), min(`char2`),
max(`char2`) from src group by `id`
+ """.stripMargin
+
+ val sink = new TestingRetractSink()
+ tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink)
+ env.execute()
+
+ val expected = List("1,2,a,b,gg,hh", "2,2,c,d,i,j")
+ assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
+ }
+
+ @TestTemplate
+ def testRetractMinMaxWithChar(): Unit = {
+ val data =
+ List(
+ changelogRow("+I", Int.box(1), "a", "ee"),
+ changelogRow("+I", Int.box(1), "b", "ff"),
+ changelogRow("+I", Int.box(1), "c", "gg"),
+ changelogRow("-D", Int.box(1), "c", "gg"),
+ changelogRow("-D", Int.box(1), "a", "ee"),
+ changelogRow("+I", Int.box(2), "a", "e"),
+ changelogRow("+I", Int.box(2), "b", "f"),
+ changelogRow("+I", Int.box(2), "c", "g"),
+ changelogRow("-U", Int.box(2), "b", "f"),
+ changelogRow("+U", Int.box(2), "d", "h"),
+ changelogRow("-U", Int.box(2), "a", "e"),
+ changelogRow("+U", Int.box(2), "b", "f")
+ )
+ val dataId = TestValuesTableFactory.registerData(data)
+ tEnv.executeSql(s"""
+ |CREATE TABLE src(
+ | `id` INT,
+ | `char1` CHAR(1),
+ | `char2` CHAR(2)
+ |) WITH (
+ | 'connector' = 'values',
+ | 'data-id' = '$dataId',
+ | 'changelog-mode' = 'I,UA,UB,D'
+ |)
+ |""".stripMargin)
+
+ val sql =
+ """
+ |select `id`, count(*), min(`char1`), max(`char1`), min(`char2`),
max(`char2`) from src group by `id`
+ """.stripMargin
+
+ val sink = new TestingRetractSink()
+ tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink)
+ env.execute()
+
+ val expected = List("1,1,b,b,ff,ff", "2,3,b,d,f,h")
+ assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
+ }
+
@TestTemplate
def testCollectOnClusteredFields(): Unit = {
val data = List(