xuyangzhong commented on code in PR #2268:
URL: https://github.com/apache/fluss/pull/2268#discussion_r2703112017
##########
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java:
##########
@@ -33,6 +34,7 @@
import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows;
import static org.apache.fluss.testutils.DataTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** IT case for {@link FlinkTableSource} in Flink 2.2. */
public class Flink22TableSourceITCase extends FlinkTableSourceITCase {
Review Comment:
I’m thinking it might be better to use a separate `Flink22DeltaJoinITCase`
to hold these Delta Join–related tests, since there are now more delta join
related tests, and there’s some common logic we could factor out, for example,
setting the FORCE strategy, setting the default parallelism to 2, and creating
the source and sink tables. WDYT?
##########
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java:
##########
@@ -234,6 +236,83 @@ void testDeltaJoinWithProjectionAndFilter() throws
Exception {
assertResultsIgnoreOrder(collected, expected, true);
}
+ @Test
+ void testDeltaJoinFailsWhenFilterOnNonUpsertKeys() throws Exception {
+ // When filtering on non-upsert-key columns with CDC sources,
+ // the optimizer can't use DeltaJoin
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
+
+ String leftTableName = "left_table_force_fail";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " b1 varchar, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " e1 bigint, "
+ + " primary key (c1, d1) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c1', "
+ + " 'table.delete.behavior' = 'IGNORE' "
+ + ")",
+ leftTableName));
+
+ String rightTableName = "right_table_force_fail";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ("
+ + " a2 int, "
+ + " b2 varchar, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " e2 bigint, "
+ + " primary key (c2, d2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c2', "
+ + " 'table.delete.behavior' = 'IGNORE' "
+ + ")",
+ rightTableName));
+
+ String sinkTableName = "sink_table_force_fail";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " b1 varchar, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " e1 bigint, "
+ + " a2 int, "
+ + " b2 varchar, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " e2 bigint, "
+ + " primary key (c1, d1, c2, d2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss' "
+ + ")",
+ sinkTableName));
+
+ // Use FORCE strategy - should throw exception
+ tEnv.getConfig()
+ .set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
+ OptimizerConfigOptions.DeltaJoinStrategy.FORCE);
+
+ // Filter on e1 > e2, where e1 and e2 are NOT part of the upsert key
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 =
c2 AND d1 = d2 WHERE e1 > e2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThatThrownBy(() -> tEnv.explainSql(sql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("doesn't support to do delta join
optimization");
Review Comment:
add a more test here.
```
// Non-equiv-cond on e1 > e2, where e1 and e2 are NOT part of the upsert key
String sql2 =
String.format(
"INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1
= c2 AND d1 = d2 AND e1 > e2",
sinkTableName, leftTableName, rightTableName);
assertThatThrownBy(() -> tEnv.explainSql(sql2))
.isInstanceOf(ValidationException.class)
.hasMessageContaining("doesn't support to do delta join
optimization");
```
##########
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java:
##########
@@ -306,4 +385,622 @@ void testDeltaJoinWithLookupCache() throws Exception {
List<String> expected = Arrays.asList("+I[1, 1]", "-U[1, 1]", "+U[1,
1]");
assertResultsIgnoreOrder(collected, expected, true);
}
+
+ @Test
+ void testDeltaJoinWithPrimaryKeyTableNoDeletes() throws Exception {
+ // Test delta join with normal primary key tables (not first_row) using
+ // table.delete.behavior=IGNORE
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
+
+ String leftTableName = "left_table_normal_pk";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " b1 varchar, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " e1 bigint, "
+ + " primary key (c1, d1) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c1', "
+ + " 'table.delete.behavior' = 'IGNORE' "
+ + ")",
+ leftTableName));
+ List<InternalRow> rows1 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v2", 200L, 2, 20000L),
+ row(3, "v3", 300L, 3, 30000L));
+ TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
+ writeRows(conn, leftTablePath, rows1, false);
+
+ String rightTableName = "right_table_normal_pk";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ("
+ + " a2 int, "
+ + " b2 varchar, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " e2 bigint, "
+ + " primary key (c2, d2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c2', "
+ + " 'table.delete.behavior' = 'IGNORE' "
+ + ")",
+ rightTableName));
+ List<InternalRow> rows2 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v4", 200L, 2, 20000L),
+ row(3, "v5", 400L, 4, 40000L));
+ TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
+ writeRows(conn, rightTablePath, rows2, false);
+
+ String sinkTableName = "sink_table_normal_pk";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " b1 varchar, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " a2 int, "
+ + " b2 varchar, "
+ + " primary key (c1, d1) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss' "
+ + ")",
+ sinkTableName));
+
+ tEnv.getConfig()
+ .set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
+ OptimizerConfigOptions.DeltaJoinStrategy.FORCE);
+
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT a1, b1, c1, d1, a2, b2 FROM %s
INNER JOIN %s ON c1 = c2 AND d1 = d2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThat(tEnv.explainSql(sql))
+ .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2)
AND (d1 = d2))]");
+
+ tEnv.executeSql(sql);
+
+ CloseableIterator<Row> collected =
+ tEnv.executeSql(String.format("select * from %s",
sinkTableName)).collect();
+ List<String> expected =
+ Arrays.asList(
+ "+I[1, v1, 100, 1, 1, v1]",
+ "-U[1, v1, 100, 1, 1, v1]",
+ "+U[1, v1, 100, 1, 1, v1]",
+ "+I[2, v2, 200, 2, 2, v4]",
+ "-U[2, v2, 200, 2, 2, v4]",
+ "+U[2, v2, 200, 2, 2, v4]");
+ assertResultsIgnoreOrder(collected, expected, true);
+ }
+
+ @Test
+ void testDeltaJoinOnBucketKey() throws Exception {
+ // Test delta join on bucket key only (not full primary key)
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
+
+ String leftTableName = "left_table_bucket_key";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " b1 varchar, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " e1 bigint, "
+ + " primary key (c1, d1) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c1', "
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ leftTableName));
+ List<InternalRow> rows1 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v2", 100L, 2, 20000L),
+ row(3, "v3", 200L, 1, 30000L));
+ TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
+ writeRows(conn, leftTablePath, rows1, false);
+
+ String rightTableName = "right_table_bucket_key";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ("
+ + " a2 int, "
+ + " b2 varchar, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " e2 bigint, "
+ + " primary key (c2, d2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c2', "
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ rightTableName));
+ List<InternalRow> rows2 =
+ Arrays.asList(row(10, "r1", 100L, 5, 50000L), row(20, "r2",
200L, 6, 60000L));
+ TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
+ writeRows(conn, rightTablePath, rows2, false);
+
+ String sinkTableName = "sink_table_bucket_key";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " b1 varchar, "
+ + " c1 bigint, "
+ + " a2 int, "
+ + " b2 varchar, "
+ + " primary key (a1, a2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss' "
+ + ")",
+ sinkTableName));
+
+ tEnv.getConfig()
+ .set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
+ OptimizerConfigOptions.DeltaJoinStrategy.FORCE);
+
+ // Join on bucket key only (c1 = c2), not full primary key
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT a1, b1, c1, a2, b2 FROM %s
INNER JOIN %s ON c1 = c2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThat(tEnv.explainSql(sql))
+ .contains("DeltaJoin(joinType=[InnerJoin], where=[=(c1, c2)]");
+
+ tEnv.executeSql(sql);
+
+ CloseableIterator<Row> collected =
+ tEnv.executeSql(String.format("select * from %s",
sinkTableName)).collect();
+ // Each left row with c1=100 joins with the right row with c2=100
+ // Each left row with c1=200 joins with the right row with c2=200
+ List<String> expected =
+ Arrays.asList(
+ "+I[1, v1, 100, 10, r1]",
+ "-U[1, v1, 100, 10, r1]",
+ "+U[1, v1, 100, 10, r1]",
+ "+I[2, v2, 100, 10, r1]",
+ "-U[2, v2, 100, 10, r1]",
+ "+U[2, v2, 100, 10, r1]",
+ "+I[3, v3, 200, 20, r2]",
+ "-U[3, v3, 200, 20, r2]",
+ "+U[3, v3, 200, 20, r2]");
+ assertResultsIgnoreOrder(collected, expected, true);
+ }
+
+ @Test
+ void testDeltaJoinFailsWhenSourceHasDelete() throws Exception {
+ // When source can produce DELETE records, DeltaJoin is not applicable
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
+
+ // No merge-engine or delete.behavior - regular PK tables with full CDC
+ String leftTableName = "left_table_delete_force";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " b1 varchar, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " e1 bigint, "
+ + " primary key (c1, d1) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c1' "
+ + ")",
+ leftTableName));
+
+ String rightTableName = "right_table_delete_force";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ("
+ + " a2 int, "
+ + " b2 varchar, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " e2 bigint, "
+ + " primary key (c2, d2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c2' "
+ + ")",
+ rightTableName));
+
+ String sinkTableName = "sink_table_delete_force";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " b1 varchar, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " e1 bigint, "
+ + " a2 int, "
+ + " b2 varchar, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " e2 bigint, "
+ + " primary key (c1, d1, c2, d2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss' "
+ + ")",
+ sinkTableName));
+
+ tEnv.getConfig()
+ .set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
+ OptimizerConfigOptions.DeltaJoinStrategy.FORCE);
+
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 =
c2 AND d1 = d2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThatThrownBy(() -> tEnv.explainSql(sql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("doesn't support to do delta join
optimization");
+ }
+
+ @Test
+ void testDeltaJoinFailsWhenJoinKeyNotContainIndex() throws Exception {
+ // When join key doesn't include at least one complete index,
DeltaJoin isn't applicable
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
+
+ String leftTableName = "left_table_no_idx_force";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " b1 varchar, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " e1 bigint, "
+ + " primary key (c1, d1) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c1', "
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ leftTableName));
+
+ String rightTableName = "right_table_no_idx_force";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ("
+ + " a2 int, "
+ + " b2 varchar, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " e2 bigint, "
+ + " primary key (c2, d2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c2', "
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ rightTableName));
+
+ String sinkTableName = "sink_table_no_idx_force";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " b1 varchar, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " e1 bigint, "
+ + " a2 int, "
+ + " b2 varchar, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " e2 bigint, "
+ + " primary key (a1, a2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss' "
+ + ")",
+ sinkTableName));
+
+ tEnv.getConfig()
+ .set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
+ OptimizerConfigOptions.DeltaJoinStrategy.FORCE);
+
+ // Join on a1 = a2, but index is on c1/c2 (bucket.key), not a1/a2
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON a1 =
a2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThatThrownBy(() -> tEnv.explainSql(sql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("doesn't support to do delta join
optimization");
+ }
+
+ @Test
+ void testDeltaJoinFailsWithLeftJoin() throws Exception {
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
+
+ String leftTableName = "left_table_left_fail";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " primary key (c1, d1) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c1', "
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ leftTableName));
+
+ String rightTableName = "right_table_left_fail";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ("
+ + " a2 int, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " primary key (c2, d2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c2', "
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ rightTableName));
+
+ String sinkTableName = "sink_table_left_fail";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " c1 bigint, "
+ + " a2 int, "
+ + " primary key (c1) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss' "
+ + ")",
+ sinkTableName));
+
+ tEnv.getConfig()
+ .set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
+ OptimizerConfigOptions.DeltaJoinStrategy.FORCE);
+
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT a1, c1, a2 FROM %s LEFT JOIN %s
ON c1 = c2 AND d1 = d2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThatThrownBy(() -> tEnv.explainSql(sql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("doesn't support to do delta join
optimization");
+ }
+
+ @Test
+ void testDeltaJoinFailsWithRightJoin() throws Exception {
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
+
+ String leftTableName = "left_table_right_fail";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " primary key (c1, d1) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c1', "
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ leftTableName));
+
+ String rightTableName = "right_table_right_fail";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ("
+ + " a2 int, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " primary key (c2, d2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c2', "
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ rightTableName));
+
+ String sinkTableName = "sink_table_right_fail";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " c2 bigint, "
+ + " a2 int, "
+ + " primary key (c2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss' "
+ + ")",
+ sinkTableName));
+
+ tEnv.getConfig()
+ .set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
+ OptimizerConfigOptions.DeltaJoinStrategy.FORCE);
+
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT a1, c2, a2 FROM %s RIGHT JOIN
%s ON c1 = c2 AND d1 = d2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThatThrownBy(() -> tEnv.explainSql(sql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("doesn't support to do delta join
optimization");
+ }
+
+ @Test
+ void testDeltaJoinFailsWithFullOuterJoin() throws Exception {
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
+
+ String leftTableName = "left_table_full_fail";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " primary key (c1, d1) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c1', "
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ leftTableName));
+
+ String rightTableName = "right_table_full_fail";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ("
+ + " a2 int, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " primary key (c2, d2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c2', "
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ rightTableName));
+
+ String sinkTableName = "sink_table_full_fail";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " c1 bigint, "
+ + " c2 bigint, "
+ + " a2 int, "
+ + " primary key (c1, c2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss' "
+ + ")",
+ sinkTableName));
+
+ tEnv.getConfig()
+ .set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
+ OptimizerConfigOptions.DeltaJoinStrategy.FORCE);
+
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT a1, c1, c2, a2 FROM %s FULL
OUTER JOIN %s ON c1 = c2 AND d1 = d2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThatThrownBy(() -> tEnv.explainSql(sql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("doesn't support to do delta join
optimization");
+ }
+
+ @Test
+ void testDeltaJoinFailsWithCascadeJoin() throws Exception {
Review Comment:
What about adding more tests from the doc like:
1. the join key includes more fields than the primary key.
2. sink materializer(the sink's primary key is different from the primary
keys of the two upstream sources.)
3. filters or projections contain non-deterministic functions like `rand`.
##########
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java:
##########
@@ -306,4 +385,622 @@ void testDeltaJoinWithLookupCache() throws Exception {
List<String> expected = Arrays.asList("+I[1, 1]", "-U[1, 1]", "+U[1,
1]");
assertResultsIgnoreOrder(collected, expected, true);
}
+
+ @Test
+ void testDeltaJoinWithPrimaryKeyTableNoDeletes() throws Exception {
+ // Test delta join with normal primary key tables (not first_row) using
+ // table.delete.behavior=IGNORE
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
+
+ String leftTableName = "left_table_normal_pk";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " b1 varchar, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " e1 bigint, "
+ + " primary key (c1, d1) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c1', "
+ + " 'table.delete.behavior' = 'IGNORE' "
+ + ")",
+ leftTableName));
+ List<InternalRow> rows1 =
Review Comment:
nit: Could we prepare some data with the same primary key to test it?
##########
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java:
##########
@@ -306,4 +385,622 @@ void testDeltaJoinWithLookupCache() throws Exception {
List<String> expected = Arrays.asList("+I[1, 1]", "-U[1, 1]", "+U[1,
1]");
assertResultsIgnoreOrder(collected, expected, true);
}
+
+ @Test
+ void testDeltaJoinWithPrimaryKeyTableNoDeletes() throws Exception {
+ // Test delta join with normal primary key tables (not first_row) using
+ // table.delete.behavior=IGNORE
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
+
+ String leftTableName = "left_table_normal_pk";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " b1 varchar, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " e1 bigint, "
+ + " primary key (c1, d1) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c1', "
+ + " 'table.delete.behavior' = 'IGNORE' "
+ + ")",
+ leftTableName));
+ List<InternalRow> rows1 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v2", 200L, 2, 20000L),
+ row(3, "v3", 300L, 3, 30000L));
+ TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
+ writeRows(conn, leftTablePath, rows1, false);
+
+ String rightTableName = "right_table_normal_pk";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ("
+ + " a2 int, "
+ + " b2 varchar, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " e2 bigint, "
+ + " primary key (c2, d2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c2', "
+ + " 'table.delete.behavior' = 'IGNORE' "
+ + ")",
+ rightTableName));
+ List<InternalRow> rows2 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v4", 200L, 2, 20000L),
+ row(3, "v5", 400L, 4, 40000L));
+ TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
+ writeRows(conn, rightTablePath, rows2, false);
+
+ String sinkTableName = "sink_table_normal_pk";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " b1 varchar, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " a2 int, "
+ + " b2 varchar, "
+ + " primary key (c1, d1) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss' "
+ + ")",
+ sinkTableName));
+
+ tEnv.getConfig()
+ .set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
+ OptimizerConfigOptions.DeltaJoinStrategy.FORCE);
+
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT a1, b1, c1, d1, a2, b2 FROM %s
INNER JOIN %s ON c1 = c2 AND d1 = d2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThat(tEnv.explainSql(sql))
+ .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2)
AND (d1 = d2))]");
+
+ tEnv.executeSql(sql);
+
+ CloseableIterator<Row> collected =
+ tEnv.executeSql(String.format("select * from %s",
sinkTableName)).collect();
+ List<String> expected =
+ Arrays.asList(
+ "+I[1, v1, 100, 1, 1, v1]",
+ "-U[1, v1, 100, 1, 1, v1]",
+ "+U[1, v1, 100, 1, 1, v1]",
+ "+I[2, v2, 200, 2, 2, v4]",
+ "-U[2, v2, 200, 2, 2, v4]",
+ "+U[2, v2, 200, 2, 2, v4]");
+ assertResultsIgnoreOrder(collected, expected, true);
+ }
+
+ @Test
+ void testDeltaJoinOnBucketKey() throws Exception {
+ // Test delta join on bucket key only (not full primary key)
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
+
+ String leftTableName = "left_table_bucket_key";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " b1 varchar, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " e1 bigint, "
+ + " primary key (c1, d1) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c1', "
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ leftTableName));
+ List<InternalRow> rows1 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v2", 100L, 2, 20000L),
+ row(3, "v3", 200L, 1, 30000L));
+ TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
+ writeRows(conn, leftTablePath, rows1, false);
+
+ String rightTableName = "right_table_bucket_key";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ("
+ + " a2 int, "
+ + " b2 varchar, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " e2 bigint, "
+ + " primary key (c2, d2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c2', "
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ rightTableName));
+ List<InternalRow> rows2 =
+ Arrays.asList(row(10, "r1", 100L, 5, 50000L), row(20, "r2",
200L, 6, 60000L));
+ TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
+ writeRows(conn, rightTablePath, rows2, false);
+
+ String sinkTableName = "sink_table_bucket_key";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " b1 varchar, "
+ + " c1 bigint, "
+ + " a2 int, "
+ + " b2 varchar, "
+ + " primary key (a1, a2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss' "
+ + ")",
+ sinkTableName));
+
+ tEnv.getConfig()
+ .set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
+ OptimizerConfigOptions.DeltaJoinStrategy.FORCE);
+
+ // Join on bucket key only (c1 = c2), not full primary key
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT a1, b1, c1, a2, b2 FROM %s
INNER JOIN %s ON c1 = c2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThat(tEnv.explainSql(sql))
+ .contains("DeltaJoin(joinType=[InnerJoin], where=[=(c1, c2)]");
+
+ tEnv.executeSql(sql);
+
+ CloseableIterator<Row> collected =
+ tEnv.executeSql(String.format("select * from %s",
sinkTableName)).collect();
+ // Each left row with c1=100 joins with the right row with c2=100
+ // Each left row with c1=200 joins with the right row with c2=200
+ List<String> expected =
+ Arrays.asList(
+ "+I[1, v1, 100, 10, r1]",
+ "-U[1, v1, 100, 10, r1]",
+ "+U[1, v1, 100, 10, r1]",
+ "+I[2, v2, 100, 10, r1]",
+ "-U[2, v2, 100, 10, r1]",
+ "+U[2, v2, 100, 10, r1]",
+ "+I[3, v3, 200, 20, r2]",
+ "-U[3, v3, 200, 20, r2]",
+ "+U[3, v3, 200, 20, r2]");
+ assertResultsIgnoreOrder(collected, expected, true);
+ }
+
+ @Test
+ void testDeltaJoinFailsWhenSourceHasDelete() throws Exception {
+ // When source can produce DELETE records, DeltaJoin is not applicable
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
+
+ // No merge-engine or delete.behavior - regular PK tables with full CDC
+ String leftTableName = "left_table_delete_force";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " b1 varchar, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " e1 bigint, "
+ + " primary key (c1, d1) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c1' "
+ + ")",
+ leftTableName));
+
+ String rightTableName = "right_table_delete_force";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ("
+ + " a2 int, "
+ + " b2 varchar, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " e2 bigint, "
+ + " primary key (c2, d2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c2' "
+ + ")",
+ rightTableName));
+
+ String sinkTableName = "sink_table_delete_force";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " b1 varchar, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " e1 bigint, "
+ + " a2 int, "
+ + " b2 varchar, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " e2 bigint, "
+ + " primary key (c1, d1, c2, d2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss' "
+ + ")",
+ sinkTableName));
+
+ tEnv.getConfig()
+ .set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
+ OptimizerConfigOptions.DeltaJoinStrategy.FORCE);
+
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 =
c2 AND d1 = d2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThatThrownBy(() -> tEnv.explainSql(sql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("doesn't support to do delta join
optimization");
+ }
+
+ @Test
+ void testDeltaJoinFailsWhenJoinKeyNotContainIndex() throws Exception {
+ // When join key doesn't include at least one complete index,
DeltaJoin isn't applicable
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
+
+ String leftTableName = "left_table_no_idx_force";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " b1 varchar, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " e1 bigint, "
+ + " primary key (c1, d1) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c1', "
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ leftTableName));
+
+ String rightTableName = "right_table_no_idx_force";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ("
+ + " a2 int, "
+ + " b2 varchar, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " e2 bigint, "
+ + " primary key (c2, d2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c2', "
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ rightTableName));
+
+ String sinkTableName = "sink_table_no_idx_force";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " b1 varchar, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " e1 bigint, "
+ + " a2 int, "
+ + " b2 varchar, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " e2 bigint, "
+ + " primary key (a1, a2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss' "
+ + ")",
+ sinkTableName));
+
+ tEnv.getConfig()
+ .set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
+ OptimizerConfigOptions.DeltaJoinStrategy.FORCE);
+
+ // Join on a1 = a2, but index is on c1/c2 (bucket.key), not a1/a2
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON a1 =
a2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThatThrownBy(() -> tEnv.explainSql(sql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("doesn't support to do delta join
optimization");
+ }
+
+ @Test
+ void testDeltaJoinFailsWithLeftJoin() throws Exception {
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
+
+ String leftTableName = "left_table_left_fail";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " primary key (c1, d1) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c1', "
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ leftTableName));
+
+ String rightTableName = "right_table_left_fail";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ("
+ + " a2 int, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " primary key (c2, d2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c2', "
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ rightTableName));
+
+ String sinkTableName = "sink_table_left_fail";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " c1 bigint, "
+ + " a2 int, "
+ + " primary key (c1) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss' "
+ + ")",
+ sinkTableName));
+
+ tEnv.getConfig()
+ .set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
+ OptimizerConfigOptions.DeltaJoinStrategy.FORCE);
+
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT a1, c1, a2 FROM %s LEFT JOIN %s
ON c1 = c2 AND d1 = d2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThatThrownBy(() -> tEnv.explainSql(sql))
Review Comment:
nit Merge `testDeltaJoinFailsWithLeftJoin`,
`testDeltaJoinFailsWithRightJoin` and `testDeltaJoinFailsWithFullOuterJoin` to
`testDeltaJoinFailsWithOuterJoin` and assert the thrown exception here three
times.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]