This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 1b674ca77d412e889af751f5f42ea97a47aa00c2 Author: xuyang <[email protected]> AuthorDate: Mon Feb 2 11:56:02 2026 +0800 [flink] Add and improve Delta Join IT tests (#2546) --- .../fluss/flink/source/Flink22DeltaJoinITCase.java | 623 ++++++++++++++------- .../FlussTableLakeSnapshotCommitterTest.java | 2 +- .../committer/TieringCommitOperatorTest.java | 2 +- .../apache/fluss/flink/utils/FlinkTestBase.java | 2 +- 4 files changed, 425 insertions(+), 204 deletions(-) diff --git a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22DeltaJoinITCase.java b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22DeltaJoinITCase.java index 933dc2835..0fe8fb73c 100644 --- a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22DeltaJoinITCase.java +++ b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22DeltaJoinITCase.java @@ -17,33 +17,65 @@ package org.apache.fluss.flink.source; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.flink.utils.FlinkTestBase; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.row.InternalRow; +import org.apache.fluss.shaded.guava32.com.google.common.collect.ImmutableMap; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.config.OptimizerConfigOptions; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import javax.annotation.Nullable; + import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS; import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder; -import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows; +import static org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE; import static org.apache.fluss.testutils.DataTestUtils.row; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** IT case for Delta Join optimization in Flink 2.2. */ -public class Flink22DeltaJoinITCase extends FlinkTableSourceITCase { +public class Flink22DeltaJoinITCase extends FlinkTestBase { + + private static final String CATALOG_NAME = "test_catalog"; + + private StreamTableEnvironment tEnv; @BeforeEach - @Override - void before() { - super.before(); + public void beforeEach() { + bootstrapServers = conn.getConfiguration().get(ConfigOptions.BOOTSTRAP_SERVERS).get(0); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inStreamingMode()); + + tEnv.executeSql( + String.format( + "create catalog %s with ('type' = 'fluss', '%s' = '%s')", + CATALOG_NAME, BOOTSTRAP_SERVERS.key(), bootstrapServers)); + tEnv.useCatalog(CATALOG_NAME); + tEnv.executeSql(String.format("create database if not exists `%s`", DEFAULT_DB)); + tEnv.useDatabase(DEFAULT_DB); + + // start two jobs for this test: one for DML involving the delta join, and the other for DQL + // to query the results of the sink table tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); // Set FORCE strategy for delta join tEnv.getConfig() @@ -52,6 +84,12 @@ public class Flink22DeltaJoinITCase extends FlinkTableSourceITCase { OptimizerConfigOptions.DeltaJoinStrategy.FORCE); } + @AfterEach + void after() { + tEnv.useDatabase(BUILTIN_DATABASE); + tEnv.executeSql(String.format("drop database `%s` cascade", DEFAULT_DB)); + } + /** * Creates a source table with specified schema and options. * @@ -59,35 +97,39 @@ public class Flink22DeltaJoinITCase extends FlinkTableSourceITCase { * @param columns column definitions (e.g., "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint") * @param primaryKey primary key columns (e.g., "c1, d1") * @param bucketKey bucket key column (e.g., "c1") - * @param streamType stream type: "insert_only" for first_row, "cdc" for delete.behavior=IGNORE, - * null for regular (supports deletes) - * @param extraOptions additional WITH options (e.g., "'lookup.cache' = 'partial'"), or null + * @param extraOptions additional WITH options (e.g., "lookup.cache" = "partial"), or null */ private void createSource( String tableName, String columns, String primaryKey, String bucketKey, - String streamType, - String extraOptions) { - StringBuilder withOptions = new StringBuilder(); - withOptions.append("'connector' = 'fluss', "); - withOptions.append(String.format("'bucket.key' = '%s'", bucketKey)); - - if ("insert_only".equals(streamType)) { - withOptions.append(", 'table.merge-engine' = 'first_row'"); - } else if ("cdc".equals(streamType)) { - withOptions.append(", 'table.delete.behavior' = 'IGNORE'"); - } - + @Nullable String partitionKey, + @Nullable Map<String, String> extraOptions) { + Map<String, String> withOptions = new HashMap<>(); if (extraOptions != null) { - withOptions.append(", ").append(extraOptions); + withOptions.putAll(extraOptions); } - - tEnv.executeSql( + withOptions.put("connector", "fluss"); + withOptions.put("bucket.key", bucketKey); + StringBuilder ddlBuilder = new StringBuilder(); + ddlBuilder.append( String.format( - "create table %s ( %s, primary key (%s) NOT ENFORCED ) with ( %s )", - tableName, columns, primaryKey, withOptions.toString())); + "create table %s ( %s, primary key (%s) NOT ENFORCED )", + tableName, columns, primaryKey)); + if (partitionKey != null) { + ddlBuilder.append(String.format(" partitioned by (%s)", partitionKey)); + withOptions.put("table.auto-partition.enabled", "true"); + withOptions.put("table.auto-partition.time-unit", "year"); + } + ddlBuilder.append( + String.format( + " with (%s)", + withOptions.entrySet().stream() + .map(e -> String.format("'%s' = '%s'", e.getKey(), e.getValue())) + .collect(Collectors.joining(", ")))); + + tEnv.executeSql(ddlBuilder.toString()); } /** @@ -111,6 +153,8 @@ public class Flink22DeltaJoinITCase extends FlinkTableSourceITCase { @Test void testDeltaJoin() throws Exception { + // disable cache to get stable results with updating + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_DELTA_JOIN_CACHE_ENABLED, false); String leftTableName = "left_table"; String rightTableName = "right_table"; String sinkTableName = "sink_table"; @@ -120,15 +164,91 @@ public class Flink22DeltaJoinITCase extends FlinkTableSourceITCase { "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", "c1, d1", "c1", - "insert_only", - null); + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); createSource( rightTableName, "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", "c2, d2", "c2", - "insert_only", - null); + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + createSink( + sinkTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c1, d1, c2, d2"); + + List<InternalRow> rows1 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v2", 100L, 2, 20000L), + row(3, "v3", 300L, 3, 30000L), + row(4, "v4", 400L, 4, 40000L), + // update + row(5, "v5", 100L, 1, 50000L)); + TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); + writeRows(conn, leftTablePath, rows1, false); + // wait for the first snapshot to finish to get the stable result + FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(leftTablePath); + + List<InternalRow> rows2 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v2", 200L, 2, 20000L), + row(3, "v3", 330L, 4, 30000L), + row(4, "v4", 500L, 4, 50000L), + // update + row(6, "v6", 100L, 1, 60000L)); + TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); + writeRows(conn, rightTablePath, rows2, false); + // wait for the first snapshot to finish to get the stable result + FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(rightTablePath); + + String sql = + String.format( + "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = c2", + sinkTableName, leftTableName, rightTableName); + + assertThat(tEnv.explainSql(sql)) + .contains("DeltaJoin(joinType=[InnerJoin], where=[(c1 = c2)]"); + + tEnv.executeSql(sql); + + CloseableIterator<Row> collected = + tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); + List<String> expected = + Arrays.asList( + "+I[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]", + "-U[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]", + "+U[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]", + "+I[2, v2, 100, 2, 20000, 6, v6, 100, 1, 60000]", + "-U[2, v2, 100, 2, 20000, 6, v6, 100, 1, 60000]", + "+U[2, v2, 100, 2, 20000, 6, v6, 100, 1, 60000]"); + assertResultsIgnoreOrder(collected, expected, true); + } + + @Test + void testDeltaJoinOnPrimaryKey() throws Exception { + // disable cache to get stable results with updating + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_DELTA_JOIN_CACHE_ENABLED, false); + String leftTableName = "left_table"; + String rightTableName = "right_table"; + String sinkTableName = "sink_table"; + + createSource( + leftTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", + "c1, d1", + "c1", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + createSource( + rightTableName, + "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c2, d2", + "c2", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); createSink( sinkTableName, "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", @@ -138,23 +258,27 @@ public class Flink22DeltaJoinITCase extends FlinkTableSourceITCase { Arrays.asList( row(1, "v1", 100L, 1, 10000L), row(2, "v2", 200L, 2, 20000L), - row(3, "v1", 300L, 3, 30000L), + row(3, "v3", 300L, 3, 30000L), row(4, "v4", 400L, 4, 40000L), - // Add data with the same primary key as the first row + // update row(5, "v5", 100L, 1, 50000L)); TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); writeRows(conn, leftTablePath, rows1, false); + // wait for the first snapshot to finish to get the stable result + FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(leftTablePath); List<InternalRow> rows2 = Arrays.asList( row(1, "v1", 100L, 1, 10000L), - row(2, "v3", 200L, 2, 20000L), - row(3, "v4", 300L, 4, 30000L), + row(2, "v2", 200L, 2, 20000L), + row(3, "v3", 300L, 4, 30000L), row(4, "v4", 500L, 4, 50000L), - // Add data with the same primary key as the first row + // update row(6, "v6", 100L, 1, 60000L)); TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); writeRows(conn, rightTablePath, rows2, false); + // wait for the first snapshot to finish to get the stable result + FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(rightTablePath); String sql = String.format( @@ -170,25 +294,85 @@ public class Flink22DeltaJoinITCase extends FlinkTableSourceITCase { tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); List<String> expected = Arrays.asList( - "+I[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]", - "-U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]", - "+U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]", - "+I[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]", - "-U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]", - "+U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]"); + "+I[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]", + "-U[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]", + "+U[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]", + "+I[2, v2, 200, 2, 20000, 2, v2, 200, 2, 20000]", + "-U[2, v2, 200, 2, 20000, 2, v2, 200, 2, 20000]", + "+U[2, v2, 200, 2, 20000, 2, v2, 200, 2, 20000]"); assertResultsIgnoreOrder(collected, expected, true); } @Test - void testDeltaJoinWithProjectionAndFilter() throws Exception { + void testDeltaJoinWithCalc() throws Exception { String leftTableName = "left_table_proj"; createSource( leftTableName, "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", "c1, d1", "c1", - "insert_only", - null); + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + List<InternalRow> rows1 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v2", 200L, 2, 20000L), + row(3, "v1", 300L, 3, 30000L)); + TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); + writeRows(conn, leftTablePath, rows1, false); + + String rightTableName = "right_table_proj"; + createSource( + rightTableName, + "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c2", + "c2", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + List<InternalRow> rows2 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v2", 200L, 2, 20000L), + row(3, "v3", 300L, 4, 30000L)); + TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); + writeRows(conn, rightTablePath, rows2, false); + + String sinkTableName = "sink_table_proj"; + createSink(sinkTableName, "a1 int, c1 bigint, d1 int, a2 int", "c1, d1"); + + String sql = + String.format( + "INSERT INTO %s SELECT a1, c1, d1, a2 FROM (" + + " SELECT * FROM %s WHERE d1 > 1" + + ") INNER JOIN (" + + " SELECT * FROM %s WHERE c2 < 300" + + ") ON c1 = c2", + sinkTableName, leftTableName, rightTableName); + + assertThat(tEnv.explainSql(sql)) + .contains("DeltaJoin(joinType=[InnerJoin], where=[(c1 = c2)]"); + + tEnv.executeSql(sql); + + CloseableIterator<Row> collected = + tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); + List<String> expected = + Arrays.asList("+I[2, 200, 2, 2]", "-U[2, 200, 2, 2]", "+U[2, 200, 2, 2]"); + assertResultsIgnoreOrder(collected, expected, true); + } + + @Test + void testDeltaJoinWithAppendOnlySourceAndCalc() throws Exception { + String leftTableName = "left_table_proj"; + createSource( + leftTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", + "c1, d1", + "c1", + null, + ImmutableMap.of("table.merge-engine", "first_row")); List<InternalRow> rows1 = Arrays.asList( @@ -204,14 +388,14 @@ public class Flink22DeltaJoinITCase extends FlinkTableSourceITCase { "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", "c2, d2", "c2", - "insert_only", - null); + null, + ImmutableMap.of("table.merge-engine", "first_row")); List<InternalRow> rows2 = Arrays.asList( row(1, "v1", 100L, 1, 10000L), row(2, "v3", 200L, 2, 20000L), - row(3, "v4", 300L, 4, 30000L)); + row(3, "v4", 400L, 4, 30000L)); TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); writeRows(conn, rightTablePath, rows2, false); @@ -220,11 +404,11 @@ public class Flink22DeltaJoinITCase extends FlinkTableSourceITCase { String sql = String.format( - "INSERT INTO %s SELECT a1, c1, a2 FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2 WHERE a1 > 1", + "INSERT INTO %s SELECT a1, c1, a2 FROM %s INNER JOIN %s ON c1 = c2 WHERE a1 > 1", sinkTableName, leftTableName, rightTableName); assertThat(tEnv.explainSql(sql)) - .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND (d1 = d2))]"); + .contains("DeltaJoin(joinType=[InnerJoin], where=[(c1 = c2)]"); tEnv.executeSql(sql); @@ -235,15 +419,15 @@ public class Flink22DeltaJoinITCase extends FlinkTableSourceITCase { } @Test - void testDeltaJoinFailsWhenFilterOnNonUpsertKeys() throws Exception { + void testDeltaJoinFailsWhenFilterOnNonUpsertKeys() { String leftTableName = "left_table_force_fail"; createSource( leftTableName, "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", "c1, d1", "c1", - "cdc", - null); + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); String rightTableName = "right_table_force_fail"; createSource( @@ -251,8 +435,8 @@ public class Flink22DeltaJoinITCase extends FlinkTableSourceITCase { "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", "c2, d2", "c2", - "cdc", - null); + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); String sinkTableName = "sink_table_force_fail"; createSink( @@ -261,6 +445,8 @@ public class Flink22DeltaJoinITCase extends FlinkTableSourceITCase { "c1, d1, c2, d2"); // Filter on e1 > e2, where e1 and e2 are NOT part of the upsert key + // TODO we can add a UpsertFilterOperator that can convert the un-match-filter UPSERT record + // into DELETE record. String sql = String.format( "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2 WHERE e1 > e2", @@ -287,20 +473,20 @@ public class Flink22DeltaJoinITCase extends FlinkTableSourceITCase { createSource( leftTableName, "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", + "c1, d1, e1", "c1, d1", - "c1", - "cdc", - null); + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); List<InternalRow> rows1 = Arrays.asList( row(1, "v1", 100L, 1, 10000L), - row(2, "v2", 200L, 2, 20000L), - row(3, "v3", 50L, 3, 30000L), + row(2, "v2", 200L, 2, 20001L), + row(3, "v3", 300L, 3, 30000L), // Add row with same PK (100, 1) to generate UPDATE in CDC mode - // This row is filtered out (d1=1, filter is d1>1), so doesn't affect join + // This row is filtered out by (e2 / 100) <> c2, so doesn't affect join // result - row(4, "v1_updated", 100L, 1, 15000L)); + row(4, "v1_updated", 100L, 1, 10000L)); TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); writeRows(conn, leftTablePath, rows1, false); @@ -310,8 +496,8 @@ public class Flink22DeltaJoinITCase extends FlinkTableSourceITCase { "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", "c2, d2", "c2", - "cdc", - null); + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); List<InternalRow> rows2 = Arrays.asList( @@ -319,46 +505,47 @@ public class Flink22DeltaJoinITCase extends FlinkTableSourceITCase { row(2, "v4", 200L, 2, 20000L), row(3, "v5", 300L, 4, 40000L), // Add row with same PK (100, 1) to generate UPDATE in CDC mode - // This row is filtered out (d2=1, filter is d2>1), so doesn't affect join + // This row is filtered out by (e2 / 100) <> c2, so doesn't affect join // result - row(5, "v1_updated", 100L, 1, 12000L)); + row(5, "v1_updated", 100L, 1, 10000L)); TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); writeRows(conn, rightTablePath, rows2, false); String sinkTableName = "sink_table_nonequi_upsert"; - createSink(sinkTableName, "a1 int, c1 bigint, d1 int, a2 int", "c1, d1"); + createSink(sinkTableName, "a1 int, c1 bigint, d1 int, e1 bigint, a2 int", "c1, d1, e1"); String sql = String.format( - "INSERT INTO %s SELECT a1, c1, d1, a2 FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2 AND d1 > 1", + "INSERT INTO %s SELECT a1, c1, d1, e1, a2 FROM %s INNER JOIN %s " + + "ON c1 = c2 AND d1 = d2 AND e1 <> (c2 * 100)", sinkTableName, leftTableName, rightTableName); assertThat(tEnv.explainSql(sql)) - .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND (d1 = d2))]"); + .contains( + "DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND (d1 = d2) AND (e1 <> (c2 * 100)))]"); tEnv.executeSql(sql); CloseableIterator<Row> collected = tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); - // Filter (d1 > 1 AND d2 > 1) is pushed down before join - // PK (100, 1) has duplicate rows but d1=1 is filtered out, so doesn't appear in result - // PK (200, 2) passes filter: single row on each side, joins successfully - // Delta join produces duplicate changes pattern List<String> expected = - Arrays.asList("+I[2, 200, 2, 2]", "-U[2, 200, 2, 2]", "+U[2, 200, 2, 2]"); + Arrays.asList( + "+I[2, 200, 2, 20001, 2]", + "-U[2, 200, 2, 20001, 2]", + "+U[2, 200, 2, 20001, 2]"); assertResultsIgnoreOrder(collected, expected, true); } @Test - void testDeltaJoinWithNonEquiConditionInsertOnly() throws Exception { + void testDeltaJoinWithAppendOnlySourceAndNonEquiCondition() throws Exception { String leftTableName = "left_table_nonequi_insert"; createSource( leftTableName, "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", "c1, d1", "c1", - "insert_only", - null); + null, + ImmutableMap.of("table.merge-engine", "first_row")); List<InternalRow> rows1 = Arrays.asList( @@ -374,8 +561,8 @@ public class Flink22DeltaJoinITCase extends FlinkTableSourceITCase { "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", "c2, d2", "c2", - "insert_only", - null); + null, + ImmutableMap.of("table.merge-engine", "first_row")); List<InternalRow> rows2 = Arrays.asList( @@ -426,201 +613,193 @@ public class Flink22DeltaJoinITCase extends FlinkTableSourceITCase { void testDeltaJoinWithLookupCache() throws Exception { String leftTableName = "left_table_cache"; createSource( - leftTableName, "a1 int, c1 bigint, d1 int", "c1, d1", "c1", "insert_only", null); - List<InternalRow> rows1 = Arrays.asList(row(1, 100L, 1)); + leftTableName, + "a1 int, c1 bigint, d1 int", + "c1, d1", + "c1", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + List<InternalRow> rows1 = Collections.singletonList(row(1, 100L, 1)); writeRows(conn, TablePath.of(DEFAULT_DB, leftTableName), rows1, false); String rightTableName = "right_table_cache"; createSource( rightTableName, "a2 int, c2 bigint, d2 int", - "c2, d2", "c2", - "insert_only", - "'lookup.cache' = 'partial', 'lookup.partial-cache.max-rows' = '100'"); - List<InternalRow> rows2 = Arrays.asList(row(1, 100L, 1)); + "c2", + null, + ImmutableMap.of( + "table.delete.behavior", + "IGNORE", + "lookup.cache", + "partial", + "lookup.partial-cache.max-rows", + "100")); + List<InternalRow> rows2 = Collections.singletonList(row(1, 100L, 1)); writeRows(conn, TablePath.of(DEFAULT_DB, rightTableName), rows2, false); String sinkTableName = "sink_table_cache"; - createSink(sinkTableName, "a1 int, a2 int", "a1"); + createSink(sinkTableName, "a1 int, c1 bigint, d1 int, a2 int", "c1, d1"); String sql = String.format( - "INSERT INTO %s SELECT T1.a1, T2.a2 FROM %s AS T1 INNER JOIN %s AS T2 ON T1.c1 = T2.c2 AND T1.d1 = T2.d2", + "INSERT INTO %s SELECT a1, c1, d1, a2 FROM %s AS T1 INNER JOIN %s AS T2 ON T1.c1 = T2.c2", sinkTableName, leftTableName, rightTableName); assertThat(tEnv.explainSql(sql)) - .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND (d1 = d2))]"); + .contains("DeltaJoin(joinType=[InnerJoin], where=[(c1 = c2)]"); tEnv.executeSql(sql); CloseableIterator<Row> collected = tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); - List<String> expected = Arrays.asList("+I[1, 1]", "-U[1, 1]", "+U[1, 1]"); + List<String> expected = + Arrays.asList("+I[1, 100, 1, 1]", "-U[1, 100, 1, 1]", "+U[1, 100, 1, 1]"); assertResultsIgnoreOrder(collected, expected, true); } @Test - void testDeltaJoinWithPrimaryKeyTableNoDeletes() throws Exception { - String leftTableName = "left_table_normal_pk"; + void testDeltaJoinWithPartitionedTable() throws Exception { + String leftTableName = "left_table_partitioned"; createSource( leftTableName, - "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", - "c1, d1", + "a1 int, b1 varchar, c1 bigint, d1 int, pt1 varchar", + "c1, d1, pt1", "c1", - "cdc", - null); - + "pt1", + ImmutableMap.of("table.delete.behavior", "IGNORE")); + TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); + Iterator<String> leftPartitionIterator = + waitUntilPartitions(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), leftTablePath) + .values() + .iterator(); + // pick two partition to insert data + String leftPartition1 = leftPartitionIterator.next(); + String leftPartition2 = leftPartitionIterator.next(); List<InternalRow> rows1 = Arrays.asList( - row(1, "v1", 100L, 1, 10000L), - row(2, "v2", 200L, 2, 20000L), - row(3, "v3", 300L, 3, 30000L), - // Add row with same PK (100, 1) to generate UPDATE in CDC mode - row(4, "v1_updated", 100L, 1, 15000L)); - TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); + row(1, "v1", 100L, 1000, leftPartition1), + row(2, "v2", 200L, 2000, leftPartition1), + row(3, "v3", 100L, 3000, leftPartition2), + row(4, "v4", 400L, 4000, leftPartition2)); writeRows(conn, leftTablePath, rows1, false); - String rightTableName = "right_table_normal_pk"; + String rightTableName = "right_table_partitioned"; createSource( rightTableName, - "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", - "c2, d2", + "a2 int, b2 varchar, c2 bigint, d2 int, pt2 varchar", + "c2, pt2", "c2", - "cdc", - null); - + "pt2", + ImmutableMap.of("table.delete.behavior", "IGNORE")); + TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); + Iterator<String> rightPartitionIterator = + waitUntilPartitions(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), rightTablePath) + .values() + .iterator(); + // pick two partition to insert data + String rightPartition1 = rightPartitionIterator.next(); + String rightPartition2 = rightPartitionIterator.next(); List<InternalRow> rows2 = Arrays.asList( - row(1, "v1", 100L, 1, 10000L), - row(2, "v4", 200L, 2, 20000L), - row(3, "v5", 400L, 4, 40000L), - // Add row with same PK (100, 1) to generate UPDATE in CDC mode - row(5, "v1_updated", 100L, 1, 12000L)); - TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); + row(1, "v1", 100L, 1000, rightPartition1), + row(4, "v4", 400L, 3000, rightPartition2)); writeRows(conn, rightTablePath, rows2, false); - String sinkTableName = "sink_table_normal_pk"; + String sinkTableName = "sink_table"; createSink( sinkTableName, - "a1 int, b1 varchar, c1 bigint, d1 int, a2 int, b2 varchar", - "c1, d1"); + "a1 int, b1 varchar, c1 bigint, d1 int, pt1 varchar, b2 varchar", + "c1, d1, pt1"); String sql = String.format( - "INSERT INTO %s SELECT a1, b1, c1, d1, a2, b2 FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2", + "INSERT INTO %s SELECT a1, b1, c1, d1, pt1, b2 FROM %s AS T1 INNER JOIN %s AS T2 " + + "ON T1.c1 = T2.c2 AND T1.pt1 = T2.pt2", sinkTableName, leftTableName, rightTableName); assertThat(tEnv.explainSql(sql)) - .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND (d1 = d2))]"); + .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND (pt1 = pt2))]"); tEnv.executeSql(sql); CloseableIterator<Row> collected = tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); - List<String> expected = Arrays.asList( - "+I[4, v1_updated, 100, 1, 5, v1_updated]", - "-U[4, v1_updated, 100, 1, 5, v1_updated]", - "+U[4, v1_updated, 100, 1, 5, v1_updated]", - "+I[2, v2, 200, 2, 2, v4]", - "-U[2, v2, 200, 2, 2, v4]", - "+U[2, v2, 200, 2, 2, v4]"); + String.format("+I[1, v1, 100, 1000, %s, v1]", leftPartition1), + String.format("-U[1, v1, 100, 1000, %s, v1]", leftPartition1), + String.format("+U[1, v1, 100, 1000, %s, v1]", leftPartition1), + String.format("+I[4, v4, 400, 4000, %s, v4]", leftPartition2), + String.format("-U[4, v4, 400, 4000, %s, v4]", leftPartition2), + String.format("+U[4, v4, 400, 4000, %s, v4]", leftPartition2)); assertResultsIgnoreOrder(collected, expected, true); } @Test - void testDeltaJoinOnBucketKey() throws Exception { - String leftTableName = "left_table_bucket_key"; + void testDeltaJoinFailsWhenSourceHasDelete() { + String leftTableName = "left_table_delete_force"; createSource( leftTableName, "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", "c1, d1", "c1", - "insert_only", + null, null); - List<InternalRow> rows1 = - Arrays.asList( - row(1, "v1", 100L, 1, 10000L), - row(2, "v2", 100L, 2, 20000L), - row(3, "v3", 200L, 1, 30000L)); - TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); - writeRows(conn, leftTablePath, rows1, false); - - String rightTableName = "right_table_bucket_key"; + String rightTableName = "right_table_delete_force"; createSource( rightTableName, "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", "c2, d2", "c2", - "insert_only", + null, null); - List<InternalRow> rows2 = - Arrays.asList(row(10, "r1", 100L, 5, 50000L), row(20, "r2", 200L, 6, 60000L)); - TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); - writeRows(conn, rightTablePath, rows2, false); - - String sinkTableName = "sink_table_bucket_key"; - createSink(sinkTableName, "a1 int, b1 varchar, c1 bigint, a2 int, b2 varchar", "a1, a2"); + String sinkTableName = "sink_table_delete_force"; + createSink( + sinkTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c1, d1, c2, d2"); String sql = String.format( - "INSERT INTO %s SELECT a1, b1, c1, a2, b2 FROM %s INNER JOIN %s ON c1 = c2", + "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2", sinkTableName, leftTableName, rightTableName); - assertThat(tEnv.explainSql(sql)) - .contains("DeltaJoin(joinType=[InnerJoin], where=[=(c1, c2)]"); - - tEnv.executeSql(sql); - - CloseableIterator<Row> collected = - tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); - List<String> expected = - Arrays.asList( - "+I[1, v1, 100, 10, r1]", - "-U[1, v1, 100, 10, r1]", - "+U[1, v1, 100, 10, r1]", - "+I[2, v2, 100, 10, r1]", - "-U[2, v2, 100, 10, r1]", - "+U[2, v2, 100, 10, r1]", - "+I[3, v3, 200, 20, r2]", - "-U[3, v3, 200, 20, r2]", - "+U[3, v3, 200, 20, r2]"); - assertResultsIgnoreOrder(collected, expected, true); + assertThatThrownBy(() -> tEnv.explainSql(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("doesn't support to do delta join optimization"); } @Test - void testDeltaJoinFailsWhenSourceHasDelete() throws Exception { - String leftTableName = "left_table_delete_force"; + void testDeltaJoinWithJoinKeyExceedsPrimaryKey() { + String leftTableName = "left_table_exceed_pk"; createSource( leftTableName, "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", "c1, d1", "c1", null, - null); + ImmutableMap.of("table.delete.behavior", "IGNORE")); - String rightTableName = "right_table_delete_force"; + String rightTableName = "right_table_exceed_pk"; createSource( rightTableName, "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", "c2, d2", "c2", null, - null); + ImmutableMap.of("table.delete.behavior", "IGNORE")); - String sinkTableName = "sink_table_delete_force"; + String sinkTableName = "sink_table_exceed_pk"; createSink( - sinkTableName, - "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", - "c1, d1, c2, d2"); + sinkTableName, "a1 int, c1 bigint, d1 int, e1 bigint, a2 int, e2 bigint", "c1, d1"); String sql = String.format( - "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2", + "INSERT INTO %s SELECT a1, c1, d1, e1, a2, e2 FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2 AND e1 = e2", sinkTableName, leftTableName, rightTableName); assertThatThrownBy(() -> tEnv.explainSql(sql)) @@ -629,15 +808,15 @@ public class Flink22DeltaJoinITCase extends FlinkTableSourceITCase { } @Test - void testDeltaJoinWhenJoinKeyExceedsPrimaryKey() throws Exception { + void testDeltaJoinWithAppendOnlySourceAndJoinKeyExceedsPrimaryKey() throws Exception { String leftTableName = "left_table_exceed_pk"; createSource( leftTableName, "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", "c1, d1", "c1", - "insert_only", - null); + null, + ImmutableMap.of("table.merge-engine", "first_row")); List<InternalRow> rows1 = Arrays.asList(row(1, "v1", 100L, 1, 10000L), row(2, "v2", 200L, 2, 20000L)); @@ -650,8 +829,8 @@ public class Flink22DeltaJoinITCase extends FlinkTableSourceITCase { "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", "c2, d2", "c2", - "insert_only", - null); + null, + ImmutableMap.of("table.merge-engine", "first_row")); List<InternalRow> rows2 = Arrays.asList(row(1, "v1", 100L, 1, 10000L), row(2, "v3", 200L, 2, 99999L)); @@ -689,15 +868,15 @@ public class Flink22DeltaJoinITCase extends FlinkTableSourceITCase { } @Test - void testDeltaJoinFailsWhenJoinKeyNotContainIndex() throws Exception { + void testDeltaJoinFailsWhenJoinKeyNotContainIndex() { String leftTableName = "left_table_no_idx_force"; createSource( leftTableName, "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", "c1, d1", "c1", - "insert_only", - null); + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); String rightTableName = "right_table_no_idx_force"; createSource( @@ -705,8 +884,8 @@ public class Flink22DeltaJoinITCase extends FlinkTableSourceITCase { "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", "c2, d2", "c2", - "insert_only", - null); + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); String sinkTableName = "sink_table_no_idx_force"; createSink( @@ -725,14 +904,24 @@ public class Flink22DeltaJoinITCase extends FlinkTableSourceITCase { } @Test - void testDeltaJoinFailsWithOuterJoin() throws Exception { + void testDeltaJoinFailsWithOuterJoin() { String leftTableName = "left_table_outer_fail"; createSource( - leftTableName, "a1 int, c1 bigint, d1 int", "c1, d1", "c1", "insert_only", null); + leftTableName, + "a1 int, c1 bigint, d1 int", + "c1, d1", + "c1", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); String rightTableName = "right_table_outer_fail"; createSource( - rightTableName, "a2 int, c2 bigint, d2 int", "c2, d2", "c2", "insert_only", null); + rightTableName, + "a2 int, c2 bigint, d2 int", + "c2, d2", + "c2", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); String sinkTableName = "sink_table_outer_fail"; createSink(sinkTableName, "a1 int, c1 bigint, a2 int", "c1"); @@ -769,15 +958,33 @@ public class Flink22DeltaJoinITCase extends FlinkTableSourceITCase { } @Test - void testDeltaJoinFailsWithCascadeJoin() throws Exception { + void testDeltaJoinFailsWithCascadeJoin() { String table1 = "cascade_table1"; - createSource(table1, "a1 int, c1 bigint, d1 int", "c1, d1", "c1", "insert_only", null); + createSource( + table1, + "a1 int, c1 bigint, d1 int", + "c1, d1", + "c1", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); String table2 = "cascade_table2"; - createSource(table2, "a2 int, c2 bigint, d2 int", "c2, d2", "c2", "insert_only", null); + createSource( + table2, + "a2 int, c2 bigint, d2 int", + "c2, d2", + "c2", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); String table3 = "cascade_table3"; - createSource(table3, "a3 int, c3 bigint, d3 int", "c3, d3", "c3", "insert_only", null); + createSource( + table3, + "a3 int, c3 bigint, d3 int", + "c3, d3", + "c3", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); String sinkTableName = "cascade_sink"; createSink( @@ -799,7 +1006,7 @@ public class Flink22DeltaJoinITCase extends FlinkTableSourceITCase { } @Test - void testDeltaJoinFailsWithSinkMaterializer() throws Exception { + void testDeltaJoinFailsWithSinkMaterializer() { // With CDC sources, when sink PK doesn't match upstream update key, // Flink would insert SinkUpsertMaterializer which prevents delta join String leftTableName = "left_table_materializer"; @@ -808,8 +1015,8 @@ public class Flink22DeltaJoinITCase extends FlinkTableSourceITCase { "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", "c1, d1", "c1", - "cdc", - null); + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); String rightTableName = "right_table_materializer"; createSource( @@ -817,10 +1024,12 @@ public class Flink22DeltaJoinITCase extends FlinkTableSourceITCase { "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", "c2, d2", "c2", - "cdc", - null); + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); // Sink PK (a1, a2) doesn't match upstream update key (c1, d1, c2, d2) + // TODO: this depends on Fluss supports MVCC/point-in-time lookup to support change upsert + // keys String sinkTableName = "sink_table_materializer"; createSink( sinkTableName, @@ -838,21 +1047,33 @@ public class Flink22DeltaJoinITCase extends FlinkTableSourceITCase { } @Test - void testDeltaJoinFailsWithNonDeterministicFunctions() throws Exception { + void testDeltaJoinFailsWithNonDeterministicFunctions() { String leftTableName = "left_table_nondeterministic"; createSource( - leftTableName, "a1 int, c1 bigint, d1 int", "c1, d1", "c1", "insert_only", null); + leftTableName, + "a1 int, c1 bigint, d1 int", + "c1, d1", + "c1", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); String rightTableName = "right_table_nondeterministic"; createSource( - rightTableName, "a2 int, c2 bigint, d2 int", "c2, d2", "c2", "insert_only", null); + rightTableName, + "a2 int, c2 bigint, d2 int", + "c2, d2", + "c2", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); String sinkTableName = "sink_table_nondeterministic"; - createSink(sinkTableName, "a1 int, c1 bigint, rand_val double", "c1"); + // TODO this should be supported in Flink in future for non-deterministic functions before + // sinking + createSink(sinkTableName, "c1 bigint, d1 bigint, rand_val double", "c1"); String sql = String.format( - "INSERT INTO %s SELECT a1, c1, RAND() FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2", + "INSERT INTO %s SELECT c1, d1, RAND() FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2", sinkTableName, leftTableName, rightTableName); assertThatThrownBy(() -> tEnv.explainSql(sql)) diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java index abb74ebf6..0421cc561 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java @@ -49,7 +49,7 @@ class FlussTableLakeSnapshotCommitterTest extends FlinkTestBase { private FlussTableLakeSnapshotCommitter flussTableLakeSnapshotCommitter; @BeforeEach - void beforeEach() { + public void beforeEach() { flussTableLakeSnapshotCommitter = new FlussTableLakeSnapshotCommitter(FLUSS_CLUSTER_EXTENSION.getClientConfig()); flussTableLakeSnapshotCommitter.open(); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java index bdecfa8fa..70d86bb2c 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java @@ -69,7 +69,7 @@ class TieringCommitOperatorTest extends FlinkTestBase { private StreamOperatorParameters<CommittableMessage<TestingCommittable>> parameters; @BeforeEach - void beforeEach() throws Exception { + public void beforeEach() throws Exception { mockOperatorEventGateway = new MockOperatorEventGateway(); MockOperatorEventDispatcher mockOperatorEventDispatcher = new MockOperatorEventDispatcher(mockOperatorEventGateway); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java index bfa814ce2..7c80bbc0c 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java @@ -155,7 +155,7 @@ public class FlinkTestBase extends AbstractTestBase { } @BeforeEach - void beforeEach() throws Exception { + public void beforeEach() throws Exception { admin.createDatabase(DEFAULT_DB, DatabaseDescriptor.EMPTY, true).get(); }
