xuyang created FLINK-34378:
------------------------------
Summary: Minibatch join disrupted the original order of input
records
Key: FLINK-34378
URL: https://issues.apache.org/jira/browse/FLINK-34378
Project: Flink
Issue Type: Technical Debt
Reporter: xuyang
I'm not sure if it's a bug, the following case can re-produce this bug.
{code:java}
// add it in CalcITCase
@Test
def test(): Unit = {
env.setParallelism(1)
val rows = Seq(
row(1, "1"),
row(2, "2"),
row(3, "3"),
row(4, "4"),
row(5, "5"),
row(6, "6"),
row(7, "7"),
row(8, "8"))
val dataId = TestValuesTableFactory.registerData(rows)
val ddl =
s"""
|CREATE TABLE t1 (
| a int,
| b string
|) WITH (
| 'connector' = 'values',
| 'data-id' = '$dataId',
| 'bounded' = 'false'
|)
""".stripMargin
tEnv.executeSql(ddl)
val ddl2 =
s"""
|CREATE TABLE t2 (
| a int,
| b string
|) WITH (
| 'connector' = 'values',
| 'data-id' = '$dataId',
| 'bounded' = 'false'
|)
""".stripMargin
tEnv.executeSql(ddl2)
tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
Duration.ofSeconds(5))
tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L))
println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())
tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
}{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)