loserwang1024 commented on code in PR #2289:
URL: https://github.com/apache/fluss/pull/2289#discussion_r2660456685


##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java:
##########
@@ -1410,6 +1412,274 @@ void testStreamingReadPartitionComplexPushDown() throws 
Exception {
         assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
     }
 
+    @Test
+    void testDeltaJoin() throws Exception {
+        Assumptions.assumeTrue(SchemaAdapter.supportIndex());
+        // start two jobs for this test: one for DML involving the delta join, 
and the other for DQL
+        // to query the results of the sink table
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 2);
+
+        String leftTableName = "left_table";
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ( "
+                                + " a1 int, "
+                                + " b1 varchar, "
+                                + " c1 bigint, "
+                                + " d1 int, "
+                                + " e1 bigint, "
+                                + " primary key (c1, d1) NOT ENFORCED"
+                                + ") with ("
+                                + " 'connector' = 'fluss', "
+                                + " 'bucket.key' = 'c1', "
+                                // currently, delta join only support 
append-only source
+                                + " 'table.merge-engine' = 'first_row' "
+                                + ")",
+                        leftTableName));
+        List<InternalRow> rows1 =
+                Arrays.asList(
+                        row(1, "v1", 100L, 1, 10000L),
+                        row(2, "v2", 200L, 2, 20000L),
+                        row(3, "v1", 300L, 3, 30000L),
+                        row(4, "v4", 400L, 4, 40000L));
+        // write records
+        TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
+        writeRows(conn, leftTablePath, rows1, false);
+
+        String rightTableName = "right_table";
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ("
+                                + " a2 int, "
+                                + " b2 varchar, "
+                                + " c2 bigint, "
+                                + " d2 int, "
+                                + " e2 bigint, "
+                                + " primary key (c2, d2) NOT ENFORCED"
+                                + ") with ("
+                                + " 'connector' = 'fluss', "
+                                + " 'bucket.key' = 'c2', "
+                                // currently, delta join only support 
append-only source
+                                + " 'table.merge-engine' = 'first_row' "
+                                + ")",
+                        rightTableName));
+        List<InternalRow> rows2 =
+                Arrays.asList(
+                        row(1, "v1", 100L, 1, 10000L),
+                        row(2, "v3", 200L, 2, 20000L),
+                        row(3, "v4", 300L, 4, 30000L),
+                        row(4, "v4", 500L, 4, 50000L));
+        // write records
+        TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
+        writeRows(conn, rightTablePath, rows2, false);
+
+        String sinkTableName = "sink_table";
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ( "
+                                + " a1 int, "
+                                + " b1 varchar, "
+                                + " c1 bigint, "
+                                + " d1 int, "
+                                + " e1 bigint, "
+                                + " a2 int, "
+                                + " b2 varchar, "
+                                + " c2 bigint, "
+                                + " d2 int, "
+                                + " e2 bigint, "
+                                + " primary key (c1, d1, c2, d2) NOT ENFORCED"
+                                + ") with ("
+                                + " 'connector' = 'fluss' "
+                                + ")",
+                        sinkTableName));
+
+        tEnv.getConfig().set("table.optimizer.delta-join.strategy", "FORCE");

Review Comment:
   Why use "table.optimizer.delta-join.strategy" rather than 
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY is for compile 
compatibility. The lower version won't execute it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to