This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch revert-2546-fluss-delta-joins-tests-docs
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit 3effa3fcf38b9a07f79812905cfbf7dff29293ce
Author: Jark Wu <[email protected]>
AuthorDate: Thu Feb 5 10:45:33 2026 +0800

    Revert "[flink] Delta Join additional IT tests and docs improvement (#2546)"
    
    This reverts commit f2c44179d1fbd323a30eef3764bebab304d508f8.
---
 .../fluss/flink/source/Flink22DeltaJoinITCase.java | 1083 --------------------
 .../flink/source/Flink22TableSourceITCase.java     |  290 +++++-
 .../FlussTableLakeSnapshotCommitterTest.java       |    2 +-
 .../committer/TieringCommitOperatorTest.java       |    2 +-
 .../apache/fluss/flink/utils/FlinkTestBase.java    |    2 +-
 website/docs/engine-flink/delta-joins.md           |   19 +-
 6 files changed, 301 insertions(+), 1097 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22DeltaJoinITCase.java
 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22DeltaJoinITCase.java
deleted file mode 100644
index 0fe8fb73c..000000000
--- 
a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22DeltaJoinITCase.java
+++ /dev/null
@@ -1,1083 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.flink.source;
-
-import org.apache.fluss.config.ConfigOptions;
-import org.apache.fluss.flink.utils.FlinkTestBase;
-import org.apache.fluss.metadata.TablePath;
-import org.apache.fluss.row.InternalRow;
-import org.apache.fluss.shaded.guava32.com.google.common.collect.ImmutableMap;
-
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.flink.table.api.config.OptimizerConfigOptions;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.CloseableIterator;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import javax.annotation.Nullable;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
-import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
-import static 
org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
-import static org.apache.fluss.testutils.DataTestUtils.row;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-/** IT case for Delta Join optimization in Flink 2.2. */
-public class Flink22DeltaJoinITCase extends FlinkTestBase {
-
-    private static final String CATALOG_NAME = "test_catalog";
-
-    private StreamTableEnvironment tEnv;
-
-    @BeforeEach
-    public void beforeEach() {
-        bootstrapServers = 
conn.getConfiguration().get(ConfigOptions.BOOTSTRAP_SERVERS).get(0);
-
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        tEnv = StreamTableEnvironment.create(env, 
EnvironmentSettings.inStreamingMode());
-
-        tEnv.executeSql(
-                String.format(
-                        "create catalog %s with ('type' = 'fluss', '%s' = 
'%s')",
-                        CATALOG_NAME, BOOTSTRAP_SERVERS.key(), 
bootstrapServers));
-        tEnv.useCatalog(CATALOG_NAME);
-        tEnv.executeSql(String.format("create database if not exists `%s`", 
DEFAULT_DB));
-        tEnv.useDatabase(DEFAULT_DB);
-
-        // start two jobs for this test: one for DML involving the delta join, 
and the other for DQL
-        // to query the results of the sink table
-        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 2);
-        // Set FORCE strategy for delta join
-        tEnv.getConfig()
-                .set(
-                        
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
-                        OptimizerConfigOptions.DeltaJoinStrategy.FORCE);
-    }
-
-    @AfterEach
-    void after() {
-        tEnv.useDatabase(BUILTIN_DATABASE);
-        tEnv.executeSql(String.format("drop database `%s` cascade", 
DEFAULT_DB));
-    }
-
-    /**
-     * Creates a source table with specified schema and options.
-     *
-     * @param tableName the name of the table
-     * @param columns column definitions (e.g., "a1 int, b1 varchar, c1 
bigint, d1 int, e1 bigint")
-     * @param primaryKey primary key columns (e.g., "c1, d1")
-     * @param bucketKey bucket key column (e.g., "c1")
-     * @param extraOptions additional WITH options (e.g., "lookup.cache" = 
"partial"), or null
-     */
-    private void createSource(
-            String tableName,
-            String columns,
-            String primaryKey,
-            String bucketKey,
-            @Nullable String partitionKey,
-            @Nullable Map<String, String> extraOptions) {
-        Map<String, String> withOptions = new HashMap<>();
-        if (extraOptions != null) {
-            withOptions.putAll(extraOptions);
-        }
-        withOptions.put("connector", "fluss");
-        withOptions.put("bucket.key", bucketKey);
-        StringBuilder ddlBuilder = new StringBuilder();
-        ddlBuilder.append(
-                String.format(
-                        "create table %s ( %s, primary key (%s) NOT ENFORCED 
)",
-                        tableName, columns, primaryKey));
-        if (partitionKey != null) {
-            ddlBuilder.append(String.format(" partitioned by (%s)", 
partitionKey));
-            withOptions.put("table.auto-partition.enabled", "true");
-            withOptions.put("table.auto-partition.time-unit", "year");
-        }
-        ddlBuilder.append(
-                String.format(
-                        " with (%s)",
-                        withOptions.entrySet().stream()
-                                .map(e -> String.format("'%s' = '%s'", 
e.getKey(), e.getValue()))
-                                .collect(Collectors.joining(", "))));
-
-        tEnv.executeSql(ddlBuilder.toString());
-    }
-
-    /**
-     * Creates a sink table with specified columns.
-     *
-     * @param tableName the name of the table
-     * @param columns the column definitions (e.g., "a1 int, c1 bigint, a2 
int")
-     * @param primaryKey the primary key columns (e.g., "c1, d1, c2, d2")
-     */
-    private void createSink(String tableName, String columns, String 
primaryKey) {
-        tEnv.executeSql(
-                String.format(
-                        "create table %s ("
-                                + " %s, "
-                                + " primary key (%s) NOT ENFORCED"
-                                + ") with ("
-                                + " 'connector' = 'fluss'"
-                                + ")",
-                        tableName, columns, primaryKey));
-    }
-
-    @Test
-    void testDeltaJoin() throws Exception {
-        // disable cache to get stable results with updating
-        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_DELTA_JOIN_CACHE_ENABLED,
 false);
-        String leftTableName = "left_table";
-        String rightTableName = "right_table";
-        String sinkTableName = "sink_table";
-
-        createSource(
-                leftTableName,
-                "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
-                "c1, d1",
-                "c1",
-                null,
-                ImmutableMap.of("table.delete.behavior", "IGNORE"));
-        createSource(
-                rightTableName,
-                "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
-                "c2, d2",
-                "c2",
-                null,
-                ImmutableMap.of("table.delete.behavior", "IGNORE"));
-        createSink(
-                sinkTableName,
-                "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2 
varchar, c2 bigint, d2 int, e2 bigint",
-                "c1, d1, c2, d2");
-
-        List<InternalRow> rows1 =
-                Arrays.asList(
-                        row(1, "v1", 100L, 1, 10000L),
-                        row(2, "v2", 100L, 2, 20000L),
-                        row(3, "v3", 300L, 3, 30000L),
-                        row(4, "v4", 400L, 4, 40000L),
-                        // update
-                        row(5, "v5", 100L, 1, 50000L));
-        TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
-        writeRows(conn, leftTablePath, rows1, false);
-        // wait for the first snapshot to finish to get the stable result
-        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(leftTablePath);
-
-        List<InternalRow> rows2 =
-                Arrays.asList(
-                        row(1, "v1", 100L, 1, 10000L),
-                        row(2, "v2", 200L, 2, 20000L),
-                        row(3, "v3", 330L, 4, 30000L),
-                        row(4, "v4", 500L, 4, 50000L),
-                        // update
-                        row(6, "v6", 100L, 1, 60000L));
-        TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
-        writeRows(conn, rightTablePath, rows2, false);
-        // wait for the first snapshot to finish to get the stable result
-        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(rightTablePath);
-
-        String sql =
-                String.format(
-                        "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = 
c2",
-                        sinkTableName, leftTableName, rightTableName);
-
-        assertThat(tEnv.explainSql(sql))
-                .contains("DeltaJoin(joinType=[InnerJoin], where=[(c1 = c2)]");
-
-        tEnv.executeSql(sql);
-
-        CloseableIterator<Row> collected =
-                tEnv.executeSql(String.format("select * from %s", 
sinkTableName)).collect();
-        List<String> expected =
-                Arrays.asList(
-                        "+I[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]",
-                        "-U[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]",
-                        "+U[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]",
-                        "+I[2, v2, 100, 2, 20000, 6, v6, 100, 1, 60000]",
-                        "-U[2, v2, 100, 2, 20000, 6, v6, 100, 1, 60000]",
-                        "+U[2, v2, 100, 2, 20000, 6, v6, 100, 1, 60000]");
-        assertResultsIgnoreOrder(collected, expected, true);
-    }
-
-    @Test
-    void testDeltaJoinOnPrimaryKey() throws Exception {
-        // disable cache to get stable results with updating
-        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_DELTA_JOIN_CACHE_ENABLED,
 false);
-        String leftTableName = "left_table";
-        String rightTableName = "right_table";
-        String sinkTableName = "sink_table";
-
-        createSource(
-                leftTableName,
-                "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
-                "c1, d1",
-                "c1",
-                null,
-                ImmutableMap.of("table.delete.behavior", "IGNORE"));
-        createSource(
-                rightTableName,
-                "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
-                "c2, d2",
-                "c2",
-                null,
-                ImmutableMap.of("table.delete.behavior", "IGNORE"));
-        createSink(
-                sinkTableName,
-                "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2 
varchar, c2 bigint, d2 int, e2 bigint",
-                "c1, d1, c2, d2");
-
-        List<InternalRow> rows1 =
-                Arrays.asList(
-                        row(1, "v1", 100L, 1, 10000L),
-                        row(2, "v2", 200L, 2, 20000L),
-                        row(3, "v3", 300L, 3, 30000L),
-                        row(4, "v4", 400L, 4, 40000L),
-                        // update
-                        row(5, "v5", 100L, 1, 50000L));
-        TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
-        writeRows(conn, leftTablePath, rows1, false);
-        // wait for the first snapshot to finish to get the stable result
-        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(leftTablePath);
-
-        List<InternalRow> rows2 =
-                Arrays.asList(
-                        row(1, "v1", 100L, 1, 10000L),
-                        row(2, "v2", 200L, 2, 20000L),
-                        row(3, "v3", 300L, 4, 30000L),
-                        row(4, "v4", 500L, 4, 50000L),
-                        // update
-                        row(6, "v6", 100L, 1, 60000L));
-        TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
-        writeRows(conn, rightTablePath, rows2, false);
-        // wait for the first snapshot to finish to get the stable result
-        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(rightTablePath);
-
-        String sql =
-                String.format(
-                        "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = 
c2 AND d1 = d2",
-                        sinkTableName, leftTableName, rightTableName);
-
-        assertThat(tEnv.explainSql(sql))
-                .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) 
AND (d1 = d2))]");
-
-        tEnv.executeSql(sql);
-
-        CloseableIterator<Row> collected =
-                tEnv.executeSql(String.format("select * from %s", 
sinkTableName)).collect();
-        List<String> expected =
-                Arrays.asList(
-                        "+I[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]",
-                        "-U[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]",
-                        "+U[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]",
-                        "+I[2, v2, 200, 2, 20000, 2, v2, 200, 2, 20000]",
-                        "-U[2, v2, 200, 2, 20000, 2, v2, 200, 2, 20000]",
-                        "+U[2, v2, 200, 2, 20000, 2, v2, 200, 2, 20000]");
-        assertResultsIgnoreOrder(collected, expected, true);
-    }
-
-    @Test
-    void testDeltaJoinWithCalc() throws Exception {
-        String leftTableName = "left_table_proj";
-        createSource(
-                leftTableName,
-                "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
-                "c1, d1",
-                "c1",
-                null,
-                ImmutableMap.of("table.delete.behavior", "IGNORE"));
-
-        List<InternalRow> rows1 =
-                Arrays.asList(
-                        row(1, "v1", 100L, 1, 10000L),
-                        row(2, "v2", 200L, 2, 20000L),
-                        row(3, "v1", 300L, 3, 30000L));
-        TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
-        writeRows(conn, leftTablePath, rows1, false);
-
-        String rightTableName = "right_table_proj";
-        createSource(
-                rightTableName,
-                "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
-                "c2",
-                "c2",
-                null,
-                ImmutableMap.of("table.delete.behavior", "IGNORE"));
-
-        List<InternalRow> rows2 =
-                Arrays.asList(
-                        row(1, "v1", 100L, 1, 10000L),
-                        row(2, "v2", 200L, 2, 20000L),
-                        row(3, "v3", 300L, 4, 30000L));
-        TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
-        writeRows(conn, rightTablePath, rows2, false);
-
-        String sinkTableName = "sink_table_proj";
-        createSink(sinkTableName, "a1 int, c1 bigint, d1 int, a2 int", "c1, 
d1");
-
-        String sql =
-                String.format(
-                        "INSERT INTO %s SELECT a1, c1, d1, a2 FROM ("
-                                + " SELECT * FROM %s WHERE d1 > 1"
-                                + ") INNER JOIN ("
-                                + " SELECT * FROM %s WHERE c2 < 300"
-                                + ") ON c1 = c2",
-                        sinkTableName, leftTableName, rightTableName);
-
-        assertThat(tEnv.explainSql(sql))
-                .contains("DeltaJoin(joinType=[InnerJoin], where=[(c1 = c2)]");
-
-        tEnv.executeSql(sql);
-
-        CloseableIterator<Row> collected =
-                tEnv.executeSql(String.format("select * from %s", 
sinkTableName)).collect();
-        List<String> expected =
-                Arrays.asList("+I[2, 200, 2, 2]", "-U[2, 200, 2, 2]", "+U[2, 
200, 2, 2]");
-        assertResultsIgnoreOrder(collected, expected, true);
-    }
-
-    @Test
-    void testDeltaJoinWithAppendOnlySourceAndCalc() throws Exception {
-        String leftTableName = "left_table_proj";
-        createSource(
-                leftTableName,
-                "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
-                "c1, d1",
-                "c1",
-                null,
-                ImmutableMap.of("table.merge-engine", "first_row"));
-
-        List<InternalRow> rows1 =
-                Arrays.asList(
-                        row(1, "v1", 100L, 1, 10000L),
-                        row(2, "v2", 200L, 2, 20000L),
-                        row(3, "v1", 300L, 3, 30000L));
-        TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
-        writeRows(conn, leftTablePath, rows1, false);
-
-        String rightTableName = "right_table_proj";
-        createSource(
-                rightTableName,
-                "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
-                "c2, d2",
-                "c2",
-                null,
-                ImmutableMap.of("table.merge-engine", "first_row"));
-
-        List<InternalRow> rows2 =
-                Arrays.asList(
-                        row(1, "v1", 100L, 1, 10000L),
-                        row(2, "v3", 200L, 2, 20000L),
-                        row(3, "v4", 400L, 4, 30000L));
-        TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
-        writeRows(conn, rightTablePath, rows2, false);
-
-        String sinkTableName = "sink_table_proj";
-        createSink(sinkTableName, "a1 int, c1 bigint, a2 int", "c1");
-
-        String sql =
-                String.format(
-                        "INSERT INTO %s SELECT a1, c1, a2 FROM %s INNER JOIN 
%s ON c1 = c2 WHERE a1 > 1",
-                        sinkTableName, leftTableName, rightTableName);
-
-        assertThat(tEnv.explainSql(sql))
-                .contains("DeltaJoin(joinType=[InnerJoin], where=[(c1 = c2)]");
-
-        tEnv.executeSql(sql);
-
-        CloseableIterator<Row> collected =
-                tEnv.executeSql(String.format("select * from %s", 
sinkTableName)).collect();
-        List<String> expected = Arrays.asList("+I[2, 200, 2]", "-U[2, 200, 
2]", "+U[2, 200, 2]");
-        assertResultsIgnoreOrder(collected, expected, true);
-    }
-
-    @Test
-    void testDeltaJoinFailsWhenFilterOnNonUpsertKeys() {
-        String leftTableName = "left_table_force_fail";
-        createSource(
-                leftTableName,
-                "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
-                "c1, d1",
-                "c1",
-                null,
-                ImmutableMap.of("table.delete.behavior", "IGNORE"));
-
-        String rightTableName = "right_table_force_fail";
-        createSource(
-                rightTableName,
-                "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
-                "c2, d2",
-                "c2",
-                null,
-                ImmutableMap.of("table.delete.behavior", "IGNORE"));
-
-        String sinkTableName = "sink_table_force_fail";
-        createSink(
-                sinkTableName,
-                "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2 
varchar, c2 bigint, d2 int, e2 bigint",
-                "c1, d1, c2, d2");
-
-        // Filter on e1 > e2, where e1 and e2 are NOT part of the upsert key
-        // TODO we can add a UpsertFilterOperator that can convert the 
un-match-filter UPSERT record
-        //  into DELETE record.
-        String sql =
-                String.format(
-                        "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = 
c2 AND d1 = d2 WHERE e1 > e2",
-                        sinkTableName, leftTableName, rightTableName);
-
-        assertThatThrownBy(() -> tEnv.explainSql(sql))
-                .isInstanceOf(ValidationException.class)
-                .hasMessageContaining("doesn't support to do delta join 
optimization");
-
-        // Non-equiv-cond on e1 > e2, where e1 and e2 are NOT part of the 
upsert key
-        String sql2 =
-                String.format(
-                        "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = 
c2 AND d1 = d2 AND e1 > e2",
-                        sinkTableName, leftTableName, rightTableName);
-
-        assertThatThrownBy(() -> tEnv.explainSql(sql2))
-                .isInstanceOf(ValidationException.class)
-                .hasMessageContaining("doesn't support to do delta join 
optimization");
-    }
-
-    @Test
-    void testDeltaJoinWithNonEquiConditionOnUpsertKeys() throws Exception {
-        String leftTableName = "left_table_nonequi_upsert";
-        createSource(
-                leftTableName,
-                "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
-                "c1, d1, e1",
-                "c1, d1",
-                null,
-                ImmutableMap.of("table.delete.behavior", "IGNORE"));
-
-        List<InternalRow> rows1 =
-                Arrays.asList(
-                        row(1, "v1", 100L, 1, 10000L),
-                        row(2, "v2", 200L, 2, 20001L),
-                        row(3, "v3", 300L, 3, 30000L),
-                        // Add row with same PK (100, 1) to generate UPDATE in 
CDC mode
-                        // This row is filtered out by (e2 / 100) <> c2, so 
doesn't affect join
-                        // result
-                        row(4, "v1_updated", 100L, 1, 10000L));
-        TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
-        writeRows(conn, leftTablePath, rows1, false);
-
-        String rightTableName = "right_table_nonequi_upsert";
-        createSource(
-                rightTableName,
-                "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
-                "c2, d2",
-                "c2",
-                null,
-                ImmutableMap.of("table.delete.behavior", "IGNORE"));
-
-        List<InternalRow> rows2 =
-                Arrays.asList(
-                        row(1, "v1", 100L, 1, 10000L),
-                        row(2, "v4", 200L, 2, 20000L),
-                        row(3, "v5", 300L, 4, 40000L),
-                        // Add row with same PK (100, 1) to generate UPDATE in 
CDC mode
-                        // This row is filtered out by (e2 / 100) <> c2, so 
doesn't affect join
-                        // result
-                        row(5, "v1_updated", 100L, 1, 10000L));
-        TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
-        writeRows(conn, rightTablePath, rows2, false);
-
-        String sinkTableName = "sink_table_nonequi_upsert";
-        createSink(sinkTableName, "a1 int, c1 bigint, d1 int, e1 bigint, a2 
int", "c1, d1, e1");
-
-        String sql =
-                String.format(
-                        "INSERT INTO %s SELECT a1, c1, d1, e1, a2 FROM %s 
INNER JOIN %s "
-                                + "ON c1 = c2 AND d1 = d2 AND e1 <> (c2 * 
100)",
-                        sinkTableName, leftTableName, rightTableName);
-
-        assertThat(tEnv.explainSql(sql))
-                .contains(
-                        "DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND 
(d1 = d2) AND (e1 <> (c2 * 100)))]");
-
-        tEnv.executeSql(sql);
-
-        CloseableIterator<Row> collected =
-                tEnv.executeSql(String.format("select * from %s", 
sinkTableName)).collect();
-        List<String> expected =
-                Arrays.asList(
-                        "+I[2, 200, 2, 20001, 2]",
-                        "-U[2, 200, 2, 20001, 2]",
-                        "+U[2, 200, 2, 20001, 2]");
-        assertResultsIgnoreOrder(collected, expected, true);
-    }
-
-    @Test
-    void testDeltaJoinWithAppendOnlySourceAndNonEquiCondition() throws 
Exception {
-        String leftTableName = "left_table_nonequi_insert";
-        createSource(
-                leftTableName,
-                "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
-                "c1, d1",
-                "c1",
-                null,
-                ImmutableMap.of("table.merge-engine", "first_row"));
-
-        List<InternalRow> rows1 =
-                Arrays.asList(
-                        row(1, "v1", 100L, 1, 10000L),
-                        row(2, "v2", 200L, 2, 20000L),
-                        row(3, "v3", 300L, 3, 5000L));
-        TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
-        writeRows(conn, leftTablePath, rows1, false);
-
-        String rightTableName = "right_table_nonequi_insert";
-        createSource(
-                rightTableName,
-                "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
-                "c2, d2",
-                "c2",
-                null,
-                ImmutableMap.of("table.merge-engine", "first_row"));
-
-        List<InternalRow> rows2 =
-                Arrays.asList(
-                        row(1, "v1", 100L, 1, 8000L),
-                        row(2, "v4", 200L, 2, 15000L),
-                        row(3, "v5", 300L, 3, 3000L));
-        TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
-        writeRows(conn, rightTablePath, rows2, false);
-
-        String sinkTableName = "sink_table_nonequi_insert";
-        createSink(
-                sinkTableName, "a1 int, c1 bigint, d1 int, e1 bigint, a2 int, 
e2 bigint", "c1, d1");
-
-        // INSERT_ONLY sources with non-equi condition on non-upsert key 
fields (e1, e2)
-        // This should succeed because INSERT_ONLY mode allows any non-equi 
conditions
-        String sql =
-                String.format(
-                        "INSERT INTO %s SELECT a1, c1, d1, e1, a2, e2 FROM %s 
INNER JOIN %s ON c1 = c2 AND d1 = d2 AND e1 > e2",
-                        sinkTableName, leftTableName, rightTableName);
-
-        assertThat(tEnv.explainSql(sql))
-                .contains(
-                        "DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND 
(d1 = d2) AND (e1 > e2))]");
-
-        tEnv.executeSql(sql);
-
-        CloseableIterator<Row> collected =
-                tEnv.executeSql(String.format("select * from %s", 
sinkTableName)).collect();
-        // Rows where e1 > e2:
-        // Row 1: e1=10000 > e2=8000 ✓
-        // Row 2: e1=20000 > e2=15000 ✓
-        // Row 3: e1=5000 > e2=3000 ✓
-        List<String> expected =
-                Arrays.asList(
-                        "+I[1, 100, 1, 10000, 1, 8000]",
-                        "-U[1, 100, 1, 10000, 1, 8000]",
-                        "+U[1, 100, 1, 10000, 1, 8000]",
-                        "+I[2, 200, 2, 20000, 2, 15000]",
-                        "-U[2, 200, 2, 20000, 2, 15000]",
-                        "+U[2, 200, 2, 20000, 2, 15000]",
-                        "+I[3, 300, 3, 5000, 3, 3000]",
-                        "-U[3, 300, 3, 5000, 3, 3000]",
-                        "+U[3, 300, 3, 5000, 3, 3000]");
-        assertResultsIgnoreOrder(collected, expected, true);
-    }
-
-    @Test
-    void testDeltaJoinWithLookupCache() throws Exception {
-        String leftTableName = "left_table_cache";
-        createSource(
-                leftTableName,
-                "a1 int, c1 bigint, d1 int",
-                "c1, d1",
-                "c1",
-                null,
-                ImmutableMap.of("table.delete.behavior", "IGNORE"));
-        List<InternalRow> rows1 = Collections.singletonList(row(1, 100L, 1));
-        writeRows(conn, TablePath.of(DEFAULT_DB, leftTableName), rows1, false);
-
-        String rightTableName = "right_table_cache";
-        createSource(
-                rightTableName,
-                "a2 int, c2 bigint, d2 int",
-                "c2",
-                "c2",
-                null,
-                ImmutableMap.of(
-                        "table.delete.behavior",
-                        "IGNORE",
-                        "lookup.cache",
-                        "partial",
-                        "lookup.partial-cache.max-rows",
-                        "100"));
-        List<InternalRow> rows2 = Collections.singletonList(row(1, 100L, 1));
-        writeRows(conn, TablePath.of(DEFAULT_DB, rightTableName), rows2, 
false);
-
-        String sinkTableName = "sink_table_cache";
-        createSink(sinkTableName, "a1 int, c1 bigint, d1 int, a2 int", "c1, 
d1");
-
-        String sql =
-                String.format(
-                        "INSERT INTO %s SELECT a1, c1, d1, a2 FROM %s AS T1 
INNER JOIN %s AS T2 ON T1.c1 = T2.c2",
-                        sinkTableName, leftTableName, rightTableName);
-
-        assertThat(tEnv.explainSql(sql))
-                .contains("DeltaJoin(joinType=[InnerJoin], where=[(c1 = c2)]");
-
-        tEnv.executeSql(sql);
-
-        CloseableIterator<Row> collected =
-                tEnv.executeSql(String.format("select * from %s", 
sinkTableName)).collect();
-        List<String> expected =
-                Arrays.asList("+I[1, 100, 1, 1]", "-U[1, 100, 1, 1]", "+U[1, 
100, 1, 1]");
-        assertResultsIgnoreOrder(collected, expected, true);
-    }
-
-    @Test
-    void testDeltaJoinWithPartitionedTable() throws Exception {
-        String leftTableName = "left_table_partitioned";
-        createSource(
-                leftTableName,
-                "a1 int, b1 varchar, c1 bigint, d1 int, pt1 varchar",
-                "c1, d1, pt1",
-                "c1",
-                "pt1",
-                ImmutableMap.of("table.delete.behavior", "IGNORE"));
-        TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
-        Iterator<String> leftPartitionIterator =
-                
waitUntilPartitions(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), leftTablePath)
-                        .values()
-                        .iterator();
-        // pick two partition to insert data
-        String leftPartition1 = leftPartitionIterator.next();
-        String leftPartition2 = leftPartitionIterator.next();
-        List<InternalRow> rows1 =
-                Arrays.asList(
-                        row(1, "v1", 100L, 1000, leftPartition1),
-                        row(2, "v2", 200L, 2000, leftPartition1),
-                        row(3, "v3", 100L, 3000, leftPartition2),
-                        row(4, "v4", 400L, 4000, leftPartition2));
-        writeRows(conn, leftTablePath, rows1, false);
-
-        String rightTableName = "right_table_partitioned";
-        createSource(
-                rightTableName,
-                "a2 int, b2 varchar, c2 bigint, d2 int, pt2 varchar",
-                "c2, pt2",
-                "c2",
-                "pt2",
-                ImmutableMap.of("table.delete.behavior", "IGNORE"));
-        TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
-        Iterator<String> rightPartitionIterator =
-                
waitUntilPartitions(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), 
rightTablePath)
-                        .values()
-                        .iterator();
-        // pick two partition to insert data
-        String rightPartition1 = rightPartitionIterator.next();
-        String rightPartition2 = rightPartitionIterator.next();
-        List<InternalRow> rows2 =
-                Arrays.asList(
-                        row(1, "v1", 100L, 1000, rightPartition1),
-                        row(4, "v4", 400L, 3000, rightPartition2));
-        writeRows(conn, rightTablePath, rows2, false);
-
-        String sinkTableName = "sink_table";
-        createSink(
-                sinkTableName,
-                "a1 int, b1 varchar, c1 bigint, d1 int, pt1 varchar, b2 
varchar",
-                "c1, d1, pt1");
-
-        String sql =
-                String.format(
-                        "INSERT INTO %s SELECT a1, b1, c1, d1, pt1, b2 FROM %s 
AS T1 INNER JOIN %s AS T2 "
-                                + "ON T1.c1 = T2.c2 AND T1.pt1 = T2.pt2",
-                        sinkTableName, leftTableName, rightTableName);
-
-        assertThat(tEnv.explainSql(sql))
-                .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) 
AND (pt1 = pt2))]");
-
-        tEnv.executeSql(sql);
-
-        CloseableIterator<Row> collected =
-                tEnv.executeSql(String.format("select * from %s", 
sinkTableName)).collect();
-        List<String> expected =
-                Arrays.asList(
-                        String.format("+I[1, v1, 100, 1000, %s, v1]", 
leftPartition1),
-                        String.format("-U[1, v1, 100, 1000, %s, v1]", 
leftPartition1),
-                        String.format("+U[1, v1, 100, 1000, %s, v1]", 
leftPartition1),
-                        String.format("+I[4, v4, 400, 4000, %s, v4]", 
leftPartition2),
-                        String.format("-U[4, v4, 400, 4000, %s, v4]", 
leftPartition2),
-                        String.format("+U[4, v4, 400, 4000, %s, v4]", 
leftPartition2));
-        assertResultsIgnoreOrder(collected, expected, true);
-    }
-
-    @Test
-    void testDeltaJoinFailsWhenSourceHasDelete() {
-        String leftTableName = "left_table_delete_force";
-        createSource(
-                leftTableName,
-                "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
-                "c1, d1",
-                "c1",
-                null,
-                null);
-
-        String rightTableName = "right_table_delete_force";
-        createSource(
-                rightTableName,
-                "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
-                "c2, d2",
-                "c2",
-                null,
-                null);
-
-        String sinkTableName = "sink_table_delete_force";
-        createSink(
-                sinkTableName,
-                "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2 
varchar, c2 bigint, d2 int, e2 bigint",
-                "c1, d1, c2, d2");
-
-        String sql =
-                String.format(
-                        "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = 
c2 AND d1 = d2",
-                        sinkTableName, leftTableName, rightTableName);
-
-        assertThatThrownBy(() -> tEnv.explainSql(sql))
-                .isInstanceOf(ValidationException.class)
-                .hasMessageContaining("doesn't support to do delta join 
optimization");
-    }
-
-    @Test
-    void testDeltaJoinWithJoinKeyExceedsPrimaryKey() {
-        String leftTableName = "left_table_exceed_pk";
-        createSource(
-                leftTableName,
-                "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
-                "c1, d1",
-                "c1",
-                null,
-                ImmutableMap.of("table.delete.behavior", "IGNORE"));
-
-        String rightTableName = "right_table_exceed_pk";
-        createSource(
-                rightTableName,
-                "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
-                "c2, d2",
-                "c2",
-                null,
-                ImmutableMap.of("table.delete.behavior", "IGNORE"));
-
-        String sinkTableName = "sink_table_exceed_pk";
-        createSink(
-                sinkTableName, "a1 int, c1 bigint, d1 int, e1 bigint, a2 int, 
e2 bigint", "c1, d1");
-
-        String sql =
-                String.format(
-                        "INSERT INTO %s SELECT a1, c1, d1, e1, a2, e2 FROM %s 
INNER JOIN %s ON c1 = c2 AND d1 = d2 AND e1 = e2",
-                        sinkTableName, leftTableName, rightTableName);
-
-        assertThatThrownBy(() -> tEnv.explainSql(sql))
-                .isInstanceOf(ValidationException.class)
-                .hasMessageContaining("doesn't support to do delta join 
optimization");
-    }
-
-    @Test
-    void testDeltaJoinWithAppendOnlySourceAndJoinKeyExceedsPrimaryKey() throws 
Exception {
-        String leftTableName = "left_table_exceed_pk";
-        createSource(
-                leftTableName,
-                "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
-                "c1, d1",
-                "c1",
-                null,
-                ImmutableMap.of("table.merge-engine", "first_row"));
-
-        List<InternalRow> rows1 =
-                Arrays.asList(row(1, "v1", 100L, 1, 10000L), row(2, "v2", 
200L, 2, 20000L));
-        TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
-        writeRows(conn, leftTablePath, rows1, false);
-
-        String rightTableName = "right_table_exceed_pk";
-        createSource(
-                rightTableName,
-                "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
-                "c2, d2",
-                "c2",
-                null,
-                ImmutableMap.of("table.merge-engine", "first_row"));
-
-        List<InternalRow> rows2 =
-                Arrays.asList(row(1, "v1", 100L, 1, 10000L), row(2, "v3", 
200L, 2, 99999L));
-        TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
-        writeRows(conn, rightTablePath, rows2, false);
-
-        String sinkTableName = "sink_table_exceed_pk";
-        createSink(
-                sinkTableName, "a1 int, c1 bigint, d1 int, e1 bigint, a2 int, 
e2 bigint", "c1, d1");
-
-        // Join on PK (c1, d1) + additional non-PK field (e1)
-        // This should succeed because join keys {c1, d1, e1} contain complete 
PK {c1, d1}
-        // The e1 = e2 condition is applied as a post-lookup equi-condition 
filter
-        String sql =
-                String.format(
-                        "INSERT INTO %s SELECT a1, c1, d1, e1, a2, e2 FROM %s 
INNER JOIN %s ON c1 = c2 AND d1 = d2 AND e1 = e2",
-                        sinkTableName, leftTableName, rightTableName);
-
-        assertThat(tEnv.explainSql(sql))
-                .contains(
-                        "DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND 
(d1 = d2) AND (e1 = e2))]");
-
-        tEnv.executeSql(sql);
-
-        CloseableIterator<Row> collected =
-                tEnv.executeSql(String.format("select * from %s", 
sinkTableName)).collect();
-        // Only first row should match (e1 = e2 = 10000)
-        // Second row filtered out (e1 = 20000 AND != e2 = 99999)
-        List<String> expected =
-                Arrays.asList(
-                        "+I[1, 100, 1, 10000, 1, 10000]",
-                        "-U[1, 100, 1, 10000, 1, 10000]",
-                        "+U[1, 100, 1, 10000, 1, 10000]");
-        assertResultsIgnoreOrder(collected, expected, true);
-    }
-
-    @Test
-    void testDeltaJoinFailsWhenJoinKeyNotContainIndex() {
-        String leftTableName = "left_table_no_idx_force";
-        createSource(
-                leftTableName,
-                "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
-                "c1, d1",
-                "c1",
-                null,
-                ImmutableMap.of("table.delete.behavior", "IGNORE"));
-
-        String rightTableName = "right_table_no_idx_force";
-        createSource(
-                rightTableName,
-                "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
-                "c2, d2",
-                "c2",
-                null,
-                ImmutableMap.of("table.delete.behavior", "IGNORE"));
-
-        String sinkTableName = "sink_table_no_idx_force";
-        createSink(
-                sinkTableName,
-                "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2 
varchar, c2 bigint, d2 int, e2 bigint",
-                "a1, a2");
-
-        String sql =
-                String.format(
-                        "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON a1 = 
a2",
-                        sinkTableName, leftTableName, rightTableName);
-
-        assertThatThrownBy(() -> tEnv.explainSql(sql))
-                .isInstanceOf(ValidationException.class)
-                .hasMessageContaining("doesn't support to do delta join 
optimization");
-    }
-
-    @Test
-    void testDeltaJoinFailsWithOuterJoin() {
-        String leftTableName = "left_table_outer_fail";
-        createSource(
-                leftTableName,
-                "a1 int, c1 bigint, d1 int",
-                "c1, d1",
-                "c1",
-                null,
-                ImmutableMap.of("table.delete.behavior", "IGNORE"));
-
-        String rightTableName = "right_table_outer_fail";
-        createSource(
-                rightTableName,
-                "a2 int, c2 bigint, d2 int",
-                "c2, d2",
-                "c2",
-                null,
-                ImmutableMap.of("table.delete.behavior", "IGNORE"));
-
-        String sinkTableName = "sink_table_outer_fail";
-        createSink(sinkTableName, "a1 int, c1 bigint, a2 int", "c1");
-
-        // Test LEFT JOIN
-        String leftJoinSql =
-                String.format(
-                        "INSERT INTO %s SELECT a1, c1, a2 FROM %s LEFT JOIN %s 
ON c1 = c2 AND d1 = d2",
-                        sinkTableName, leftTableName, rightTableName);
-
-        assertThatThrownBy(() -> tEnv.explainSql(leftJoinSql))
-                .isInstanceOf(ValidationException.class)
-                .hasMessageContaining("doesn't support to do delta join 
optimization");
-
-        // Test RIGHT JOIN
-        String rightJoinSql =
-                String.format(
-                        "INSERT INTO %s SELECT a1, c2, a2 FROM %s RIGHT JOIN 
%s ON c1 = c2 AND d1 = d2",
-                        sinkTableName, leftTableName, rightTableName);
-
-        assertThatThrownBy(() -> tEnv.explainSql(rightJoinSql))
-                .isInstanceOf(ValidationException.class)
-                .hasMessageContaining("doesn't support to do delta join 
optimization");
-
-        // Test FULL OUTER JOIN
-        String fullOuterJoinSql =
-                String.format(
-                        "INSERT INTO %s SELECT a1, c1, a2 FROM %s FULL OUTER 
JOIN %s ON c1 = c2 AND d1 = d2",
-                        sinkTableName, leftTableName, rightTableName);
-
-        assertThatThrownBy(() -> tEnv.explainSql(fullOuterJoinSql))
-                .isInstanceOf(ValidationException.class)
-                .hasMessageContaining("doesn't support to do delta join 
optimization");
-    }
-
-    @Test
-    void testDeltaJoinFailsWithCascadeJoin() {
-        String table1 = "cascade_table1";
-        createSource(
-                table1,
-                "a1 int, c1 bigint, d1 int",
-                "c1, d1",
-                "c1",
-                null,
-                ImmutableMap.of("table.delete.behavior", "IGNORE"));
-
-        String table2 = "cascade_table2";
-        createSource(
-                table2,
-                "a2 int, c2 bigint, d2 int",
-                "c2, d2",
-                "c2",
-                null,
-                ImmutableMap.of("table.delete.behavior", "IGNORE"));
-
-        String table3 = "cascade_table3";
-        createSource(
-                table3,
-                "a3 int, c3 bigint, d3 int",
-                "c3, d3",
-                "c3",
-                null,
-                ImmutableMap.of("table.delete.behavior", "IGNORE"));
-
-        String sinkTableName = "cascade_sink";
-        createSink(
-                sinkTableName,
-                "a1 int, c1 bigint, a2 int, c2 bigint, a3 int, c3 bigint",
-                "c1, c2, c3");
-
-        String sql =
-                String.format(
-                        "INSERT INTO %s SELECT a1, c1, a2, c2, a3, c3 "
-                                + "FROM %s "
-                                + "INNER JOIN %s ON c1 = c2 AND d1 = d2 "
-                                + "INNER JOIN %s ON c1 = c3 AND d1 = d3",
-                        sinkTableName, table1, table2, table3);
-
-        assertThatThrownBy(() -> tEnv.explainSql(sql))
-                .isInstanceOf(ValidationException.class)
-                .hasMessageContaining("doesn't support to do delta join 
optimization");
-    }
-
-    @Test
-    void testDeltaJoinFailsWithSinkMaterializer() {
-        // With CDC sources, when sink PK doesn't match upstream update key,
-        // Flink would insert SinkUpsertMaterializer which prevents delta join
-        String leftTableName = "left_table_materializer";
-        createSource(
-                leftTableName,
-                "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
-                "c1, d1",
-                "c1",
-                null,
-                ImmutableMap.of("table.delete.behavior", "IGNORE"));
-
-        String rightTableName = "right_table_materializer";
-        createSource(
-                rightTableName,
-                "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
-                "c2, d2",
-                "c2",
-                null,
-                ImmutableMap.of("table.delete.behavior", "IGNORE"));
-
-        // Sink PK (a1, a2) doesn't match upstream update key (c1, d1, c2, d2)
-        // TODO: this depends on Fluss supports MVCC/point-in-time lookup to 
support change upsert
-        //  keys
-        String sinkTableName = "sink_table_materializer";
-        createSink(
-                sinkTableName,
-                "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2 
varchar, c2 bigint, d2 int, e2 bigint",
-                "a1, a2");
-
-        String sql =
-                String.format(
-                        "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = 
c2 AND d1 = d2",
-                        sinkTableName, leftTableName, rightTableName);
-
-        assertThatThrownBy(() -> tEnv.explainSql(sql))
-                .isInstanceOf(ValidationException.class)
-                .hasMessageContaining("doesn't support to do delta join 
optimization");
-    }
-
-    @Test
-    void testDeltaJoinFailsWithNonDeterministicFunctions() {
-        String leftTableName = "left_table_nondeterministic";
-        createSource(
-                leftTableName,
-                "a1 int, c1 bigint, d1 int",
-                "c1, d1",
-                "c1",
-                null,
-                ImmutableMap.of("table.delete.behavior", "IGNORE"));
-
-        String rightTableName = "right_table_nondeterministic";
-        createSource(
-                rightTableName,
-                "a2 int, c2 bigint, d2 int",
-                "c2, d2",
-                "c2",
-                null,
-                ImmutableMap.of("table.delete.behavior", "IGNORE"));
-
-        String sinkTableName = "sink_table_nondeterministic";
-        // TODO this should be supported in Flink in future for 
non-deterministic functions before
-        //  sinking
-        createSink(sinkTableName, "c1 bigint, d1 bigint, rand_val double", 
"c1");
-
-        String sql =
-                String.format(
-                        "INSERT INTO %s SELECT c1, d1, RAND() FROM %s INNER 
JOIN %s ON c1 = c2 AND d1 = d2",
-                        sinkTableName, leftTableName, rightTableName);
-
-        assertThatThrownBy(() -> tEnv.explainSql(sql))
-                .isInstanceOf(ValidationException.class)
-                .hasMessageContaining("doesn't support to do delta join 
optimization");
-    }
-}
diff --git 
a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java
 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java
index 507527fb3..725372048 100644
--- 
a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java
+++ 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java
@@ -17,5 +17,293 @@
 
 package org.apache.fluss.flink.source;
 
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
+import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows;
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
 /** IT case for {@link FlinkTableSource} in Flink 2.2. */
-public class Flink22TableSourceITCase extends FlinkTableSourceITCase {}
+public class Flink22TableSourceITCase extends FlinkTableSourceITCase {
+
+    @Test
+    void testDeltaJoin() throws Exception {
+        // start two jobs for this test: one for DML involving the delta join, 
and the other for DQL
+        // to query the results of the sink table
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 2);
+
+        String leftTableName = "left_table";
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ( "
+                                + " a1 int, "
+                                + " b1 varchar, "
+                                + " c1 bigint, "
+                                + " d1 int, "
+                                + " e1 bigint, "
+                                + " primary key (c1, d1) NOT ENFORCED"
+                                + ") with ("
+                                + " 'connector' = 'fluss', "
+                                + " 'bucket.key' = 'c1', "
+                                // currently, delta join only support 
append-only source
+                                + " 'table.merge-engine' = 'first_row' "
+                                + ")",
+                        leftTableName));
+        List<InternalRow> rows1 =
+                Arrays.asList(
+                        row(1, "v1", 100L, 1, 10000L),
+                        row(2, "v2", 200L, 2, 20000L),
+                        row(3, "v1", 300L, 3, 30000L),
+                        row(4, "v4", 400L, 4, 40000L));
+        // write records
+        TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
+        writeRows(conn, leftTablePath, rows1, false);
+
+        String rightTableName = "right_table";
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ("
+                                + " a2 int, "
+                                + " b2 varchar, "
+                                + " c2 bigint, "
+                                + " d2 int, "
+                                + " e2 bigint, "
+                                + " primary key (c2, d2) NOT ENFORCED"
+                                + ") with ("
+                                + " 'connector' = 'fluss', "
+                                + " 'bucket.key' = 'c2', "
+                                // currently, delta join only support 
append-only source
+                                + " 'table.merge-engine' = 'first_row' "
+                                + ")",
+                        rightTableName));
+        List<InternalRow> rows2 =
+                Arrays.asList(
+                        row(1, "v1", 100L, 1, 10000L),
+                        row(2, "v3", 200L, 2, 20000L),
+                        row(3, "v4", 300L, 4, 30000L),
+                        row(4, "v4", 500L, 4, 50000L));
+        // write records
+        TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
+        writeRows(conn, rightTablePath, rows2, false);
+
+        String sinkTableName = "sink_table";
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ( "
+                                + " a1 int, "
+                                + " b1 varchar, "
+                                + " c1 bigint, "
+                                + " d1 int, "
+                                + " e1 bigint, "
+                                + " a2 int, "
+                                + " b2 varchar, "
+                                + " c2 bigint, "
+                                + " d2 int, "
+                                + " e2 bigint, "
+                                + " primary key (c1, d1, c2, d2) NOT ENFORCED"
+                                + ") with ("
+                                + " 'connector' = 'fluss' "
+                                + ")",
+                        sinkTableName));
+
+        tEnv.getConfig()
+                .set(
+                        
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
+                        OptimizerConfigOptions.DeltaJoinStrategy.FORCE);
+
+        String sql =
+                String.format(
+                        "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = 
c2 AND d1 = d2",
+                        sinkTableName, leftTableName, rightTableName);
+
+        assertThat(tEnv.explainSql(sql))
+                .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) 
AND (d1 = d2))]");
+
+        tEnv.executeSql(sql);
+
+        CloseableIterator<Row> collected =
+                tEnv.executeSql(String.format("select * from %s", 
sinkTableName)).collect();
+        List<String> expected =
+                Arrays.asList(
+                        "+I[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
+                        "-U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
+                        "+U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
+                        "+I[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]",
+                        "-U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]",
+                        "+U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]");
+        assertResultsIgnoreOrder(collected, expected, true);
+    }
+
+    @Test
+    void testDeltaJoinWithProjectionAndFilter() throws Exception {
+        // start two jobs for this test: one for DML involving the delta join, 
and the other for DQL
+        // to query the results of the sink table
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 2);
+
+        String leftTableName = "left_table_proj";
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ( "
+                                + " a1 int, "
+                                + " b1 varchar, "
+                                + " c1 bigint, "
+                                + " d1 int, "
+                                + " e1 bigint, "
+                                + " primary key (c1, d1) NOT ENFORCED"
+                                + ") with ("
+                                + " 'connector' = 'fluss', "
+                                + " 'bucket.key' = 'c1', "
+                                + " 'table.merge-engine' = 'first_row' "
+                                + ")",
+                        leftTableName));
+        List<InternalRow> rows1 =
+                Arrays.asList(
+                        row(1, "v1", 100L, 1, 10000L),
+                        row(2, "v2", 200L, 2, 20000L),
+                        row(3, "v1", 300L, 3, 30000L));
+        TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
+        writeRows(conn, leftTablePath, rows1, false);
+
+        String rightTableName = "right_table_proj";
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ("
+                                + " a2 int, "
+                                + " b2 varchar, "
+                                + " c2 bigint, "
+                                + " d2 int, "
+                                + " e2 bigint, "
+                                + " primary key (c2, d2) NOT ENFORCED"
+                                + ") with ("
+                                + " 'connector' = 'fluss', "
+                                + " 'bucket.key' = 'c2', "
+                                + " 'table.merge-engine' = 'first_row' "
+                                + ")",
+                        rightTableName));
+        List<InternalRow> rows2 =
+                Arrays.asList(
+                        row(1, "v1", 100L, 1, 10000L),
+                        row(2, "v3", 200L, 2, 20000L),
+                        row(3, "v4", 300L, 4, 30000L));
+        TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
+        writeRows(conn, rightTablePath, rows2, false);
+
+        String sinkTableName = "sink_table_proj";
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ( "
+                                + " a1 int, "
+                                + " c1 bigint, "
+                                + " a2 int, "
+                                + " primary key (c1) NOT ENFORCED"
+                                + ") with ("
+                                + " 'connector' = 'fluss' "
+                                + ")",
+                        sinkTableName));
+
+        tEnv.getConfig()
+                .set(
+                        
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
+                        OptimizerConfigOptions.DeltaJoinStrategy.FORCE);
+
+        // Test with projection and filter
+        String sql =
+                String.format(
+                        "INSERT INTO %s SELECT a1, c1, a2 FROM %s INNER JOIN 
%s ON c1 = c2 AND d1 = d2 WHERE a1 > 1",
+                        sinkTableName, leftTableName, rightTableName);
+
+        assertThat(tEnv.explainSql(sql))
+                .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) 
AND (d1 = d2))]");
+
+        tEnv.executeSql(sql);
+
+        CloseableIterator<Row> collected =
+                tEnv.executeSql(String.format("select * from %s", 
sinkTableName)).collect();
+        List<String> expected = Arrays.asList("+I[2, 200, 2]", "-U[2, 200, 
2]", "+U[2, 200, 2]");
+        assertResultsIgnoreOrder(collected, expected, true);
+    }
+
+    @Test
+    void testDeltaJoinWithLookupCache() throws Exception {
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 2);
+
+        String leftTableName = "left_table_cache";
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ( "
+                                + " a1 int, "
+                                + " c1 bigint, "
+                                + " d1 int, "
+                                + " primary key (c1, d1) NOT ENFORCED"
+                                + ") with ("
+                                + " 'connector' = 'fluss', "
+                                + " 'bucket.key' = 'c1', "
+                                + " 'table.merge-engine' = 'first_row' "
+                                + ")",
+                        leftTableName));
+        List<InternalRow> rows1 = Arrays.asList(row(1, 100L, 1));
+        writeRows(conn, TablePath.of(DEFAULT_DB, leftTableName), rows1, false);
+
+        String rightTableName = "right_table_cache";
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ("
+                                + " a2 int, "
+                                + " c2 bigint, "
+                                + " d2 int, "
+                                + " primary key (c2, d2) NOT ENFORCED"
+                                + ") with ("
+                                + " 'connector' = 'fluss', "
+                                + " 'bucket.key' = 'c2', "
+                                + " 'table.merge-engine' = 'first_row', "
+                                + " 'lookup.cache' = 'partial', "
+                                + " 'lookup.partial-cache.max-rows' = '100' "
+                                + ")",
+                        rightTableName));
+        List<InternalRow> rows2 = Arrays.asList(row(1, 100L, 1));
+        writeRows(conn, TablePath.of(DEFAULT_DB, rightTableName), rows2, 
false);
+
+        String sinkTableName = "sink_table_cache";
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ( "
+                                + " a1 int, "
+                                + " a2 int, "
+                                + " primary key (a1) NOT ENFORCED" // Dummy PK
+                                + ") with ("
+                                + " 'connector' = 'fluss' "
+                                + ")",
+                        sinkTableName));
+
+        tEnv.getConfig()
+                .set(
+                        
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
+                        OptimizerConfigOptions.DeltaJoinStrategy.FORCE);
+
+        String sql =
+                String.format(
+                        "INSERT INTO %s SELECT T1.a1, T2.a2 FROM %s AS T1 
INNER JOIN %s AS T2 ON T1.c1 = T2.c2 AND T1.d1 = T2.d2",
+                        sinkTableName, leftTableName, rightTableName);
+
+        assertThat(tEnv.explainSql(sql))
+                .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) 
AND (d1 = d2))]");
+
+        tEnv.executeSql(sql);
+
+        CloseableIterator<Row> collected =
+                tEnv.executeSql(String.format("select * from %s", 
sinkTableName)).collect();
+        List<String> expected = Arrays.asList("+I[1, 1]", "-U[1, 1]", "+U[1, 
1]");
+        assertResultsIgnoreOrder(collected, expected, true);
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
index 0421cc561..abb74ebf6 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
@@ -49,7 +49,7 @@ class FlussTableLakeSnapshotCommitterTest extends 
FlinkTestBase {
     private FlussTableLakeSnapshotCommitter flussTableLakeSnapshotCommitter;
 
     @BeforeEach
-    public void beforeEach() {
+    void beforeEach() {
         flussTableLakeSnapshotCommitter =
                 new 
FlussTableLakeSnapshotCommitter(FLUSS_CLUSTER_EXTENSION.getClientConfig());
         flussTableLakeSnapshotCommitter.open();
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
index 70d86bb2c..bdecfa8fa 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
@@ -69,7 +69,7 @@ class TieringCommitOperatorTest extends FlinkTestBase {
     private StreamOperatorParameters<CommittableMessage<TestingCommittable>> 
parameters;
 
     @BeforeEach
-    public void beforeEach() throws Exception {
+    void beforeEach() throws Exception {
         mockOperatorEventGateway = new MockOperatorEventGateway();
         MockOperatorEventDispatcher mockOperatorEventDispatcher =
                 new MockOperatorEventDispatcher(mockOperatorEventGateway);
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
index 7c80bbc0c..bfa814ce2 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
@@ -155,7 +155,7 @@ public class FlinkTestBase extends AbstractTestBase {
     }
 
     @BeforeEach
-    public void beforeEach() throws Exception {
+    void beforeEach() throws Exception {
         admin.createDatabase(DEFAULT_DB, DatabaseDescriptor.EMPTY, true).get();
     }
 
diff --git a/website/docs/engine-flink/delta-joins.md 
b/website/docs/engine-flink/delta-joins.md
index 8388b71c8..96f097496 100644
--- a/website/docs/engine-flink/delta-joins.md
+++ b/website/docs/engine-flink/delta-joins.md
@@ -169,25 +169,24 @@ There is a known issue 
([FLINK-38399](https://issues.apache.org/jira/browse/FLIN
 
 #### Supported Features
 
-- CDC sources are now supported in delta join, provided they do not produce 
DELETE messages.
-  - Set `'table.delete.behavior' = 'IGNORE'` or `'DISABLE'` on the source 
table to suppress deletes.
-  - The `'table.merge-engine' = 'first_row'` option is no longer required.
-- Projection and filter operations are now supported between source and delta 
join.
-- Lookup cache is now supported in delta join.
+- Support for optimizing a dual-stream join from CDC sources that do not 
include delete messages into a delta join.
+  - Disable delete on the source table to guarantee there is no delete message 
in the table, by adding the option `'table.delete.behavior' = 'IGNORE'` or 
`'DISABLE'` on the table.
+  - The source table is no more required to be a `first_row` merge engine 
table since this version.
+- Support `Project` and `Filter` between source and delta join.
+- Support cache in delta join.
 
 #### Limitations
 
 - The primary key or the prefix key of the tables must be included as part of 
the equivalence conditions in the join.
-- The join must be an INNER join. LEFT JOIN, RIGHT JOIN, and FULL OUTER JOIN 
are not supported.
-- Cascade joins (e.g., `A JOIN B JOIN C`) are not supported. Each join input 
must come directly from a table source.
+- The join must be a INNER join.
 - The downstream node of the join must support idempotent updates, typically 
it's an upsert sink and should not have a `SinkUpsertMaterializer` node before 
it.
-  - Flink planner automatically inserts a `SinkUpsertMaterializer` when the 
sink's primary key does not fully cover the upstream update key.
+  - Flink planner automatically inserts a `SinkUpsertMaterializer` when the 
sink’s primary key does not fully cover the upstream update key.
   - You can learn more details about `SinkUpsertMaterializer` by reading this 
[blog](https://www.ververica.com/blog/flink-sql-secrets-mastering-the-art-of-changelog-events).
 - Since delta join does not support to handle update-before messages, it is 
necessary to ensure that the entire pipeline can safely discard update-before 
messages. That means when consuming a CDC stream:
   - The join key used in the delta join must be part of the primary key.
   - The sink's primary key must be the same as the upstream update key.
-  - All filters (including non-equi join conditions) must be applied on the 
upsert key.
-- Filters, projections, and non-equi join conditions must not contain 
non-deterministic functions.
+  - All filters must be applied on the upsert key.
+- Neither filters nor projections should contain non-deterministic functions.
 
 ## Future Plan
 

Reply via email to