[ 
https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846520#comment-17846520
 ] 

Roman Boyko commented on FLINK-34380:
-------------------------------------

Hi [~xuyangzhong] ! Thank you for your reply.

Yes, you're right - the RowKind still not fixed in this example. But I think we 
should consider to fix the RowKind in separate issue because:

1) Incorrect RowKind in your example is the common problem of MiniBatch 
functionality. It happens every time when +I and -U records are assigned to 
first batch and then +U record is assigned to second batch. And it can't be 
fixed easily and only for Join operator - we should try to reproduce the same 
for Aggregate operator as well

2) While incorrect RowKind is not so serious problem, the incorrect order of 
output records might be really critical because it leads to incorrect result

So I sugest to fix only incorrect order in this issue and create the separate 
one for incorrect RowKind.

> Strange RowKind and records about intermediate output when using minibatch 
> join
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-34380
>                 URL: https://issues.apache.org/jira/browse/FLINK-34380
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.19.0
>            Reporter: xuyang
>            Priority: Major
>             Fix For: 1.20.0
>
>
> {code:java}
> // Add it in CalcItCase
> @Test
>   def test(): Unit = {
>     env.setParallelism(1)
>     val rows = Seq(
>       changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
>       changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
>       changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
>       changelogRow("-D", java.lang.Integer.valueOf(1), "99")
>     )
>     val dataId = TestValuesTableFactory.registerData(rows)
>     val ddl =
>       s"""
>          |CREATE TABLE t1 (
>          |  a int,
>          |  b string
>          |) WITH (
>          |  'connector' = 'values',
>          |  'data-id' = '$dataId',
>          |  'bounded' = 'false'
>          |)
>        """.stripMargin
>     tEnv.executeSql(ddl)
>     val ddl2 =
>       s"""
>          |CREATE TABLE t2 (
>          |  a int,
>          |  b string
>          |) WITH (
>          |  'connector' = 'values',
>          |  'data-id' = '$dataId',
>          |  'bounded' = 'false'
>          |)
>        """.stripMargin
>     tEnv.executeSql(ddl2)
>     tEnv.getConfig.getConfiguration
>       .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
>     tEnv.getConfig.getConfiguration
>       .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
>     tEnv.getConfig.getConfiguration
>       .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
>     println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = 
> t2.a").explain())
>     tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
>   } {code}
> Output:
> {code:java}
> +----+-------------+-----------------+-------------+---------+
> | op |           a |               b |          a0 |      b0 |
> +----+-------------+-----------------+-------------+---------+
> | +U |           1 |               1 |           1 |      99 |
> | +U |           1 |              99 |           1 |      99 |
> | -U |           1 |               1 |           1 |      99 |
> | -D |           1 |              99 |           1 |      99 |
> +----+-------------+-----------------+-------------+---------+{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to