becketqin commented on code in PR #26567:
URL: https://github.com/apache/flink/pull/26567#discussion_r2208478502


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/AsyncTableUtil.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.utils;
+
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
+import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
+
+import java.time.Duration;
+
+import static 
org.apache.flink.table.runtime.operators.calc.async.RetryPredicates.ANY_EXCEPTION;
+import static 
org.apache.flink.table.runtime.operators.calc.async.RetryPredicates.EMPTY_RESPONSE;
+
+/** Contains utilities for {@link 
org.apache.flink.table.functions.AsyncTableFunction}. */
+public class AsyncTableUtil extends FunctionCallUtil {
+
+    /**
+     * Gets the options required to run the operator.
+     *
+     * @param config The config from which to fetch the options
+     * @return Extracted options
+     */
+    public static AsyncOptions getAsyncOptions(ExecNodeConfig config) {
+        return new AsyncOptions(
+                
config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_TABLE_MAX_CONCURRENT_OPERATIONS),
+                
config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_TABLE_TIMEOUT).toMillis(),
+                false,
+                AsyncDataStream.OutputMode.ORDERED);
+    }
+
+    @SuppressWarnings("unchecked")
+    public static AsyncRetryStrategy<RowData> 
getResultRetryStrategy(ExecNodeConfig config) {
+        ExecutionConfigOptions.RetryStrategy retryStrategy =
+                
config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_TABLE_RETRY_STRATEGY);
+        Duration retryDelay = 
config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_TABLE_RETRY_DELAY);
+        int retryMaxAttempts =
+                
config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_TABLE_MAX_RETRIES);
+        // Only fixed delay is allowed at the moment, so just ignore the 
config.
+        if (retryStrategy == ExecutionConfigOptions.RetryStrategy.FIXED_DELAY) 
{
+            return new 
AsyncRetryStrategies.FixedDelayRetryStrategyBuilder<RowData>(
+                            retryMaxAttempts, retryDelay.toMillis())
+                    .ifResult(EMPTY_RESPONSE)

Review Comment:
   Do we want to hard code this to retry on empty response? I think it should 
at least be configurable. The empty response can be a legit result in many 
cases. 
   
   I think the decision of whether retry on empty result should be left to the 
end users to decide. There are a few behaviors:
   1. treat empty result as a valid result without retries.
   2. treat empty result as an invalid result and retry, throw exception when 
retries are exhausted.
   3. treat empty result (or some result) as an invalid result and retry, 
fallback to use some result (e.g empty result) when retries are exhausted.
   The desired behavior may vary from record to record, e.g. use behavior 1 for 
the one record and behavior 3 for another record.
   
   The current code forces behavior 3 for all the records. I am not sure if 
this is an agreed behavior in the FLIP.
   
   One approach to support all the behaviors is following:
   * do not specify any result predicate.
   * To achieve behavior 1, users function can simply complete with an empty 
result.
   * To achieve behavior 2, users function implementation can complete the 
future with an exception.
   * To achieve behavior 3, users function implementation can complete the 
future with a special exception of `TableFunctionExceptionWithFallbackValue`. 
When retries are exhausted, the fallback value in the exception will be used.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/correlate/async/AsyncCorrelateRunner.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.correlate.async;
+
+import org.apache.flink.api.common.functions.OpenContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.CollectionSupplier;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.runtime.generated.GeneratedFunction;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Async function runner for {@link 
org.apache.flink.table.functions.AsyncTableFunction}. It invokes
+ * the UDF for each of the input rows, joining the responses with the input.
+ */
+public class AsyncCorrelateRunner extends RichAsyncFunction<RowData, RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final GeneratedFunction<AsyncFunction<RowData, Object>> 
generatedFetcher;
+    private final DataStructureConverter<RowData, Object> fetcherConverter;
+    private transient AsyncFunction<RowData, Object> fetcher;
+
+    public AsyncCorrelateRunner(
+            GeneratedFunction<AsyncFunction<RowData, Object>> generatedFetcher,
+            DataStructureConverter<RowData, Object> fetcherConverter) {
+        this.generatedFetcher = generatedFetcher;
+        this.fetcherConverter = fetcherConverter;
+    }
+
+    @Override
+    public void open(OpenContext openContext) throws Exception {
+        super.open(openContext);
+
+        ClassLoader cl = getRuntimeContext().getUserCodeClassLoader();
+        this.fetcher = generatedFetcher.newInstance(cl);
+
+        FunctionUtils.setFunctionRuntimeContext(fetcher, getRuntimeContext());
+        FunctionUtils.openFunction(fetcher, openContext);
+
+        fetcherConverter.open(cl);
+    }
+
+    @Override
+    public void asyncInvoke(RowData input, ResultFuture<RowData> resultFuture) 
throws Exception {
+        try {
+            JoinedRowResultFuture outResultFuture =
+                    new JoinedRowResultFuture(input, resultFuture, 
fetcherConverter);
+
+            fetcher.asyncInvoke(input, outResultFuture);
+        } catch (Throwable t) {
+            resultFuture.completeExceptionally(t);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        FunctionUtils.closeFunction(fetcher);
+    }
+
+    private static final class JoinedRowResultFuture implements 
ResultFuture<Object> {
+        private final DataStructureConverter<RowData, Object> resultConverter;
+
+        private RowData leftRow;
+        private ResultFuture<RowData> realOutput;
+
+        private JoinedRowResultFuture(
+                RowData row,
+                ResultFuture<RowData> resultFuture,
+                DataStructureConverter<RowData, Object> resultConverter) {
+            this.leftRow = row;
+            this.realOutput = resultFuture;
+            this.resultConverter = resultConverter;
+        }
+
+        @Override
+        public void complete(Collection<Object> result) {
+            try {
+                Collection<RowData> rightRows = 
wrapPrimitivesAndConvert(result);
+                completeResultFuture(rightRows);
+            } catch (Throwable t) {
+                realOutput.completeExceptionally(t);
+            }
+        }
+
+        private void completeResultFuture(Collection<RowData> rightRows) {
+            realOutput.complete(

Review Comment:
   I was trying to follow the execution flow here, and found it is a little 
confusing:
   1. The `RetryableResultHandlerDelegator#complete()` will always enter the 
retry branch first.
   2. Inside the retry branch, depending on whether the function result is 
empty or not, it may or may not hit retry. (See the other comment about 
`ResultPredicate`)
   
   Can we clean up the logic?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/correlate/async/DelegatingAsyncTableResultFuture.java:
##########
@@ -29,27 +33,46 @@
  * is used as a bridge between {@link 
org.apache.flink.table.functions.AsyncTableFunction} and
  * {@link org.apache.flink.streaming.api.functions.async.AsyncFunction}.
  */
-public class DelegatingResultFuture<OUT> implements 
BiConsumer<Collection<OUT>, Throwable> {
+public class DelegatingAsyncTableResultFuture implements 
BiConsumer<Collection<Object>, Throwable> {
 
-    private final ResultFuture<OUT> delegatedResultFuture;
-    private final CompletableFuture<Collection<OUT>> completableFuture;
+    private final ResultFuture<Object> delegatedResultFuture;
+    private final boolean needsWrapping;
+    private final boolean isInternalResultType;
 
-    public DelegatingResultFuture(ResultFuture<OUT> delegatedResultFuture) {
+    private final CompletableFuture<Collection<Object>> completableFuture;
+
+    public DelegatingAsyncTableResultFuture(
+            ResultFuture<Object> delegatedResultFuture,
+            boolean needsWrapping,
+            boolean isInternalResultType) {
         this.delegatedResultFuture = delegatedResultFuture;
+        this.needsWrapping = needsWrapping;
+        this.isInternalResultType = isInternalResultType;
         this.completableFuture = new CompletableFuture<>();
         this.completableFuture.whenComplete(this);
     }
 
     @Override
-    public void accept(Collection<OUT> outs, Throwable throwable) {
+    public void accept(Collection<Object> outs, Throwable throwable) {
         if (throwable != null) {
             delegatedResultFuture.completeExceptionally(throwable);
         } else {
+            if (needsWrapping) {

Review Comment:
   nit: arguably, the `needsWrapping` and `isInternalResultType` flags can be 
handled as a part of the CodeGen to avoid branching in the per record handling. 
But I guess JIT and the CPU branch prediction may kick in to reduce the 
performance impact.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to