[ 
https://issues.apache.org/jira/browse/FLINK-34378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuyang updated FLINK-34378:
---------------------------
    Description: 
I'm not sure if it's a bug. The following case can re-produce this situation.
{code:java}
// add it in CalcITCase
@Test
def test(): Unit = {
  env.setParallelism(1)
  val rows = Seq(
    row(1, "1"),
    row(2, "2"),
    row(3, "3"),
    row(4, "4"),
    row(5, "5"),
    row(6, "6"),
    row(7, "7"),
    row(8, "8"))
  val dataId = TestValuesTableFactory.registerData(rows)

  val ddl =
    s"""
       |CREATE TABLE t1 (
       |  a int,
       |  b string
       |) WITH (
       |  'connector' = 'values',
       |  'data-id' = '$dataId',
       |  'bounded' = 'false'
       |)
     """.stripMargin
  tEnv.executeSql(ddl)

  val ddl2 =
    s"""
       |CREATE TABLE t2 (
       |  a int,
       |  b string
       |) WITH (
       |  'connector' = 'values',
       |  'data-id' = '$dataId',
       |  'bounded' = 'false'
       |)
     """.stripMargin
  tEnv.executeSql(ddl2)

  tEnv.getConfig.getConfiguration
    .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
  tEnv.getConfig.getConfiguration
    .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
Duration.ofSeconds(5))
  tEnv.getConfig.getConfiguration
    .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L))

  println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())

  tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
}{code}

  was:
I'm not sure if it's a bug, the following case can re-produce this bug.
{code:java}
// add it in CalcITCase
@Test
def test(): Unit = {
  env.setParallelism(1)
  val rows = Seq(
    row(1, "1"),
    row(2, "2"),
    row(3, "3"),
    row(4, "4"),
    row(5, "5"),
    row(6, "6"),
    row(7, "7"),
    row(8, "8"))
  val dataId = TestValuesTableFactory.registerData(rows)

  val ddl =
    s"""
       |CREATE TABLE t1 (
       |  a int,
       |  b string
       |) WITH (
       |  'connector' = 'values',
       |  'data-id' = '$dataId',
       |  'bounded' = 'false'
       |)
     """.stripMargin
  tEnv.executeSql(ddl)

  val ddl2 =
    s"""
       |CREATE TABLE t2 (
       |  a int,
       |  b string
       |) WITH (
       |  'connector' = 'values',
       |  'data-id' = '$dataId',
       |  'bounded' = 'false'
       |)
     """.stripMargin
  tEnv.executeSql(ddl2)

  tEnv.getConfig.getConfiguration
    .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
  tEnv.getConfig.getConfiguration
    .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
Duration.ofSeconds(5))
  tEnv.getConfig.getConfiguration
    .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L))

  println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())

  tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
}{code}


> Minibatch join disrupted the original order of input records
> ------------------------------------------------------------
>
>                 Key: FLINK-34378
>                 URL: https://issues.apache.org/jira/browse/FLINK-34378
>             Project: Flink
>          Issue Type: Technical Debt
>            Reporter: xuyang
>            Priority: Major
>
> I'm not sure if it's a bug. The following case can re-produce this situation.
> {code:java}
> // add it in CalcITCase
> @Test
> def test(): Unit = {
>   env.setParallelism(1)
>   val rows = Seq(
>     row(1, "1"),
>     row(2, "2"),
>     row(3, "3"),
>     row(4, "4"),
>     row(5, "5"),
>     row(6, "6"),
>     row(7, "7"),
>     row(8, "8"))
>   val dataId = TestValuesTableFactory.registerData(rows)
>   val ddl =
>     s"""
>        |CREATE TABLE t1 (
>        |  a int,
>        |  b string
>        |) WITH (
>        |  'connector' = 'values',
>        |  'data-id' = '$dataId',
>        |  'bounded' = 'false'
>        |)
>      """.stripMargin
>   tEnv.executeSql(ddl)
>   val ddl2 =
>     s"""
>        |CREATE TABLE t2 (
>        |  a int,
>        |  b string
>        |) WITH (
>        |  'connector' = 'values',
>        |  'data-id' = '$dataId',
>        |  'bounded' = 'false'
>        |)
>      """.stripMargin
>   tEnv.executeSql(ddl2)
>   tEnv.getConfig.getConfiguration
>     .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
>   tEnv.getConfig.getConfiguration
>     .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
>   tEnv.getConfig.getConfiguration
>     .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L))
>   println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())
>   tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to