godfreyhe commented on code in PR #20531:
URL: https://github.com/apache/flink/pull/20531#discussion_r945446291
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java:
##########
@@ -196,9 +198,22 @@ public static int[]
getOrderedLookupKeys(Collection<Integer> allLookupKeys) {
}
/**
- * Gets LookupFunction from temporal table according to the given lookup
keys with preference.
+ * Gets lookup function (async or sync) from temporal table according to
the given lookup keys
+ * with considering {@link LookupJoinHintSpec} and required
upsertMaterialize. Note: if required
+ * upsertMaterialize is true, will return synchronous lookup function
only, otherwise prefers
+ * asynchronous lookup function except there's a hint option 'async' =
'false', will raise an
+ * error if both candidates not found.
*
- * @return the UserDefinedFunction by preferable lookup mode, if require
+ * <pre>{@code
+ * 1. if upsertMaterialize == true : require sync lookup orElse error
Review Comment:
or else ?
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/RetryableAsyncLookupFunctionDelegator.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.lookup;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncLookupFunction;
+import org.apache.flink.table.functions.FunctionContext;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
+
+/** A delegator holds user's {@link AsyncLookupFunction} to handle retries. */
+public class RetryableAsyncLookupFunctionDelegator extends AsyncLookupFunction
{
+
+ private final AsyncLookupFunction userLookupFunction;
+
+ private final ResultRetryStrategy retryStrategy;
+
+ private final boolean retryEnabled;
+
+ private transient Predicate<Collection<RowData>> retryResultPredicate;
+
+ public RetryableAsyncLookupFunctionDelegator(
+ @Nonnull AsyncLookupFunction userLookupFunction,
+ @Nonnull ResultRetryStrategy retryStrategy) {
+ this.userLookupFunction = userLookupFunction;
+ this.retryStrategy = retryStrategy;
+ this.retryEnabled =
retryStrategy.getRetryPredicate().resultPredicate().isPresent();
+ }
+
+ @Override
+ public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
+ if (!retryEnabled) {
+ return userLookupFunction.asyncLookup(keyRow);
+ }
+ CompletableFuture<Collection<RowData>> resultFuture = new
CompletableFuture<>();
+ lookupWithRetry(resultFuture, 1, keyRow);
+ return resultFuture;
+ }
+
+ private void lookupWithRetry(
+ final CompletableFuture<Collection<RowData>> resultFuture,
+ final int currentAttempts,
+ final RowData keyRow) {
+ CompletableFuture<Collection<RowData>> lookupFuture =
+ userLookupFunction.asyncLookup(keyRow);
+
+ lookupFuture.whenCompleteAsync(
+ (result, throwable) -> {
+ if (retryResultPredicate.test(result)
+ && retryStrategy.canRetry(currentAttempts)) {
+ long backoff =
retryStrategy.getBackoffTimeMillis(currentAttempts);
+ try {
+ Thread.sleep(backoff);
+ } catch (InterruptedException e) {
+ // Do not raise an error when interrupted, just
complete with last
+ // result intermediately.
+ resultFuture.complete(result);
+ return;
+ }
+ lookupWithRetry(resultFuture, currentAttempts + 1,
keyRow);
+ } else {
+ resultFuture.complete(result);
+ }
+ });
+ }
+
+ @Override
+ public void open(FunctionContext context) throws Exception {
+ super.open(context);
+ userLookupFunction.open(context);
+ retryResultPredicate =
+
retryStrategy.getRetryPredicate().resultPredicate().orElse(ignore -> false);
+ }
Review Comment:
nit: the open method can be moved to after constructor
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/RetryableAsyncLookupFunctionDelegator.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.lookup;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncLookupFunction;
+import org.apache.flink.table.functions.FunctionContext;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
+
+/** A delegator holds user's {@link AsyncLookupFunction} to handle retries. */
+public class RetryableAsyncLookupFunctionDelegator extends AsyncLookupFunction
{
+
+ private final AsyncLookupFunction userLookupFunction;
+
+ private final ResultRetryStrategy retryStrategy;
+
+ private final boolean retryEnabled;
+
+ private transient Predicate<Collection<RowData>> retryResultPredicate;
+
+ public RetryableAsyncLookupFunctionDelegator(
+ @Nonnull AsyncLookupFunction userLookupFunction,
+ @Nonnull ResultRetryStrategy retryStrategy) {
+ this.userLookupFunction = userLookupFunction;
+ this.retryStrategy = retryStrategy;
Review Comment:
please do some null check for given arguments
##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/RetryableAsyncLookupFunctionDelegatorTest.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join;
+
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+import org.apache.flink.streaming.util.retryable.RetryPredicates;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncLookupFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import
org.apache.flink.table.runtime.operators.join.lookup.ResultRetryStrategy;
+import
org.apache.flink.table.runtime.operators.join.lookup.RetryableAsyncLookupFunctionDelegator;
+import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.flink.table.data.StringData.fromString;
+
+/** Harness tests for {@link RetryableAsyncLookupFunctionDelegator}. */
+public class RetryableAsyncLookupFunctionDelegatorTest {
+
+ private final AsyncLookupFunction userLookupFunc = new
TestingAsyncLookupFunction();
+
+ private final ResultRetryStrategy retryStrategy =
+ ResultRetryStrategy.fixedDelayRetry(3, 10,
RetryPredicates.EMPTY_RESULT_PREDICATE);
+
+ private final RetryableAsyncLookupFunctionDelegator delegator =
+ new RetryableAsyncLookupFunctionDelegator(userLookupFunc,
retryStrategy);
+
+ private static final Map<RowData, Collection<RowData>> data = new
HashMap<>();
+
+ static {
+ data.put(
+ GenericRowData.of(1),
+ Collections.singletonList(GenericRowData.of(1,
fromString("Julian"))));
+ data.put(
+ GenericRowData.of(3),
+ Arrays.asList(
+ GenericRowData.of(3, fromString("Jark")),
+ GenericRowData.of(3, fromString("Jackson"))));
+ data.put(
+ GenericRowData.of(4),
+ Collections.singletonList(GenericRowData.of(4,
fromString("Fabian"))));
+ }
+
+ private final RowDataHarnessAssertor assertor =
+ new RowDataHarnessAssertor(
+ new LogicalType[] {
+ DataTypes.INT().getLogicalType(),
DataTypes.STRING().getLogicalType()
+ });
+
+ @Test
+ public void testLookupWithRetry() throws Exception {
Review Comment:
could you add more tests to cover the branches:
1. retryEnabled is disabled
2. different ResultRetryStrategy
3. ...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]