This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch revert-2546-fluss-delta-joins-tests-docs in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 3effa3fcf38b9a07f79812905cfbf7dff29293ce Author: Jark Wu <[email protected]> AuthorDate: Thu Feb 5 10:45:33 2026 +0800 Revert "[flink] Delta Join additional IT tests and docs improvement (#2546)" This reverts commit f2c44179d1fbd323a30eef3764bebab304d508f8. --- .../fluss/flink/source/Flink22DeltaJoinITCase.java | 1083 -------------------- .../flink/source/Flink22TableSourceITCase.java | 290 +++++- .../FlussTableLakeSnapshotCommitterTest.java | 2 +- .../committer/TieringCommitOperatorTest.java | 2 +- .../apache/fluss/flink/utils/FlinkTestBase.java | 2 +- website/docs/engine-flink/delta-joins.md | 19 +- 6 files changed, 301 insertions(+), 1097 deletions(-) diff --git a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22DeltaJoinITCase.java b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22DeltaJoinITCase.java deleted file mode 100644 index 0fe8fb73c..000000000 --- a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22DeltaJoinITCase.java +++ /dev/null @@ -1,1083 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.flink.source; - -import org.apache.fluss.config.ConfigOptions; -import org.apache.fluss.flink.utils.FlinkTestBase; -import org.apache.fluss.metadata.TablePath; -import org.apache.fluss.row.InternalRow; -import org.apache.fluss.shaded.guava32.com.google.common.collect.ImmutableMap; - -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.EnvironmentSettings; -import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; -import org.apache.flink.table.api.config.ExecutionConfigOptions; -import org.apache.flink.table.api.config.OptimizerConfigOptions; -import org.apache.flink.types.Row; -import org.apache.flink.util.CloseableIterator; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import javax.annotation.Nullable; - -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS; -import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder; -import static org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE; -import static org.apache.fluss.testutils.DataTestUtils.row; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; - -/** IT case for Delta Join optimization in Flink 2.2. */ -public class Flink22DeltaJoinITCase extends FlinkTestBase { - - private static final String CATALOG_NAME = "test_catalog"; - - private StreamTableEnvironment tEnv; - - @BeforeEach - public void beforeEach() { - bootstrapServers = conn.getConfiguration().get(ConfigOptions.BOOTSTRAP_SERVERS).get(0); - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inStreamingMode()); - - tEnv.executeSql( - String.format( - "create catalog %s with ('type' = 'fluss', '%s' = '%s')", - CATALOG_NAME, BOOTSTRAP_SERVERS.key(), bootstrapServers)); - tEnv.useCatalog(CATALOG_NAME); - tEnv.executeSql(String.format("create database if not exists `%s`", DEFAULT_DB)); - tEnv.useDatabase(DEFAULT_DB); - - // start two jobs for this test: one for DML involving the delta join, and the other for DQL - // to query the results of the sink table - tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); - // Set FORCE strategy for delta join - tEnv.getConfig() - .set( - OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY, - OptimizerConfigOptions.DeltaJoinStrategy.FORCE); - } - - @AfterEach - void after() { - tEnv.useDatabase(BUILTIN_DATABASE); - tEnv.executeSql(String.format("drop database `%s` cascade", DEFAULT_DB)); - } - - /** - * Creates a source table with specified schema and options. - * - * @param tableName the name of the table - * @param columns column definitions (e.g., "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint") - * @param primaryKey primary key columns (e.g., "c1, d1") - * @param bucketKey bucket key column (e.g., "c1") - * @param extraOptions additional WITH options (e.g., "lookup.cache" = "partial"), or null - */ - private void createSource( - String tableName, - String columns, - String primaryKey, - String bucketKey, - @Nullable String partitionKey, - @Nullable Map<String, String> extraOptions) { - Map<String, String> withOptions = new HashMap<>(); - if (extraOptions != null) { - withOptions.putAll(extraOptions); - } - withOptions.put("connector", "fluss"); - withOptions.put("bucket.key", bucketKey); - StringBuilder ddlBuilder = new StringBuilder(); - ddlBuilder.append( - String.format( - "create table %s ( %s, primary key (%s) NOT ENFORCED )", - tableName, columns, primaryKey)); - if (partitionKey != null) { - ddlBuilder.append(String.format(" partitioned by (%s)", partitionKey)); - withOptions.put("table.auto-partition.enabled", "true"); - withOptions.put("table.auto-partition.time-unit", "year"); - } - ddlBuilder.append( - String.format( - " with (%s)", - withOptions.entrySet().stream() - .map(e -> String.format("'%s' = '%s'", e.getKey(), e.getValue())) - .collect(Collectors.joining(", ")))); - - tEnv.executeSql(ddlBuilder.toString()); - } - - /** - * Creates a sink table with specified columns. - * - * @param tableName the name of the table - * @param columns the column definitions (e.g., "a1 int, c1 bigint, a2 int") - * @param primaryKey the primary key columns (e.g., "c1, d1, c2, d2") - */ - private void createSink(String tableName, String columns, String primaryKey) { - tEnv.executeSql( - String.format( - "create table %s (" - + " %s, " - + " primary key (%s) NOT ENFORCED" - + ") with (" - + " 'connector' = 'fluss'" - + ")", - tableName, columns, primaryKey)); - } - - @Test - void testDeltaJoin() throws Exception { - // disable cache to get stable results with updating - tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_DELTA_JOIN_CACHE_ENABLED, false); - String leftTableName = "left_table"; - String rightTableName = "right_table"; - String sinkTableName = "sink_table"; - - createSource( - leftTableName, - "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", - "c1, d1", - "c1", - null, - ImmutableMap.of("table.delete.behavior", "IGNORE")); - createSource( - rightTableName, - "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", - "c2, d2", - "c2", - null, - ImmutableMap.of("table.delete.behavior", "IGNORE")); - createSink( - sinkTableName, - "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", - "c1, d1, c2, d2"); - - List<InternalRow> rows1 = - Arrays.asList( - row(1, "v1", 100L, 1, 10000L), - row(2, "v2", 100L, 2, 20000L), - row(3, "v3", 300L, 3, 30000L), - row(4, "v4", 400L, 4, 40000L), - // update - row(5, "v5", 100L, 1, 50000L)); - TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); - writeRows(conn, leftTablePath, rows1, false); - // wait for the first snapshot to finish to get the stable result - FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(leftTablePath); - - List<InternalRow> rows2 = - Arrays.asList( - row(1, "v1", 100L, 1, 10000L), - row(2, "v2", 200L, 2, 20000L), - row(3, "v3", 330L, 4, 30000L), - row(4, "v4", 500L, 4, 50000L), - // update - row(6, "v6", 100L, 1, 60000L)); - TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); - writeRows(conn, rightTablePath, rows2, false); - // wait for the first snapshot to finish to get the stable result - FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(rightTablePath); - - String sql = - String.format( - "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = c2", - sinkTableName, leftTableName, rightTableName); - - assertThat(tEnv.explainSql(sql)) - .contains("DeltaJoin(joinType=[InnerJoin], where=[(c1 = c2)]"); - - tEnv.executeSql(sql); - - CloseableIterator<Row> collected = - tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); - List<String> expected = - Arrays.asList( - "+I[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]", - "-U[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]", - "+U[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]", - "+I[2, v2, 100, 2, 20000, 6, v6, 100, 1, 60000]", - "-U[2, v2, 100, 2, 20000, 6, v6, 100, 1, 60000]", - "+U[2, v2, 100, 2, 20000, 6, v6, 100, 1, 60000]"); - assertResultsIgnoreOrder(collected, expected, true); - } - - @Test - void testDeltaJoinOnPrimaryKey() throws Exception { - // disable cache to get stable results with updating - tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_DELTA_JOIN_CACHE_ENABLED, false); - String leftTableName = "left_table"; - String rightTableName = "right_table"; - String sinkTableName = "sink_table"; - - createSource( - leftTableName, - "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", - "c1, d1", - "c1", - null, - ImmutableMap.of("table.delete.behavior", "IGNORE")); - createSource( - rightTableName, - "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", - "c2, d2", - "c2", - null, - ImmutableMap.of("table.delete.behavior", "IGNORE")); - createSink( - sinkTableName, - "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", - "c1, d1, c2, d2"); - - List<InternalRow> rows1 = - Arrays.asList( - row(1, "v1", 100L, 1, 10000L), - row(2, "v2", 200L, 2, 20000L), - row(3, "v3", 300L, 3, 30000L), - row(4, "v4", 400L, 4, 40000L), - // update - row(5, "v5", 100L, 1, 50000L)); - TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); - writeRows(conn, leftTablePath, rows1, false); - // wait for the first snapshot to finish to get the stable result - FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(leftTablePath); - - List<InternalRow> rows2 = - Arrays.asList( - row(1, "v1", 100L, 1, 10000L), - row(2, "v2", 200L, 2, 20000L), - row(3, "v3", 300L, 4, 30000L), - row(4, "v4", 500L, 4, 50000L), - // update - row(6, "v6", 100L, 1, 60000L)); - TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); - writeRows(conn, rightTablePath, rows2, false); - // wait for the first snapshot to finish to get the stable result - FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(rightTablePath); - - String sql = - String.format( - "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2", - sinkTableName, leftTableName, rightTableName); - - assertThat(tEnv.explainSql(sql)) - .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND (d1 = d2))]"); - - tEnv.executeSql(sql); - - CloseableIterator<Row> collected = - tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); - List<String> expected = - Arrays.asList( - "+I[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]", - "-U[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]", - "+U[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]", - "+I[2, v2, 200, 2, 20000, 2, v2, 200, 2, 20000]", - "-U[2, v2, 200, 2, 20000, 2, v2, 200, 2, 20000]", - "+U[2, v2, 200, 2, 20000, 2, v2, 200, 2, 20000]"); - assertResultsIgnoreOrder(collected, expected, true); - } - - @Test - void testDeltaJoinWithCalc() throws Exception { - String leftTableName = "left_table_proj"; - createSource( - leftTableName, - "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", - "c1, d1", - "c1", - null, - ImmutableMap.of("table.delete.behavior", "IGNORE")); - - List<InternalRow> rows1 = - Arrays.asList( - row(1, "v1", 100L, 1, 10000L), - row(2, "v2", 200L, 2, 20000L), - row(3, "v1", 300L, 3, 30000L)); - TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); - writeRows(conn, leftTablePath, rows1, false); - - String rightTableName = "right_table_proj"; - createSource( - rightTableName, - "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", - "c2", - "c2", - null, - ImmutableMap.of("table.delete.behavior", "IGNORE")); - - List<InternalRow> rows2 = - Arrays.asList( - row(1, "v1", 100L, 1, 10000L), - row(2, "v2", 200L, 2, 20000L), - row(3, "v3", 300L, 4, 30000L)); - TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); - writeRows(conn, rightTablePath, rows2, false); - - String sinkTableName = "sink_table_proj"; - createSink(sinkTableName, "a1 int, c1 bigint, d1 int, a2 int", "c1, d1"); - - String sql = - String.format( - "INSERT INTO %s SELECT a1, c1, d1, a2 FROM (" - + " SELECT * FROM %s WHERE d1 > 1" - + ") INNER JOIN (" - + " SELECT * FROM %s WHERE c2 < 300" - + ") ON c1 = c2", - sinkTableName, leftTableName, rightTableName); - - assertThat(tEnv.explainSql(sql)) - .contains("DeltaJoin(joinType=[InnerJoin], where=[(c1 = c2)]"); - - tEnv.executeSql(sql); - - CloseableIterator<Row> collected = - tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); - List<String> expected = - Arrays.asList("+I[2, 200, 2, 2]", "-U[2, 200, 2, 2]", "+U[2, 200, 2, 2]"); - assertResultsIgnoreOrder(collected, expected, true); - } - - @Test - void testDeltaJoinWithAppendOnlySourceAndCalc() throws Exception { - String leftTableName = "left_table_proj"; - createSource( - leftTableName, - "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", - "c1, d1", - "c1", - null, - ImmutableMap.of("table.merge-engine", "first_row")); - - List<InternalRow> rows1 = - Arrays.asList( - row(1, "v1", 100L, 1, 10000L), - row(2, "v2", 200L, 2, 20000L), - row(3, "v1", 300L, 3, 30000L)); - TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); - writeRows(conn, leftTablePath, rows1, false); - - String rightTableName = "right_table_proj"; - createSource( - rightTableName, - "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", - "c2, d2", - "c2", - null, - ImmutableMap.of("table.merge-engine", "first_row")); - - List<InternalRow> rows2 = - Arrays.asList( - row(1, "v1", 100L, 1, 10000L), - row(2, "v3", 200L, 2, 20000L), - row(3, "v4", 400L, 4, 30000L)); - TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); - writeRows(conn, rightTablePath, rows2, false); - - String sinkTableName = "sink_table_proj"; - createSink(sinkTableName, "a1 int, c1 bigint, a2 int", "c1"); - - String sql = - String.format( - "INSERT INTO %s SELECT a1, c1, a2 FROM %s INNER JOIN %s ON c1 = c2 WHERE a1 > 1", - sinkTableName, leftTableName, rightTableName); - - assertThat(tEnv.explainSql(sql)) - .contains("DeltaJoin(joinType=[InnerJoin], where=[(c1 = c2)]"); - - tEnv.executeSql(sql); - - CloseableIterator<Row> collected = - tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); - List<String> expected = Arrays.asList("+I[2, 200, 2]", "-U[2, 200, 2]", "+U[2, 200, 2]"); - assertResultsIgnoreOrder(collected, expected, true); - } - - @Test - void testDeltaJoinFailsWhenFilterOnNonUpsertKeys() { - String leftTableName = "left_table_force_fail"; - createSource( - leftTableName, - "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", - "c1, d1", - "c1", - null, - ImmutableMap.of("table.delete.behavior", "IGNORE")); - - String rightTableName = "right_table_force_fail"; - createSource( - rightTableName, - "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", - "c2, d2", - "c2", - null, - ImmutableMap.of("table.delete.behavior", "IGNORE")); - - String sinkTableName = "sink_table_force_fail"; - createSink( - sinkTableName, - "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", - "c1, d1, c2, d2"); - - // Filter on e1 > e2, where e1 and e2 are NOT part of the upsert key - // TODO we can add a UpsertFilterOperator that can convert the un-match-filter UPSERT record - // into DELETE record. - String sql = - String.format( - "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2 WHERE e1 > e2", - sinkTableName, leftTableName, rightTableName); - - assertThatThrownBy(() -> tEnv.explainSql(sql)) - .isInstanceOf(ValidationException.class) - .hasMessageContaining("doesn't support to do delta join optimization"); - - // Non-equiv-cond on e1 > e2, where e1 and e2 are NOT part of the upsert key - String sql2 = - String.format( - "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2 AND e1 > e2", - sinkTableName, leftTableName, rightTableName); - - assertThatThrownBy(() -> tEnv.explainSql(sql2)) - .isInstanceOf(ValidationException.class) - .hasMessageContaining("doesn't support to do delta join optimization"); - } - - @Test - void testDeltaJoinWithNonEquiConditionOnUpsertKeys() throws Exception { - String leftTableName = "left_table_nonequi_upsert"; - createSource( - leftTableName, - "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", - "c1, d1, e1", - "c1, d1", - null, - ImmutableMap.of("table.delete.behavior", "IGNORE")); - - List<InternalRow> rows1 = - Arrays.asList( - row(1, "v1", 100L, 1, 10000L), - row(2, "v2", 200L, 2, 20001L), - row(3, "v3", 300L, 3, 30000L), - // Add row with same PK (100, 1) to generate UPDATE in CDC mode - // This row is filtered out by (e2 / 100) <> c2, so doesn't affect join - // result - row(4, "v1_updated", 100L, 1, 10000L)); - TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); - writeRows(conn, leftTablePath, rows1, false); - - String rightTableName = "right_table_nonequi_upsert"; - createSource( - rightTableName, - "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", - "c2, d2", - "c2", - null, - ImmutableMap.of("table.delete.behavior", "IGNORE")); - - List<InternalRow> rows2 = - Arrays.asList( - row(1, "v1", 100L, 1, 10000L), - row(2, "v4", 200L, 2, 20000L), - row(3, "v5", 300L, 4, 40000L), - // Add row with same PK (100, 1) to generate UPDATE in CDC mode - // This row is filtered out by (e2 / 100) <> c2, so doesn't affect join - // result - row(5, "v1_updated", 100L, 1, 10000L)); - TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); - writeRows(conn, rightTablePath, rows2, false); - - String sinkTableName = "sink_table_nonequi_upsert"; - createSink(sinkTableName, "a1 int, c1 bigint, d1 int, e1 bigint, a2 int", "c1, d1, e1"); - - String sql = - String.format( - "INSERT INTO %s SELECT a1, c1, d1, e1, a2 FROM %s INNER JOIN %s " - + "ON c1 = c2 AND d1 = d2 AND e1 <> (c2 * 100)", - sinkTableName, leftTableName, rightTableName); - - assertThat(tEnv.explainSql(sql)) - .contains( - "DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND (d1 = d2) AND (e1 <> (c2 * 100)))]"); - - tEnv.executeSql(sql); - - CloseableIterator<Row> collected = - tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); - List<String> expected = - Arrays.asList( - "+I[2, 200, 2, 20001, 2]", - "-U[2, 200, 2, 20001, 2]", - "+U[2, 200, 2, 20001, 2]"); - assertResultsIgnoreOrder(collected, expected, true); - } - - @Test - void testDeltaJoinWithAppendOnlySourceAndNonEquiCondition() throws Exception { - String leftTableName = "left_table_nonequi_insert"; - createSource( - leftTableName, - "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", - "c1, d1", - "c1", - null, - ImmutableMap.of("table.merge-engine", "first_row")); - - List<InternalRow> rows1 = - Arrays.asList( - row(1, "v1", 100L, 1, 10000L), - row(2, "v2", 200L, 2, 20000L), - row(3, "v3", 300L, 3, 5000L)); - TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); - writeRows(conn, leftTablePath, rows1, false); - - String rightTableName = "right_table_nonequi_insert"; - createSource( - rightTableName, - "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", - "c2, d2", - "c2", - null, - ImmutableMap.of("table.merge-engine", "first_row")); - - List<InternalRow> rows2 = - Arrays.asList( - row(1, "v1", 100L, 1, 8000L), - row(2, "v4", 200L, 2, 15000L), - row(3, "v5", 300L, 3, 3000L)); - TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); - writeRows(conn, rightTablePath, rows2, false); - - String sinkTableName = "sink_table_nonequi_insert"; - createSink( - sinkTableName, "a1 int, c1 bigint, d1 int, e1 bigint, a2 int, e2 bigint", "c1, d1"); - - // INSERT_ONLY sources with non-equi condition on non-upsert key fields (e1, e2) - // This should succeed because INSERT_ONLY mode allows any non-equi conditions - String sql = - String.format( - "INSERT INTO %s SELECT a1, c1, d1, e1, a2, e2 FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2 AND e1 > e2", - sinkTableName, leftTableName, rightTableName); - - assertThat(tEnv.explainSql(sql)) - .contains( - "DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND (d1 = d2) AND (e1 > e2))]"); - - tEnv.executeSql(sql); - - CloseableIterator<Row> collected = - tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); - // Rows where e1 > e2: - // Row 1: e1=10000 > e2=8000 ✓ - // Row 2: e1=20000 > e2=15000 ✓ - // Row 3: e1=5000 > e2=3000 ✓ - List<String> expected = - Arrays.asList( - "+I[1, 100, 1, 10000, 1, 8000]", - "-U[1, 100, 1, 10000, 1, 8000]", - "+U[1, 100, 1, 10000, 1, 8000]", - "+I[2, 200, 2, 20000, 2, 15000]", - "-U[2, 200, 2, 20000, 2, 15000]", - "+U[2, 200, 2, 20000, 2, 15000]", - "+I[3, 300, 3, 5000, 3, 3000]", - "-U[3, 300, 3, 5000, 3, 3000]", - "+U[3, 300, 3, 5000, 3, 3000]"); - assertResultsIgnoreOrder(collected, expected, true); - } - - @Test - void testDeltaJoinWithLookupCache() throws Exception { - String leftTableName = "left_table_cache"; - createSource( - leftTableName, - "a1 int, c1 bigint, d1 int", - "c1, d1", - "c1", - null, - ImmutableMap.of("table.delete.behavior", "IGNORE")); - List<InternalRow> rows1 = Collections.singletonList(row(1, 100L, 1)); - writeRows(conn, TablePath.of(DEFAULT_DB, leftTableName), rows1, false); - - String rightTableName = "right_table_cache"; - createSource( - rightTableName, - "a2 int, c2 bigint, d2 int", - "c2", - "c2", - null, - ImmutableMap.of( - "table.delete.behavior", - "IGNORE", - "lookup.cache", - "partial", - "lookup.partial-cache.max-rows", - "100")); - List<InternalRow> rows2 = Collections.singletonList(row(1, 100L, 1)); - writeRows(conn, TablePath.of(DEFAULT_DB, rightTableName), rows2, false); - - String sinkTableName = "sink_table_cache"; - createSink(sinkTableName, "a1 int, c1 bigint, d1 int, a2 int", "c1, d1"); - - String sql = - String.format( - "INSERT INTO %s SELECT a1, c1, d1, a2 FROM %s AS T1 INNER JOIN %s AS T2 ON T1.c1 = T2.c2", - sinkTableName, leftTableName, rightTableName); - - assertThat(tEnv.explainSql(sql)) - .contains("DeltaJoin(joinType=[InnerJoin], where=[(c1 = c2)]"); - - tEnv.executeSql(sql); - - CloseableIterator<Row> collected = - tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); - List<String> expected = - Arrays.asList("+I[1, 100, 1, 1]", "-U[1, 100, 1, 1]", "+U[1, 100, 1, 1]"); - assertResultsIgnoreOrder(collected, expected, true); - } - - @Test - void testDeltaJoinWithPartitionedTable() throws Exception { - String leftTableName = "left_table_partitioned"; - createSource( - leftTableName, - "a1 int, b1 varchar, c1 bigint, d1 int, pt1 varchar", - "c1, d1, pt1", - "c1", - "pt1", - ImmutableMap.of("table.delete.behavior", "IGNORE")); - TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); - Iterator<String> leftPartitionIterator = - waitUntilPartitions(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), leftTablePath) - .values() - .iterator(); - // pick two partition to insert data - String leftPartition1 = leftPartitionIterator.next(); - String leftPartition2 = leftPartitionIterator.next(); - List<InternalRow> rows1 = - Arrays.asList( - row(1, "v1", 100L, 1000, leftPartition1), - row(2, "v2", 200L, 2000, leftPartition1), - row(3, "v3", 100L, 3000, leftPartition2), - row(4, "v4", 400L, 4000, leftPartition2)); - writeRows(conn, leftTablePath, rows1, false); - - String rightTableName = "right_table_partitioned"; - createSource( - rightTableName, - "a2 int, b2 varchar, c2 bigint, d2 int, pt2 varchar", - "c2, pt2", - "c2", - "pt2", - ImmutableMap.of("table.delete.behavior", "IGNORE")); - TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); - Iterator<String> rightPartitionIterator = - waitUntilPartitions(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), rightTablePath) - .values() - .iterator(); - // pick two partition to insert data - String rightPartition1 = rightPartitionIterator.next(); - String rightPartition2 = rightPartitionIterator.next(); - List<InternalRow> rows2 = - Arrays.asList( - row(1, "v1", 100L, 1000, rightPartition1), - row(4, "v4", 400L, 3000, rightPartition2)); - writeRows(conn, rightTablePath, rows2, false); - - String sinkTableName = "sink_table"; - createSink( - sinkTableName, - "a1 int, b1 varchar, c1 bigint, d1 int, pt1 varchar, b2 varchar", - "c1, d1, pt1"); - - String sql = - String.format( - "INSERT INTO %s SELECT a1, b1, c1, d1, pt1, b2 FROM %s AS T1 INNER JOIN %s AS T2 " - + "ON T1.c1 = T2.c2 AND T1.pt1 = T2.pt2", - sinkTableName, leftTableName, rightTableName); - - assertThat(tEnv.explainSql(sql)) - .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND (pt1 = pt2))]"); - - tEnv.executeSql(sql); - - CloseableIterator<Row> collected = - tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); - List<String> expected = - Arrays.asList( - String.format("+I[1, v1, 100, 1000, %s, v1]", leftPartition1), - String.format("-U[1, v1, 100, 1000, %s, v1]", leftPartition1), - String.format("+U[1, v1, 100, 1000, %s, v1]", leftPartition1), - String.format("+I[4, v4, 400, 4000, %s, v4]", leftPartition2), - String.format("-U[4, v4, 400, 4000, %s, v4]", leftPartition2), - String.format("+U[4, v4, 400, 4000, %s, v4]", leftPartition2)); - assertResultsIgnoreOrder(collected, expected, true); - } - - @Test - void testDeltaJoinFailsWhenSourceHasDelete() { - String leftTableName = "left_table_delete_force"; - createSource( - leftTableName, - "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", - "c1, d1", - "c1", - null, - null); - - String rightTableName = "right_table_delete_force"; - createSource( - rightTableName, - "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", - "c2, d2", - "c2", - null, - null); - - String sinkTableName = "sink_table_delete_force"; - createSink( - sinkTableName, - "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", - "c1, d1, c2, d2"); - - String sql = - String.format( - "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2", - sinkTableName, leftTableName, rightTableName); - - assertThatThrownBy(() -> tEnv.explainSql(sql)) - .isInstanceOf(ValidationException.class) - .hasMessageContaining("doesn't support to do delta join optimization"); - } - - @Test - void testDeltaJoinWithJoinKeyExceedsPrimaryKey() { - String leftTableName = "left_table_exceed_pk"; - createSource( - leftTableName, - "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", - "c1, d1", - "c1", - null, - ImmutableMap.of("table.delete.behavior", "IGNORE")); - - String rightTableName = "right_table_exceed_pk"; - createSource( - rightTableName, - "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", - "c2, d2", - "c2", - null, - ImmutableMap.of("table.delete.behavior", "IGNORE")); - - String sinkTableName = "sink_table_exceed_pk"; - createSink( - sinkTableName, "a1 int, c1 bigint, d1 int, e1 bigint, a2 int, e2 bigint", "c1, d1"); - - String sql = - String.format( - "INSERT INTO %s SELECT a1, c1, d1, e1, a2, e2 FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2 AND e1 = e2", - sinkTableName, leftTableName, rightTableName); - - assertThatThrownBy(() -> tEnv.explainSql(sql)) - .isInstanceOf(ValidationException.class) - .hasMessageContaining("doesn't support to do delta join optimization"); - } - - @Test - void testDeltaJoinWithAppendOnlySourceAndJoinKeyExceedsPrimaryKey() throws Exception { - String leftTableName = "left_table_exceed_pk"; - createSource( - leftTableName, - "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", - "c1, d1", - "c1", - null, - ImmutableMap.of("table.merge-engine", "first_row")); - - List<InternalRow> rows1 = - Arrays.asList(row(1, "v1", 100L, 1, 10000L), row(2, "v2", 200L, 2, 20000L)); - TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); - writeRows(conn, leftTablePath, rows1, false); - - String rightTableName = "right_table_exceed_pk"; - createSource( - rightTableName, - "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", - "c2, d2", - "c2", - null, - ImmutableMap.of("table.merge-engine", "first_row")); - - List<InternalRow> rows2 = - Arrays.asList(row(1, "v1", 100L, 1, 10000L), row(2, "v3", 200L, 2, 99999L)); - TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); - writeRows(conn, rightTablePath, rows2, false); - - String sinkTableName = "sink_table_exceed_pk"; - createSink( - sinkTableName, "a1 int, c1 bigint, d1 int, e1 bigint, a2 int, e2 bigint", "c1, d1"); - - // Join on PK (c1, d1) + additional non-PK field (e1) - // This should succeed because join keys {c1, d1, e1} contain complete PK {c1, d1} - // The e1 = e2 condition is applied as a post-lookup equi-condition filter - String sql = - String.format( - "INSERT INTO %s SELECT a1, c1, d1, e1, a2, e2 FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2 AND e1 = e2", - sinkTableName, leftTableName, rightTableName); - - assertThat(tEnv.explainSql(sql)) - .contains( - "DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND (d1 = d2) AND (e1 = e2))]"); - - tEnv.executeSql(sql); - - CloseableIterator<Row> collected = - tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); - // Only first row should match (e1 = e2 = 10000) - // Second row filtered out (e1 = 20000 AND != e2 = 99999) - List<String> expected = - Arrays.asList( - "+I[1, 100, 1, 10000, 1, 10000]", - "-U[1, 100, 1, 10000, 1, 10000]", - "+U[1, 100, 1, 10000, 1, 10000]"); - assertResultsIgnoreOrder(collected, expected, true); - } - - @Test - void testDeltaJoinFailsWhenJoinKeyNotContainIndex() { - String leftTableName = "left_table_no_idx_force"; - createSource( - leftTableName, - "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", - "c1, d1", - "c1", - null, - ImmutableMap.of("table.delete.behavior", "IGNORE")); - - String rightTableName = "right_table_no_idx_force"; - createSource( - rightTableName, - "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", - "c2, d2", - "c2", - null, - ImmutableMap.of("table.delete.behavior", "IGNORE")); - - String sinkTableName = "sink_table_no_idx_force"; - createSink( - sinkTableName, - "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", - "a1, a2"); - - String sql = - String.format( - "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON a1 = a2", - sinkTableName, leftTableName, rightTableName); - - assertThatThrownBy(() -> tEnv.explainSql(sql)) - .isInstanceOf(ValidationException.class) - .hasMessageContaining("doesn't support to do delta join optimization"); - } - - @Test - void testDeltaJoinFailsWithOuterJoin() { - String leftTableName = "left_table_outer_fail"; - createSource( - leftTableName, - "a1 int, c1 bigint, d1 int", - "c1, d1", - "c1", - null, - ImmutableMap.of("table.delete.behavior", "IGNORE")); - - String rightTableName = "right_table_outer_fail"; - createSource( - rightTableName, - "a2 int, c2 bigint, d2 int", - "c2, d2", - "c2", - null, - ImmutableMap.of("table.delete.behavior", "IGNORE")); - - String sinkTableName = "sink_table_outer_fail"; - createSink(sinkTableName, "a1 int, c1 bigint, a2 int", "c1"); - - // Test LEFT JOIN - String leftJoinSql = - String.format( - "INSERT INTO %s SELECT a1, c1, a2 FROM %s LEFT JOIN %s ON c1 = c2 AND d1 = d2", - sinkTableName, leftTableName, rightTableName); - - assertThatThrownBy(() -> tEnv.explainSql(leftJoinSql)) - .isInstanceOf(ValidationException.class) - .hasMessageContaining("doesn't support to do delta join optimization"); - - // Test RIGHT JOIN - String rightJoinSql = - String.format( - "INSERT INTO %s SELECT a1, c2, a2 FROM %s RIGHT JOIN %s ON c1 = c2 AND d1 = d2", - sinkTableName, leftTableName, rightTableName); - - assertThatThrownBy(() -> tEnv.explainSql(rightJoinSql)) - .isInstanceOf(ValidationException.class) - .hasMessageContaining("doesn't support to do delta join optimization"); - - // Test FULL OUTER JOIN - String fullOuterJoinSql = - String.format( - "INSERT INTO %s SELECT a1, c1, a2 FROM %s FULL OUTER JOIN %s ON c1 = c2 AND d1 = d2", - sinkTableName, leftTableName, rightTableName); - - assertThatThrownBy(() -> tEnv.explainSql(fullOuterJoinSql)) - .isInstanceOf(ValidationException.class) - .hasMessageContaining("doesn't support to do delta join optimization"); - } - - @Test - void testDeltaJoinFailsWithCascadeJoin() { - String table1 = "cascade_table1"; - createSource( - table1, - "a1 int, c1 bigint, d1 int", - "c1, d1", - "c1", - null, - ImmutableMap.of("table.delete.behavior", "IGNORE")); - - String table2 = "cascade_table2"; - createSource( - table2, - "a2 int, c2 bigint, d2 int", - "c2, d2", - "c2", - null, - ImmutableMap.of("table.delete.behavior", "IGNORE")); - - String table3 = "cascade_table3"; - createSource( - table3, - "a3 int, c3 bigint, d3 int", - "c3, d3", - "c3", - null, - ImmutableMap.of("table.delete.behavior", "IGNORE")); - - String sinkTableName = "cascade_sink"; - createSink( - sinkTableName, - "a1 int, c1 bigint, a2 int, c2 bigint, a3 int, c3 bigint", - "c1, c2, c3"); - - String sql = - String.format( - "INSERT INTO %s SELECT a1, c1, a2, c2, a3, c3 " - + "FROM %s " - + "INNER JOIN %s ON c1 = c2 AND d1 = d2 " - + "INNER JOIN %s ON c1 = c3 AND d1 = d3", - sinkTableName, table1, table2, table3); - - assertThatThrownBy(() -> tEnv.explainSql(sql)) - .isInstanceOf(ValidationException.class) - .hasMessageContaining("doesn't support to do delta join optimization"); - } - - @Test - void testDeltaJoinFailsWithSinkMaterializer() { - // With CDC sources, when sink PK doesn't match upstream update key, - // Flink would insert SinkUpsertMaterializer which prevents delta join - String leftTableName = "left_table_materializer"; - createSource( - leftTableName, - "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", - "c1, d1", - "c1", - null, - ImmutableMap.of("table.delete.behavior", "IGNORE")); - - String rightTableName = "right_table_materializer"; - createSource( - rightTableName, - "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", - "c2, d2", - "c2", - null, - ImmutableMap.of("table.delete.behavior", "IGNORE")); - - // Sink PK (a1, a2) doesn't match upstream update key (c1, d1, c2, d2) - // TODO: this depends on Fluss supports MVCC/point-in-time lookup to support change upsert - // keys - String sinkTableName = "sink_table_materializer"; - createSink( - sinkTableName, - "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", - "a1, a2"); - - String sql = - String.format( - "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2", - sinkTableName, leftTableName, rightTableName); - - assertThatThrownBy(() -> tEnv.explainSql(sql)) - .isInstanceOf(ValidationException.class) - .hasMessageContaining("doesn't support to do delta join optimization"); - } - - @Test - void testDeltaJoinFailsWithNonDeterministicFunctions() { - String leftTableName = "left_table_nondeterministic"; - createSource( - leftTableName, - "a1 int, c1 bigint, d1 int", - "c1, d1", - "c1", - null, - ImmutableMap.of("table.delete.behavior", "IGNORE")); - - String rightTableName = "right_table_nondeterministic"; - createSource( - rightTableName, - "a2 int, c2 bigint, d2 int", - "c2, d2", - "c2", - null, - ImmutableMap.of("table.delete.behavior", "IGNORE")); - - String sinkTableName = "sink_table_nondeterministic"; - // TODO this should be supported in Flink in future for non-deterministic functions before - // sinking - createSink(sinkTableName, "c1 bigint, d1 bigint, rand_val double", "c1"); - - String sql = - String.format( - "INSERT INTO %s SELECT c1, d1, RAND() FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2", - sinkTableName, leftTableName, rightTableName); - - assertThatThrownBy(() -> tEnv.explainSql(sql)) - .isInstanceOf(ValidationException.class) - .hasMessageContaining("doesn't support to do delta join optimization"); - } -} diff --git a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java index 507527fb3..725372048 100644 --- a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java +++ b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java @@ -17,5 +17,293 @@ package org.apache.fluss.flink.source; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.row.InternalRow; + +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder; +import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows; +import static org.apache.fluss.testutils.DataTestUtils.row; +import static org.assertj.core.api.Assertions.assertThat; + /** IT case for {@link FlinkTableSource} in Flink 2.2. */ -public class Flink22TableSourceITCase extends FlinkTableSourceITCase {} +public class Flink22TableSourceITCase extends FlinkTableSourceITCase { + + @Test + void testDeltaJoin() throws Exception { + // start two jobs for this test: one for DML involving the delta join, and the other for DQL + // to query the results of the sink table + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); + + String leftTableName = "left_table"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " b1 varchar, " + + " c1 bigint, " + + " d1 int, " + + " e1 bigint, " + + " primary key (c1, d1) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c1', " + // currently, delta join only support append-only source + + " 'table.merge-engine' = 'first_row' " + + ")", + leftTableName)); + List<InternalRow> rows1 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v2", 200L, 2, 20000L), + row(3, "v1", 300L, 3, 30000L), + row(4, "v4", 400L, 4, 40000L)); + // write records + TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); + writeRows(conn, leftTablePath, rows1, false); + + String rightTableName = "right_table"; + tEnv.executeSql( + String.format( + "create table %s (" + + " a2 int, " + + " b2 varchar, " + + " c2 bigint, " + + " d2 int, " + + " e2 bigint, " + + " primary key (c2, d2) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c2', " + // currently, delta join only support append-only source + + " 'table.merge-engine' = 'first_row' " + + ")", + rightTableName)); + List<InternalRow> rows2 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v3", 200L, 2, 20000L), + row(3, "v4", 300L, 4, 30000L), + row(4, "v4", 500L, 4, 50000L)); + // write records + TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); + writeRows(conn, rightTablePath, rows2, false); + + String sinkTableName = "sink_table"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " b1 varchar, " + + " c1 bigint, " + + " d1 int, " + + " e1 bigint, " + + " a2 int, " + + " b2 varchar, " + + " c2 bigint, " + + " d2 int, " + + " e2 bigint, " + + " primary key (c1, d1, c2, d2) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss' " + + ")", + sinkTableName)); + + tEnv.getConfig() + .set( + OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY, + OptimizerConfigOptions.DeltaJoinStrategy.FORCE); + + String sql = + String.format( + "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2", + sinkTableName, leftTableName, rightTableName); + + assertThat(tEnv.explainSql(sql)) + .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND (d1 = d2))]"); + + tEnv.executeSql(sql); + + CloseableIterator<Row> collected = + tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); + List<String> expected = + Arrays.asList( + "+I[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]", + "-U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]", + "+U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]", + "+I[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]", + "-U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]", + "+U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]"); + assertResultsIgnoreOrder(collected, expected, true); + } + + @Test + void testDeltaJoinWithProjectionAndFilter() throws Exception { + // start two jobs for this test: one for DML involving the delta join, and the other for DQL + // to query the results of the sink table + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); + + String leftTableName = "left_table_proj"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " b1 varchar, " + + " c1 bigint, " + + " d1 int, " + + " e1 bigint, " + + " primary key (c1, d1) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c1', " + + " 'table.merge-engine' = 'first_row' " + + ")", + leftTableName)); + List<InternalRow> rows1 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v2", 200L, 2, 20000L), + row(3, "v1", 300L, 3, 30000L)); + TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); + writeRows(conn, leftTablePath, rows1, false); + + String rightTableName = "right_table_proj"; + tEnv.executeSql( + String.format( + "create table %s (" + + " a2 int, " + + " b2 varchar, " + + " c2 bigint, " + + " d2 int, " + + " e2 bigint, " + + " primary key (c2, d2) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c2', " + + " 'table.merge-engine' = 'first_row' " + + ")", + rightTableName)); + List<InternalRow> rows2 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v3", 200L, 2, 20000L), + row(3, "v4", 300L, 4, 30000L)); + TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); + writeRows(conn, rightTablePath, rows2, false); + + String sinkTableName = "sink_table_proj"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " c1 bigint, " + + " a2 int, " + + " primary key (c1) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss' " + + ")", + sinkTableName)); + + tEnv.getConfig() + .set( + OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY, + OptimizerConfigOptions.DeltaJoinStrategy.FORCE); + + // Test with projection and filter + String sql = + String.format( + "INSERT INTO %s SELECT a1, c1, a2 FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2 WHERE a1 > 1", + sinkTableName, leftTableName, rightTableName); + + assertThat(tEnv.explainSql(sql)) + .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND (d1 = d2))]"); + + tEnv.executeSql(sql); + + CloseableIterator<Row> collected = + tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); + List<String> expected = Arrays.asList("+I[2, 200, 2]", "-U[2, 200, 2]", "+U[2, 200, 2]"); + assertResultsIgnoreOrder(collected, expected, true); + } + + @Test + void testDeltaJoinWithLookupCache() throws Exception { + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); + + String leftTableName = "left_table_cache"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " c1 bigint, " + + " d1 int, " + + " primary key (c1, d1) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c1', " + + " 'table.merge-engine' = 'first_row' " + + ")", + leftTableName)); + List<InternalRow> rows1 = Arrays.asList(row(1, 100L, 1)); + writeRows(conn, TablePath.of(DEFAULT_DB, leftTableName), rows1, false); + + String rightTableName = "right_table_cache"; + tEnv.executeSql( + String.format( + "create table %s (" + + " a2 int, " + + " c2 bigint, " + + " d2 int, " + + " primary key (c2, d2) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c2', " + + " 'table.merge-engine' = 'first_row', " + + " 'lookup.cache' = 'partial', " + + " 'lookup.partial-cache.max-rows' = '100' " + + ")", + rightTableName)); + List<InternalRow> rows2 = Arrays.asList(row(1, 100L, 1)); + writeRows(conn, TablePath.of(DEFAULT_DB, rightTableName), rows2, false); + + String sinkTableName = "sink_table_cache"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " a2 int, " + + " primary key (a1) NOT ENFORCED" // Dummy PK + + ") with (" + + " 'connector' = 'fluss' " + + ")", + sinkTableName)); + + tEnv.getConfig() + .set( + OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY, + OptimizerConfigOptions.DeltaJoinStrategy.FORCE); + + String sql = + String.format( + "INSERT INTO %s SELECT T1.a1, T2.a2 FROM %s AS T1 INNER JOIN %s AS T2 ON T1.c1 = T2.c2 AND T1.d1 = T2.d2", + sinkTableName, leftTableName, rightTableName); + + assertThat(tEnv.explainSql(sql)) + .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND (d1 = d2))]"); + + tEnv.executeSql(sql); + + CloseableIterator<Row> collected = + tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); + List<String> expected = Arrays.asList("+I[1, 1]", "-U[1, 1]", "+U[1, 1]"); + assertResultsIgnoreOrder(collected, expected, true); + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java index 0421cc561..abb74ebf6 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java @@ -49,7 +49,7 @@ class FlussTableLakeSnapshotCommitterTest extends FlinkTestBase { private FlussTableLakeSnapshotCommitter flussTableLakeSnapshotCommitter; @BeforeEach - public void beforeEach() { + void beforeEach() { flussTableLakeSnapshotCommitter = new FlussTableLakeSnapshotCommitter(FLUSS_CLUSTER_EXTENSION.getClientConfig()); flussTableLakeSnapshotCommitter.open(); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java index 70d86bb2c..bdecfa8fa 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java @@ -69,7 +69,7 @@ class TieringCommitOperatorTest extends FlinkTestBase { private StreamOperatorParameters<CommittableMessage<TestingCommittable>> parameters; @BeforeEach - public void beforeEach() throws Exception { + void beforeEach() throws Exception { mockOperatorEventGateway = new MockOperatorEventGateway(); MockOperatorEventDispatcher mockOperatorEventDispatcher = new MockOperatorEventDispatcher(mockOperatorEventGateway); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java index 7c80bbc0c..bfa814ce2 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java @@ -155,7 +155,7 @@ public class FlinkTestBase extends AbstractTestBase { } @BeforeEach - public void beforeEach() throws Exception { + void beforeEach() throws Exception { admin.createDatabase(DEFAULT_DB, DatabaseDescriptor.EMPTY, true).get(); } diff --git a/website/docs/engine-flink/delta-joins.md b/website/docs/engine-flink/delta-joins.md index 8388b71c8..96f097496 100644 --- a/website/docs/engine-flink/delta-joins.md +++ b/website/docs/engine-flink/delta-joins.md @@ -169,25 +169,24 @@ There is a known issue ([FLINK-38399](https://issues.apache.org/jira/browse/FLIN #### Supported Features -- CDC sources are now supported in delta join, provided they do not produce DELETE messages. - - Set `'table.delete.behavior' = 'IGNORE'` or `'DISABLE'` on the source table to suppress deletes. - - The `'table.merge-engine' = 'first_row'` option is no longer required. -- Projection and filter operations are now supported between source and delta join. -- Lookup cache is now supported in delta join. +- Support for optimizing a dual-stream join from CDC sources that do not include delete messages into a delta join. + - Disable delete on the source table to guarantee there is no delete message in the table, by adding the option `'table.delete.behavior' = 'IGNORE'` or `'DISABLE'` on the table. + - The source table is no more required to be a `first_row` merge engine table since this version. +- Support `Project` and `Filter` between source and delta join. +- Support cache in delta join. #### Limitations - The primary key or the prefix key of the tables must be included as part of the equivalence conditions in the join. -- The join must be an INNER join. LEFT JOIN, RIGHT JOIN, and FULL OUTER JOIN are not supported. -- Cascade joins (e.g., `A JOIN B JOIN C`) are not supported. Each join input must come directly from a table source. +- The join must be a INNER join. - The downstream node of the join must support idempotent updates, typically it's an upsert sink and should not have a `SinkUpsertMaterializer` node before it. - - Flink planner automatically inserts a `SinkUpsertMaterializer` when the sink's primary key does not fully cover the upstream update key. + - Flink planner automatically inserts a `SinkUpsertMaterializer` when the sink’s primary key does not fully cover the upstream update key. - You can learn more details about `SinkUpsertMaterializer` by reading this [blog](https://www.ververica.com/blog/flink-sql-secrets-mastering-the-art-of-changelog-events). - Since delta join does not support to handle update-before messages, it is necessary to ensure that the entire pipeline can safely discard update-before messages. That means when consuming a CDC stream: - The join key used in the delta join must be part of the primary key. - The sink's primary key must be the same as the upstream update key. - - All filters (including non-equi join conditions) must be applied on the upsert key. -- Filters, projections, and non-equi join conditions must not contain non-deterministic functions. + - All filters must be applied on the upsert key. +- Neither filters nor projections should contain non-deterministic functions. ## Future Plan
