xuyang created FLINK-34039:
------------------------------
Summary: The session group window agg operator does not split the
session window when processing retrace records.
Key: FLINK-34039
URL: https://issues.apache.org/jira/browse/FLINK-34039
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 1.18.0
Reporter: xuyang
Add the test in GroupWindowITCase to reproduce this bug.
{code:java}
@TestTemplate
def test(): Unit = {
env.setParallelism(1)
val upsertSourceDataId = registerData(
List(
changelogRow("+I", "a", "no1", localDateTime(1L)),
changelogRow("+I", "a", "no1", localDateTime(2L)),
changelogRow("+I", "a", "no1", localDateTime(6L)),
changelogRow("+I", "a", "no1", localDateTime(9L)),
changelogRow("-D", "a", "no1", localDateTime(6L))
))
tEnv.executeSql(s"""
|CREATE TABLE upsert_currency (
| pk STRING,
| str STRING,
| currency_time TIMESTAMP(3),
| WATERMARK FOR currency_time AS currency_time - interval
'5' SECOND
|) WITH (
| 'connector' = 'values',
| 'changelog-mode' = 'I,UB,UA,D',
| 'data-id' = '$upsertSourceDataId'
|)
|""".stripMargin)
val sql =
"""
|SELECT
|pk,
|COUNT(*) AS cnt,
|SESSION_START(currency_time, INTERVAL '5' SECOND) as w_start,
|SESSION_END(currency_time, INTERVAL '5' SECOND) as w_end
|FROM upsert_currency
|GROUP BY pk, SESSION(currency_time, INTERVAL '5' SECOND)
|""".stripMargin
val sink = new TestingAppendSink
tEnv.sqlQuery(sql).toDataStream.addSink(sink)
env.execute()
println(sink.getAppendResults.sorted)
}{code}
The result is:
{code:java}
a,3,1970-01-01T00:00:01,1970-01-01T00:00:14{code}
But if we change the source data as below:
{code:java}
val upsertSourceDataId = registerData(
List(
changelogRow("+I", "a", "no1", localDateTime(1L)),
changelogRow("+I", "a", "no1", localDateTime(2L)),
// changelogRow("+I", "a", "no1", localDateTime(6L)),
changelogRow("+I", "a", "no1", localDateTime(9L))
// changelogRow("-D", "a", "no1", localDateTime(6L))
)) {code}
The result will be:
{code:java}
a,1,1970-01-01T00:00:09,1970-01-01T00:00:14
a,2,1970-01-01T00:00:01,1970-01-01T00:00:07{code}
When there is a minibatch operator upstream and CDC messages may be folded, the
results of the session group window agg node may be different.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)