Au-Miner commented on code in PR #27099:
URL: https://github.com/apache/flink/pull/27099#discussion_r2438878155


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/deltajoin/AsyncDeltaJoinRunner.java:
##########
@@ -191,26 +274,45 @@ public static final class JoinedRowResultFuture 
implements ResultFuture<Object>
         private final TableFunctionResultFuture<RowData> 
joinConditionResultFuture;
         private final DataStructureConverter<RowData, Object> resultConverter;
 
+        private final boolean enableCache;
+        private final DeltaJoinCache cache;
+
         private final DelegateResultFuture delegate;
         private final boolean treatRightAsLookupTable;
 
+        private final RowDataKeySelector leftUpsertKeySelector;
+        private final RowDataKeySelector rightUpsertKeySelector;
+
+        private @Nullable RowData streamJoinKey;
         private RowData streamRow;
         private ResultFuture<RowData> realOutput;
 
         private JoinedRowResultFuture(
                 BlockingQueue<JoinedRowResultFuture> resultFutureBuffer,
                 TableFunctionResultFuture<RowData> joinConditionResultFuture,
                 DataStructureConverter<RowData, Object> resultConverter,
-                boolean treatRightAsLookupTable) {
+                boolean enableCache,
+                DeltaJoinCache cache,
+                boolean treatRightAsLookupTable,

Review Comment:
   JoinedRowResultFuture should like AsyncDeltaJoinRunner. Place enableCache, 
leftJoinKeySelector, rightUpsertKeySelector, and cache after 
treatRightAsLookupTable.



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeltaJoinITCase.scala:
##########
@@ -129,7 +136,38 @@ class DeltaJoinITCase extends StreamingTestBase {
       .hasMessageContaining("The current sql doesn't support to do delta join 
optimization.")
   }
 
-  @Test
+  @TestTemplate
+  def testDataWithSameJoinKeyColValues(): Unit = {

Review Comment:
   Perhaps we can change the above to testWithSameJoinKeyColValues here



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeltaJoinITCase.scala:
##########
@@ -129,7 +136,38 @@ class DeltaJoinITCase extends StreamingTestBase {
       .hasMessageContaining("The current sql doesn't support to do delta join 
optimization.")
   }
 
-  @Test
+  @TestTemplate
+  def testDataWithSameJoinKeyColValues(): Unit = {
+    val data1 = List(
+      changelogRow("+I", Double.box(1.0), Int.box(1), LocalDateTime.of(2021, 
1, 1, 1, 1, 1)),
+      changelogRow("+I", Double.box(1.0), Int.box(1), LocalDateTime.of(2022, 
2, 2, 2, 2, 2)),
+      // mismatch
+      changelogRow("+I", Double.box(3.0), Int.box(3), LocalDateTime.of(2023, 
3, 3, 3, 3, 3))
+    )
+
+    val data2 = List(
+      changelogRow("+I", Int.box(1), Double.box(1.0), LocalDateTime.of(2021, 
1, 1, 1, 1, 1)),
+      changelogRow("+I", Int.box(1), Double.box(1.0), LocalDateTime.of(2022, 
2, 2, 2, 2, 22)),
+      // mismatch
+      changelogRow("+I", Int.box(99), Double.box(99.0), LocalDateTime.of(2099, 
2, 2, 2, 2, 2))
+    )
+
+    // TestValuesRuntimeFunctions#KeyedUpsertingSinkFunction will change the 
RowKind from
+    // "+U" to "+I"
+    val expected = List(
+      "+I[1.0, 1, 2022-02-02T02:02:02, 1, 1.0, 2022-02-02T02:02:22]"
+    )
+    testUpsertResult(
+      List("a1"),
+      List("b1"),
+      data1,
+      data2,
+      "a1 = b1",
+      expected,
+      if (enableCache) 4 else 6)
+  }
+

Review Comment:
   Can you add a test to see if the result of using enableCache in the case of 
JoinKeyContainsIndex and SameJoinKeyColValues meets expectations.
   
   We can name it by testWithSameJoinKeyColValues1 and 
testWithSameJoinKeyColValues2. At the same time, it is necessary to provide a 
note explaining the difference between the two tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to