[
https://issues.apache.org/jira/browse/FLINK-20909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jark Wu reassigned FLINK-20909:
-------------------------------
Assignee: Andy
> MiniBatch Interval derivation does not work well when enable miniBatch
> optimization in a job which contains deduplicate on row and unbounded
> aggregate.
> -------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-20909
> URL: https://issues.apache.org/jira/browse/FLINK-20909
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Planner
> Affects Versions: 1.12.0
> Reporter: Andy
> Assignee: Andy
> Priority: Major
> Fix For: 1.13.0
>
>
> MiniBatch Interval derivation does not work well when enable miniBatch
> optimization in a job which contains deduplicate on row and unbounded
> aggregate.
> {code:java}
> @Test
> def testLastRowOnRowtime1(): Unit = {
> val t = env.fromCollection(rowtimeTestData)
> .assignTimestampsAndWatermarks(new RowtimeExtractor)
> .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime())
> tEnv.registerTable("T", t)
> tEnv.executeSql(
> s"""
> |CREATE TABLE rowtime_sink (
> | cnt BIGINT
> |) WITH (
> | 'connector' = 'values',
> | 'sink-insert-only' = 'false',
> | 'changelog-mode' = 'I,UA,D'
> |)
> |""".stripMargin)
> val sql =
> """
> |INSERT INTO rowtime_sink
> |SELECT COUNT(b) FROM (
> | SELECT a, b, c, rowtime
> | FROM (
> | SELECT *,
> | ROW_NUMBER() OVER (PARTITION BY b ORDER BY rowtime DESC) as rowNum
> | FROM T
> | )
> | WHERE rowNum = 1
> | )
> """.stripMargin
> tEnv.executeSql(sql).await()
> val rawResult = TestValuesTableFactory.getRawResults("rowtime_sink")
> }{code}
> E.g for the above sql, when enable MiniBatch optimization, the optimized plan
> is as following.
> {code:java}
> Sink(table=[default_catalog.default_database.rowtime_sink], fields=[EXPR$0])
> +- GlobalGroupAggregate(select=[COUNT_RETRACT(count$0) AS EXPR$0])
> +- Exchange(distribution=[single])
> +- LocalGroupAggregate(select=[COUNT_RETRACT(b) AS count$0,
> COUNT_RETRACT(*) AS count1$1])
> +- Calc(select=[b])
> +- Deduplicate(keep=[LastRow], key=[b], order=[ROWTIME])
> +- Exchange(distribution=[hash[b]])
> +- Calc(select=[b, rowtime])
> +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
> +- DataStreamScan(table=[[default_catalog,
> default_database, T]], fields=[a, b, c, rowtime]){code}
> A `StreamExecMiniBatchAssigner` will be inserted. The behavior is weird
> because `Deduplicate` depends on rowTime, however
> `ProcTimeMiniBatchAssignerOperator` will send watermark every specified
> interval second depends on process time. For `Deduplicate`, the incoming
> watermark does not relate to rowTime of incoming record, it cannot indicate
> rowTime of all following input records are all larger than or equals to the
> current incoming watermark.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)