Au-Miner commented on code in PR #27111:
URL: https://github.com/apache/flink/pull/27111#discussion_r2454137575


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeltaJoin.java:
##########
@@ -135,6 +140,10 @@ public class StreamExecDeltaJoin extends 
ExecNodeBase<RowData>
     @JsonProperty(FIELD_NAME_LEFT_JOIN_KEYS)
     private final int[] leftJoinKeys;
 
+    @JsonProperty(FIELD_NAME_LEFT_UPSERT_KEY)
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final int[] leftUpsertKeys;

Review Comment:
   An additional @Nullable annotation is required to indicate that it can be 
null



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeltaJoin.java:
##########
@@ -117,6 +119,9 @@ public class StreamExecDeltaJoin extends 
ExecNodeBase<RowData>
             "lookupRightTableJoinSpec";

Review Comment:
   I don't quite understand why the DELTA_JOIN_TRANSFORMATION here doesn't use 
deltaJoin but to use delta-join.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java:
##########
@@ -240,7 +248,28 @@ private static List<List<String>> 
getAllIndexesColumnsOfTable(
     private static boolean areJoinConditionsSupported(StreamPhysicalJoin join) 
{
         JoinInfo joinInfo = join.analyzeCondition();
         // there must be one pair of join key
-        return !joinInfo.pairs().isEmpty();
+        if (joinInfo.pairs().isEmpty()) {
+            return false;
+        }
+
+        // if this join output cdc records, the non-equiv condition must be 
applied on upsert key

Review Comment:
   Besides, I don't quite understand why there is such a restriction here



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala:
##########
@@ -188,6 +231,30 @@ class DeltaJoinTest extends TableTestBase {
         "where a3 > b0")
   }
 
+  @Test
+  def testFilterOnNonUpsertKeysAfterJoinWithCdcSourceWithoutDelete(): Unit = {

Review Comment:
   Can we set deltaJoin to force and use assertThatThrownBy to display the 
error reason in the test for failing to convert to deltaJoin



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java:
##########
@@ -240,7 +248,28 @@ private static List<List<String>> 
getAllIndexesColumnsOfTable(
     private static boolean areJoinConditionsSupported(StreamPhysicalJoin join) 
{
         JoinInfo joinInfo = join.analyzeCondition();
         // there must be one pair of join key
-        return !joinInfo.pairs().isEmpty();
+        if (joinInfo.pairs().isEmpty()) {
+            return false;
+        }
+
+        // if this join output cdc records, the non-equiv condition must be 
applied on upsert key

Review Comment:
    if this join output cdc records and has non-equiv condition, upsert key 
must contain non-equiv condition



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to