[ 
https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842243#comment-17842243
 ] 

Roman Boyko commented on FLINK-34380:
-------------------------------------

[~xuyangzhong] , [~xu_shuai_] , what do you think? May I create a PR for fixing 
it?

> Strange RowKind and records about intermediate output when using minibatch 
> join
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-34380
>                 URL: https://issues.apache.org/jira/browse/FLINK-34380
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.19.0
>            Reporter: xuyang
>            Priority: Major
>             Fix For: 1.20.0
>
>
> {code:java}
> // Add it in CalcItCase
> @Test
>   def test(): Unit = {
>     env.setParallelism(1)
>     val rows = Seq(
>       changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
>       changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
>       changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
>       changelogRow("-D", java.lang.Integer.valueOf(1), "99")
>     )
>     val dataId = TestValuesTableFactory.registerData(rows)
>     val ddl =
>       s"""
>          |CREATE TABLE t1 (
>          |  a int,
>          |  b string
>          |) WITH (
>          |  'connector' = 'values',
>          |  'data-id' = '$dataId',
>          |  'bounded' = 'false'
>          |)
>        """.stripMargin
>     tEnv.executeSql(ddl)
>     val ddl2 =
>       s"""
>          |CREATE TABLE t2 (
>          |  a int,
>          |  b string
>          |) WITH (
>          |  'connector' = 'values',
>          |  'data-id' = '$dataId',
>          |  'bounded' = 'false'
>          |)
>        """.stripMargin
>     tEnv.executeSql(ddl2)
>     tEnv.getConfig.getConfiguration
>       .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
>     tEnv.getConfig.getConfiguration
>       .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
>     tEnv.getConfig.getConfiguration
>       .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
>     println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = 
> t2.a").explain())
>     tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
>   } {code}
> Output:
> {code:java}
> +----+-------------+-----------------+-------------+---------+
> | op |           a |               b |          a0 |      b0 |
> +----+-------------+-----------------+-------------+---------+
> | +U |           1 |               1 |           1 |      99 |
> | +U |           1 |              99 |           1 |      99 |
> | -U |           1 |               1 |           1 |      99 |
> | -D |           1 |              99 |           1 |      99 |
> +----+-------------+-----------------+-------------+---------+{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to