This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7e30e5e9fcd [FLINK-33489][table-planner] forbid generating
partial-final agg with LISTAGG to avoid wrong result
7e30e5e9fcd is described below
commit 7e30e5e9fcd51382f48d48c9848bb8df14293e22
Author: xuyang <[email protected]>
AuthorDate: Thu Nov 9 12:28:14 2023 +0800
[FLINK-33489][table-planner] forbid generating partial-final agg with
LISTAGG to avoid wrong result
---
.../table/planner/plan/utils/AggregateUtil.scala | 5 +-
.../plan/stream/sql/agg/DistinctAggregateTest.xml | 86 ++++++++++++++++++++++
.../stream/sql/agg/IncrementalAggregateTest.xml | 22 ++++++
.../stream/sql/agg/DistinctAggregateTest.scala | 5 ++
.../runtime/stream/sql/SplitAggregateITCase.scala | 31 ++++++++
5 files changed, 147 insertions(+), 2 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
index ef25844477f..73c5f2a09da 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
@@ -953,9 +953,10 @@ object AggregateUtil extends Enumeration {
aggCall.getAggregation match {
case _: SqlCountAggFunction | _: SqlAvgAggFunction | _:
SqlMinMaxAggFunction |
_: SqlSumAggFunction | _: SqlSumEmptyIsZeroAggFunction |
- _: SqlSingleValueAggFunction | _: SqlListAggFunction =>
+ _: SqlSingleValueAggFunction =>
true
- case _: SqlFirstLastValueAggFunction => aggCall.getArgList.size() ==
1
+ case _: SqlFirstLastValueAggFunction | _: SqlListAggFunction =>
+ aggCall.getArgList.size() == 1
case _ => false
}
}
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
index cf1f3660adb..6ea406b774f 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
@@ -287,6 +287,92 @@ GroupAggregate(groupBy=[b], partialFinalType=[FINAL],
select=[b, FIRST_VALUE_RET
+- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS
$f2], changelogMode=[I])
+- MiniBatchAssigner(interval=[1000ms],
mode=[ProcTime], changelogMode=[I])
+-
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]], fields=[a, b, c], changelogMode=[I])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testListAggWithDistinctMultiArgs[splitDistinctAggEnabled=false,
aggPhaseEnforcer=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[SELECT a, LISTAGG(DISTINCT c, '#') FROM MyTable GROUP BY a]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG(DISTINCT $1, $2)])
++- LogicalProject(a=[$0], c=[$2], $f2=[_UTF-16LE'#'])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+GroupAggregate(groupBy=[a], select=[a, LISTAGG(DISTINCT c, $f2) AS EXPR$1])
++- Exchange(distribution=[hash[a]])
+ +- Calc(select=[a, c, '#' AS $f2])
+ +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testListAggWithDistinctMultiArgs[splitDistinctAggEnabled=false,
aggPhaseEnforcer=TWO_PHASE]">
+ <Resource name="sql">
+ <![CDATA[SELECT a, LISTAGG(DISTINCT c, '#') FROM MyTable GROUP BY a]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG(DISTINCT $1, $2)])
++- LogicalProject(a=[$0], c=[$2], $f2=[_UTF-16LE'#'])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+GlobalGroupAggregate(groupBy=[a], select=[a, LISTAGG(distinct$0
(accDelimiter$0, concatAcc$1)) AS EXPR$1])
++- Exchange(distribution=[hash[a]])
+ +- LocalGroupAggregate(groupBy=[a], select=[a, LISTAGG(distinct$0 c, $f2)
AS (accDelimiter$0, concatAcc$1), DISTINCT(c, $f2) AS distinct$0])
+ +- Calc(select=[a, c, '#' AS $f2])
+ +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+ +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b,
c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testListAggWithDistinctMultiArgs[splitDistinctAggEnabled=true,
aggPhaseEnforcer=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[SELECT a, LISTAGG(DISTINCT c, '#') FROM MyTable GROUP BY a]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG(DISTINCT $1, $2)])
++- LogicalProject(a=[$0], c=[$2], $f2=[_UTF-16LE'#'])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+GroupAggregate(groupBy=[a], select=[a, LISTAGG(DISTINCT c, $f2) AS EXPR$1])
++- Exchange(distribution=[hash[a]])
+ +- Calc(select=[a, c, '#' AS $f2])
+ +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testListAggWithDistinctMultiArgs[splitDistinctAggEnabled=true,
aggPhaseEnforcer=TWO_PHASE]">
+ <Resource name="sql">
+ <![CDATA[SELECT a, LISTAGG(DISTINCT c, '#') FROM MyTable GROUP BY a]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG(DISTINCT $1, $2)])
++- LogicalProject(a=[$0], c=[$2], $f2=[_UTF-16LE'#'])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+GlobalGroupAggregate(groupBy=[a], select=[a, LISTAGG(distinct$0
(accDelimiter$0, concatAcc$1)) AS EXPR$1])
++- Exchange(distribution=[hash[a]])
+ +- LocalGroupAggregate(groupBy=[a], select=[a, LISTAGG(distinct$0 c, $f2)
AS (accDelimiter$0, concatAcc$1), DISTINCT(c, $f2) AS distinct$0])
+ +- Calc(select=[a, c, '#' AS $f2])
+ +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+ +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b,
c])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
index 3962d19ab57..8585541ae0c 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
@@ -85,6 +85,28 @@ GroupAggregate(groupBy=[b], partialFinalType=[FINAL],
select=[b, FIRST_VALUE_RET
+- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS
$f2], changelogMode=[I])
+- MiniBatchAssigner(interval=[1000ms],
mode=[ProcTime], changelogMode=[I])
+-
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]], fields=[a, b, c], changelogMode=[I])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testListAggWithDistinctMultiArgs[splitDistinctAggEnabled=true,
aggPhaseEnforcer=TWO_PHASE]">
+ <Resource name="sql">
+ <![CDATA[SELECT a, LISTAGG(DISTINCT c, '#') FROM MyTable GROUP BY a]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG(DISTINCT $1, $2)])
++- LogicalProject(a=[$0], c=[$2], $f2=[_UTF-16LE'#'])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+GlobalGroupAggregate(groupBy=[a], select=[a, LISTAGG(distinct$0
(accDelimiter$0, concatAcc$1)) AS EXPR$1])
++- Exchange(distribution=[hash[a]])
+ +- LocalGroupAggregate(groupBy=[a], select=[a, LISTAGG(distinct$0 c, $f2)
AS (accDelimiter$0, concatAcc$1), DISTINCT(c, $f2) AS distinct$0])
+ +- Calc(select=[a, c, '#' AS $f2])
+ +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+ +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b,
c])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
index e4ae95bbfb4..6feab3b9f6f 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
@@ -216,6 +216,11 @@ class DistinctAggregateTest(
""".stripMargin
util.verifyRelPlan(sqlQuery, ExplainDetail.CHANGELOG_MODE)
}
+
+ @Test
+ def testListAggWithDistinctMultiArgs(): Unit = {
+ util.verifyExecPlan("SELECT a, LISTAGG(DISTINCT c, '#') FROM MyTable GROUP
BY a")
+ }
}
object DistinctAggregateTest {
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
index 3e6b11bfd1f..c3be7d12b1c 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
@@ -514,6 +514,37 @@ class SplitAggregateITCase(
val expected = List("1,1,3,2,3,1", "2,3,24,8,29,3", "3,1,null,2,10,5",
"4,2,6,4,21,5")
assertEquals(expected.sorted, sink.getRetractResults.sorted)
}
+
+ @Test
+ def testListAggWithDistinctMultiArgs(): Unit = {
+ val t1 = tEnv.sqlQuery(s"""
+ |SELECT
+ | a,
+ | LISTAGG(DISTINCT c, '#')
+ |FROM T
+ |GROUP BY a
+ """.stripMargin)
+
+ val sink = new TestingRetractSink
+ t1.toRetractStream[Row].addSink(sink)
+ env.execute()
+
+ val expected = Map[String, List[String]](
+ "1" -> List("Hello 0", "Hello 1"),
+ "2" -> List("Hello 0", "Hello 1", "Hello 2", "Hello 3", "Hello 4"),
+ "3" -> List("Hello 0", "Hello 1"),
+ "4" -> List("Hello 1", "Hello 2", "Hello 3")
+ )
+ val actualData = sink.getRetractResults.sorted
+ val actualMap = actualData.map {
+ str =>
+ // key and value are split by ','
+ val list = str.split(",")
+ val values = list(1).split("#").toList.sorted
+ (list(0), values)
+ }.toMap
+ assertMapStrEquals(expected.toString, actualMap.toString)
+ }
}
object SplitAggregateITCase {