lincoln-lil commented on code in PR #20482:
URL: https://github.com/apache/flink/pull/20482#discussion_r940082428


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala:
##########
@@ -543,6 +554,201 @@ class LookupJoinTest(legacyTableSource: Boolean) extends 
TableTestBase with Seri
     util.verifyExecPlan(sql)
   }
 
+  @Test
+  def testInvalidJoinHint(): Unit = {

Review Comment:
   ok



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java:
##########
@@ -179,6 +181,9 @@ public abstract class CommonExecLookupJoin extends 
ExecNodeBase<RowData>
     @JsonProperty(FIELD_NAME_INPUT_CHANGELOG_MODE)
     private final ChangelogMode inputChangelogMode;
 
+    @JsonProperty(FIELD_NAME_JOIN_HINT)
+    private final @Nullable LookupJoinHintSpec joinHintSpec;

Review Comment:
   ok



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala:
##########
@@ -543,6 +554,201 @@ class LookupJoinTest(legacyTableSource: Boolean) extends 
TableTestBase with Seri
     util.verifyExecPlan(sql)
   }
 
+  @Test
+  def testInvalidJoinHint(): Unit = {
+    // lost required hint option 'table'
+    expectExceptionThrown(
+      """
+        |SELECT /*+ LOOKUP('tableName'='LookupTable') */ *
+        |FROM MyTable AS T
+        |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+        | ON T.a = D.id
+        |""".stripMargin,
+      "Invalid LOOKUP hint: incomplete required option(s): [Key: 'table' , 
default: null (fallback keys: [])]",
+      classOf[AssertionError]
+    )
+
+    // invalid async option value
+    expectExceptionThrown(
+      """
+        |SELECT /*+ LOOKUP('table'='LookupTable', 'async'='yes') */ *
+        |FROM MyTable AS T
+        |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+        | ON T.a = D.id
+        |""".stripMargin,
+      "Invalid LOOKUP hint options, parsing error: Could not parse value 'yes' 
for key 'async'",

Review Comment:
   will change to "Invalid LOOKUP hint options: Could not..."



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java:
##########
@@ -144,102 +152,210 @@ private LookupJoinUtil() {
         // no instantiation
     }
 
+    /** AsyncLookupOptions includes async related options. */
+    public static class AsyncLookupOptions {
+        public final int asyncBufferCapacity;
+        public final long asyncTimeout;
+        public final ExecutionConfigOptions.AsyncOutputMode asyncOutputMode;
+
+        public AsyncLookupOptions(
+                int asyncBufferCapacity,
+                long asyncTimeout,
+                ExecutionConfigOptions.AsyncOutputMode asyncOutputMode) {
+            this.asyncBufferCapacity = asyncBufferCapacity;
+            this.asyncTimeout = asyncTimeout;
+            this.asyncOutputMode = asyncOutputMode;
+        }
+    }
+
     /** Gets lookup keys sorted by index in ascending order. */
     public static int[] getOrderedLookupKeys(Collection<Integer> 
allLookupKeys) {
         List<Integer> lookupKeyIndicesInOrder = new ArrayList<>(allLookupKeys);
         lookupKeyIndicesInOrder.sort(Integer::compareTo);
         return 
lookupKeyIndicesInOrder.stream().mapToInt(Integer::intValue).toArray();
     }
 
-    /** Gets LookupFunction from temporal table according to the given lookup 
keys. */
-    public static UserDefinedFunction getLookupFunction(
-            RelOptTable temporalTable, Collection<Integer> lookupKeys) {
-        return getLookupFunction(temporalTable, lookupKeys, false);
-    }
-
     /**
-     * Gets LookupFunction from temporal table according to the given lookup 
keys. If specifies
-     * requireSyncLookup, then only sync function will be created or raise an 
error if not
-     * implemented.
+     * Gets LookupFunction from temporal table according to the given lookup 
keys with preference.
+     *
+     * @return the UserDefinedFunction by preferable lookup mode, if require
      */
     public static UserDefinedFunction getLookupFunction(
-            RelOptTable temporalTable, Collection<Integer> lookupKeys, boolean 
requireSyncLookup) {
+            RelOptTable temporalTable,
+            Collection<Integer> lookupKeys,
+            LookupJoinHintSpec joinHintSpec,
+            boolean upsertMaterialize) {
+        // async & sync lookup candidates
+        Tuple2<UserDefinedFunction, UserDefinedFunction> lookupFunctions = new 
Tuple2<>();

Review Comment:
   ok, will add a private static class LookupFunctionCandidates



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to