[
https://issues.apache.org/jira/browse/FLINK-34039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
xuyang updated FLINK-34039:
---------------------------
Description:
Add the test in GroupWindowITCase to reproduce this bug.
{code:java}
@TestTemplate
def test(): Unit = {
env.setParallelism(1)
val upsertSourceDataId = registerData(
List(
changelogRow("+I", "a", "no1", localDateTime(1L)),
changelogRow("+I", "a", "no1", localDateTime(2L)),
changelogRow("+I", "a", "no1", localDateTime(6L)),
changelogRow("+I", "a", "no1", localDateTime(9L)),
changelogRow("-D", "a", "no1", localDateTime(6L))
))
tEnv.executeSql(s"""
|CREATE TABLE upsert_currency (
| pk STRING,
| str STRING,
| currency_time TIMESTAMP(3),
| WATERMARK FOR currency_time AS currency_time - interval
'5' SECOND
|) WITH (
| 'connector' = 'values',
| 'changelog-mode' = 'I,UB,UA,D',
| 'data-id' = '$upsertSourceDataId'
|)
|""".stripMargin)
val sql =
"""
|SELECT
|pk,
|COUNT(*) AS cnt,
|SESSION_START(currency_time, INTERVAL '5' SECOND) as w_start,
|SESSION_END(currency_time, INTERVAL '5' SECOND) as w_end
|FROM upsert_currency
|GROUP BY pk, SESSION(currency_time, INTERVAL '5' SECOND)
|""".stripMargin
val sink = new TestingAppendSink
tEnv.sqlQuery(sql).toDataStream.addSink(sink)
env.execute()
println(sink.getAppendResults.sorted)
}{code}
The result is:
{code:java}
a,3,1970-01-01T00:00:01,1970-01-01T00:00:14{code}
But if we change the source data as below:
{code:java}
val upsertSourceDataId = registerData(
List(
changelogRow("+I", "a", "no1", localDateTime(1L)),
changelogRow("+I", "a", "no1", localDateTime(2L)),
// changelogRow("+I", "a", "no1", localDateTime(6L)),
changelogRow("+I", "a", "no1", localDateTime(9L))
// changelogRow("-D", "a", "no1", localDateTime(6L))
)) {code}
The result will be:
{code:java}
a,1,1970-01-01T00:00:09,1970-01-01T00:00:14
a,2,1970-01-01T00:00:01,1970-01-01T00:00:07{code}
When there is a minibatch operator upstream and CDC messages may be folded, the
results of the session group window agg node may be different.
was:
Add the test in GroupWindowITCase to reproduce this bug.
{code:java}
@TestTemplate
def test(): Unit = {
env.setParallelism(1)
val upsertSourceDataId = registerData(
List(
changelogRow("+I", "a", "no1", localDateTime(1L)),
changelogRow("+I", "a", "no1", localDateTime(2L)),
changelogRow("+I", "a", "no1", localDateTime(6L)),
changelogRow("+I", "a", "no1", localDateTime(9L)),
changelogRow("-D", "a", "no1", localDateTime(6L))
))
tEnv.executeSql(s"""
|CREATE TABLE upsert_currency (
| pk STRING,
| str STRING,
| currency_time TIMESTAMP(3),
| WATERMARK FOR currency_time AS currency_time - interval
'5' SECOND
|) WITH (
| 'connector' = 'values',
| 'changelog-mode' = 'I,UB,UA,D',
| 'data-id' = '$upsertSourceDataId'
|)
|""".stripMargin)
val sql =
"""
|SELECT
|pk,
|COUNT(*) AS cnt,
|SESSION_START(currency_time, INTERVAL '5' SECOND) as w_start,
|SESSION_END(currency_time, INTERVAL '5' SECOND) as w_end
|FROM upsert_currency
|GROUP BY pk, SESSION(currency_time, INTERVAL '5' SECOND)
|""".stripMargin
val sink = new TestingAppendSink
tEnv.sqlQuery(sql).toDataStream.addSink(sink)
env.execute()
println(sink.getAppendResults.sorted)
}{code}
The result is:
{code:java}
a,3,1970-01-01T00:00:01,1970-01-01T00:00:14{code}
But if we change the source data as below:
{code:java}
val upsertSourceDataId = registerData(
List(
changelogRow("+I", "a", "no1", localDateTime(1L)),
changelogRow("+I", "a", "no1", localDateTime(2L)),
// changelogRow("+I", "a", "no1", localDateTime(6L)),
changelogRow("+I", "a", "no1", localDateTime(9L))
// changelogRow("-D", "a", "no1", localDateTime(6L))
)) {code}
The result will be:
{code:java}
a,1,1970-01-01T00:00:09,1970-01-01T00:00:14
a,2,1970-01-01T00:00:01,1970-01-01T00:00:07{code}
When there is a minibatch operator upstream and CDC messages may be folded, the
results of the session group window agg node may be different.
> The session group window agg operator does not split the session window when
> processing retrace records.
> --------------------------------------------------------------------------------------------------------
>
> Key: FLINK-34039
> URL: https://issues.apache.org/jira/browse/FLINK-34039
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.18.0
> Reporter: xuyang
> Priority: Major
>
> Add the test in GroupWindowITCase to reproduce this bug.
> {code:java}
> @TestTemplate
> def test(): Unit = {
> env.setParallelism(1)
> val upsertSourceDataId = registerData(
> List(
> changelogRow("+I", "a", "no1", localDateTime(1L)),
> changelogRow("+I", "a", "no1", localDateTime(2L)),
> changelogRow("+I", "a", "no1", localDateTime(6L)),
> changelogRow("+I", "a", "no1", localDateTime(9L)),
> changelogRow("-D", "a", "no1", localDateTime(6L))
> ))
> tEnv.executeSql(s"""
> |CREATE TABLE upsert_currency (
> | pk STRING,
> | str STRING,
> | currency_time TIMESTAMP(3),
> | WATERMARK FOR currency_time AS currency_time -
> interval '5' SECOND
> |) WITH (
> | 'connector' = 'values',
> | 'changelog-mode' = 'I,UB,UA,D',
> | 'data-id' = '$upsertSourceDataId'
> |)
> |""".stripMargin)
> val sql =
> """
> |SELECT
> |pk,
> |COUNT(*) AS cnt,
> |SESSION_START(currency_time, INTERVAL '5' SECOND) as w_start,
> |SESSION_END(currency_time, INTERVAL '5' SECOND) as w_end
> |FROM upsert_currency
> |GROUP BY pk, SESSION(currency_time, INTERVAL '5' SECOND)
> |""".stripMargin
> val sink = new TestingAppendSink
> tEnv.sqlQuery(sql).toDataStream.addSink(sink)
> env.execute()
> println(sink.getAppendResults.sorted)
> }{code}
> The result is:
> {code:java}
> a,3,1970-01-01T00:00:01,1970-01-01T00:00:14{code}
> But if we change the source data as below:
> {code:java}
> val upsertSourceDataId = registerData(
> List(
> changelogRow("+I", "a", "no1", localDateTime(1L)),
> changelogRow("+I", "a", "no1", localDateTime(2L)),
> // changelogRow("+I", "a", "no1", localDateTime(6L)),
> changelogRow("+I", "a", "no1", localDateTime(9L))
> // changelogRow("-D", "a", "no1", localDateTime(6L))
> )) {code}
> The result will be:
> {code:java}
> a,1,1970-01-01T00:00:09,1970-01-01T00:00:14
> a,2,1970-01-01T00:00:01,1970-01-01T00:00:07{code}
> When there is a minibatch operator upstream and CDC messages may be folded,
> the results of the session group window agg node may be different.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)