xuyang created FLINK-34378:
------------------------------

             Summary: Minibatch join disrupted the original order of input 
records
                 Key: FLINK-34378
                 URL: https://issues.apache.org/jira/browse/FLINK-34378
             Project: Flink
          Issue Type: Technical Debt
            Reporter: xuyang


I'm not sure if it's a bug, the following case can re-produce this bug.
{code:java}
// add it in CalcITCase
@Test
def test(): Unit = {
  env.setParallelism(1)
  val rows = Seq(
    row(1, "1"),
    row(2, "2"),
    row(3, "3"),
    row(4, "4"),
    row(5, "5"),
    row(6, "6"),
    row(7, "7"),
    row(8, "8"))
  val dataId = TestValuesTableFactory.registerData(rows)

  val ddl =
    s"""
       |CREATE TABLE t1 (
       |  a int,
       |  b string
       |) WITH (
       |  'connector' = 'values',
       |  'data-id' = '$dataId',
       |  'bounded' = 'false'
       |)
     """.stripMargin
  tEnv.executeSql(ddl)

  val ddl2 =
    s"""
       |CREATE TABLE t2 (
       |  a int,
       |  b string
       |) WITH (
       |  'connector' = 'values',
       |  'data-id' = '$dataId',
       |  'bounded' = 'false'
       |)
     """.stripMargin
  tEnv.executeSql(ddl2)

  tEnv.getConfig.getConfiguration
    .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
  tEnv.getConfig.getConfiguration
    .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
Duration.ofSeconds(5))
  tEnv.getConfig.getConfiguration
    .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L))

  println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())

  tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
}{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to