godfreyhe commented on code in PR #20482:
URL: https://github.com/apache/flink/pull/20482#discussion_r940046214
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala:
##########
@@ -543,6 +554,201 @@ class LookupJoinTest(legacyTableSource: Boolean) extends
TableTestBase with Seri
util.verifyExecPlan(sql)
}
+ @Test
+ def testInvalidJoinHint(): Unit = {
+ // lost required hint option 'table'
+ expectExceptionThrown(
+ """
+ |SELECT /*+ LOOKUP('tableName'='LookupTable') */ *
+ |FROM MyTable AS T
+ |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ | ON T.a = D.id
+ |""".stripMargin,
+ "Invalid LOOKUP hint: incomplete required option(s): [Key: 'table' ,
default: null (fallback keys: [])]",
+ classOf[AssertionError]
+ )
+
+ // invalid async option value
+ expectExceptionThrown(
+ """
+ |SELECT /*+ LOOKUP('table'='LookupTable', 'async'='yes') */ *
+ |FROM MyTable AS T
+ |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ | ON T.a = D.id
+ |""".stripMargin,
+ "Invalid LOOKUP hint options, parsing error: Could not parse value 'yes'
for key 'async'",
Review Comment:
` parsing error` will cause misunderstanding, just remove it
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java:
##########
@@ -144,102 +152,210 @@ private LookupJoinUtil() {
// no instantiation
}
+ /** AsyncLookupOptions includes async related options. */
+ public static class AsyncLookupOptions {
+ public final int asyncBufferCapacity;
+ public final long asyncTimeout;
+ public final ExecutionConfigOptions.AsyncOutputMode asyncOutputMode;
+
+ public AsyncLookupOptions(
+ int asyncBufferCapacity,
+ long asyncTimeout,
+ ExecutionConfigOptions.AsyncOutputMode asyncOutputMode) {
+ this.asyncBufferCapacity = asyncBufferCapacity;
+ this.asyncTimeout = asyncTimeout;
+ this.asyncOutputMode = asyncOutputMode;
+ }
+ }
+
/** Gets lookup keys sorted by index in ascending order. */
public static int[] getOrderedLookupKeys(Collection<Integer>
allLookupKeys) {
List<Integer> lookupKeyIndicesInOrder = new ArrayList<>(allLookupKeys);
lookupKeyIndicesInOrder.sort(Integer::compareTo);
return
lookupKeyIndicesInOrder.stream().mapToInt(Integer::intValue).toArray();
}
- /** Gets LookupFunction from temporal table according to the given lookup
keys. */
- public static UserDefinedFunction getLookupFunction(
- RelOptTable temporalTable, Collection<Integer> lookupKeys) {
- return getLookupFunction(temporalTable, lookupKeys, false);
- }
-
/**
- * Gets LookupFunction from temporal table according to the given lookup
keys. If specifies
- * requireSyncLookup, then only sync function will be created or raise an
error if not
- * implemented.
+ * Gets LookupFunction from temporal table according to the given lookup
keys with preference.
+ *
+ * @return the UserDefinedFunction by preferable lookup mode, if require
*/
public static UserDefinedFunction getLookupFunction(
- RelOptTable temporalTable, Collection<Integer> lookupKeys, boolean
requireSyncLookup) {
+ RelOptTable temporalTable,
+ Collection<Integer> lookupKeys,
+ LookupJoinHintSpec joinHintSpec,
+ boolean upsertMaterialize) {
+ // async & sync lookup candidates
+ Tuple2<UserDefinedFunction, UserDefinedFunction> lookupFunctions = new
Tuple2<>();
Review Comment:
introduce a new class to represent the functions, which is more clear and
readable
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java:
##########
@@ -179,6 +181,9 @@ public abstract class CommonExecLookupJoin extends
ExecNodeBase<RowData>
@JsonProperty(FIELD_NAME_INPUT_CHANGELOG_MODE)
private final ChangelogMode inputChangelogMode;
+ @JsonProperty(FIELD_NAME_JOIN_HINT)
+ private final @Nullable LookupJoinHintSpec joinHintSpec;
Review Comment:
ignore if joinHintSpec is null
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/LookupJoinHintSpec.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.spec;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.retryable.RetryPredicates;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.planner.hint.LookupJoinHintOptions;
+import
org.apache.flink.table.runtime.operators.join.lookup.ResultRetryStrategy;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.apache.calcite.rel.hint.RelHint;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Objects;
+
+import static
org.apache.flink.table.planner.hint.LookupJoinHintOptions.ASYNC_CAPACITY;
+import static
org.apache.flink.table.planner.hint.LookupJoinHintOptions.ASYNC_LOOKUP;
+import static
org.apache.flink.table.planner.hint.LookupJoinHintOptions.ASYNC_OUTPUT_MODE;
+import static
org.apache.flink.table.planner.hint.LookupJoinHintOptions.ASYNC_TIMEOUT;
+import static
org.apache.flink.table.planner.hint.LookupJoinHintOptions.FIXED_DELAY;
+import static
org.apache.flink.table.planner.hint.LookupJoinHintOptions.MAX_ATTEMPTS;
+import static
org.apache.flink.table.planner.hint.LookupJoinHintOptions.RETRY_PREDICATE;
+import static
org.apache.flink.table.planner.hint.LookupJoinHintOptions.RETRY_STRATEGY;
+import static
org.apache.flink.table.runtime.operators.join.lookup.ResultRetryStrategy.NO_RETRY_STRATEGY;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * LookupJoinHintSpec describes the user specified hint options for lookup
join.
+ *
+ * <p>This class corresponds to {@link
+ * org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin}
rel node.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class LookupJoinHintSpec {
+ public static final String FIELD_NAME_ASYNC = "async";
+ public static final String FIELD_NAME_ASYNC_OUTPUT_MODE = "output-mode";
+ public static final String FIELD_NAME_ASYNC_CAPACITY = "capacity";
+ public static final String FIELD_NAME_ASYNC_TIMEOUT = "timeout";
+ public static final String FIELD_NAME_RETRY_PREDICATE = "retry-predicate";
+ public static final String FIELD_NAME_RETRY_STRATEGY = "retry-strategy";
+ public static final String FIELD_NAME_RETRY_FIXED_DELAY = "fixed-delay";
+ public static final String FIELD_NAME_RETRY_MAX_ATTEMPTS = "max-attempts";
+
+ @JsonProperty(FIELD_NAME_ASYNC)
+ private final @Nullable Boolean async;
+
+ @JsonProperty(FIELD_NAME_ASYNC_OUTPUT_MODE)
+ private final ExecutionConfigOptions.AsyncOutputMode asyncOutputMode;
+
+ @JsonProperty(FIELD_NAME_ASYNC_CAPACITY)
+ private final Integer asyncCapacity;
+
+ @JsonProperty(FIELD_NAME_ASYNC_TIMEOUT)
+ private final Long asyncTimeout;
+
+ @JsonProperty(FIELD_NAME_RETRY_PREDICATE)
+ private final @Nullable String retryPredicate;
Review Comment:
ignore empty: @JsonInclude(JsonInclude.Include.NON_NULL)
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala:
##########
@@ -543,6 +554,201 @@ class LookupJoinTest(legacyTableSource: Boolean) extends
TableTestBase with Seri
util.verifyExecPlan(sql)
}
+ @Test
+ def testInvalidJoinHint(): Unit = {
Review Comment:
please add some tests which hints refers an inner table name
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]