This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new f2c44179d [flink] Delta Join additional IT tests and docs improvement
(#2546)
f2c44179d is described below
commit f2c44179d1fbd323a30eef3764bebab304d508f8
Author: Xuyang <[email protected]>
AuthorDate: Thu Feb 5 10:44:31 2026 +0800
[flink] Delta Join additional IT tests and docs improvement (#2546)
---
.../fluss/flink/source/Flink22DeltaJoinITCase.java | 1083 ++++++++++++++++++++
.../flink/source/Flink22TableSourceITCase.java | 290 +-----
.../FlussTableLakeSnapshotCommitterTest.java | 2 +-
.../committer/TieringCommitOperatorTest.java | 2 +-
.../apache/fluss/flink/utils/FlinkTestBase.java | 2 +-
website/docs/engine-flink/delta-joins.md | 19 +-
6 files changed, 1097 insertions(+), 301 deletions(-)
diff --git
a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22DeltaJoinITCase.java
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22DeltaJoinITCase.java
new file mode 100644
index 000000000..0fe8fb73c
--- /dev/null
+++
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22DeltaJoinITCase.java
@@ -0,0 +1,1083 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.flink.utils.FlinkTestBase;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.shaded.guava32.com.google.common.collect.ImmutableMap;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
+import static
org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** IT case for Delta Join optimization in Flink 2.2. */
+public class Flink22DeltaJoinITCase extends FlinkTestBase {
+
+ private static final String CATALOG_NAME = "test_catalog";
+
+ private StreamTableEnvironment tEnv;
+
+ @BeforeEach
+ public void beforeEach() {
+ bootstrapServers =
conn.getConfiguration().get(ConfigOptions.BOOTSTRAP_SERVERS).get(0);
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ tEnv = StreamTableEnvironment.create(env,
EnvironmentSettings.inStreamingMode());
+
+ tEnv.executeSql(
+ String.format(
+ "create catalog %s with ('type' = 'fluss', '%s' =
'%s')",
+ CATALOG_NAME, BOOTSTRAP_SERVERS.key(),
bootstrapServers));
+ tEnv.useCatalog(CATALOG_NAME);
+ tEnv.executeSql(String.format("create database if not exists `%s`",
DEFAULT_DB));
+ tEnv.useDatabase(DEFAULT_DB);
+
+ // start two jobs for this test: one for DML involving the delta join,
and the other for DQL
+ // to query the results of the sink table
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
+ // Set FORCE strategy for delta join
+ tEnv.getConfig()
+ .set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
+ OptimizerConfigOptions.DeltaJoinStrategy.FORCE);
+ }
+
+ @AfterEach
+ void after() {
+ tEnv.useDatabase(BUILTIN_DATABASE);
+ tEnv.executeSql(String.format("drop database `%s` cascade",
DEFAULT_DB));
+ }
+
+ /**
+ * Creates a source table with specified schema and options.
+ *
+ * @param tableName the name of the table
+ * @param columns column definitions (e.g., "a1 int, b1 varchar, c1
bigint, d1 int, e1 bigint")
+ * @param primaryKey primary key columns (e.g., "c1, d1")
+ * @param bucketKey bucket key column (e.g., "c1")
+ * @param extraOptions additional WITH options (e.g., "lookup.cache" =
"partial"), or null
+ */
+ private void createSource(
+ String tableName,
+ String columns,
+ String primaryKey,
+ String bucketKey,
+ @Nullable String partitionKey,
+ @Nullable Map<String, String> extraOptions) {
+ Map<String, String> withOptions = new HashMap<>();
+ if (extraOptions != null) {
+ withOptions.putAll(extraOptions);
+ }
+ withOptions.put("connector", "fluss");
+ withOptions.put("bucket.key", bucketKey);
+ StringBuilder ddlBuilder = new StringBuilder();
+ ddlBuilder.append(
+ String.format(
+ "create table %s ( %s, primary key (%s) NOT ENFORCED
)",
+ tableName, columns, primaryKey));
+ if (partitionKey != null) {
+ ddlBuilder.append(String.format(" partitioned by (%s)",
partitionKey));
+ withOptions.put("table.auto-partition.enabled", "true");
+ withOptions.put("table.auto-partition.time-unit", "year");
+ }
+ ddlBuilder.append(
+ String.format(
+ " with (%s)",
+ withOptions.entrySet().stream()
+ .map(e -> String.format("'%s' = '%s'",
e.getKey(), e.getValue()))
+ .collect(Collectors.joining(", "))));
+
+ tEnv.executeSql(ddlBuilder.toString());
+ }
+
+ /**
+ * Creates a sink table with specified columns.
+ *
+ * @param tableName the name of the table
+ * @param columns the column definitions (e.g., "a1 int, c1 bigint, a2
int")
+ * @param primaryKey the primary key columns (e.g., "c1, d1, c2, d2")
+ */
+ private void createSink(String tableName, String columns, String
primaryKey) {
+ tEnv.executeSql(
+ String.format(
+ "create table %s ("
+ + " %s, "
+ + " primary key (%s) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss'"
+ + ")",
+ tableName, columns, primaryKey));
+ }
+
+ @Test
+ void testDeltaJoin() throws Exception {
+ // disable cache to get stable results with updating
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_DELTA_JOIN_CACHE_ENABLED,
false);
+ String leftTableName = "left_table";
+ String rightTableName = "right_table";
+ String sinkTableName = "sink_table";
+
+ createSource(
+ leftTableName,
+ "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
+ "c1, d1",
+ "c1",
+ null,
+ ImmutableMap.of("table.delete.behavior", "IGNORE"));
+ createSource(
+ rightTableName,
+ "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
+ "c2, d2",
+ "c2",
+ null,
+ ImmutableMap.of("table.delete.behavior", "IGNORE"));
+ createSink(
+ sinkTableName,
+ "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2
varchar, c2 bigint, d2 int, e2 bigint",
+ "c1, d1, c2, d2");
+
+ List<InternalRow> rows1 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v2", 100L, 2, 20000L),
+ row(3, "v3", 300L, 3, 30000L),
+ row(4, "v4", 400L, 4, 40000L),
+ // update
+ row(5, "v5", 100L, 1, 50000L));
+ TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
+ writeRows(conn, leftTablePath, rows1, false);
+ // wait for the first snapshot to finish to get the stable result
+ FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(leftTablePath);
+
+ List<InternalRow> rows2 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v2", 200L, 2, 20000L),
+ row(3, "v3", 330L, 4, 30000L),
+ row(4, "v4", 500L, 4, 50000L),
+ // update
+ row(6, "v6", 100L, 1, 60000L));
+ TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
+ writeRows(conn, rightTablePath, rows2, false);
+ // wait for the first snapshot to finish to get the stable result
+ FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(rightTablePath);
+
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 =
c2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThat(tEnv.explainSql(sql))
+ .contains("DeltaJoin(joinType=[InnerJoin], where=[(c1 = c2)]");
+
+ tEnv.executeSql(sql);
+
+ CloseableIterator<Row> collected =
+ tEnv.executeSql(String.format("select * from %s",
sinkTableName)).collect();
+ List<String> expected =
+ Arrays.asList(
+ "+I[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]",
+ "-U[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]",
+ "+U[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]",
+ "+I[2, v2, 100, 2, 20000, 6, v6, 100, 1, 60000]",
+ "-U[2, v2, 100, 2, 20000, 6, v6, 100, 1, 60000]",
+ "+U[2, v2, 100, 2, 20000, 6, v6, 100, 1, 60000]");
+ assertResultsIgnoreOrder(collected, expected, true);
+ }
+
+ @Test
+ void testDeltaJoinOnPrimaryKey() throws Exception {
+ // disable cache to get stable results with updating
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_DELTA_JOIN_CACHE_ENABLED,
false);
+ String leftTableName = "left_table";
+ String rightTableName = "right_table";
+ String sinkTableName = "sink_table";
+
+ createSource(
+ leftTableName,
+ "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
+ "c1, d1",
+ "c1",
+ null,
+ ImmutableMap.of("table.delete.behavior", "IGNORE"));
+ createSource(
+ rightTableName,
+ "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
+ "c2, d2",
+ "c2",
+ null,
+ ImmutableMap.of("table.delete.behavior", "IGNORE"));
+ createSink(
+ sinkTableName,
+ "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2
varchar, c2 bigint, d2 int, e2 bigint",
+ "c1, d1, c2, d2");
+
+ List<InternalRow> rows1 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v2", 200L, 2, 20000L),
+ row(3, "v3", 300L, 3, 30000L),
+ row(4, "v4", 400L, 4, 40000L),
+ // update
+ row(5, "v5", 100L, 1, 50000L));
+ TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
+ writeRows(conn, leftTablePath, rows1, false);
+ // wait for the first snapshot to finish to get the stable result
+ FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(leftTablePath);
+
+ List<InternalRow> rows2 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v2", 200L, 2, 20000L),
+ row(3, "v3", 300L, 4, 30000L),
+ row(4, "v4", 500L, 4, 50000L),
+ // update
+ row(6, "v6", 100L, 1, 60000L));
+ TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
+ writeRows(conn, rightTablePath, rows2, false);
+ // wait for the first snapshot to finish to get the stable result
+ FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(rightTablePath);
+
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 =
c2 AND d1 = d2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThat(tEnv.explainSql(sql))
+ .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2)
AND (d1 = d2))]");
+
+ tEnv.executeSql(sql);
+
+ CloseableIterator<Row> collected =
+ tEnv.executeSql(String.format("select * from %s",
sinkTableName)).collect();
+ List<String> expected =
+ Arrays.asList(
+ "+I[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]",
+ "-U[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]",
+ "+U[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]",
+ "+I[2, v2, 200, 2, 20000, 2, v2, 200, 2, 20000]",
+ "-U[2, v2, 200, 2, 20000, 2, v2, 200, 2, 20000]",
+ "+U[2, v2, 200, 2, 20000, 2, v2, 200, 2, 20000]");
+ assertResultsIgnoreOrder(collected, expected, true);
+ }
+
+ @Test
+ void testDeltaJoinWithCalc() throws Exception {
+ String leftTableName = "left_table_proj";
+ createSource(
+ leftTableName,
+ "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
+ "c1, d1",
+ "c1",
+ null,
+ ImmutableMap.of("table.delete.behavior", "IGNORE"));
+
+ List<InternalRow> rows1 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v2", 200L, 2, 20000L),
+ row(3, "v1", 300L, 3, 30000L));
+ TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
+ writeRows(conn, leftTablePath, rows1, false);
+
+ String rightTableName = "right_table_proj";
+ createSource(
+ rightTableName,
+ "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
+ "c2",
+ "c2",
+ null,
+ ImmutableMap.of("table.delete.behavior", "IGNORE"));
+
+ List<InternalRow> rows2 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v2", 200L, 2, 20000L),
+ row(3, "v3", 300L, 4, 30000L));
+ TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
+ writeRows(conn, rightTablePath, rows2, false);
+
+ String sinkTableName = "sink_table_proj";
+ createSink(sinkTableName, "a1 int, c1 bigint, d1 int, a2 int", "c1,
d1");
+
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT a1, c1, d1, a2 FROM ("
+ + " SELECT * FROM %s WHERE d1 > 1"
+ + ") INNER JOIN ("
+ + " SELECT * FROM %s WHERE c2 < 300"
+ + ") ON c1 = c2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThat(tEnv.explainSql(sql))
+ .contains("DeltaJoin(joinType=[InnerJoin], where=[(c1 = c2)]");
+
+ tEnv.executeSql(sql);
+
+ CloseableIterator<Row> collected =
+ tEnv.executeSql(String.format("select * from %s",
sinkTableName)).collect();
+ List<String> expected =
+ Arrays.asList("+I[2, 200, 2, 2]", "-U[2, 200, 2, 2]", "+U[2,
200, 2, 2]");
+ assertResultsIgnoreOrder(collected, expected, true);
+ }
+
+ @Test
+ void testDeltaJoinWithAppendOnlySourceAndCalc() throws Exception {
+ String leftTableName = "left_table_proj";
+ createSource(
+ leftTableName,
+ "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
+ "c1, d1",
+ "c1",
+ null,
+ ImmutableMap.of("table.merge-engine", "first_row"));
+
+ List<InternalRow> rows1 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v2", 200L, 2, 20000L),
+ row(3, "v1", 300L, 3, 30000L));
+ TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
+ writeRows(conn, leftTablePath, rows1, false);
+
+ String rightTableName = "right_table_proj";
+ createSource(
+ rightTableName,
+ "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
+ "c2, d2",
+ "c2",
+ null,
+ ImmutableMap.of("table.merge-engine", "first_row"));
+
+ List<InternalRow> rows2 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v3", 200L, 2, 20000L),
+ row(3, "v4", 400L, 4, 30000L));
+ TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
+ writeRows(conn, rightTablePath, rows2, false);
+
+ String sinkTableName = "sink_table_proj";
+ createSink(sinkTableName, "a1 int, c1 bigint, a2 int", "c1");
+
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT a1, c1, a2 FROM %s INNER JOIN
%s ON c1 = c2 WHERE a1 > 1",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThat(tEnv.explainSql(sql))
+ .contains("DeltaJoin(joinType=[InnerJoin], where=[(c1 = c2)]");
+
+ tEnv.executeSql(sql);
+
+ CloseableIterator<Row> collected =
+ tEnv.executeSql(String.format("select * from %s",
sinkTableName)).collect();
+ List<String> expected = Arrays.asList("+I[2, 200, 2]", "-U[2, 200,
2]", "+U[2, 200, 2]");
+ assertResultsIgnoreOrder(collected, expected, true);
+ }
+
+ @Test
+ void testDeltaJoinFailsWhenFilterOnNonUpsertKeys() {
+ String leftTableName = "left_table_force_fail";
+ createSource(
+ leftTableName,
+ "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
+ "c1, d1",
+ "c1",
+ null,
+ ImmutableMap.of("table.delete.behavior", "IGNORE"));
+
+ String rightTableName = "right_table_force_fail";
+ createSource(
+ rightTableName,
+ "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
+ "c2, d2",
+ "c2",
+ null,
+ ImmutableMap.of("table.delete.behavior", "IGNORE"));
+
+ String sinkTableName = "sink_table_force_fail";
+ createSink(
+ sinkTableName,
+ "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2
varchar, c2 bigint, d2 int, e2 bigint",
+ "c1, d1, c2, d2");
+
+ // Filter on e1 > e2, where e1 and e2 are NOT part of the upsert key
+ // TODO we can add a UpsertFilterOperator that can convert the
un-match-filter UPSERT record
+ // into DELETE record.
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 =
c2 AND d1 = d2 WHERE e1 > e2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThatThrownBy(() -> tEnv.explainSql(sql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("doesn't support to do delta join
optimization");
+
+ // Non-equiv-cond on e1 > e2, where e1 and e2 are NOT part of the
upsert key
+ String sql2 =
+ String.format(
+ "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 =
c2 AND d1 = d2 AND e1 > e2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThatThrownBy(() -> tEnv.explainSql(sql2))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("doesn't support to do delta join
optimization");
+ }
+
+ @Test
+ void testDeltaJoinWithNonEquiConditionOnUpsertKeys() throws Exception {
+ String leftTableName = "left_table_nonequi_upsert";
+ createSource(
+ leftTableName,
+ "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
+ "c1, d1, e1",
+ "c1, d1",
+ null,
+ ImmutableMap.of("table.delete.behavior", "IGNORE"));
+
+ List<InternalRow> rows1 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v2", 200L, 2, 20001L),
+ row(3, "v3", 300L, 3, 30000L),
+ // Add row with same PK (100, 1) to generate UPDATE in
CDC mode
+ // This row is filtered out by (e2 / 100) <> c2, so
doesn't affect join
+ // result
+ row(4, "v1_updated", 100L, 1, 10000L));
+ TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
+ writeRows(conn, leftTablePath, rows1, false);
+
+ String rightTableName = "right_table_nonequi_upsert";
+ createSource(
+ rightTableName,
+ "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
+ "c2, d2",
+ "c2",
+ null,
+ ImmutableMap.of("table.delete.behavior", "IGNORE"));
+
+ List<InternalRow> rows2 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v4", 200L, 2, 20000L),
+ row(3, "v5", 300L, 4, 40000L),
+ // Add row with same PK (100, 1) to generate UPDATE in
CDC mode
+ // This row is filtered out by (e2 / 100) <> c2, so
doesn't affect join
+ // result
+ row(5, "v1_updated", 100L, 1, 10000L));
+ TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
+ writeRows(conn, rightTablePath, rows2, false);
+
+ String sinkTableName = "sink_table_nonequi_upsert";
+ createSink(sinkTableName, "a1 int, c1 bigint, d1 int, e1 bigint, a2
int", "c1, d1, e1");
+
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT a1, c1, d1, e1, a2 FROM %s
INNER JOIN %s "
+ + "ON c1 = c2 AND d1 = d2 AND e1 <> (c2 *
100)",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThat(tEnv.explainSql(sql))
+ .contains(
+ "DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND
(d1 = d2) AND (e1 <> (c2 * 100)))]");
+
+ tEnv.executeSql(sql);
+
+ CloseableIterator<Row> collected =
+ tEnv.executeSql(String.format("select * from %s",
sinkTableName)).collect();
+ List<String> expected =
+ Arrays.asList(
+ "+I[2, 200, 2, 20001, 2]",
+ "-U[2, 200, 2, 20001, 2]",
+ "+U[2, 200, 2, 20001, 2]");
+ assertResultsIgnoreOrder(collected, expected, true);
+ }
+
+ @Test
+ void testDeltaJoinWithAppendOnlySourceAndNonEquiCondition() throws
Exception {
+ String leftTableName = "left_table_nonequi_insert";
+ createSource(
+ leftTableName,
+ "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
+ "c1, d1",
+ "c1",
+ null,
+ ImmutableMap.of("table.merge-engine", "first_row"));
+
+ List<InternalRow> rows1 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v2", 200L, 2, 20000L),
+ row(3, "v3", 300L, 3, 5000L));
+ TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
+ writeRows(conn, leftTablePath, rows1, false);
+
+ String rightTableName = "right_table_nonequi_insert";
+ createSource(
+ rightTableName,
+ "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
+ "c2, d2",
+ "c2",
+ null,
+ ImmutableMap.of("table.merge-engine", "first_row"));
+
+ List<InternalRow> rows2 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 8000L),
+ row(2, "v4", 200L, 2, 15000L),
+ row(3, "v5", 300L, 3, 3000L));
+ TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
+ writeRows(conn, rightTablePath, rows2, false);
+
+ String sinkTableName = "sink_table_nonequi_insert";
+ createSink(
+ sinkTableName, "a1 int, c1 bigint, d1 int, e1 bigint, a2 int,
e2 bigint", "c1, d1");
+
+ // INSERT_ONLY sources with non-equi condition on non-upsert key
fields (e1, e2)
+ // This should succeed because INSERT_ONLY mode allows any non-equi
conditions
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT a1, c1, d1, e1, a2, e2 FROM %s
INNER JOIN %s ON c1 = c2 AND d1 = d2 AND e1 > e2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThat(tEnv.explainSql(sql))
+ .contains(
+ "DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND
(d1 = d2) AND (e1 > e2))]");
+
+ tEnv.executeSql(sql);
+
+ CloseableIterator<Row> collected =
+ tEnv.executeSql(String.format("select * from %s",
sinkTableName)).collect();
+ // Rows where e1 > e2:
+ // Row 1: e1=10000 > e2=8000 ✓
+ // Row 2: e1=20000 > e2=15000 ✓
+ // Row 3: e1=5000 > e2=3000 ✓
+ List<String> expected =
+ Arrays.asList(
+ "+I[1, 100, 1, 10000, 1, 8000]",
+ "-U[1, 100, 1, 10000, 1, 8000]",
+ "+U[1, 100, 1, 10000, 1, 8000]",
+ "+I[2, 200, 2, 20000, 2, 15000]",
+ "-U[2, 200, 2, 20000, 2, 15000]",
+ "+U[2, 200, 2, 20000, 2, 15000]",
+ "+I[3, 300, 3, 5000, 3, 3000]",
+ "-U[3, 300, 3, 5000, 3, 3000]",
+ "+U[3, 300, 3, 5000, 3, 3000]");
+ assertResultsIgnoreOrder(collected, expected, true);
+ }
+
+ @Test
+ void testDeltaJoinWithLookupCache() throws Exception {
+ String leftTableName = "left_table_cache";
+ createSource(
+ leftTableName,
+ "a1 int, c1 bigint, d1 int",
+ "c1, d1",
+ "c1",
+ null,
+ ImmutableMap.of("table.delete.behavior", "IGNORE"));
+ List<InternalRow> rows1 = Collections.singletonList(row(1, 100L, 1));
+ writeRows(conn, TablePath.of(DEFAULT_DB, leftTableName), rows1, false);
+
+ String rightTableName = "right_table_cache";
+ createSource(
+ rightTableName,
+ "a2 int, c2 bigint, d2 int",
+ "c2",
+ "c2",
+ null,
+ ImmutableMap.of(
+ "table.delete.behavior",
+ "IGNORE",
+ "lookup.cache",
+ "partial",
+ "lookup.partial-cache.max-rows",
+ "100"));
+ List<InternalRow> rows2 = Collections.singletonList(row(1, 100L, 1));
+ writeRows(conn, TablePath.of(DEFAULT_DB, rightTableName), rows2,
false);
+
+ String sinkTableName = "sink_table_cache";
+ createSink(sinkTableName, "a1 int, c1 bigint, d1 int, a2 int", "c1,
d1");
+
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT a1, c1, d1, a2 FROM %s AS T1
INNER JOIN %s AS T2 ON T1.c1 = T2.c2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThat(tEnv.explainSql(sql))
+ .contains("DeltaJoin(joinType=[InnerJoin], where=[(c1 = c2)]");
+
+ tEnv.executeSql(sql);
+
+ CloseableIterator<Row> collected =
+ tEnv.executeSql(String.format("select * from %s",
sinkTableName)).collect();
+ List<String> expected =
+ Arrays.asList("+I[1, 100, 1, 1]", "-U[1, 100, 1, 1]", "+U[1,
100, 1, 1]");
+ assertResultsIgnoreOrder(collected, expected, true);
+ }
+
+ @Test
+ void testDeltaJoinWithPartitionedTable() throws Exception {
+ String leftTableName = "left_table_partitioned";
+ createSource(
+ leftTableName,
+ "a1 int, b1 varchar, c1 bigint, d1 int, pt1 varchar",
+ "c1, d1, pt1",
+ "c1",
+ "pt1",
+ ImmutableMap.of("table.delete.behavior", "IGNORE"));
+ TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
+ Iterator<String> leftPartitionIterator =
+
waitUntilPartitions(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), leftTablePath)
+ .values()
+ .iterator();
+ // pick two partition to insert data
+ String leftPartition1 = leftPartitionIterator.next();
+ String leftPartition2 = leftPartitionIterator.next();
+ List<InternalRow> rows1 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1000, leftPartition1),
+ row(2, "v2", 200L, 2000, leftPartition1),
+ row(3, "v3", 100L, 3000, leftPartition2),
+ row(4, "v4", 400L, 4000, leftPartition2));
+ writeRows(conn, leftTablePath, rows1, false);
+
+ String rightTableName = "right_table_partitioned";
+ createSource(
+ rightTableName,
+ "a2 int, b2 varchar, c2 bigint, d2 int, pt2 varchar",
+ "c2, pt2",
+ "c2",
+ "pt2",
+ ImmutableMap.of("table.delete.behavior", "IGNORE"));
+ TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
+ Iterator<String> rightPartitionIterator =
+
waitUntilPartitions(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(),
rightTablePath)
+ .values()
+ .iterator();
+ // pick two partition to insert data
+ String rightPartition1 = rightPartitionIterator.next();
+ String rightPartition2 = rightPartitionIterator.next();
+ List<InternalRow> rows2 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1000, rightPartition1),
+ row(4, "v4", 400L, 3000, rightPartition2));
+ writeRows(conn, rightTablePath, rows2, false);
+
+ String sinkTableName = "sink_table";
+ createSink(
+ sinkTableName,
+ "a1 int, b1 varchar, c1 bigint, d1 int, pt1 varchar, b2
varchar",
+ "c1, d1, pt1");
+
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT a1, b1, c1, d1, pt1, b2 FROM %s
AS T1 INNER JOIN %s AS T2 "
+ + "ON T1.c1 = T2.c2 AND T1.pt1 = T2.pt2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThat(tEnv.explainSql(sql))
+ .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2)
AND (pt1 = pt2))]");
+
+ tEnv.executeSql(sql);
+
+ CloseableIterator<Row> collected =
+ tEnv.executeSql(String.format("select * from %s",
sinkTableName)).collect();
+ List<String> expected =
+ Arrays.asList(
+ String.format("+I[1, v1, 100, 1000, %s, v1]",
leftPartition1),
+ String.format("-U[1, v1, 100, 1000, %s, v1]",
leftPartition1),
+ String.format("+U[1, v1, 100, 1000, %s, v1]",
leftPartition1),
+ String.format("+I[4, v4, 400, 4000, %s, v4]",
leftPartition2),
+ String.format("-U[4, v4, 400, 4000, %s, v4]",
leftPartition2),
+ String.format("+U[4, v4, 400, 4000, %s, v4]",
leftPartition2));
+ assertResultsIgnoreOrder(collected, expected, true);
+ }
+
+ @Test
+ void testDeltaJoinFailsWhenSourceHasDelete() {
+ String leftTableName = "left_table_delete_force";
+ createSource(
+ leftTableName,
+ "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
+ "c1, d1",
+ "c1",
+ null,
+ null);
+
+ String rightTableName = "right_table_delete_force";
+ createSource(
+ rightTableName,
+ "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
+ "c2, d2",
+ "c2",
+ null,
+ null);
+
+ String sinkTableName = "sink_table_delete_force";
+ createSink(
+ sinkTableName,
+ "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2
varchar, c2 bigint, d2 int, e2 bigint",
+ "c1, d1, c2, d2");
+
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 =
c2 AND d1 = d2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThatThrownBy(() -> tEnv.explainSql(sql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("doesn't support to do delta join
optimization");
+ }
+
+ @Test
+ void testDeltaJoinWithJoinKeyExceedsPrimaryKey() {
+ String leftTableName = "left_table_exceed_pk";
+ createSource(
+ leftTableName,
+ "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
+ "c1, d1",
+ "c1",
+ null,
+ ImmutableMap.of("table.delete.behavior", "IGNORE"));
+
+ String rightTableName = "right_table_exceed_pk";
+ createSource(
+ rightTableName,
+ "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
+ "c2, d2",
+ "c2",
+ null,
+ ImmutableMap.of("table.delete.behavior", "IGNORE"));
+
+ String sinkTableName = "sink_table_exceed_pk";
+ createSink(
+ sinkTableName, "a1 int, c1 bigint, d1 int, e1 bigint, a2 int,
e2 bigint", "c1, d1");
+
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT a1, c1, d1, e1, a2, e2 FROM %s
INNER JOIN %s ON c1 = c2 AND d1 = d2 AND e1 = e2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThatThrownBy(() -> tEnv.explainSql(sql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("doesn't support to do delta join
optimization");
+ }
+
+ @Test
+ void testDeltaJoinWithAppendOnlySourceAndJoinKeyExceedsPrimaryKey() throws
Exception {
+ String leftTableName = "left_table_exceed_pk";
+ createSource(
+ leftTableName,
+ "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
+ "c1, d1",
+ "c1",
+ null,
+ ImmutableMap.of("table.merge-engine", "first_row"));
+
+ List<InternalRow> rows1 =
+ Arrays.asList(row(1, "v1", 100L, 1, 10000L), row(2, "v2",
200L, 2, 20000L));
+ TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
+ writeRows(conn, leftTablePath, rows1, false);
+
+ String rightTableName = "right_table_exceed_pk";
+ createSource(
+ rightTableName,
+ "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
+ "c2, d2",
+ "c2",
+ null,
+ ImmutableMap.of("table.merge-engine", "first_row"));
+
+ List<InternalRow> rows2 =
+ Arrays.asList(row(1, "v1", 100L, 1, 10000L), row(2, "v3",
200L, 2, 99999L));
+ TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
+ writeRows(conn, rightTablePath, rows2, false);
+
+ String sinkTableName = "sink_table_exceed_pk";
+ createSink(
+ sinkTableName, "a1 int, c1 bigint, d1 int, e1 bigint, a2 int,
e2 bigint", "c1, d1");
+
+ // Join on PK (c1, d1) + additional non-PK field (e1)
+ // This should succeed because join keys {c1, d1, e1} contain complete
PK {c1, d1}
+ // The e1 = e2 condition is applied as a post-lookup equi-condition
filter
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT a1, c1, d1, e1, a2, e2 FROM %s
INNER JOIN %s ON c1 = c2 AND d1 = d2 AND e1 = e2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThat(tEnv.explainSql(sql))
+ .contains(
+ "DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND
(d1 = d2) AND (e1 = e2))]");
+
+ tEnv.executeSql(sql);
+
+ CloseableIterator<Row> collected =
+ tEnv.executeSql(String.format("select * from %s",
sinkTableName)).collect();
+ // Only first row should match (e1 = e2 = 10000)
+ // Second row filtered out (e1 = 20000 AND != e2 = 99999)
+ List<String> expected =
+ Arrays.asList(
+ "+I[1, 100, 1, 10000, 1, 10000]",
+ "-U[1, 100, 1, 10000, 1, 10000]",
+ "+U[1, 100, 1, 10000, 1, 10000]");
+ assertResultsIgnoreOrder(collected, expected, true);
+ }
+
+ @Test
+ void testDeltaJoinFailsWhenJoinKeyNotContainIndex() {
+ String leftTableName = "left_table_no_idx_force";
+ createSource(
+ leftTableName,
+ "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
+ "c1, d1",
+ "c1",
+ null,
+ ImmutableMap.of("table.delete.behavior", "IGNORE"));
+
+ String rightTableName = "right_table_no_idx_force";
+ createSource(
+ rightTableName,
+ "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
+ "c2, d2",
+ "c2",
+ null,
+ ImmutableMap.of("table.delete.behavior", "IGNORE"));
+
+ String sinkTableName = "sink_table_no_idx_force";
+ createSink(
+ sinkTableName,
+ "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2
varchar, c2 bigint, d2 int, e2 bigint",
+ "a1, a2");
+
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON a1 =
a2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThatThrownBy(() -> tEnv.explainSql(sql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("doesn't support to do delta join
optimization");
+ }
+
+ @Test
+ void testDeltaJoinFailsWithOuterJoin() {
+ String leftTableName = "left_table_outer_fail";
+ createSource(
+ leftTableName,
+ "a1 int, c1 bigint, d1 int",
+ "c1, d1",
+ "c1",
+ null,
+ ImmutableMap.of("table.delete.behavior", "IGNORE"));
+
+ String rightTableName = "right_table_outer_fail";
+ createSource(
+ rightTableName,
+ "a2 int, c2 bigint, d2 int",
+ "c2, d2",
+ "c2",
+ null,
+ ImmutableMap.of("table.delete.behavior", "IGNORE"));
+
+ String sinkTableName = "sink_table_outer_fail";
+ createSink(sinkTableName, "a1 int, c1 bigint, a2 int", "c1");
+
+ // Test LEFT JOIN
+ String leftJoinSql =
+ String.format(
+ "INSERT INTO %s SELECT a1, c1, a2 FROM %s LEFT JOIN %s
ON c1 = c2 AND d1 = d2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThatThrownBy(() -> tEnv.explainSql(leftJoinSql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("doesn't support to do delta join
optimization");
+
+ // Test RIGHT JOIN
+ String rightJoinSql =
+ String.format(
+ "INSERT INTO %s SELECT a1, c2, a2 FROM %s RIGHT JOIN
%s ON c1 = c2 AND d1 = d2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThatThrownBy(() -> tEnv.explainSql(rightJoinSql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("doesn't support to do delta join
optimization");
+
+ // Test FULL OUTER JOIN
+ String fullOuterJoinSql =
+ String.format(
+ "INSERT INTO %s SELECT a1, c1, a2 FROM %s FULL OUTER
JOIN %s ON c1 = c2 AND d1 = d2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThatThrownBy(() -> tEnv.explainSql(fullOuterJoinSql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("doesn't support to do delta join
optimization");
+ }
+
+ @Test
+ void testDeltaJoinFailsWithCascadeJoin() {
+ String table1 = "cascade_table1";
+ createSource(
+ table1,
+ "a1 int, c1 bigint, d1 int",
+ "c1, d1",
+ "c1",
+ null,
+ ImmutableMap.of("table.delete.behavior", "IGNORE"));
+
+ String table2 = "cascade_table2";
+ createSource(
+ table2,
+ "a2 int, c2 bigint, d2 int",
+ "c2, d2",
+ "c2",
+ null,
+ ImmutableMap.of("table.delete.behavior", "IGNORE"));
+
+ String table3 = "cascade_table3";
+ createSource(
+ table3,
+ "a3 int, c3 bigint, d3 int",
+ "c3, d3",
+ "c3",
+ null,
+ ImmutableMap.of("table.delete.behavior", "IGNORE"));
+
+ String sinkTableName = "cascade_sink";
+ createSink(
+ sinkTableName,
+ "a1 int, c1 bigint, a2 int, c2 bigint, a3 int, c3 bigint",
+ "c1, c2, c3");
+
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT a1, c1, a2, c2, a3, c3 "
+ + "FROM %s "
+ + "INNER JOIN %s ON c1 = c2 AND d1 = d2 "
+ + "INNER JOIN %s ON c1 = c3 AND d1 = d3",
+ sinkTableName, table1, table2, table3);
+
+ assertThatThrownBy(() -> tEnv.explainSql(sql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("doesn't support to do delta join
optimization");
+ }
+
+ @Test
+ void testDeltaJoinFailsWithSinkMaterializer() {
+ // With CDC sources, when sink PK doesn't match upstream update key,
+ // Flink would insert SinkUpsertMaterializer which prevents delta join
+ String leftTableName = "left_table_materializer";
+ createSource(
+ leftTableName,
+ "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
+ "c1, d1",
+ "c1",
+ null,
+ ImmutableMap.of("table.delete.behavior", "IGNORE"));
+
+ String rightTableName = "right_table_materializer";
+ createSource(
+ rightTableName,
+ "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
+ "c2, d2",
+ "c2",
+ null,
+ ImmutableMap.of("table.delete.behavior", "IGNORE"));
+
+ // Sink PK (a1, a2) doesn't match upstream update key (c1, d1, c2, d2)
+ // TODO: this depends on Fluss supports MVCC/point-in-time lookup to
support change upsert
+ // keys
+ String sinkTableName = "sink_table_materializer";
+ createSink(
+ sinkTableName,
+ "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2
varchar, c2 bigint, d2 int, e2 bigint",
+ "a1, a2");
+
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 =
c2 AND d1 = d2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThatThrownBy(() -> tEnv.explainSql(sql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("doesn't support to do delta join
optimization");
+ }
+
+ @Test
+ void testDeltaJoinFailsWithNonDeterministicFunctions() {
+ String leftTableName = "left_table_nondeterministic";
+ createSource(
+ leftTableName,
+ "a1 int, c1 bigint, d1 int",
+ "c1, d1",
+ "c1",
+ null,
+ ImmutableMap.of("table.delete.behavior", "IGNORE"));
+
+ String rightTableName = "right_table_nondeterministic";
+ createSource(
+ rightTableName,
+ "a2 int, c2 bigint, d2 int",
+ "c2, d2",
+ "c2",
+ null,
+ ImmutableMap.of("table.delete.behavior", "IGNORE"));
+
+ String sinkTableName = "sink_table_nondeterministic";
+ // TODO this should be supported in Flink in future for
non-deterministic functions before
+ // sinking
+ createSink(sinkTableName, "c1 bigint, d1 bigint, rand_val double",
"c1");
+
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT c1, d1, RAND() FROM %s INNER
JOIN %s ON c1 = c2 AND d1 = d2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThatThrownBy(() -> tEnv.explainSql(sql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("doesn't support to do delta join
optimization");
+ }
+}
diff --git
a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java
index 725372048..507527fb3 100644
---
a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java
+++
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java
@@ -17,293 +17,5 @@
package org.apache.fluss.flink.source;
-import org.apache.fluss.metadata.TablePath;
-import org.apache.fluss.row.InternalRow;
-
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.flink.table.api.config.OptimizerConfigOptions;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.CloseableIterator;
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-import java.util.List;
-
-import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
-import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows;
-import static org.apache.fluss.testutils.DataTestUtils.row;
-import static org.assertj.core.api.Assertions.assertThat;
-
/** IT case for {@link FlinkTableSource} in Flink 2.2. */
-public class Flink22TableSourceITCase extends FlinkTableSourceITCase {
-
- @Test
- void testDeltaJoin() throws Exception {
- // start two jobs for this test: one for DML involving the delta join,
and the other for DQL
- // to query the results of the sink table
-
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
-
- String leftTableName = "left_table";
- tEnv.executeSql(
- String.format(
- "create table %s ( "
- + " a1 int, "
- + " b1 varchar, "
- + " c1 bigint, "
- + " d1 int, "
- + " e1 bigint, "
- + " primary key (c1, d1) NOT ENFORCED"
- + ") with ("
- + " 'connector' = 'fluss', "
- + " 'bucket.key' = 'c1', "
- // currently, delta join only support
append-only source
- + " 'table.merge-engine' = 'first_row' "
- + ")",
- leftTableName));
- List<InternalRow> rows1 =
- Arrays.asList(
- row(1, "v1", 100L, 1, 10000L),
- row(2, "v2", 200L, 2, 20000L),
- row(3, "v1", 300L, 3, 30000L),
- row(4, "v4", 400L, 4, 40000L));
- // write records
- TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
- writeRows(conn, leftTablePath, rows1, false);
-
- String rightTableName = "right_table";
- tEnv.executeSql(
- String.format(
- "create table %s ("
- + " a2 int, "
- + " b2 varchar, "
- + " c2 bigint, "
- + " d2 int, "
- + " e2 bigint, "
- + " primary key (c2, d2) NOT ENFORCED"
- + ") with ("
- + " 'connector' = 'fluss', "
- + " 'bucket.key' = 'c2', "
- // currently, delta join only support
append-only source
- + " 'table.merge-engine' = 'first_row' "
- + ")",
- rightTableName));
- List<InternalRow> rows2 =
- Arrays.asList(
- row(1, "v1", 100L, 1, 10000L),
- row(2, "v3", 200L, 2, 20000L),
- row(3, "v4", 300L, 4, 30000L),
- row(4, "v4", 500L, 4, 50000L));
- // write records
- TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
- writeRows(conn, rightTablePath, rows2, false);
-
- String sinkTableName = "sink_table";
- tEnv.executeSql(
- String.format(
- "create table %s ( "
- + " a1 int, "
- + " b1 varchar, "
- + " c1 bigint, "
- + " d1 int, "
- + " e1 bigint, "
- + " a2 int, "
- + " b2 varchar, "
- + " c2 bigint, "
- + " d2 int, "
- + " e2 bigint, "
- + " primary key (c1, d1, c2, d2) NOT ENFORCED"
- + ") with ("
- + " 'connector' = 'fluss' "
- + ")",
- sinkTableName));
-
- tEnv.getConfig()
- .set(
-
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
- OptimizerConfigOptions.DeltaJoinStrategy.FORCE);
-
- String sql =
- String.format(
- "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 =
c2 AND d1 = d2",
- sinkTableName, leftTableName, rightTableName);
-
- assertThat(tEnv.explainSql(sql))
- .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2)
AND (d1 = d2))]");
-
- tEnv.executeSql(sql);
-
- CloseableIterator<Row> collected =
- tEnv.executeSql(String.format("select * from %s",
sinkTableName)).collect();
- List<String> expected =
- Arrays.asList(
- "+I[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
- "-U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
- "+U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
- "+I[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]",
- "-U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]",
- "+U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]");
- assertResultsIgnoreOrder(collected, expected, true);
- }
-
- @Test
- void testDeltaJoinWithProjectionAndFilter() throws Exception {
- // start two jobs for this test: one for DML involving the delta join,
and the other for DQL
- // to query the results of the sink table
-
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
-
- String leftTableName = "left_table_proj";
- tEnv.executeSql(
- String.format(
- "create table %s ( "
- + " a1 int, "
- + " b1 varchar, "
- + " c1 bigint, "
- + " d1 int, "
- + " e1 bigint, "
- + " primary key (c1, d1) NOT ENFORCED"
- + ") with ("
- + " 'connector' = 'fluss', "
- + " 'bucket.key' = 'c1', "
- + " 'table.merge-engine' = 'first_row' "
- + ")",
- leftTableName));
- List<InternalRow> rows1 =
- Arrays.asList(
- row(1, "v1", 100L, 1, 10000L),
- row(2, "v2", 200L, 2, 20000L),
- row(3, "v1", 300L, 3, 30000L));
- TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
- writeRows(conn, leftTablePath, rows1, false);
-
- String rightTableName = "right_table_proj";
- tEnv.executeSql(
- String.format(
- "create table %s ("
- + " a2 int, "
- + " b2 varchar, "
- + " c2 bigint, "
- + " d2 int, "
- + " e2 bigint, "
- + " primary key (c2, d2) NOT ENFORCED"
- + ") with ("
- + " 'connector' = 'fluss', "
- + " 'bucket.key' = 'c2', "
- + " 'table.merge-engine' = 'first_row' "
- + ")",
- rightTableName));
- List<InternalRow> rows2 =
- Arrays.asList(
- row(1, "v1", 100L, 1, 10000L),
- row(2, "v3", 200L, 2, 20000L),
- row(3, "v4", 300L, 4, 30000L));
- TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
- writeRows(conn, rightTablePath, rows2, false);
-
- String sinkTableName = "sink_table_proj";
- tEnv.executeSql(
- String.format(
- "create table %s ( "
- + " a1 int, "
- + " c1 bigint, "
- + " a2 int, "
- + " primary key (c1) NOT ENFORCED"
- + ") with ("
- + " 'connector' = 'fluss' "
- + ")",
- sinkTableName));
-
- tEnv.getConfig()
- .set(
-
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
- OptimizerConfigOptions.DeltaJoinStrategy.FORCE);
-
- // Test with projection and filter
- String sql =
- String.format(
- "INSERT INTO %s SELECT a1, c1, a2 FROM %s INNER JOIN
%s ON c1 = c2 AND d1 = d2 WHERE a1 > 1",
- sinkTableName, leftTableName, rightTableName);
-
- assertThat(tEnv.explainSql(sql))
- .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2)
AND (d1 = d2))]");
-
- tEnv.executeSql(sql);
-
- CloseableIterator<Row> collected =
- tEnv.executeSql(String.format("select * from %s",
sinkTableName)).collect();
- List<String> expected = Arrays.asList("+I[2, 200, 2]", "-U[2, 200,
2]", "+U[2, 200, 2]");
- assertResultsIgnoreOrder(collected, expected, true);
- }
-
- @Test
- void testDeltaJoinWithLookupCache() throws Exception {
-
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
-
- String leftTableName = "left_table_cache";
- tEnv.executeSql(
- String.format(
- "create table %s ( "
- + " a1 int, "
- + " c1 bigint, "
- + " d1 int, "
- + " primary key (c1, d1) NOT ENFORCED"
- + ") with ("
- + " 'connector' = 'fluss', "
- + " 'bucket.key' = 'c1', "
- + " 'table.merge-engine' = 'first_row' "
- + ")",
- leftTableName));
- List<InternalRow> rows1 = Arrays.asList(row(1, 100L, 1));
- writeRows(conn, TablePath.of(DEFAULT_DB, leftTableName), rows1, false);
-
- String rightTableName = "right_table_cache";
- tEnv.executeSql(
- String.format(
- "create table %s ("
- + " a2 int, "
- + " c2 bigint, "
- + " d2 int, "
- + " primary key (c2, d2) NOT ENFORCED"
- + ") with ("
- + " 'connector' = 'fluss', "
- + " 'bucket.key' = 'c2', "
- + " 'table.merge-engine' = 'first_row', "
- + " 'lookup.cache' = 'partial', "
- + " 'lookup.partial-cache.max-rows' = '100' "
- + ")",
- rightTableName));
- List<InternalRow> rows2 = Arrays.asList(row(1, 100L, 1));
- writeRows(conn, TablePath.of(DEFAULT_DB, rightTableName), rows2,
false);
-
- String sinkTableName = "sink_table_cache";
- tEnv.executeSql(
- String.format(
- "create table %s ( "
- + " a1 int, "
- + " a2 int, "
- + " primary key (a1) NOT ENFORCED" // Dummy PK
- + ") with ("
- + " 'connector' = 'fluss' "
- + ")",
- sinkTableName));
-
- tEnv.getConfig()
- .set(
-
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
- OptimizerConfigOptions.DeltaJoinStrategy.FORCE);
-
- String sql =
- String.format(
- "INSERT INTO %s SELECT T1.a1, T2.a2 FROM %s AS T1
INNER JOIN %s AS T2 ON T1.c1 = T2.c2 AND T1.d1 = T2.d2",
- sinkTableName, leftTableName, rightTableName);
-
- assertThat(tEnv.explainSql(sql))
- .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2)
AND (d1 = d2))]");
-
- tEnv.executeSql(sql);
-
- CloseableIterator<Row> collected =
- tEnv.executeSql(String.format("select * from %s",
sinkTableName)).collect();
- List<String> expected = Arrays.asList("+I[1, 1]", "-U[1, 1]", "+U[1,
1]");
- assertResultsIgnoreOrder(collected, expected, true);
- }
-}
+public class Flink22TableSourceITCase extends FlinkTableSourceITCase {}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
index abb74ebf6..0421cc561 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
@@ -49,7 +49,7 @@ class FlussTableLakeSnapshotCommitterTest extends
FlinkTestBase {
private FlussTableLakeSnapshotCommitter flussTableLakeSnapshotCommitter;
@BeforeEach
- void beforeEach() {
+ public void beforeEach() {
flussTableLakeSnapshotCommitter =
new
FlussTableLakeSnapshotCommitter(FLUSS_CLUSTER_EXTENSION.getClientConfig());
flussTableLakeSnapshotCommitter.open();
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
index bdecfa8fa..70d86bb2c 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
@@ -69,7 +69,7 @@ class TieringCommitOperatorTest extends FlinkTestBase {
private StreamOperatorParameters<CommittableMessage<TestingCommittable>>
parameters;
@BeforeEach
- void beforeEach() throws Exception {
+ public void beforeEach() throws Exception {
mockOperatorEventGateway = new MockOperatorEventGateway();
MockOperatorEventDispatcher mockOperatorEventDispatcher =
new MockOperatorEventDispatcher(mockOperatorEventGateway);
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
index bfa814ce2..7c80bbc0c 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
@@ -155,7 +155,7 @@ public class FlinkTestBase extends AbstractTestBase {
}
@BeforeEach
- void beforeEach() throws Exception {
+ public void beforeEach() throws Exception {
admin.createDatabase(DEFAULT_DB, DatabaseDescriptor.EMPTY, true).get();
}
diff --git a/website/docs/engine-flink/delta-joins.md
b/website/docs/engine-flink/delta-joins.md
index 96f097496..8388b71c8 100644
--- a/website/docs/engine-flink/delta-joins.md
+++ b/website/docs/engine-flink/delta-joins.md
@@ -169,24 +169,25 @@ There is a known issue
([FLINK-38399](https://issues.apache.org/jira/browse/FLIN
#### Supported Features
-- Support for optimizing a dual-stream join from CDC sources that do not
include delete messages into a delta join.
- - Disable delete on the source table to guarantee there is no delete message
in the table, by adding the option `'table.delete.behavior' = 'IGNORE'` or
`'DISABLE'` on the table.
- - The source table is no more required to be a `first_row` merge engine
table since this version.
-- Support `Project` and `Filter` between source and delta join.
-- Support cache in delta join.
+- CDC sources are now supported in delta join, provided they do not produce
DELETE messages.
+ - Set `'table.delete.behavior' = 'IGNORE'` or `'DISABLE'` on the source
table to suppress deletes.
+ - The `'table.merge-engine' = 'first_row'` option is no longer required.
+- Projection and filter operations are now supported between source and delta
join.
+- Lookup cache is now supported in delta join.
#### Limitations
- The primary key or the prefix key of the tables must be included as part of
the equivalence conditions in the join.
-- The join must be a INNER join.
+- The join must be an INNER join. LEFT JOIN, RIGHT JOIN, and FULL OUTER JOIN
are not supported.
+- Cascade joins (e.g., `A JOIN B JOIN C`) are not supported. Each join input
must come directly from a table source.
- The downstream node of the join must support idempotent updates, typically
it's an upsert sink and should not have a `SinkUpsertMaterializer` node before
it.
- - Flink planner automatically inserts a `SinkUpsertMaterializer` when the
sink’s primary key does not fully cover the upstream update key.
+ - Flink planner automatically inserts a `SinkUpsertMaterializer` when the
sink's primary key does not fully cover the upstream update key.
- You can learn more details about `SinkUpsertMaterializer` by reading this
[blog](https://www.ververica.com/blog/flink-sql-secrets-mastering-the-art-of-changelog-events).
- Since delta join does not support to handle update-before messages, it is
necessary to ensure that the entire pipeline can safely discard update-before
messages. That means when consuming a CDC stream:
- The join key used in the delta join must be part of the primary key.
- The sink's primary key must be the same as the upstream update key.
- - All filters must be applied on the upsert key.
-- Neither filters nor projections should contain non-deterministic functions.
+ - All filters (including non-equi join conditions) must be applied on the
upsert key.
+- Filters, projections, and non-equi join conditions must not contain
non-deterministic functions.
## Future Plan