This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit 1b674ca77d412e889af751f5f42ea97a47aa00c2
Author: xuyang <[email protected]>
AuthorDate: Mon Feb 2 11:56:02 2026 +0800

    [flink] Add and improve Delta Join IT tests (#2546)
---
 .../fluss/flink/source/Flink22DeltaJoinITCase.java | 623 ++++++++++++++-------
 .../FlussTableLakeSnapshotCommitterTest.java       |   2 +-
 .../committer/TieringCommitOperatorTest.java       |   2 +-
 .../apache/fluss/flink/utils/FlinkTestBase.java    |   2 +-
 4 files changed, 425 insertions(+), 204 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22DeltaJoinITCase.java
 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22DeltaJoinITCase.java
index 933dc2835..0fe8fb73c 100644
--- 
a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22DeltaJoinITCase.java
+++ 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22DeltaJoinITCase.java
@@ -17,33 +17,65 @@
 
 package org.apache.fluss.flink.source;
 
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.flink.utils.FlinkTestBase;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.shaded.guava32.com.google.common.collect.ImmutableMap;
 
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.api.config.OptimizerConfigOptions;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import javax.annotation.Nullable;
+
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 
+import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
 import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
-import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows;
+import static 
org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
 import static org.apache.fluss.testutils.DataTestUtils.row;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** IT case for Delta Join optimization in Flink 2.2. */
-public class Flink22DeltaJoinITCase extends FlinkTableSourceITCase {
+public class Flink22DeltaJoinITCase extends FlinkTestBase {
+
+    private static final String CATALOG_NAME = "test_catalog";
+
+    private StreamTableEnvironment tEnv;
 
     @BeforeEach
-    @Override
-    void before() {
-        super.before();
+    public void beforeEach() {
+        bootstrapServers = 
conn.getConfiguration().get(ConfigOptions.BOOTSTRAP_SERVERS).get(0);
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        tEnv = StreamTableEnvironment.create(env, 
EnvironmentSettings.inStreamingMode());
+
+        tEnv.executeSql(
+                String.format(
+                        "create catalog %s with ('type' = 'fluss', '%s' = 
'%s')",
+                        CATALOG_NAME, BOOTSTRAP_SERVERS.key(), 
bootstrapServers));
+        tEnv.useCatalog(CATALOG_NAME);
+        tEnv.executeSql(String.format("create database if not exists `%s`", 
DEFAULT_DB));
+        tEnv.useDatabase(DEFAULT_DB);
+
+        // start two jobs for this test: one for DML involving the delta join, 
and the other for DQL
+        // to query the results of the sink table
         
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 2);
         // Set FORCE strategy for delta join
         tEnv.getConfig()
@@ -52,6 +84,12 @@ public class Flink22DeltaJoinITCase extends 
FlinkTableSourceITCase {
                         OptimizerConfigOptions.DeltaJoinStrategy.FORCE);
     }
 
+    @AfterEach
+    void after() {
+        tEnv.useDatabase(BUILTIN_DATABASE);
+        tEnv.executeSql(String.format("drop database `%s` cascade", 
DEFAULT_DB));
+    }
+
     /**
      * Creates a source table with specified schema and options.
      *
@@ -59,35 +97,39 @@ public class Flink22DeltaJoinITCase extends 
FlinkTableSourceITCase {
      * @param columns column definitions (e.g., "a1 int, b1 varchar, c1 
bigint, d1 int, e1 bigint")
      * @param primaryKey primary key columns (e.g., "c1, d1")
      * @param bucketKey bucket key column (e.g., "c1")
-     * @param streamType stream type: "insert_only" for first_row, "cdc" for 
delete.behavior=IGNORE,
-     *     null for regular (supports deletes)
-     * @param extraOptions additional WITH options (e.g., "'lookup.cache' = 
'partial'"), or null
+     * @param extraOptions additional WITH options (e.g., "lookup.cache" = 
"partial"), or null
      */
     private void createSource(
             String tableName,
             String columns,
             String primaryKey,
             String bucketKey,
-            String streamType,
-            String extraOptions) {
-        StringBuilder withOptions = new StringBuilder();
-        withOptions.append("'connector' = 'fluss', ");
-        withOptions.append(String.format("'bucket.key' = '%s'", bucketKey));
-
-        if ("insert_only".equals(streamType)) {
-            withOptions.append(", 'table.merge-engine' = 'first_row'");
-        } else if ("cdc".equals(streamType)) {
-            withOptions.append(", 'table.delete.behavior' = 'IGNORE'");
-        }
-
+            @Nullable String partitionKey,
+            @Nullable Map<String, String> extraOptions) {
+        Map<String, String> withOptions = new HashMap<>();
         if (extraOptions != null) {
-            withOptions.append(", ").append(extraOptions);
+            withOptions.putAll(extraOptions);
         }
-
-        tEnv.executeSql(
+        withOptions.put("connector", "fluss");
+        withOptions.put("bucket.key", bucketKey);
+        StringBuilder ddlBuilder = new StringBuilder();
+        ddlBuilder.append(
                 String.format(
-                        "create table %s ( %s, primary key (%s) NOT ENFORCED ) 
with ( %s )",
-                        tableName, columns, primaryKey, 
withOptions.toString()));
+                        "create table %s ( %s, primary key (%s) NOT ENFORCED 
)",
+                        tableName, columns, primaryKey));
+        if (partitionKey != null) {
+            ddlBuilder.append(String.format(" partitioned by (%s)", 
partitionKey));
+            withOptions.put("table.auto-partition.enabled", "true");
+            withOptions.put("table.auto-partition.time-unit", "year");
+        }
+        ddlBuilder.append(
+                String.format(
+                        " with (%s)",
+                        withOptions.entrySet().stream()
+                                .map(e -> String.format("'%s' = '%s'", 
e.getKey(), e.getValue()))
+                                .collect(Collectors.joining(", "))));
+
+        tEnv.executeSql(ddlBuilder.toString());
     }
 
     /**
@@ -111,6 +153,8 @@ public class Flink22DeltaJoinITCase extends 
FlinkTableSourceITCase {
 
     @Test
     void testDeltaJoin() throws Exception {
+        // disable cache to get stable results with updating
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_DELTA_JOIN_CACHE_ENABLED,
 false);
         String leftTableName = "left_table";
         String rightTableName = "right_table";
         String sinkTableName = "sink_table";
@@ -120,15 +164,91 @@ public class Flink22DeltaJoinITCase extends 
FlinkTableSourceITCase {
                 "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
                 "c1, d1",
                 "c1",
-                "insert_only",
-                null);
+                null,
+                ImmutableMap.of("table.delete.behavior", "IGNORE"));
         createSource(
                 rightTableName,
                 "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
                 "c2, d2",
                 "c2",
-                "insert_only",
-                null);
+                null,
+                ImmutableMap.of("table.delete.behavior", "IGNORE"));
+        createSink(
+                sinkTableName,
+                "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2 
varchar, c2 bigint, d2 int, e2 bigint",
+                "c1, d1, c2, d2");
+
+        List<InternalRow> rows1 =
+                Arrays.asList(
+                        row(1, "v1", 100L, 1, 10000L),
+                        row(2, "v2", 100L, 2, 20000L),
+                        row(3, "v3", 300L, 3, 30000L),
+                        row(4, "v4", 400L, 4, 40000L),
+                        // update
+                        row(5, "v5", 100L, 1, 50000L));
+        TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
+        writeRows(conn, leftTablePath, rows1, false);
+        // wait for the first snapshot to finish to get the stable result
+        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(leftTablePath);
+
+        List<InternalRow> rows2 =
+                Arrays.asList(
+                        row(1, "v1", 100L, 1, 10000L),
+                        row(2, "v2", 200L, 2, 20000L),
+                        row(3, "v3", 330L, 4, 30000L),
+                        row(4, "v4", 500L, 4, 50000L),
+                        // update
+                        row(6, "v6", 100L, 1, 60000L));
+        TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
+        writeRows(conn, rightTablePath, rows2, false);
+        // wait for the first snapshot to finish to get the stable result
+        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(rightTablePath);
+
+        String sql =
+                String.format(
+                        "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = 
c2",
+                        sinkTableName, leftTableName, rightTableName);
+
+        assertThat(tEnv.explainSql(sql))
+                .contains("DeltaJoin(joinType=[InnerJoin], where=[(c1 = c2)]");
+
+        tEnv.executeSql(sql);
+
+        CloseableIterator<Row> collected =
+                tEnv.executeSql(String.format("select * from %s", 
sinkTableName)).collect();
+        List<String> expected =
+                Arrays.asList(
+                        "+I[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]",
+                        "-U[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]",
+                        "+U[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]",
+                        "+I[2, v2, 100, 2, 20000, 6, v6, 100, 1, 60000]",
+                        "-U[2, v2, 100, 2, 20000, 6, v6, 100, 1, 60000]",
+                        "+U[2, v2, 100, 2, 20000, 6, v6, 100, 1, 60000]");
+        assertResultsIgnoreOrder(collected, expected, true);
+    }
+
+    @Test
+    void testDeltaJoinOnPrimaryKey() throws Exception {
+        // disable cache to get stable results with updating
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_DELTA_JOIN_CACHE_ENABLED,
 false);
+        String leftTableName = "left_table";
+        String rightTableName = "right_table";
+        String sinkTableName = "sink_table";
+
+        createSource(
+                leftTableName,
+                "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
+                "c1, d1",
+                "c1",
+                null,
+                ImmutableMap.of("table.delete.behavior", "IGNORE"));
+        createSource(
+                rightTableName,
+                "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
+                "c2, d2",
+                "c2",
+                null,
+                ImmutableMap.of("table.delete.behavior", "IGNORE"));
         createSink(
                 sinkTableName,
                 "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2 
varchar, c2 bigint, d2 int, e2 bigint",
@@ -138,23 +258,27 @@ public class Flink22DeltaJoinITCase extends 
FlinkTableSourceITCase {
                 Arrays.asList(
                         row(1, "v1", 100L, 1, 10000L),
                         row(2, "v2", 200L, 2, 20000L),
-                        row(3, "v1", 300L, 3, 30000L),
+                        row(3, "v3", 300L, 3, 30000L),
                         row(4, "v4", 400L, 4, 40000L),
-                        // Add data with the same primary key as the first row
+                        // update
                         row(5, "v5", 100L, 1, 50000L));
         TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
         writeRows(conn, leftTablePath, rows1, false);
+        // wait for the first snapshot to finish to get the stable result
+        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(leftTablePath);
 
         List<InternalRow> rows2 =
                 Arrays.asList(
                         row(1, "v1", 100L, 1, 10000L),
-                        row(2, "v3", 200L, 2, 20000L),
-                        row(3, "v4", 300L, 4, 30000L),
+                        row(2, "v2", 200L, 2, 20000L),
+                        row(3, "v3", 300L, 4, 30000L),
                         row(4, "v4", 500L, 4, 50000L),
-                        // Add data with the same primary key as the first row
+                        // update
                         row(6, "v6", 100L, 1, 60000L));
         TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
         writeRows(conn, rightTablePath, rows2, false);
+        // wait for the first snapshot to finish to get the stable result
+        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(rightTablePath);
 
         String sql =
                 String.format(
@@ -170,25 +294,85 @@ public class Flink22DeltaJoinITCase extends 
FlinkTableSourceITCase {
                 tEnv.executeSql(String.format("select * from %s", 
sinkTableName)).collect();
         List<String> expected =
                 Arrays.asList(
-                        "+I[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
-                        "-U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
-                        "+U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
-                        "+I[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]",
-                        "-U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]",
-                        "+U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]");
+                        "+I[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]",
+                        "-U[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]",
+                        "+U[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]",
+                        "+I[2, v2, 200, 2, 20000, 2, v2, 200, 2, 20000]",
+                        "-U[2, v2, 200, 2, 20000, 2, v2, 200, 2, 20000]",
+                        "+U[2, v2, 200, 2, 20000, 2, v2, 200, 2, 20000]");
         assertResultsIgnoreOrder(collected, expected, true);
     }
 
     @Test
-    void testDeltaJoinWithProjectionAndFilter() throws Exception {
+    void testDeltaJoinWithCalc() throws Exception {
         String leftTableName = "left_table_proj";
         createSource(
                 leftTableName,
                 "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
                 "c1, d1",
                 "c1",
-                "insert_only",
-                null);
+                null,
+                ImmutableMap.of("table.delete.behavior", "IGNORE"));
+
+        List<InternalRow> rows1 =
+                Arrays.asList(
+                        row(1, "v1", 100L, 1, 10000L),
+                        row(2, "v2", 200L, 2, 20000L),
+                        row(3, "v1", 300L, 3, 30000L));
+        TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
+        writeRows(conn, leftTablePath, rows1, false);
+
+        String rightTableName = "right_table_proj";
+        createSource(
+                rightTableName,
+                "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
+                "c2",
+                "c2",
+                null,
+                ImmutableMap.of("table.delete.behavior", "IGNORE"));
+
+        List<InternalRow> rows2 =
+                Arrays.asList(
+                        row(1, "v1", 100L, 1, 10000L),
+                        row(2, "v2", 200L, 2, 20000L),
+                        row(3, "v3", 300L, 4, 30000L));
+        TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
+        writeRows(conn, rightTablePath, rows2, false);
+
+        String sinkTableName = "sink_table_proj";
+        createSink(sinkTableName, "a1 int, c1 bigint, d1 int, a2 int", "c1, 
d1");
+
+        String sql =
+                String.format(
+                        "INSERT INTO %s SELECT a1, c1, d1, a2 FROM ("
+                                + " SELECT * FROM %s WHERE d1 > 1"
+                                + ") INNER JOIN ("
+                                + " SELECT * FROM %s WHERE c2 < 300"
+                                + ") ON c1 = c2",
+                        sinkTableName, leftTableName, rightTableName);
+
+        assertThat(tEnv.explainSql(sql))
+                .contains("DeltaJoin(joinType=[InnerJoin], where=[(c1 = c2)]");
+
+        tEnv.executeSql(sql);
+
+        CloseableIterator<Row> collected =
+                tEnv.executeSql(String.format("select * from %s", 
sinkTableName)).collect();
+        List<String> expected =
+                Arrays.asList("+I[2, 200, 2, 2]", "-U[2, 200, 2, 2]", "+U[2, 
200, 2, 2]");
+        assertResultsIgnoreOrder(collected, expected, true);
+    }
+
+    @Test
+    void testDeltaJoinWithAppendOnlySourceAndCalc() throws Exception {
+        String leftTableName = "left_table_proj";
+        createSource(
+                leftTableName,
+                "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
+                "c1, d1",
+                "c1",
+                null,
+                ImmutableMap.of("table.merge-engine", "first_row"));
 
         List<InternalRow> rows1 =
                 Arrays.asList(
@@ -204,14 +388,14 @@ public class Flink22DeltaJoinITCase extends 
FlinkTableSourceITCase {
                 "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
                 "c2, d2",
                 "c2",
-                "insert_only",
-                null);
+                null,
+                ImmutableMap.of("table.merge-engine", "first_row"));
 
         List<InternalRow> rows2 =
                 Arrays.asList(
                         row(1, "v1", 100L, 1, 10000L),
                         row(2, "v3", 200L, 2, 20000L),
-                        row(3, "v4", 300L, 4, 30000L));
+                        row(3, "v4", 400L, 4, 30000L));
         TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
         writeRows(conn, rightTablePath, rows2, false);
 
@@ -220,11 +404,11 @@ public class Flink22DeltaJoinITCase extends 
FlinkTableSourceITCase {
 
         String sql =
                 String.format(
-                        "INSERT INTO %s SELECT a1, c1, a2 FROM %s INNER JOIN 
%s ON c1 = c2 AND d1 = d2 WHERE a1 > 1",
+                        "INSERT INTO %s SELECT a1, c1, a2 FROM %s INNER JOIN 
%s ON c1 = c2 WHERE a1 > 1",
                         sinkTableName, leftTableName, rightTableName);
 
         assertThat(tEnv.explainSql(sql))
-                .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) 
AND (d1 = d2))]");
+                .contains("DeltaJoin(joinType=[InnerJoin], where=[(c1 = c2)]");
 
         tEnv.executeSql(sql);
 
@@ -235,15 +419,15 @@ public class Flink22DeltaJoinITCase extends 
FlinkTableSourceITCase {
     }
 
     @Test
-    void testDeltaJoinFailsWhenFilterOnNonUpsertKeys() throws Exception {
+    void testDeltaJoinFailsWhenFilterOnNonUpsertKeys() {
         String leftTableName = "left_table_force_fail";
         createSource(
                 leftTableName,
                 "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
                 "c1, d1",
                 "c1",
-                "cdc",
-                null);
+                null,
+                ImmutableMap.of("table.delete.behavior", "IGNORE"));
 
         String rightTableName = "right_table_force_fail";
         createSource(
@@ -251,8 +435,8 @@ public class Flink22DeltaJoinITCase extends 
FlinkTableSourceITCase {
                 "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
                 "c2, d2",
                 "c2",
-                "cdc",
-                null);
+                null,
+                ImmutableMap.of("table.delete.behavior", "IGNORE"));
 
         String sinkTableName = "sink_table_force_fail";
         createSink(
@@ -261,6 +445,8 @@ public class Flink22DeltaJoinITCase extends 
FlinkTableSourceITCase {
                 "c1, d1, c2, d2");
 
         // Filter on e1 > e2, where e1 and e2 are NOT part of the upsert key
+        // TODO we can add a UpsertFilterOperator that can convert the 
un-match-filter UPSERT record
+        //  into DELETE record.
         String sql =
                 String.format(
                         "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = 
c2 AND d1 = d2 WHERE e1 > e2",
@@ -287,20 +473,20 @@ public class Flink22DeltaJoinITCase extends 
FlinkTableSourceITCase {
         createSource(
                 leftTableName,
                 "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
+                "c1, d1, e1",
                 "c1, d1",
-                "c1",
-                "cdc",
-                null);
+                null,
+                ImmutableMap.of("table.delete.behavior", "IGNORE"));
 
         List<InternalRow> rows1 =
                 Arrays.asList(
                         row(1, "v1", 100L, 1, 10000L),
-                        row(2, "v2", 200L, 2, 20000L),
-                        row(3, "v3", 50L, 3, 30000L),
+                        row(2, "v2", 200L, 2, 20001L),
+                        row(3, "v3", 300L, 3, 30000L),
                         // Add row with same PK (100, 1) to generate UPDATE in 
CDC mode
-                        // This row is filtered out (d1=1, filter is d1>1), so 
doesn't affect join
+                        // This row is filtered out by (e2 / 100) <> c2, so 
doesn't affect join
                         // result
-                        row(4, "v1_updated", 100L, 1, 15000L));
+                        row(4, "v1_updated", 100L, 1, 10000L));
         TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
         writeRows(conn, leftTablePath, rows1, false);
 
@@ -310,8 +496,8 @@ public class Flink22DeltaJoinITCase extends 
FlinkTableSourceITCase {
                 "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
                 "c2, d2",
                 "c2",
-                "cdc",
-                null);
+                null,
+                ImmutableMap.of("table.delete.behavior", "IGNORE"));
 
         List<InternalRow> rows2 =
                 Arrays.asList(
@@ -319,46 +505,47 @@ public class Flink22DeltaJoinITCase extends 
FlinkTableSourceITCase {
                         row(2, "v4", 200L, 2, 20000L),
                         row(3, "v5", 300L, 4, 40000L),
                         // Add row with same PK (100, 1) to generate UPDATE in 
CDC mode
-                        // This row is filtered out (d2=1, filter is d2>1), so 
doesn't affect join
+                        // This row is filtered out by (e2 / 100) <> c2, so 
doesn't affect join
                         // result
-                        row(5, "v1_updated", 100L, 1, 12000L));
+                        row(5, "v1_updated", 100L, 1, 10000L));
         TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
         writeRows(conn, rightTablePath, rows2, false);
 
         String sinkTableName = "sink_table_nonequi_upsert";
-        createSink(sinkTableName, "a1 int, c1 bigint, d1 int, a2 int", "c1, 
d1");
+        createSink(sinkTableName, "a1 int, c1 bigint, d1 int, e1 bigint, a2 
int", "c1, d1, e1");
 
         String sql =
                 String.format(
-                        "INSERT INTO %s SELECT a1, c1, d1, a2 FROM %s INNER 
JOIN %s ON c1 = c2 AND d1 = d2 AND d1 > 1",
+                        "INSERT INTO %s SELECT a1, c1, d1, e1, a2 FROM %s 
INNER JOIN %s "
+                                + "ON c1 = c2 AND d1 = d2 AND e1 <> (c2 * 
100)",
                         sinkTableName, leftTableName, rightTableName);
 
         assertThat(tEnv.explainSql(sql))
-                .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) 
AND (d1 = d2))]");
+                .contains(
+                        "DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND 
(d1 = d2) AND (e1 <> (c2 * 100)))]");
 
         tEnv.executeSql(sql);
 
         CloseableIterator<Row> collected =
                 tEnv.executeSql(String.format("select * from %s", 
sinkTableName)).collect();
-        // Filter (d1 > 1 AND d2 > 1) is pushed down before join
-        // PK (100, 1) has duplicate rows but d1=1 is filtered out, so doesn't 
appear in result
-        // PK (200, 2) passes filter: single row on each side, joins 
successfully
-        // Delta join produces duplicate changes pattern
         List<String> expected =
-                Arrays.asList("+I[2, 200, 2, 2]", "-U[2, 200, 2, 2]", "+U[2, 
200, 2, 2]");
+                Arrays.asList(
+                        "+I[2, 200, 2, 20001, 2]",
+                        "-U[2, 200, 2, 20001, 2]",
+                        "+U[2, 200, 2, 20001, 2]");
         assertResultsIgnoreOrder(collected, expected, true);
     }
 
     @Test
-    void testDeltaJoinWithNonEquiConditionInsertOnly() throws Exception {
+    void testDeltaJoinWithAppendOnlySourceAndNonEquiCondition() throws 
Exception {
         String leftTableName = "left_table_nonequi_insert";
         createSource(
                 leftTableName,
                 "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
                 "c1, d1",
                 "c1",
-                "insert_only",
-                null);
+                null,
+                ImmutableMap.of("table.merge-engine", "first_row"));
 
         List<InternalRow> rows1 =
                 Arrays.asList(
@@ -374,8 +561,8 @@ public class Flink22DeltaJoinITCase extends 
FlinkTableSourceITCase {
                 "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
                 "c2, d2",
                 "c2",
-                "insert_only",
-                null);
+                null,
+                ImmutableMap.of("table.merge-engine", "first_row"));
 
         List<InternalRow> rows2 =
                 Arrays.asList(
@@ -426,201 +613,193 @@ public class Flink22DeltaJoinITCase extends 
FlinkTableSourceITCase {
     void testDeltaJoinWithLookupCache() throws Exception {
         String leftTableName = "left_table_cache";
         createSource(
-                leftTableName, "a1 int, c1 bigint, d1 int", "c1, d1", "c1", 
"insert_only", null);
-        List<InternalRow> rows1 = Arrays.asList(row(1, 100L, 1));
+                leftTableName,
+                "a1 int, c1 bigint, d1 int",
+                "c1, d1",
+                "c1",
+                null,
+                ImmutableMap.of("table.delete.behavior", "IGNORE"));
+        List<InternalRow> rows1 = Collections.singletonList(row(1, 100L, 1));
         writeRows(conn, TablePath.of(DEFAULT_DB, leftTableName), rows1, false);
 
         String rightTableName = "right_table_cache";
         createSource(
                 rightTableName,
                 "a2 int, c2 bigint, d2 int",
-                "c2, d2",
                 "c2",
-                "insert_only",
-                "'lookup.cache' = 'partial', 'lookup.partial-cache.max-rows' = 
'100'");
-        List<InternalRow> rows2 = Arrays.asList(row(1, 100L, 1));
+                "c2",
+                null,
+                ImmutableMap.of(
+                        "table.delete.behavior",
+                        "IGNORE",
+                        "lookup.cache",
+                        "partial",
+                        "lookup.partial-cache.max-rows",
+                        "100"));
+        List<InternalRow> rows2 = Collections.singletonList(row(1, 100L, 1));
         writeRows(conn, TablePath.of(DEFAULT_DB, rightTableName), rows2, 
false);
 
         String sinkTableName = "sink_table_cache";
-        createSink(sinkTableName, "a1 int, a2 int", "a1");
+        createSink(sinkTableName, "a1 int, c1 bigint, d1 int, a2 int", "c1, 
d1");
 
         String sql =
                 String.format(
-                        "INSERT INTO %s SELECT T1.a1, T2.a2 FROM %s AS T1 
INNER JOIN %s AS T2 ON T1.c1 = T2.c2 AND T1.d1 = T2.d2",
+                        "INSERT INTO %s SELECT a1, c1, d1, a2 FROM %s AS T1 
INNER JOIN %s AS T2 ON T1.c1 = T2.c2",
                         sinkTableName, leftTableName, rightTableName);
 
         assertThat(tEnv.explainSql(sql))
-                .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) 
AND (d1 = d2))]");
+                .contains("DeltaJoin(joinType=[InnerJoin], where=[(c1 = c2)]");
 
         tEnv.executeSql(sql);
 
         CloseableIterator<Row> collected =
                 tEnv.executeSql(String.format("select * from %s", 
sinkTableName)).collect();
-        List<String> expected = Arrays.asList("+I[1, 1]", "-U[1, 1]", "+U[1, 
1]");
+        List<String> expected =
+                Arrays.asList("+I[1, 100, 1, 1]", "-U[1, 100, 1, 1]", "+U[1, 
100, 1, 1]");
         assertResultsIgnoreOrder(collected, expected, true);
     }
 
     @Test
-    void testDeltaJoinWithPrimaryKeyTableNoDeletes() throws Exception {
-        String leftTableName = "left_table_normal_pk";
+    void testDeltaJoinWithPartitionedTable() throws Exception {
+        String leftTableName = "left_table_partitioned";
         createSource(
                 leftTableName,
-                "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
-                "c1, d1",
+                "a1 int, b1 varchar, c1 bigint, d1 int, pt1 varchar",
+                "c1, d1, pt1",
                 "c1",
-                "cdc",
-                null);
-
+                "pt1",
+                ImmutableMap.of("table.delete.behavior", "IGNORE"));
+        TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
+        Iterator<String> leftPartitionIterator =
+                
waitUntilPartitions(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), leftTablePath)
+                        .values()
+                        .iterator();
+        // pick two partition to insert data
+        String leftPartition1 = leftPartitionIterator.next();
+        String leftPartition2 = leftPartitionIterator.next();
         List<InternalRow> rows1 =
                 Arrays.asList(
-                        row(1, "v1", 100L, 1, 10000L),
-                        row(2, "v2", 200L, 2, 20000L),
-                        row(3, "v3", 300L, 3, 30000L),
-                        // Add row with same PK (100, 1) to generate UPDATE in 
CDC mode
-                        row(4, "v1_updated", 100L, 1, 15000L));
-        TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
+                        row(1, "v1", 100L, 1000, leftPartition1),
+                        row(2, "v2", 200L, 2000, leftPartition1),
+                        row(3, "v3", 100L, 3000, leftPartition2),
+                        row(4, "v4", 400L, 4000, leftPartition2));
         writeRows(conn, leftTablePath, rows1, false);
 
-        String rightTableName = "right_table_normal_pk";
+        String rightTableName = "right_table_partitioned";
         createSource(
                 rightTableName,
-                "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
-                "c2, d2",
+                "a2 int, b2 varchar, c2 bigint, d2 int, pt2 varchar",
+                "c2, pt2",
                 "c2",
-                "cdc",
-                null);
-
+                "pt2",
+                ImmutableMap.of("table.delete.behavior", "IGNORE"));
+        TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
+        Iterator<String> rightPartitionIterator =
+                
waitUntilPartitions(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), 
rightTablePath)
+                        .values()
+                        .iterator();
+        // pick two partition to insert data
+        String rightPartition1 = rightPartitionIterator.next();
+        String rightPartition2 = rightPartitionIterator.next();
         List<InternalRow> rows2 =
                 Arrays.asList(
-                        row(1, "v1", 100L, 1, 10000L),
-                        row(2, "v4", 200L, 2, 20000L),
-                        row(3, "v5", 400L, 4, 40000L),
-                        // Add row with same PK (100, 1) to generate UPDATE in 
CDC mode
-                        row(5, "v1_updated", 100L, 1, 12000L));
-        TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
+                        row(1, "v1", 100L, 1000, rightPartition1),
+                        row(4, "v4", 400L, 3000, rightPartition2));
         writeRows(conn, rightTablePath, rows2, false);
 
-        String sinkTableName = "sink_table_normal_pk";
+        String sinkTableName = "sink_table";
         createSink(
                 sinkTableName,
-                "a1 int, b1 varchar, c1 bigint, d1 int, a2 int, b2 varchar",
-                "c1, d1");
+                "a1 int, b1 varchar, c1 bigint, d1 int, pt1 varchar, b2 
varchar",
+                "c1, d1, pt1");
 
         String sql =
                 String.format(
-                        "INSERT INTO %s SELECT a1, b1, c1, d1, a2, b2 FROM %s 
INNER JOIN %s ON c1 = c2 AND d1 = d2",
+                        "INSERT INTO %s SELECT a1, b1, c1, d1, pt1, b2 FROM %s 
AS T1 INNER JOIN %s AS T2 "
+                                + "ON T1.c1 = T2.c2 AND T1.pt1 = T2.pt2",
                         sinkTableName, leftTableName, rightTableName);
 
         assertThat(tEnv.explainSql(sql))
-                .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) 
AND (d1 = d2))]");
+                .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) 
AND (pt1 = pt2))]");
 
         tEnv.executeSql(sql);
 
         CloseableIterator<Row> collected =
                 tEnv.executeSql(String.format("select * from %s", 
sinkTableName)).collect();
-
         List<String> expected =
                 Arrays.asList(
-                        "+I[4, v1_updated, 100, 1, 5, v1_updated]",
-                        "-U[4, v1_updated, 100, 1, 5, v1_updated]",
-                        "+U[4, v1_updated, 100, 1, 5, v1_updated]",
-                        "+I[2, v2, 200, 2, 2, v4]",
-                        "-U[2, v2, 200, 2, 2, v4]",
-                        "+U[2, v2, 200, 2, 2, v4]");
+                        String.format("+I[1, v1, 100, 1000, %s, v1]", 
leftPartition1),
+                        String.format("-U[1, v1, 100, 1000, %s, v1]", 
leftPartition1),
+                        String.format("+U[1, v1, 100, 1000, %s, v1]", 
leftPartition1),
+                        String.format("+I[4, v4, 400, 4000, %s, v4]", 
leftPartition2),
+                        String.format("-U[4, v4, 400, 4000, %s, v4]", 
leftPartition2),
+                        String.format("+U[4, v4, 400, 4000, %s, v4]", 
leftPartition2));
         assertResultsIgnoreOrder(collected, expected, true);
     }
 
     @Test
-    void testDeltaJoinOnBucketKey() throws Exception {
-        String leftTableName = "left_table_bucket_key";
+    void testDeltaJoinFailsWhenSourceHasDelete() {
+        String leftTableName = "left_table_delete_force";
         createSource(
                 leftTableName,
                 "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
                 "c1, d1",
                 "c1",
-                "insert_only",
+                null,
                 null);
 
-        List<InternalRow> rows1 =
-                Arrays.asList(
-                        row(1, "v1", 100L, 1, 10000L),
-                        row(2, "v2", 100L, 2, 20000L),
-                        row(3, "v3", 200L, 1, 30000L));
-        TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
-        writeRows(conn, leftTablePath, rows1, false);
-
-        String rightTableName = "right_table_bucket_key";
+        String rightTableName = "right_table_delete_force";
         createSource(
                 rightTableName,
                 "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
                 "c2, d2",
                 "c2",
-                "insert_only",
+                null,
                 null);
 
-        List<InternalRow> rows2 =
-                Arrays.asList(row(10, "r1", 100L, 5, 50000L), row(20, "r2", 
200L, 6, 60000L));
-        TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
-        writeRows(conn, rightTablePath, rows2, false);
-
-        String sinkTableName = "sink_table_bucket_key";
-        createSink(sinkTableName, "a1 int, b1 varchar, c1 bigint, a2 int, b2 
varchar", "a1, a2");
+        String sinkTableName = "sink_table_delete_force";
+        createSink(
+                sinkTableName,
+                "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2 
varchar, c2 bigint, d2 int, e2 bigint",
+                "c1, d1, c2, d2");
 
         String sql =
                 String.format(
-                        "INSERT INTO %s SELECT a1, b1, c1, a2, b2 FROM %s 
INNER JOIN %s ON c1 = c2",
+                        "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = 
c2 AND d1 = d2",
                         sinkTableName, leftTableName, rightTableName);
 
-        assertThat(tEnv.explainSql(sql))
-                .contains("DeltaJoin(joinType=[InnerJoin], where=[=(c1, c2)]");
-
-        tEnv.executeSql(sql);
-
-        CloseableIterator<Row> collected =
-                tEnv.executeSql(String.format("select * from %s", 
sinkTableName)).collect();
-        List<String> expected =
-                Arrays.asList(
-                        "+I[1, v1, 100, 10, r1]",
-                        "-U[1, v1, 100, 10, r1]",
-                        "+U[1, v1, 100, 10, r1]",
-                        "+I[2, v2, 100, 10, r1]",
-                        "-U[2, v2, 100, 10, r1]",
-                        "+U[2, v2, 100, 10, r1]",
-                        "+I[3, v3, 200, 20, r2]",
-                        "-U[3, v3, 200, 20, r2]",
-                        "+U[3, v3, 200, 20, r2]");
-        assertResultsIgnoreOrder(collected, expected, true);
+        assertThatThrownBy(() -> tEnv.explainSql(sql))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining("doesn't support to do delta join 
optimization");
     }
 
     @Test
-    void testDeltaJoinFailsWhenSourceHasDelete() throws Exception {
-        String leftTableName = "left_table_delete_force";
+    void testDeltaJoinWithJoinKeyExceedsPrimaryKey() {
+        String leftTableName = "left_table_exceed_pk";
         createSource(
                 leftTableName,
                 "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
                 "c1, d1",
                 "c1",
                 null,
-                null);
+                ImmutableMap.of("table.delete.behavior", "IGNORE"));
 
-        String rightTableName = "right_table_delete_force";
+        String rightTableName = "right_table_exceed_pk";
         createSource(
                 rightTableName,
                 "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
                 "c2, d2",
                 "c2",
                 null,
-                null);
+                ImmutableMap.of("table.delete.behavior", "IGNORE"));
 
-        String sinkTableName = "sink_table_delete_force";
+        String sinkTableName = "sink_table_exceed_pk";
         createSink(
-                sinkTableName,
-                "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2 
varchar, c2 bigint, d2 int, e2 bigint",
-                "c1, d1, c2, d2");
+                sinkTableName, "a1 int, c1 bigint, d1 int, e1 bigint, a2 int, 
e2 bigint", "c1, d1");
 
         String sql =
                 String.format(
-                        "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = 
c2 AND d1 = d2",
+                        "INSERT INTO %s SELECT a1, c1, d1, e1, a2, e2 FROM %s 
INNER JOIN %s ON c1 = c2 AND d1 = d2 AND e1 = e2",
                         sinkTableName, leftTableName, rightTableName);
 
         assertThatThrownBy(() -> tEnv.explainSql(sql))
@@ -629,15 +808,15 @@ public class Flink22DeltaJoinITCase extends 
FlinkTableSourceITCase {
     }
 
     @Test
-    void testDeltaJoinWhenJoinKeyExceedsPrimaryKey() throws Exception {
+    void testDeltaJoinWithAppendOnlySourceAndJoinKeyExceedsPrimaryKey() throws 
Exception {
         String leftTableName = "left_table_exceed_pk";
         createSource(
                 leftTableName,
                 "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
                 "c1, d1",
                 "c1",
-                "insert_only",
-                null);
+                null,
+                ImmutableMap.of("table.merge-engine", "first_row"));
 
         List<InternalRow> rows1 =
                 Arrays.asList(row(1, "v1", 100L, 1, 10000L), row(2, "v2", 
200L, 2, 20000L));
@@ -650,8 +829,8 @@ public class Flink22DeltaJoinITCase extends 
FlinkTableSourceITCase {
                 "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
                 "c2, d2",
                 "c2",
-                "insert_only",
-                null);
+                null,
+                ImmutableMap.of("table.merge-engine", "first_row"));
 
         List<InternalRow> rows2 =
                 Arrays.asList(row(1, "v1", 100L, 1, 10000L), row(2, "v3", 
200L, 2, 99999L));
@@ -689,15 +868,15 @@ public class Flink22DeltaJoinITCase extends 
FlinkTableSourceITCase {
     }
 
     @Test
-    void testDeltaJoinFailsWhenJoinKeyNotContainIndex() throws Exception {
+    void testDeltaJoinFailsWhenJoinKeyNotContainIndex() {
         String leftTableName = "left_table_no_idx_force";
         createSource(
                 leftTableName,
                 "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
                 "c1, d1",
                 "c1",
-                "insert_only",
-                null);
+                null,
+                ImmutableMap.of("table.delete.behavior", "IGNORE"));
 
         String rightTableName = "right_table_no_idx_force";
         createSource(
@@ -705,8 +884,8 @@ public class Flink22DeltaJoinITCase extends 
FlinkTableSourceITCase {
                 "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
                 "c2, d2",
                 "c2",
-                "insert_only",
-                null);
+                null,
+                ImmutableMap.of("table.delete.behavior", "IGNORE"));
 
         String sinkTableName = "sink_table_no_idx_force";
         createSink(
@@ -725,14 +904,24 @@ public class Flink22DeltaJoinITCase extends 
FlinkTableSourceITCase {
     }
 
     @Test
-    void testDeltaJoinFailsWithOuterJoin() throws Exception {
+    void testDeltaJoinFailsWithOuterJoin() {
         String leftTableName = "left_table_outer_fail";
         createSource(
-                leftTableName, "a1 int, c1 bigint, d1 int", "c1, d1", "c1", 
"insert_only", null);
+                leftTableName,
+                "a1 int, c1 bigint, d1 int",
+                "c1, d1",
+                "c1",
+                null,
+                ImmutableMap.of("table.delete.behavior", "IGNORE"));
 
         String rightTableName = "right_table_outer_fail";
         createSource(
-                rightTableName, "a2 int, c2 bigint, d2 int", "c2, d2", "c2", 
"insert_only", null);
+                rightTableName,
+                "a2 int, c2 bigint, d2 int",
+                "c2, d2",
+                "c2",
+                null,
+                ImmutableMap.of("table.delete.behavior", "IGNORE"));
 
         String sinkTableName = "sink_table_outer_fail";
         createSink(sinkTableName, "a1 int, c1 bigint, a2 int", "c1");
@@ -769,15 +958,33 @@ public class Flink22DeltaJoinITCase extends 
FlinkTableSourceITCase {
     }
 
     @Test
-    void testDeltaJoinFailsWithCascadeJoin() throws Exception {
+    void testDeltaJoinFailsWithCascadeJoin() {
         String table1 = "cascade_table1";
-        createSource(table1, "a1 int, c1 bigint, d1 int", "c1, d1", "c1", 
"insert_only", null);
+        createSource(
+                table1,
+                "a1 int, c1 bigint, d1 int",
+                "c1, d1",
+                "c1",
+                null,
+                ImmutableMap.of("table.delete.behavior", "IGNORE"));
 
         String table2 = "cascade_table2";
-        createSource(table2, "a2 int, c2 bigint, d2 int", "c2, d2", "c2", 
"insert_only", null);
+        createSource(
+                table2,
+                "a2 int, c2 bigint, d2 int",
+                "c2, d2",
+                "c2",
+                null,
+                ImmutableMap.of("table.delete.behavior", "IGNORE"));
 
         String table3 = "cascade_table3";
-        createSource(table3, "a3 int, c3 bigint, d3 int", "c3, d3", "c3", 
"insert_only", null);
+        createSource(
+                table3,
+                "a3 int, c3 bigint, d3 int",
+                "c3, d3",
+                "c3",
+                null,
+                ImmutableMap.of("table.delete.behavior", "IGNORE"));
 
         String sinkTableName = "cascade_sink";
         createSink(
@@ -799,7 +1006,7 @@ public class Flink22DeltaJoinITCase extends 
FlinkTableSourceITCase {
     }
 
     @Test
-    void testDeltaJoinFailsWithSinkMaterializer() throws Exception {
+    void testDeltaJoinFailsWithSinkMaterializer() {
         // With CDC sources, when sink PK doesn't match upstream update key,
         // Flink would insert SinkUpsertMaterializer which prevents delta join
         String leftTableName = "left_table_materializer";
@@ -808,8 +1015,8 @@ public class Flink22DeltaJoinITCase extends 
FlinkTableSourceITCase {
                 "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint",
                 "c1, d1",
                 "c1",
-                "cdc",
-                null);
+                null,
+                ImmutableMap.of("table.delete.behavior", "IGNORE"));
 
         String rightTableName = "right_table_materializer";
         createSource(
@@ -817,10 +1024,12 @@ public class Flink22DeltaJoinITCase extends 
FlinkTableSourceITCase {
                 "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint",
                 "c2, d2",
                 "c2",
-                "cdc",
-                null);
+                null,
+                ImmutableMap.of("table.delete.behavior", "IGNORE"));
 
         // Sink PK (a1, a2) doesn't match upstream update key (c1, d1, c2, d2)
+        // TODO: this depends on Fluss supports MVCC/point-in-time lookup to 
support change upsert
+        //  keys
         String sinkTableName = "sink_table_materializer";
         createSink(
                 sinkTableName,
@@ -838,21 +1047,33 @@ public class Flink22DeltaJoinITCase extends 
FlinkTableSourceITCase {
     }
 
     @Test
-    void testDeltaJoinFailsWithNonDeterministicFunctions() throws Exception {
+    void testDeltaJoinFailsWithNonDeterministicFunctions() {
         String leftTableName = "left_table_nondeterministic";
         createSource(
-                leftTableName, "a1 int, c1 bigint, d1 int", "c1, d1", "c1", 
"insert_only", null);
+                leftTableName,
+                "a1 int, c1 bigint, d1 int",
+                "c1, d1",
+                "c1",
+                null,
+                ImmutableMap.of("table.delete.behavior", "IGNORE"));
 
         String rightTableName = "right_table_nondeterministic";
         createSource(
-                rightTableName, "a2 int, c2 bigint, d2 int", "c2, d2", "c2", 
"insert_only", null);
+                rightTableName,
+                "a2 int, c2 bigint, d2 int",
+                "c2, d2",
+                "c2",
+                null,
+                ImmutableMap.of("table.delete.behavior", "IGNORE"));
 
         String sinkTableName = "sink_table_nondeterministic";
-        createSink(sinkTableName, "a1 int, c1 bigint, rand_val double", "c1");
+        // TODO this should be supported in Flink in future for 
non-deterministic functions before
+        //  sinking
+        createSink(sinkTableName, "c1 bigint, d1 bigint, rand_val double", 
"c1");
 
         String sql =
                 String.format(
-                        "INSERT INTO %s SELECT a1, c1, RAND() FROM %s INNER 
JOIN %s ON c1 = c2 AND d1 = d2",
+                        "INSERT INTO %s SELECT c1, d1, RAND() FROM %s INNER 
JOIN %s ON c1 = c2 AND d1 = d2",
                         sinkTableName, leftTableName, rightTableName);
 
         assertThatThrownBy(() -> tEnv.explainSql(sql))
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
index abb74ebf6..0421cc561 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
@@ -49,7 +49,7 @@ class FlussTableLakeSnapshotCommitterTest extends 
FlinkTestBase {
     private FlussTableLakeSnapshotCommitter flussTableLakeSnapshotCommitter;
 
     @BeforeEach
-    void beforeEach() {
+    public void beforeEach() {
         flussTableLakeSnapshotCommitter =
                 new 
FlussTableLakeSnapshotCommitter(FLUSS_CLUSTER_EXTENSION.getClientConfig());
         flussTableLakeSnapshotCommitter.open();
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
index bdecfa8fa..70d86bb2c 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
@@ -69,7 +69,7 @@ class TieringCommitOperatorTest extends FlinkTestBase {
     private StreamOperatorParameters<CommittableMessage<TestingCommittable>> 
parameters;
 
     @BeforeEach
-    void beforeEach() throws Exception {
+    public void beforeEach() throws Exception {
         mockOperatorEventGateway = new MockOperatorEventGateway();
         MockOperatorEventDispatcher mockOperatorEventDispatcher =
                 new MockOperatorEventDispatcher(mockOperatorEventGateway);
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
index bfa814ce2..7c80bbc0c 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
@@ -155,7 +155,7 @@ public class FlinkTestBase extends AbstractTestBase {
     }
 
     @BeforeEach
-    void beforeEach() throws Exception {
+    public void beforeEach() throws Exception {
         admin.createDatabase(DEFAULT_DB, DatabaseDescriptor.EMPTY, true).get();
     }
 

Reply via email to