[
https://issues.apache.org/jira/browse/FLINK-34378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
xuyang updated FLINK-34378:
---------------------------
Description:
I'm not sure if it's a bug. The following case can re-produce this situation.
{code:java}
// add it in CalcITCase
@Test
def test(): Unit = {
env.setParallelism(1)
val rows = Seq(
row(1, "1"),
row(2, "2"),
row(3, "3"),
row(4, "4"),
row(5, "5"),
row(6, "6"),
row(7, "7"),
row(8, "8"))
val dataId = TestValuesTableFactory.registerData(rows)
val ddl =
s"""
|CREATE TABLE t1 (
| a int,
| b string
|) WITH (
| 'connector' = 'values',
| 'data-id' = '$dataId',
| 'bounded' = 'false'
|)
""".stripMargin
tEnv.executeSql(ddl)
val ddl2 =
s"""
|CREATE TABLE t2 (
| a int,
| b string
|) WITH (
| 'connector' = 'values',
| 'data-id' = '$dataId',
| 'bounded' = 'false'
|)
""".stripMargin
tEnv.executeSql(ddl2)
tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
Duration.ofSeconds(5))
tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L))
println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())
tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
}{code}
Result
{code:java}
+----+---+---+---+---+
| op | a | b | a0| b0|
+----+---+---+---+---+
| +I | 3 | 3 | 3 | 3 |
| +I | 7 | 7 | 7 | 7 |
| +I | 2 | 2 | 2 | 2 |
| +I | 5 | 5 | 5 | 5 |
| +I | 1 | 1 | 1 | 1 |
| +I | 6 | 6 | 6 | 6 |
| +I | 4 | 4 | 4 | 4 |
| +I | 8 | 8 | 8 | 8 |
+----+---+---+---+---+
{code}
was:
I'm not sure if it's a bug. The following case can re-produce this situation.
{code:java}
// add it in CalcITCase
@Test
def test(): Unit = {
env.setParallelism(1)
val rows = Seq(
row(1, "1"),
row(2, "2"),
row(3, "3"),
row(4, "4"),
row(5, "5"),
row(6, "6"),
row(7, "7"),
row(8, "8"))
val dataId = TestValuesTableFactory.registerData(rows)
val ddl =
s"""
|CREATE TABLE t1 (
| a int,
| b string
|) WITH (
| 'connector' = 'values',
| 'data-id' = '$dataId',
| 'bounded' = 'false'
|)
""".stripMargin
tEnv.executeSql(ddl)
val ddl2 =
s"""
|CREATE TABLE t2 (
| a int,
| b string
|) WITH (
| 'connector' = 'values',
| 'data-id' = '$dataId',
| 'bounded' = 'false'
|)
""".stripMargin
tEnv.executeSql(ddl2)
tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
Duration.ofSeconds(5))
tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L))
println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())
tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
}{code}
Result
{code:java}
// code placeholder
{code}
> Minibatch join disrupted the original order of input records
> ------------------------------------------------------------
>
> Key: FLINK-34378
> URL: https://issues.apache.org/jira/browse/FLINK-34378
> Project: Flink
> Issue Type: Technical Debt
> Components: Table SQL / Runtime
> Affects Versions: 1.19.0
> Reporter: xuyang
> Priority: Major
> Fix For: 1.19.0
>
>
> I'm not sure if it's a bug. The following case can re-produce this situation.
> {code:java}
> // add it in CalcITCase
> @Test
> def test(): Unit = {
> env.setParallelism(1)
> val rows = Seq(
> row(1, "1"),
> row(2, "2"),
> row(3, "3"),
> row(4, "4"),
> row(5, "5"),
> row(6, "6"),
> row(7, "7"),
> row(8, "8"))
> val dataId = TestValuesTableFactory.registerData(rows)
> val ddl =
> s"""
> |CREATE TABLE t1 (
> | a int,
> | b string
> |) WITH (
> | 'connector' = 'values',
> | 'data-id' = '$dataId',
> | 'bounded' = 'false'
> |)
> """.stripMargin
> tEnv.executeSql(ddl)
> val ddl2 =
> s"""
> |CREATE TABLE t2 (
> | a int,
> | b string
> |) WITH (
> | 'connector' = 'values',
> | 'data-id' = '$dataId',
> | 'bounded' = 'false'
> |)
> """.stripMargin
> tEnv.executeSql(ddl2)
> tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED,
> Boolean.box(true))
> tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
> Duration.ofSeconds(5))
> tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L))
> println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())
> tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
> }{code}
> Result
> {code:java}
> +----+---+---+---+---+
> | op | a | b | a0| b0|
> +----+---+---+---+---+
> | +I | 3 | 3 | 3 | 3 |
> | +I | 7 | 7 | 7 | 7 |
> | +I | 2 | 2 | 2 | 2 |
> | +I | 5 | 5 | 5 | 5 |
> | +I | 1 | 1 | 1 | 1 |
> | +I | 6 | 6 | 6 | 6 |
> | +I | 4 | 4 | 4 | 4 |
> | +I | 8 | 8 | 8 | 8 |
> +----+---+---+---+---+
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)