[ 
https://issues.apache.org/jira/browse/FLINK-34378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17816252#comment-17816252
 ] 

Jeyhun Karimov commented on FLINK-34378:
----------------------------------------

Hi [~xuyangzhong] the ordering is different even with parallelism 1 because of 
{{Set}} in {{MiniBatch}} operator. IMO this is expected behavior.  

> Minibatch join disrupted the original order of input records
> ------------------------------------------------------------
>
>                 Key: FLINK-34378
>                 URL: https://issues.apache.org/jira/browse/FLINK-34378
>             Project: Flink
>          Issue Type: Technical Debt
>          Components: Table SQL / Runtime
>    Affects Versions: 1.19.0
>            Reporter: xuyang
>            Priority: Major
>             Fix For: 1.19.0
>
>
> I'm not sure if it's a bug. The following case can re-produce this situation.
> {code:java}
> // add it in CalcITCase
> @Test
> def test(): Unit = {
>   env.setParallelism(1)
>   val rows = Seq(
>     row(1, "1"),
>     row(2, "2"),
>     row(3, "3"),
>     row(4, "4"),
>     row(5, "5"),
>     row(6, "6"),
>     row(7, "7"),
>     row(8, "8"))
>   val dataId = TestValuesTableFactory.registerData(rows)
>   val ddl =
>     s"""
>        |CREATE TABLE t1 (
>        |  a int,
>        |  b string
>        |) WITH (
>        |  'connector' = 'values',
>        |  'data-id' = '$dataId',
>        |  'bounded' = 'false'
>        |)
>      """.stripMargin
>   tEnv.executeSql(ddl)
>   val ddl2 =
>     s"""
>        |CREATE TABLE t2 (
>        |  a int,
>        |  b string
>        |) WITH (
>        |  'connector' = 'values',
>        |  'data-id' = '$dataId',
>        |  'bounded' = 'false'
>        |)
>      """.stripMargin
>   tEnv.executeSql(ddl2)
>   tEnv.getConfig.getConfiguration
>     .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
>   tEnv.getConfig.getConfiguration
>     .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
>   tEnv.getConfig.getConfiguration
>     .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L))
>   println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())
>   tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
> }{code}
> Result
> {code:java}
> +----+---+---+---+---+ 
> | op | a | b | a0| b0| 
> +----+---+---+---+---+ 
> | +I | 3 | 3 | 3 | 3 | 
> | +I | 7 | 7 | 7 | 7 | 
> | +I | 2 | 2 | 2 | 2 | 
> | +I | 5 | 5 | 5 | 5 | 
> | +I | 1 | 1 | 1 | 1 | 
> | +I | 6 | 6 | 6 | 6 | 
> | +I | 4 | 4 | 4 | 4 | 
> | +I | 8 | 8 | 8 | 8 | 
> +----+---+---+---+---+
> {code}
> When I do not use minibatch join, the result is :
> {code:java}
> +----+---+---+----+----+
> | op | a | b | a0 | b0 |
> +----+---+---+----+----+
> | +I | 1 | 1 |  1 |  1 |
> | +I | 2 | 2 |  2 |  2 |
> | +I | 3 | 3 |  3 |  3 |
> | +I | 4 | 4 |  4 |  4 |
> | +I | 5 | 5 |  5 |  5 |
> | +I | 6 | 6 |  6 |  6 |
> | +I | 7 | 7 |  7 |  7 |
> | +I | 8 | 8 |  8 |  8 |
> +----+---+---+----+----+
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to