[ 
https://issues.apache.org/jira/browse/FLINK-34039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuyang updated FLINK-34039:
---------------------------
    Description: 
Add the test in GroupWindowITCase to reproduce this bug.
{code:java}
@TestTemplate
def test(): Unit = {
  env.setParallelism(1)
  val upsertSourceDataId = registerData(
    List(
      changelogRow("+I", "a", "no1", localDateTime(1L)),
      changelogRow("+I", "a", "no1", localDateTime(2L)),
      changelogRow("+I", "a", "no1", localDateTime(6L)),
      changelogRow("+I", "a", "no1", localDateTime(9L)),
      changelogRow("-D", "a", "no1", localDateTime(6L))
    ))
  tEnv.executeSql(s"""
                     |CREATE TABLE upsert_currency (
                     |  pk STRING,
                     |  str STRING,
                     |  currency_time TIMESTAMP(3),
                     |  WATERMARK FOR currency_time AS currency_time - interval 
'5' SECOND
                     |) WITH (
                     |  'connector' = 'values',
                     |  'changelog-mode' = 'I,UB,UA,D',
                     |  'data-id' = '$upsertSourceDataId'
                     |)
                     |""".stripMargin)
  val sql =
    """
      |SELECT
      |pk,
      |COUNT(*) AS cnt,
      |SESSION_START(currency_time, INTERVAL '5' SECOND) as w_start,
      |SESSION_END(currency_time, INTERVAL '5' SECOND) as w_end
      |FROM upsert_currency
      |GROUP BY pk, SESSION(currency_time, INTERVAL '5' SECOND)
      |""".stripMargin
  val sink = new TestingAppendSink
  tEnv.sqlQuery(sql).toDataStream.addSink(sink)
  env.execute()
  println(sink.getAppendResults.sorted)
}{code}
The result is: 
{code:java}
a,3,1970-01-01T00:00:01,1970-01-01T00:00:14{code}
But if we change the source data as below:
{code:java}
val upsertSourceDataId = registerData(
      List(
        changelogRow("+I", "a", "no1", localDateTime(1L)),
        changelogRow("+I", "a", "no1", localDateTime(2L)),             
        // changelogRow("+I", "a", "no1", localDateTime(6L)),
        changelogRow("+I", "a", "no1", localDateTime(9L))
        // changelogRow("-D", "a", "no1", localDateTime(6L))
      )) {code}
The result will be:
{code:java}
a,1,1970-01-01T00:00:09,1970-01-01T00:00:14
a,2,1970-01-01T00:00:01,1970-01-01T00:00:07{code}
When there is a minibatch operator upstream and CDC messages may be folded, the 
results of the session group window agg node may be different.

  was:
Add the test in GroupWindowITCase to reproduce this bug.

 
{code:java}
@TestTemplate
def test(): Unit = {
  env.setParallelism(1)
  val upsertSourceDataId = registerData(
    List(
      changelogRow("+I", "a", "no1", localDateTime(1L)),
      changelogRow("+I", "a", "no1", localDateTime(2L)),
      changelogRow("+I", "a", "no1", localDateTime(6L)),
      changelogRow("+I", "a", "no1", localDateTime(9L)),
      changelogRow("-D", "a", "no1", localDateTime(6L))
    ))
  tEnv.executeSql(s"""
                     |CREATE TABLE upsert_currency (
                     |  pk STRING,
                     |  str STRING,
                     |  currency_time TIMESTAMP(3),
                     |  WATERMARK FOR currency_time AS currency_time - interval 
'5' SECOND
                     |) WITH (
                     |  'connector' = 'values',
                     |  'changelog-mode' = 'I,UB,UA,D',
                     |  'data-id' = '$upsertSourceDataId'
                     |)
                     |""".stripMargin)
  val sql =
    """
      |SELECT
      |pk,
      |COUNT(*) AS cnt,
      |SESSION_START(currency_time, INTERVAL '5' SECOND) as w_start,
      |SESSION_END(currency_time, INTERVAL '5' SECOND) as w_end
      |FROM upsert_currency
      |GROUP BY pk, SESSION(currency_time, INTERVAL '5' SECOND)
      |""".stripMargin
  val sink = new TestingAppendSink
  tEnv.sqlQuery(sql).toDataStream.addSink(sink)
  env.execute()
  println(sink.getAppendResults.sorted)
}{code}
The result is: 

 

 
{code:java}
a,3,1970-01-01T00:00:01,1970-01-01T00:00:14{code}
But if we change the source data as below:
{code:java}
val upsertSourceDataId = registerData(
      List(
        changelogRow("+I", "a", "no1", localDateTime(1L)),
        changelogRow("+I", "a", "no1", localDateTime(2L)),             
        // changelogRow("+I", "a", "no1", localDateTime(6L)),
        changelogRow("+I", "a", "no1", localDateTime(9L))
        // changelogRow("-D", "a", "no1", localDateTime(6L))
      )) {code}
 

The result will be:
{code:java}
a,1,1970-01-01T00:00:09,1970-01-01T00:00:14
a,2,1970-01-01T00:00:01,1970-01-01T00:00:07{code}
When there is a minibatch operator upstream and CDC messages may be folded, the 
results of the session group window agg node may be different.


> The session group window agg operator does not split the session window when 
> processing retrace records.
> --------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-34039
>                 URL: https://issues.apache.org/jira/browse/FLINK-34039
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.18.0
>            Reporter: xuyang
>            Priority: Major
>
> Add the test in GroupWindowITCase to reproduce this bug.
> {code:java}
> @TestTemplate
> def test(): Unit = {
>   env.setParallelism(1)
>   val upsertSourceDataId = registerData(
>     List(
>       changelogRow("+I", "a", "no1", localDateTime(1L)),
>       changelogRow("+I", "a", "no1", localDateTime(2L)),
>       changelogRow("+I", "a", "no1", localDateTime(6L)),
>       changelogRow("+I", "a", "no1", localDateTime(9L)),
>       changelogRow("-D", "a", "no1", localDateTime(6L))
>     ))
>   tEnv.executeSql(s"""
>                      |CREATE TABLE upsert_currency (
>                      |  pk STRING,
>                      |  str STRING,
>                      |  currency_time TIMESTAMP(3),
>                      |  WATERMARK FOR currency_time AS currency_time - 
> interval '5' SECOND
>                      |) WITH (
>                      |  'connector' = 'values',
>                      |  'changelog-mode' = 'I,UB,UA,D',
>                      |  'data-id' = '$upsertSourceDataId'
>                      |)
>                      |""".stripMargin)
>   val sql =
>     """
>       |SELECT
>       |pk,
>       |COUNT(*) AS cnt,
>       |SESSION_START(currency_time, INTERVAL '5' SECOND) as w_start,
>       |SESSION_END(currency_time, INTERVAL '5' SECOND) as w_end
>       |FROM upsert_currency
>       |GROUP BY pk, SESSION(currency_time, INTERVAL '5' SECOND)
>       |""".stripMargin
>   val sink = new TestingAppendSink
>   tEnv.sqlQuery(sql).toDataStream.addSink(sink)
>   env.execute()
>   println(sink.getAppendResults.sorted)
> }{code}
> The result is: 
> {code:java}
> a,3,1970-01-01T00:00:01,1970-01-01T00:00:14{code}
> But if we change the source data as below:
> {code:java}
> val upsertSourceDataId = registerData(
>       List(
>         changelogRow("+I", "a", "no1", localDateTime(1L)),
>         changelogRow("+I", "a", "no1", localDateTime(2L)),             
>         // changelogRow("+I", "a", "no1", localDateTime(6L)),
>         changelogRow("+I", "a", "no1", localDateTime(9L))
>         // changelogRow("-D", "a", "no1", localDateTime(6L))
>       )) {code}
> The result will be:
> {code:java}
> a,1,1970-01-01T00:00:09,1970-01-01T00:00:14
> a,2,1970-01-01T00:00:01,1970-01-01T00:00:07{code}
> When there is a minibatch operator upstream and CDC messages may be folded, 
> the results of the session group window agg node may be different.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to