[
https://issues.apache.org/jira/browse/FLINK-33489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
xuyang resolved FLINK-33489.
----------------------------
Resolution: Resolved
> LISTAGG with generating partial-final agg will cause wrong result
> -----------------------------------------------------------------
>
> Key: FLINK-33489
> URL: https://issues.apache.org/jira/browse/FLINK-33489
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.9.0, 1.10.0, 1.11.0, 1.12.0, 1.13.0, 1.14.0, 1.15.0,
> 1.16.0, 1.17.0, 1.18.0
> Reporter: xuyang
> Assignee: xuyang
> Priority: Major
> Labels: pull-request-available
>
> Adding the following test cases in SplitAggregateITCase will reproduce this
> bug:
>
> {code:java}
> // code placeholder
> @Test
> def testListAggWithDistinctMultiArgs(): Unit = {
> val t1 = tEnv.sqlQuery(s"""
> |SELECT
> | a,
> | LISTAGG(DISTINCT c, '#')
> |FROM T
> |GROUP BY a
> """.stripMargin)
> val sink = new TestingRetractSink
> t1.toRetractStream[Row].addSink(sink)
> env.execute()
> val expected = Map[String, List[String]](
> "1" -> List("Hello 0", "Hello 1"),
> "2" -> List("Hello 0", "Hello 1", "Hello 2", "Hello 3", "Hello 4"),
> "3" -> List("Hello 0", "Hello 1"),
> "4" -> List("Hello 1", "Hello 2", "Hello 3")
> )
> val actualData = sink.getRetractResults.sorted
> println(actualData)
> } {code}
> The `actualData` is `List(1,Hello 0,Hello 1, 2,Hello 2,Hello 4,Hello 3,Hello
> 1,Hello 0, 3,Hello 1,Hello 0, 4,Hello 2,Hello 3,Hello 1)`, and the delimiter
> `#` will be ignored.
> Let's take its plan:
> {code:java}
> // code placeholder
> LegacySink(name=[DataStreamTableSink], fields=[a, EXPR$1])
> +- GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a,
> LISTAGG_RETRACT($f3_0) AS $f1])
> +- Exchange(distribution=[hash[a]])
> +- GroupAggregate(groupBy=[a, $f3, $f4], partialFinalType=[PARTIAL],
> select=[a, $f3, $f4, LISTAGG(DISTINCT c, $f2) AS $f3_0])
> +- Exchange(distribution=[hash[a, $f3, $f4]])
> +- Calc(select=[a, c, _UTF-16LE'#' AS $f2, MOD(HASH_CODE(c),
> 1024) AS $f3, MOD(HASH_CODE(_UTF-16LE'#'), 1024) AS $f4])
> +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
> +- DataStreamScan(table=[[default_catalog,
> default_database, T]], fields=[a, b, c]) {code}
> The final `GroupAggregate` missing the delimiter args, and the default
> delimiter `,` will be used.
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)