[
https://issues.apache.org/jira/browse/FLINK-34378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
xuyang updated FLINK-34378:
---------------------------
Component/s: Table SQL / Runtime
> Minibatch join disrupted the original order of input records
> ------------------------------------------------------------
>
> Key: FLINK-34378
> URL: https://issues.apache.org/jira/browse/FLINK-34378
> Project: Flink
> Issue Type: Technical Debt
> Components: Table SQL / Runtime
> Reporter: xuyang
> Priority: Major
>
> I'm not sure if it's a bug. The following case can re-produce this situation.
> {code:java}
> // add it in CalcITCase
> @Test
> def test(): Unit = {
> env.setParallelism(1)
> val rows = Seq(
> row(1, "1"),
> row(2, "2"),
> row(3, "3"),
> row(4, "4"),
> row(5, "5"),
> row(6, "6"),
> row(7, "7"),
> row(8, "8"))
> val dataId = TestValuesTableFactory.registerData(rows)
> val ddl =
> s"""
> |CREATE TABLE t1 (
> | a int,
> | b string
> |) WITH (
> | 'connector' = 'values',
> | 'data-id' = '$dataId',
> | 'bounded' = 'false'
> |)
> """.stripMargin
> tEnv.executeSql(ddl)
> val ddl2 =
> s"""
> |CREATE TABLE t2 (
> | a int,
> | b string
> |) WITH (
> | 'connector' = 'values',
> | 'data-id' = '$dataId',
> | 'bounded' = 'false'
> |)
> """.stripMargin
> tEnv.executeSql(ddl2)
> tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED,
> Boolean.box(true))
> tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
> Duration.ofSeconds(5))
> tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L))
> println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())
> tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
> }{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)