Copilot commented on code in PR #2289:
URL: https://github.com/apache/fluss/pull/2289#discussion_r2659615272
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java:
##########
@@ -1410,6 +1412,274 @@ void testStreamingReadPartitionComplexPushDown() throws
Exception {
assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
}
+ @Test
+ void testDeltaJoin() throws Exception {
+ Assumptions.assumeTrue(SchemaAdapter.supportIndex());
+ // start two jobs for this test: one for DML involving the delta join,
and the other for DQL
+ // to query the results of the sink table
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
+
+ String leftTableName = "left_table";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " b1 varchar, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " e1 bigint, "
+ + " primary key (c1, d1) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c1', "
+ // currently, delta join only support
append-only source
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ leftTableName));
+ List<InternalRow> rows1 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v2", 200L, 2, 20000L),
+ row(3, "v1", 300L, 3, 30000L),
+ row(4, "v4", 400L, 4, 40000L));
+ // write records
+ TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
+ writeRows(conn, leftTablePath, rows1, false);
+
+ String rightTableName = "right_table";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ("
+ + " a2 int, "
+ + " b2 varchar, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " e2 bigint, "
+ + " primary key (c2, d2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c2', "
+ // currently, delta join only support
append-only source
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ rightTableName));
+ List<InternalRow> rows2 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v3", 200L, 2, 20000L),
+ row(3, "v4", 300L, 4, 30000L),
+ row(4, "v4", 500L, 4, 50000L));
+ // write records
+ TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
+ writeRows(conn, rightTablePath, rows2, false);
+
+ String sinkTableName = "sink_table";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " b1 varchar, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " e1 bigint, "
+ + " a2 int, "
+ + " b2 varchar, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " e2 bigint, "
+ + " primary key (c1, d1, c2, d2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss' "
+ + ")",
+ sinkTableName));
+
+ tEnv.getConfig().set("table.optimizer.delta-join.strategy", "FORCE");
Review Comment:
The configuration API has been changed from a typed API to a string-based
API. The original Flink 2.2 code used
`OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY` with
`OptimizerConfigOptions.DeltaJoinStrategy.FORCE`, while the moved code uses
string literals `"table.optimizer.delta-join.strategy"` and `"FORCE"`. This
change might cause compatibility issues if the string-based configuration is
not supported in all Flink versions, or if the string values don't match the
expected configuration keys. Consider using the typed configuration API if it's
available in the common module, or verify that the string-based approach works
correctly across all supported Flink versions.
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java:
##########
@@ -1410,6 +1412,274 @@ void testStreamingReadPartitionComplexPushDown() throws
Exception {
assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
}
+ @Test
+ void testDeltaJoin() throws Exception {
+ Assumptions.assumeTrue(SchemaAdapter.supportIndex());
+ // start two jobs for this test: one for DML involving the delta join,
and the other for DQL
+ // to query the results of the sink table
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
+
+ String leftTableName = "left_table";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " b1 varchar, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " e1 bigint, "
+ + " primary key (c1, d1) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c1', "
+ // currently, delta join only support
append-only source
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ leftTableName));
+ List<InternalRow> rows1 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v2", 200L, 2, 20000L),
+ row(3, "v1", 300L, 3, 30000L),
+ row(4, "v4", 400L, 4, 40000L));
+ // write records
+ TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
+ writeRows(conn, leftTablePath, rows1, false);
+
+ String rightTableName = "right_table";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ("
+ + " a2 int, "
+ + " b2 varchar, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " e2 bigint, "
+ + " primary key (c2, d2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c2', "
+ // currently, delta join only support
append-only source
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ rightTableName));
+ List<InternalRow> rows2 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v3", 200L, 2, 20000L),
+ row(3, "v4", 300L, 4, 30000L),
+ row(4, "v4", 500L, 4, 50000L));
+ // write records
+ TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
+ writeRows(conn, rightTablePath, rows2, false);
+
+ String sinkTableName = "sink_table";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " b1 varchar, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " e1 bigint, "
+ + " a2 int, "
+ + " b2 varchar, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " e2 bigint, "
+ + " primary key (c1, d1, c2, d2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss' "
+ + ")",
+ sinkTableName));
+
+ tEnv.getConfig().set("table.optimizer.delta-join.strategy", "FORCE");
+
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 =
c2 AND d1 = d2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThat(tEnv.explainSql(sql))
+ .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2)
AND (d1 = d2))]");
+
+ tEnv.executeSql(sql);
+
+ CloseableIterator<Row> collected =
+ tEnv.executeSql(String.format("select * from %s",
sinkTableName)).collect();
+ List<String> expected =
+ Arrays.asList(
+ "+I[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
+ "-U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
+ "+U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
+ "+I[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]",
+ "-U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]",
+ "+U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]");
+ assertResultsIgnoreOrder(collected, expected, true);
+ }
+
+ @Test
+ void testDeltaJoinWithProjectionAndFilter() throws Exception {
+ Assumptions.assumeTrue(SchemaAdapter.supportIndex());
+ // start two jobs for this test: one for DML involving the delta join,
and the other for DQL
+ // to query the results of the sink table
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
+
+ String leftTableName = "left_table_proj";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " b1 varchar, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " e1 bigint, "
+ + " primary key (c1, d1) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c1', "
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ leftTableName));
+ List<InternalRow> rows1 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v2", 200L, 2, 20000L),
+ row(3, "v1", 300L, 3, 30000L));
+ TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
+ writeRows(conn, leftTablePath, rows1, false);
+
+ String rightTableName = "right_table_proj";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ("
+ + " a2 int, "
+ + " b2 varchar, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " e2 bigint, "
+ + " primary key (c2, d2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c2', "
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ rightTableName));
+ List<InternalRow> rows2 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v3", 200L, 2, 20000L),
+ row(3, "v4", 300L, 4, 30000L));
+ TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
+ writeRows(conn, rightTablePath, rows2, false);
+
+ String sinkTableName = "sink_table_proj";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " c1 bigint, "
+ + " a2 int, "
+ + " primary key (c1) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss' "
+ + ")",
+ sinkTableName));
+
+ tEnv.getConfig().set("table.optimizer.delta-join.strategy", "FORCE");
Review Comment:
The configuration API has been changed from a typed API to a string-based
API. The original Flink 2.2 code used
`OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY` with
`OptimizerConfigOptions.DeltaJoinStrategy.FORCE`, while the moved code uses
string literals `"table.optimizer.delta-join.strategy"` and `"FORCE"`. This
change might cause compatibility issues if the string-based configuration is
not supported in all Flink versions, or if the string values don't match the
expected configuration keys. Consider using the typed configuration API if it's
available in the common module, or verify that the string-based approach works
correctly across all supported Flink versions.
##########
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22CatalogITCase.java:
##########
@@ -17,158 +17,18 @@
package org.apache.fluss.flink.catalog;
-import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.junit.jupiter.api.Test;
-
-import static org.assertj.core.api.Assertions.assertThat;
/** IT case for catalog in Flink 2.2. */
public class Flink22CatalogITCase extends FlinkCatalogITCase {
Review Comment:
This method overrides [FlinkCatalogITCase.supportIndex](1); it is advisable
to add an Override annotation.
```suggestion
@Override
```
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java:
##########
@@ -878,6 +880,150 @@ void testCreateCatalogWithLakeProperties() throws
Exception {
.containsEntry("table.datalake.paimon.jdbc.password", "pass");
}
+ @Test
+ void testGetTableWithIndex() throws Exception {
+ Assumptions.assumeTrue(SchemaAdapter.supportIndex());
+ String tableName = "table_with_pk_only";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a int, "
+ + " b varchar, "
+ + " c bigint, "
+ + " primary key (a, b) NOT ENFORCED"
+ + ") with ( "
+ + " 'connector' = 'fluss' "
+ + ")",
+ tableName));
+ CatalogTable table = (CatalogTable) catalog.getTable(new
ObjectPath(DEFAULT_DB, tableName));
+ Schema expectedSchema =
+ SchemaAdapter.withIndex(
+ Schema.newBuilder()
+ .column("a", DataTypes.INT().notNull())
+ .column("b", DataTypes.STRING().notNull())
+ .column("c", DataTypes.BIGINT())
+ .primaryKey("a", "b")
+ .build(),
+ Collections.singletonList(Arrays.asList("a", "b")));
+ assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema);
+
+ tableName = "table_with_prefix_bucket_key";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a int, "
+ + " b varchar, "
+ + " c bigint, "
+ + " primary key (a, b) NOT ENFORCED"
+ + ") with ( "
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'a'"
+ + ")",
+ tableName));
+
+ table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB,
tableName));
+ expectedSchema =
+ SchemaAdapter.withIndex(
+ Schema.newBuilder()
+ .column("a", DataTypes.INT().notNull())
+ .column("b", DataTypes.STRING().notNull())
+ .column("c", DataTypes.BIGINT())
+ .primaryKey("a", "b")
+ .build(),
+ Arrays.asList(Arrays.asList("a", "b"),
Arrays.asList("a")));
+ assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema);
+
+ tableName = "table_with_bucket_key_is_not_prefix_pk";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a int, "
+ + " b varchar, "
+ + " c bigint, "
+ + " primary key (a, b) NOT ENFORCED"
+ + ") with ( "
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'b'"
+ + ")",
+ tableName));
+
+ table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB,
tableName));
+ expectedSchema =
+ SchemaAdapter.withIndex(
+ Schema.newBuilder()
+ .column("a", DataTypes.INT().notNull())
+ .column("b", DataTypes.STRING().notNull())
+ .column("c", DataTypes.BIGINT())
+ .primaryKey("a", "b")
+ .build(),
+ Collections.singletonList(Arrays.asList("a", "b")));
+ assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema);
+
+ tableName = "table_with_partition_1";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a int, "
+ + " b varchar, "
+ + " c bigint, "
+ + " dt varchar, "
+ + " primary key (a, b, dt) NOT ENFORCED "
+ + ") "
+ + " partitioned by (dt) "
+ + " with ( "
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'a'"
+ + ")",
+ tableName));
+
+ table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB,
tableName));
+ expectedSchema =
+ SchemaAdapter.withIndex(
+ Schema.newBuilder()
+ .column("a", DataTypes.INT().notNull())
+ .column("b", DataTypes.STRING().notNull())
+ .column("c", DataTypes.BIGINT())
+ .column("dt", DataTypes.STRING().notNull())
+ .primaryKey("a", "b", "dt")
+ .build(),
+ Arrays.asList(Arrays.asList("a", "b", "dt"),
Arrays.asList("a", "dt")));
+ assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema);
+
+ tableName = "table_with_partition_2";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a int, "
+ + " b varchar, "
+ + " c bigint, "
+ + " dt varchar, "
+ + " primary key (dt, a, b) NOT ENFORCED "
+ + ") "
+ + " partitioned by (dt) "
+ + " with ( "
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'a'"
+ + ")",
+ tableName));
+
+ table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB,
tableName));
+ expectedSchema =
+ SchemaAdapter.withIndex(
+ Schema.newBuilder()
+ .column("a", DataTypes.INT().notNull())
+ .column("b", DataTypes.STRING().notNull())
+ .column("c", DataTypes.BIGINT())
+ .column("dt", DataTypes.STRING().notNull())
+ .primaryKey("dt", "a", "b")
+ .build(),
+ Arrays.asList(Arrays.asList("dt", "a", "b"),
Arrays.asList("a", "dt")));
+ assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema);
+ }
+
+ protected boolean supportIndex() {
+ return false;
+ }
Review Comment:
The `supportIndex()` method is defined but never used within this class. The
tests use `SchemaAdapter.supportIndex()` instead. If this method is not
intended to be overridden by subclasses for a specific purpose, consider
removing it to avoid confusion. If it is meant to be used by subclasses, add
documentation explaining its purpose.
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java:
##########
@@ -1410,6 +1412,274 @@ void testStreamingReadPartitionComplexPushDown() throws
Exception {
assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
}
+ @Test
+ void testDeltaJoin() throws Exception {
+ Assumptions.assumeTrue(SchemaAdapter.supportIndex());
+ // start two jobs for this test: one for DML involving the delta join,
and the other for DQL
+ // to query the results of the sink table
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
+
+ String leftTableName = "left_table";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " b1 varchar, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " e1 bigint, "
+ + " primary key (c1, d1) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c1', "
+ // currently, delta join only support
append-only source
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ leftTableName));
+ List<InternalRow> rows1 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v2", 200L, 2, 20000L),
+ row(3, "v1", 300L, 3, 30000L),
+ row(4, "v4", 400L, 4, 40000L));
+ // write records
+ TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
+ writeRows(conn, leftTablePath, rows1, false);
+
+ String rightTableName = "right_table";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ("
+ + " a2 int, "
+ + " b2 varchar, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " e2 bigint, "
+ + " primary key (c2, d2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c2', "
+ // currently, delta join only support
append-only source
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ rightTableName));
+ List<InternalRow> rows2 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v3", 200L, 2, 20000L),
+ row(3, "v4", 300L, 4, 30000L),
+ row(4, "v4", 500L, 4, 50000L));
+ // write records
+ TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
+ writeRows(conn, rightTablePath, rows2, false);
+
+ String sinkTableName = "sink_table";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " b1 varchar, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " e1 bigint, "
+ + " a2 int, "
+ + " b2 varchar, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " e2 bigint, "
+ + " primary key (c1, d1, c2, d2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss' "
+ + ")",
+ sinkTableName));
+
+ tEnv.getConfig().set("table.optimizer.delta-join.strategy", "FORCE");
+
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 =
c2 AND d1 = d2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThat(tEnv.explainSql(sql))
+ .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2)
AND (d1 = d2))]");
+
+ tEnv.executeSql(sql);
+
+ CloseableIterator<Row> collected =
+ tEnv.executeSql(String.format("select * from %s",
sinkTableName)).collect();
+ List<String> expected =
+ Arrays.asList(
+ "+I[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
+ "-U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
+ "+U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
+ "+I[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]",
+ "-U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]",
+ "+U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]");
+ assertResultsIgnoreOrder(collected, expected, true);
+ }
+
+ @Test
+ void testDeltaJoinWithProjectionAndFilter() throws Exception {
+ Assumptions.assumeTrue(SchemaAdapter.supportIndex());
+ // start two jobs for this test: one for DML involving the delta join,
and the other for DQL
+ // to query the results of the sink table
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
+
+ String leftTableName = "left_table_proj";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " b1 varchar, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " e1 bigint, "
+ + " primary key (c1, d1) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c1', "
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ leftTableName));
+ List<InternalRow> rows1 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v2", 200L, 2, 20000L),
+ row(3, "v1", 300L, 3, 30000L));
+ TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
+ writeRows(conn, leftTablePath, rows1, false);
+
+ String rightTableName = "right_table_proj";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ("
+ + " a2 int, "
+ + " b2 varchar, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " e2 bigint, "
+ + " primary key (c2, d2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c2', "
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ rightTableName));
+ List<InternalRow> rows2 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v3", 200L, 2, 20000L),
+ row(3, "v4", 300L, 4, 30000L));
+ TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
+ writeRows(conn, rightTablePath, rows2, false);
+
+ String sinkTableName = "sink_table_proj";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " c1 bigint, "
+ + " a2 int, "
+ + " primary key (c1) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss' "
+ + ")",
+ sinkTableName));
+
+ tEnv.getConfig().set("table.optimizer.delta-join.strategy", "FORCE");
+
+ // Test with projection and filter
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT a1, c1, a2 FROM %s INNER JOIN
%s ON c1 = c2 AND d1 = d2 WHERE a1 > 1",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThat(tEnv.explainSql(sql))
+ .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2)
AND (d1 = d2))]");
+
+ tEnv.executeSql(sql);
+
+ CloseableIterator<Row> collected =
+ tEnv.executeSql(String.format("select * from %s",
sinkTableName)).collect();
+ List<String> expected = Arrays.asList("+I[2, 200, 2]", "-U[2, 200,
2]", "+U[2, 200, 2]");
+ assertResultsIgnoreOrder(collected, expected, true);
+ }
+
+ @Test
+ void testDeltaJoinWithLookupCache() throws Exception {
+ Assumptions.assumeTrue(SchemaAdapter.supportIndex());
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
+
+ String leftTableName = "left_table_cache";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " primary key (c1, d1) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c1', "
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ leftTableName));
+ List<InternalRow> rows1 = Arrays.asList(row(1, 100L, 1));
+ writeRows(conn, TablePath.of(DEFAULT_DB, leftTableName), rows1, false);
+
+ String rightTableName = "right_table_cache";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ("
+ + " a2 int, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " primary key (c2, d2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c2', "
+ + " 'table.merge-engine' = 'first_row', "
+ + " 'lookup.cache' = 'partial', "
+ + " 'lookup.partial-cache.max-rows' = '100' "
+ + ")",
+ rightTableName));
+ List<InternalRow> rows2 = Arrays.asList(row(1, 100L, 1));
+ writeRows(conn, TablePath.of(DEFAULT_DB, rightTableName), rows2,
false);
+
+ String sinkTableName = "sink_table_cache";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " a2 int, "
+ + " primary key (a1) NOT ENFORCED" // Dummy PK
+ + ") with ("
+ + " 'connector' = 'fluss' "
+ + ")",
+ sinkTableName));
+
+ tEnv.getConfig().set("table.optimizer.delta-join.strategy", "FORCE");
+
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT T1.a1, T2.a2 FROM %s AS T1
INNER JOIN %s AS T2 ON T1.c1 = T2.c2 AND T1.d1 = T2.d2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThat(tEnv.explainSql(sql))
+ .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2)
AND (d1 = d2))]");
+
+ tEnv.executeSql(sql);
+
+ CloseableIterator<Row> collected =
+ tEnv.executeSql(String.format("select * from %s",
sinkTableName)).collect();
+ List<String> expected = Arrays.asList("+I[1, 1]", "-U[1, 1]", "+U[1,
1]");
+ assertResultsIgnoreOrder(collected, expected, true);
+ }
+
+ protected boolean supportIndex() {
+ return false;
+ }
+
Review Comment:
The `supportIndex()` method is defined but never used within this class. The
tests use `SchemaAdapter.supportIndex()` instead. If this method is not
intended to be overridden by subclasses for a specific purpose, consider
removing it to avoid confusion. If it is meant to be used by subclasses, add
documentation explaining its purpose.
```suggestion
```
##########
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java:
##########
@@ -17,293 +17,10 @@
package org.apache.fluss.flink.source;
-import org.apache.fluss.metadata.TablePath;
-import org.apache.fluss.row.InternalRow;
-
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.flink.table.api.config.OptimizerConfigOptions;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.CloseableIterator;
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-import java.util.List;
-
-import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
-import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows;
-import static org.apache.fluss.testutils.DataTestUtils.row;
-import static org.assertj.core.api.Assertions.assertThat;
-
/** IT case for {@link FlinkTableSource} in Flink 2.2. */
public class Flink22TableSourceITCase extends FlinkTableSourceITCase {
Review Comment:
This method overrides [FlinkTableSourceITCase.supportIndex](1); it is
advisable to add an Override annotation.
```suggestion
@Override
```
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java:
##########
@@ -1410,6 +1412,274 @@ void testStreamingReadPartitionComplexPushDown() throws
Exception {
assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
}
+ @Test
+ void testDeltaJoin() throws Exception {
+ Assumptions.assumeTrue(SchemaAdapter.supportIndex());
+ // start two jobs for this test: one for DML involving the delta join,
and the other for DQL
+ // to query the results of the sink table
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
+
+ String leftTableName = "left_table";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " b1 varchar, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " e1 bigint, "
+ + " primary key (c1, d1) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c1', "
+ // currently, delta join only support
append-only source
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ leftTableName));
+ List<InternalRow> rows1 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v2", 200L, 2, 20000L),
+ row(3, "v1", 300L, 3, 30000L),
+ row(4, "v4", 400L, 4, 40000L));
+ // write records
+ TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
+ writeRows(conn, leftTablePath, rows1, false);
+
+ String rightTableName = "right_table";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ("
+ + " a2 int, "
+ + " b2 varchar, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " e2 bigint, "
+ + " primary key (c2, d2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c2', "
+ // currently, delta join only support
append-only source
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ rightTableName));
+ List<InternalRow> rows2 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v3", 200L, 2, 20000L),
+ row(3, "v4", 300L, 4, 30000L),
+ row(4, "v4", 500L, 4, 50000L));
+ // write records
+ TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
+ writeRows(conn, rightTablePath, rows2, false);
+
+ String sinkTableName = "sink_table";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " b1 varchar, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " e1 bigint, "
+ + " a2 int, "
+ + " b2 varchar, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " e2 bigint, "
+ + " primary key (c1, d1, c2, d2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss' "
+ + ")",
+ sinkTableName));
+
+ tEnv.getConfig().set("table.optimizer.delta-join.strategy", "FORCE");
+
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 =
c2 AND d1 = d2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThat(tEnv.explainSql(sql))
+ .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2)
AND (d1 = d2))]");
+
+ tEnv.executeSql(sql);
+
+ CloseableIterator<Row> collected =
+ tEnv.executeSql(String.format("select * from %s",
sinkTableName)).collect();
+ List<String> expected =
+ Arrays.asList(
+ "+I[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
+ "-U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
+ "+U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
+ "+I[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]",
+ "-U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]",
+ "+U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]");
+ assertResultsIgnoreOrder(collected, expected, true);
+ }
+
+ @Test
+ void testDeltaJoinWithProjectionAndFilter() throws Exception {
+ Assumptions.assumeTrue(SchemaAdapter.supportIndex());
+ // start two jobs for this test: one for DML involving the delta join,
and the other for DQL
+ // to query the results of the sink table
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
+
+ String leftTableName = "left_table_proj";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " b1 varchar, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " e1 bigint, "
+ + " primary key (c1, d1) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c1', "
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ leftTableName));
+ List<InternalRow> rows1 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v2", 200L, 2, 20000L),
+ row(3, "v1", 300L, 3, 30000L));
+ TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
+ writeRows(conn, leftTablePath, rows1, false);
+
+ String rightTableName = "right_table_proj";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ("
+ + " a2 int, "
+ + " b2 varchar, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " e2 bigint, "
+ + " primary key (c2, d2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c2', "
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ rightTableName));
+ List<InternalRow> rows2 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v3", 200L, 2, 20000L),
+ row(3, "v4", 300L, 4, 30000L));
+ TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
+ writeRows(conn, rightTablePath, rows2, false);
+
+ String sinkTableName = "sink_table_proj";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " c1 bigint, "
+ + " a2 int, "
+ + " primary key (c1) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss' "
+ + ")",
+ sinkTableName));
+
+ tEnv.getConfig().set("table.optimizer.delta-join.strategy", "FORCE");
+
+ // Test with projection and filter
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT a1, c1, a2 FROM %s INNER JOIN
%s ON c1 = c2 AND d1 = d2 WHERE a1 > 1",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThat(tEnv.explainSql(sql))
+ .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2)
AND (d1 = d2))]");
+
+ tEnv.executeSql(sql);
+
+ CloseableIterator<Row> collected =
+ tEnv.executeSql(String.format("select * from %s",
sinkTableName)).collect();
+ List<String> expected = Arrays.asList("+I[2, 200, 2]", "-U[2, 200,
2]", "+U[2, 200, 2]");
+ assertResultsIgnoreOrder(collected, expected, true);
+ }
+
+ @Test
+ void testDeltaJoinWithLookupCache() throws Exception {
+ Assumptions.assumeTrue(SchemaAdapter.supportIndex());
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
+
+ String leftTableName = "left_table_cache";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " primary key (c1, d1) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c1', "
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ leftTableName));
+ List<InternalRow> rows1 = Arrays.asList(row(1, 100L, 1));
+ writeRows(conn, TablePath.of(DEFAULT_DB, leftTableName), rows1, false);
+
+ String rightTableName = "right_table_cache";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ("
+ + " a2 int, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " primary key (c2, d2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c2', "
+ + " 'table.merge-engine' = 'first_row', "
+ + " 'lookup.cache' = 'partial', "
+ + " 'lookup.partial-cache.max-rows' = '100' "
+ + ")",
+ rightTableName));
+ List<InternalRow> rows2 = Arrays.asList(row(1, 100L, 1));
+ writeRows(conn, TablePath.of(DEFAULT_DB, rightTableName), rows2,
false);
+
+ String sinkTableName = "sink_table_cache";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " a2 int, "
+ + " primary key (a1) NOT ENFORCED" // Dummy PK
+ + ") with ("
+ + " 'connector' = 'fluss' "
+ + ")",
+ sinkTableName));
+
+ tEnv.getConfig().set("table.optimizer.delta-join.strategy", "FORCE");
Review Comment:
The configuration API has been changed from a typed API to a string-based
API. The original Flink 2.2 code used
`OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY` with
`OptimizerConfigOptions.DeltaJoinStrategy.FORCE`, while the moved code uses
string literals `"table.optimizer.delta-join.strategy"` and `"FORCE"`. This
change might cause compatibility issues if the string-based configuration is
not supported in all Flink versions, or if the string values don't match the
expected configuration keys. Consider using the typed configuration API if it's
available in the common module, or verify that the string-based approach works
correctly across all supported Flink versions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]