This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e30e5e9fcd [FLINK-33489][table-planner] forbid generating 
partial-final agg with LISTAGG to avoid wrong result
7e30e5e9fcd is described below

commit 7e30e5e9fcd51382f48d48c9848bb8df14293e22
Author: xuyang <[email protected]>
AuthorDate: Thu Nov 9 12:28:14 2023 +0800

    [FLINK-33489][table-planner] forbid generating partial-final agg with 
LISTAGG to avoid wrong result
---
 .../table/planner/plan/utils/AggregateUtil.scala   |  5 +-
 .../plan/stream/sql/agg/DistinctAggregateTest.xml  | 86 ++++++++++++++++++++++
 .../stream/sql/agg/IncrementalAggregateTest.xml    | 22 ++++++
 .../stream/sql/agg/DistinctAggregateTest.scala     |  5 ++
 .../runtime/stream/sql/SplitAggregateITCase.scala  | 31 ++++++++
 5 files changed, 147 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
index ef25844477f..73c5f2a09da 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
@@ -953,9 +953,10 @@ object AggregateUtil extends Enumeration {
         aggCall.getAggregation match {
           case _: SqlCountAggFunction | _: SqlAvgAggFunction | _: 
SqlMinMaxAggFunction |
               _: SqlSumAggFunction | _: SqlSumEmptyIsZeroAggFunction |
-              _: SqlSingleValueAggFunction | _: SqlListAggFunction =>
+              _: SqlSingleValueAggFunction =>
             true
-          case _: SqlFirstLastValueAggFunction => aggCall.getArgList.size() == 
1
+          case _: SqlFirstLastValueAggFunction | _: SqlListAggFunction =>
+            aggCall.getArgList.size() == 1
           case _ => false
         }
     }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
index cf1f3660adb..6ea406b774f 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
@@ -287,6 +287,92 @@ GroupAggregate(groupBy=[b], partialFinalType=[FINAL], 
select=[b, FIRST_VALUE_RET
                               +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS 
$f2], changelogMode=[I])
                                  +- MiniBatchAssigner(interval=[1000ms], 
mode=[ProcTime], changelogMode=[I])
                                     +- 
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c], changelogMode=[I])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testListAggWithDistinctMultiArgs[splitDistinctAggEnabled=false, 
aggPhaseEnforcer=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[SELECT a, LISTAGG(DISTINCT c, '#') FROM MyTable GROUP BY a]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG(DISTINCT $1, $2)])
++- LogicalProject(a=[$0], c=[$2], $f2=[_UTF-16LE'#'])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+GroupAggregate(groupBy=[a], select=[a, LISTAGG(DISTINCT c, $f2) AS EXPR$1])
++- Exchange(distribution=[hash[a]])
+   +- Calc(select=[a, c, '#' AS $f2])
+      +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+         +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testListAggWithDistinctMultiArgs[splitDistinctAggEnabled=false, 
aggPhaseEnforcer=TWO_PHASE]">
+    <Resource name="sql">
+      <![CDATA[SELECT a, LISTAGG(DISTINCT c, '#') FROM MyTable GROUP BY a]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG(DISTINCT $1, $2)])
++- LogicalProject(a=[$0], c=[$2], $f2=[_UTF-16LE'#'])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+GlobalGroupAggregate(groupBy=[a], select=[a, LISTAGG(distinct$0 
(accDelimiter$0, concatAcc$1)) AS EXPR$1])
++- Exchange(distribution=[hash[a]])
+   +- LocalGroupAggregate(groupBy=[a], select=[a, LISTAGG(distinct$0 c, $f2) 
AS (accDelimiter$0, concatAcc$1), DISTINCT(c, $f2) AS distinct$0])
+      +- Calc(select=[a, c, '#' AS $f2])
+         +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+            +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testListAggWithDistinctMultiArgs[splitDistinctAggEnabled=true, 
aggPhaseEnforcer=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[SELECT a, LISTAGG(DISTINCT c, '#') FROM MyTable GROUP BY a]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG(DISTINCT $1, $2)])
++- LogicalProject(a=[$0], c=[$2], $f2=[_UTF-16LE'#'])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+GroupAggregate(groupBy=[a], select=[a, LISTAGG(DISTINCT c, $f2) AS EXPR$1])
++- Exchange(distribution=[hash[a]])
+   +- Calc(select=[a, c, '#' AS $f2])
+      +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+         +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testListAggWithDistinctMultiArgs[splitDistinctAggEnabled=true, 
aggPhaseEnforcer=TWO_PHASE]">
+    <Resource name="sql">
+      <![CDATA[SELECT a, LISTAGG(DISTINCT c, '#') FROM MyTable GROUP BY a]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG(DISTINCT $1, $2)])
++- LogicalProject(a=[$0], c=[$2], $f2=[_UTF-16LE'#'])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+GlobalGroupAggregate(groupBy=[a], select=[a, LISTAGG(distinct$0 
(accDelimiter$0, concatAcc$1)) AS EXPR$1])
++- Exchange(distribution=[hash[a]])
+   +- LocalGroupAggregate(groupBy=[a], select=[a, LISTAGG(distinct$0 c, $f2) 
AS (accDelimiter$0, concatAcc$1), DISTINCT(c, $f2) AS distinct$0])
+      +- Calc(select=[a, c, '#' AS $f2])
+         +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+            +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
index 3962d19ab57..8585541ae0c 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
@@ -85,6 +85,28 @@ GroupAggregate(groupBy=[b], partialFinalType=[FINAL], 
select=[b, FIRST_VALUE_RET
                            +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS 
$f2], changelogMode=[I])
                               +- MiniBatchAssigner(interval=[1000ms], 
mode=[ProcTime], changelogMode=[I])
                                  +- 
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c], changelogMode=[I])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testListAggWithDistinctMultiArgs[splitDistinctAggEnabled=true, 
aggPhaseEnforcer=TWO_PHASE]">
+    <Resource name="sql">
+      <![CDATA[SELECT a, LISTAGG(DISTINCT c, '#') FROM MyTable GROUP BY a]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG(DISTINCT $1, $2)])
++- LogicalProject(a=[$0], c=[$2], $f2=[_UTF-16LE'#'])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+GlobalGroupAggregate(groupBy=[a], select=[a, LISTAGG(distinct$0 
(accDelimiter$0, concatAcc$1)) AS EXPR$1])
++- Exchange(distribution=[hash[a]])
+   +- LocalGroupAggregate(groupBy=[a], select=[a, LISTAGG(distinct$0 c, $f2) 
AS (accDelimiter$0, concatAcc$1), DISTINCT(c, $f2) AS distinct$0])
+      +- Calc(select=[a, c, '#' AS $f2])
+         +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+            +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
index e4ae95bbfb4..6feab3b9f6f 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
@@ -216,6 +216,11 @@ class DistinctAggregateTest(
        """.stripMargin
     util.verifyRelPlan(sqlQuery, ExplainDetail.CHANGELOG_MODE)
   }
+
+  @Test
+  def testListAggWithDistinctMultiArgs(): Unit = {
+    util.verifyExecPlan("SELECT a, LISTAGG(DISTINCT c, '#') FROM MyTable GROUP 
BY a")
+  }
 }
 
 object DistinctAggregateTest {
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
index 3e6b11bfd1f..c3be7d12b1c 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
@@ -514,6 +514,37 @@ class SplitAggregateITCase(
     val expected = List("1,1,3,2,3,1", "2,3,24,8,29,3", "3,1,null,2,10,5", 
"4,2,6,4,21,5")
     assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
+
+  @Test
+  def testListAggWithDistinctMultiArgs(): Unit = {
+    val t1 = tEnv.sqlQuery(s"""
+                              |SELECT
+                              |  a,
+                              |  LISTAGG(DISTINCT c, '#')
+                              |FROM T
+                              |GROUP BY a
+       """.stripMargin)
+
+    val sink = new TestingRetractSink
+    t1.toRetractStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = Map[String, List[String]](
+      "1" -> List("Hello 0", "Hello 1"),
+      "2" -> List("Hello 0", "Hello 1", "Hello 2", "Hello 3", "Hello 4"),
+      "3" -> List("Hello 0", "Hello 1"),
+      "4" -> List("Hello 1", "Hello 2", "Hello 3")
+    )
+    val actualData = sink.getRetractResults.sorted
+    val actualMap = actualData.map {
+      str =>
+        // key and value are split by ','
+        val list = str.split(",")
+        val values = list(1).split("#").toList.sorted
+        (list(0), values)
+    }.toMap
+    assertMapStrEquals(expected.toString, actualMap.toString)
+  }
 }
 
 object SplitAggregateITCase {

Reply via email to