xuyang created FLINK-34380:
------------------------------
Summary: Strange RowKind and records about intermediate output
when using minibatch join
Key: FLINK-34380
URL: https://issues.apache.org/jira/browse/FLINK-34380
Project: Flink
Issue Type: Bug
Components: Table SQL / Runtime
Affects Versions: 1.19.0
Reporter: xuyang
Fix For: 1.19.0
{code:java}
// Add it in CalcItCase
@Test
def test(): Unit = {
env.setParallelism(1)
val rows = Seq(
changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
changelogRow("-D", java.lang.Integer.valueOf(1), "99")
)
val dataId = TestValuesTableFactory.registerData(rows)
val ddl =
s"""
|CREATE TABLE t1 (
| a int,
| b string
|) WITH (
| 'connector' = 'values',
| 'data-id' = '$dataId',
| 'bounded' = 'false'
|)
""".stripMargin
tEnv.executeSql(ddl)
val ddl2 =
s"""
|CREATE TABLE t2 (
| a int,
| b string
|) WITH (
| 'connector' = 'values',
| 'data-id' = '$dataId',
| 'bounded' = 'false'
|)
""".stripMargin
tEnv.executeSql(ddl2)
tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED,
Boolean.box(true))
tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
Duration.ofSeconds(5))
tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())
tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
} {code}
Output:
{code:java}
+----+-------------+-----------------+-------------+---------+
| op | a | b | a0 | b0 |
+----+-------------+-----------------+-------------+---------+
| +U | 1 | 1 | 1 | 99 |
| +U | 1 | 99 | 1 | 99 |
| -U | 1 | 1 | 1 | 99 |
| -D | 1 | 99 | 1 | 99 |
+----+-------------+-----------------+-------------+---------+{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)