[
https://issues.apache.org/jira/browse/FLINK-34378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17820924#comment-17820924
]
xuyang commented on FLINK-34378:
--------------------------------
Thanks for your answer, [~xu_shuai_]. That sounds good to me. I'll close this
jira.
> Minibatch join disrupted the original order of input records
> ------------------------------------------------------------
>
> Key: FLINK-34378
> URL: https://issues.apache.org/jira/browse/FLINK-34378
> Project: Flink
> Issue Type: Technical Debt
> Components: Table SQL / Runtime
> Affects Versions: 1.19.0
> Reporter: xuyang
> Priority: Major
> Fix For: 1.19.0
>
>
> I'm not sure if it's a bug. The following case can re-produce this situation.
> {code:java}
> // add it in CalcITCase
> @Test
> def test(): Unit = {
> env.setParallelism(1)
> val rows = Seq(
> row(1, "1"),
> row(2, "2"),
> row(3, "3"),
> row(4, "4"),
> row(5, "5"),
> row(6, "6"),
> row(7, "7"),
> row(8, "8"))
> val dataId = TestValuesTableFactory.registerData(rows)
> val ddl =
> s"""
> |CREATE TABLE t1 (
> | a int,
> | b string
> |) WITH (
> | 'connector' = 'values',
> | 'data-id' = '$dataId',
> | 'bounded' = 'false'
> |)
> """.stripMargin
> tEnv.executeSql(ddl)
> val ddl2 =
> s"""
> |CREATE TABLE t2 (
> | a int,
> | b string
> |) WITH (
> | 'connector' = 'values',
> | 'data-id' = '$dataId',
> | 'bounded' = 'false'
> |)
> """.stripMargin
> tEnv.executeSql(ddl2)
> tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED,
> Boolean.box(true))
> tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
> Duration.ofSeconds(5))
> tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L))
> println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())
> tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
> }{code}
> Result
> {code:java}
> +----+---+---+---+---+
> | op | a | b | a0| b0|
> +----+---+---+---+---+
> | +I | 3 | 3 | 3 | 3 |
> | +I | 7 | 7 | 7 | 7 |
> | +I | 2 | 2 | 2 | 2 |
> | +I | 5 | 5 | 5 | 5 |
> | +I | 1 | 1 | 1 | 1 |
> | +I | 6 | 6 | 6 | 6 |
> | +I | 4 | 4 | 4 | 4 |
> | +I | 8 | 8 | 8 | 8 |
> +----+---+---+---+---+
> {code}
> When I do not use minibatch join, the result is :
> {code:java}
> +----+---+---+----+----+
> | op | a | b | a0 | b0 |
> +----+---+---+----+----+
> | +I | 1 | 1 | 1 | 1 |
> | +I | 2 | 2 | 2 | 2 |
> | +I | 3 | 3 | 3 | 3 |
> | +I | 4 | 4 | 4 | 4 |
> | +I | 5 | 5 | 5 | 5 |
> | +I | 6 | 6 | 6 | 6 |
> | +I | 7 | 7 | 7 | 7 |
> | +I | 8 | 8 | 8 | 8 |
> +----+---+---+----+----+
> {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)