[
https://issues.apache.org/jira/browse/FLINK-33760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17810625#comment-17810625
]
Yunhong Zheng commented on FLINK-33760:
---------------------------------------
After offline discussion with [~lincoln.86xy] and [~xuyangzhong]. The key to
this question is whether to retract the data from the window that has already
been sent, to ensure final consistency when -D data exists alone int the
current window, or we just simply discard the retracted record. There's
currently no final solution, a more detailed solution is needed.
> Group Window agg has different result when only consuming -D records while
> using or not using minibatch
> -------------------------------------------------------------------------------------------------------
>
> Key: FLINK-33760
> URL: https://issues.apache.org/jira/browse/FLINK-33760
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Reporter: xuyang
> Assignee: Yunhong Zheng
> Priority: Major
>
> Add the test in AggregateITCase to re-produce this bug.
>
> {code:java}
> @Test
> def test(): Unit = {
> val upsertSourceCurrencyData = List(
> changelogRow("-D", 1.bigDecimal, "a"),
> changelogRow("-D", 1.bigDecimal, "b"),
> changelogRow("-D", 1.bigDecimal, "b")
> )
> val upsertSourceDataId = registerData(upsertSourceCurrencyData);
> tEnv.executeSql(s"""
> |CREATE TABLE T (
> | `a` DECIMAL(32, 8),
> | `d` STRING,
> | proctime as proctime()
> |) WITH (
> | 'connector' = 'values',
> | 'data-id' = '$upsertSourceDataId',
> | 'changelog-mode' = 'I,UA,UB,D',
> | 'failing-source' = 'true'
> |)
> |""".stripMargin)
> val sql =
> "SELECT max(a), sum(a), min(a), TUMBLE_START(proctime, INTERVAL '0.005'
> SECOND), TUMBLE_END(proctime, INTERVAL '0.005' SECOND), d FROM T GROUP BY d,
> TUMBLE(proctime, INTERVAL '0.005' SECOND)"
> val sink = new TestingRetractSink
> tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink)
> env.execute()
> // Use the result precision/scale calculated for sum and don't override
> with the one calculated
> // for plus()/minus(), which results in loosing a decimal digit.
> val expected =
> List("6.41671935,65947.23071935707000000000,609.02867403703699700000")
> assertEquals(expected, sink.getRetractResults.sorted)
> } {code}
> When MiniBatch is ON, the result is `List()`.
>
> When MiniBatch is OFF, the result is
> `List(null,-1.00000000,null,2023-12-06T11:29:21.895,2023-12-06T11:29:21.900,a)`.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)