[ 
https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846626#comment-17846626
 ] 

Shuai Xu commented on FLINK-34380:
----------------------------------

Hi [~rovboyko] , sorry for late reply. 
For the  incorrect order of output records, the minibatch optimization is 
designed to guanrantee final consistency. And the fix you mentioned has been 
considered when the pr was reviewed. Flink is a distributed realtime processing 
system. The order of output could be guanranteed on a node by using 
LinkedHashMap, however, it could not be guranteed when join operator runs on 
multiple nodes. So I think it makes little sense to keep the order here.

For the Rowkind, it was also reviewed. As you mentioned, it is a common problem 
of MiniBatch functionality. It does not influence final result. From the 
benefit perspective, this problem could be tolerable.

> Strange RowKind and records about intermediate output when using minibatch 
> join
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-34380
>                 URL: https://issues.apache.org/jira/browse/FLINK-34380
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.19.0
>            Reporter: xuyang
>            Priority: Major
>             Fix For: 1.20.0
>
>
> {code:java}
> // Add it in CalcItCase
> @Test
>   def test(): Unit = {
>     env.setParallelism(1)
>     val rows = Seq(
>       changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
>       changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
>       changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
>       changelogRow("-D", java.lang.Integer.valueOf(1), "99")
>     )
>     val dataId = TestValuesTableFactory.registerData(rows)
>     val ddl =
>       s"""
>          |CREATE TABLE t1 (
>          |  a int,
>          |  b string
>          |) WITH (
>          |  'connector' = 'values',
>          |  'data-id' = '$dataId',
>          |  'bounded' = 'false'
>          |)
>        """.stripMargin
>     tEnv.executeSql(ddl)
>     val ddl2 =
>       s"""
>          |CREATE TABLE t2 (
>          |  a int,
>          |  b string
>          |) WITH (
>          |  'connector' = 'values',
>          |  'data-id' = '$dataId',
>          |  'bounded' = 'false'
>          |)
>        """.stripMargin
>     tEnv.executeSql(ddl2)
>     tEnv.getConfig.getConfiguration
>       .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
>     tEnv.getConfig.getConfiguration
>       .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
>     tEnv.getConfig.getConfiguration
>       .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
>     println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = 
> t2.a").explain())
>     tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
>   } {code}
> Output:
> {code:java}
> +----+-------------+-----------------+-------------+---------+
> | op |           a |               b |          a0 |      b0 |
> +----+-------------+-----------------+-------------+---------+
> | +U |           1 |               1 |           1 |      99 |
> | +U |           1 |              99 |           1 |      99 |
> | -U |           1 |               1 |           1 |      99 |
> | -D |           1 |              99 |           1 |      99 |
> +----+-------------+-----------------+-------------+---------+{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to