[ https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846626#comment-17846626 ]
Shuai Xu commented on FLINK-34380: ---------------------------------- Hi [~rovboyko] , sorry for late reply. For the incorrect order of output records, the minibatch optimization is designed to guanrantee final consistency. And the fix you mentioned has been considered when the pr was reviewed. Flink is a distributed realtime processing system. The order of output could be guanranteed on a node by using LinkedHashMap, however, it could not be guranteed when join operator runs on multiple nodes. So I think it makes little sense to keep the order here. For the Rowkind, it was also reviewed. As you mentioned, it is a common problem of MiniBatch functionality. It does not influence final result. From the benefit perspective, this problem could be tolerable. > Strange RowKind and records about intermediate output when using minibatch > join > ------------------------------------------------------------------------------- > > Key: FLINK-34380 > URL: https://issues.apache.org/jira/browse/FLINK-34380 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.19.0 > Reporter: xuyang > Priority: Major > Fix For: 1.20.0 > > > {code:java} > // Add it in CalcItCase > @Test > def test(): Unit = { > env.setParallelism(1) > val rows = Seq( > changelogRow("+I", java.lang.Integer.valueOf(1), "1"), > changelogRow("-U", java.lang.Integer.valueOf(1), "1"), > changelogRow("+U", java.lang.Integer.valueOf(1), "99"), > changelogRow("-D", java.lang.Integer.valueOf(1), "99") > ) > val dataId = TestValuesTableFactory.registerData(rows) > val ddl = > s""" > |CREATE TABLE t1 ( > | a int, > | b string > |) WITH ( > | 'connector' = 'values', > | 'data-id' = '$dataId', > | 'bounded' = 'false' > |) > """.stripMargin > tEnv.executeSql(ddl) > val ddl2 = > s""" > |CREATE TABLE t2 ( > | a int, > | b string > |) WITH ( > | 'connector' = 'values', > | 'data-id' = '$dataId', > | 'bounded' = 'false' > |) > """.stripMargin > tEnv.executeSql(ddl2) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, > Boolean.box(true)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, > Duration.ofSeconds(5)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L)) > println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = > t2.a").explain()) > tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() > } {code} > Output: > {code:java} > +----+-------------+-----------------+-------------+---------+ > | op | a | b | a0 | b0 | > +----+-------------+-----------------+-------------+---------+ > | +U | 1 | 1 | 1 | 99 | > | +U | 1 | 99 | 1 | 99 | > | -U | 1 | 1 | 1 | 99 | > | -D | 1 | 99 | 1 | 99 | > +----+-------------+-----------------+-------------+---------+{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)