becketqin commented on code in PR #26567: URL: https://github.com/apache/flink/pull/26567#discussion_r2208478502
########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/AsyncTableUtil.java: ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.utils; + +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy; +import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig; + +import java.time.Duration; + +import static org.apache.flink.table.runtime.operators.calc.async.RetryPredicates.ANY_EXCEPTION; +import static org.apache.flink.table.runtime.operators.calc.async.RetryPredicates.EMPTY_RESPONSE; + +/** Contains utilities for {@link org.apache.flink.table.functions.AsyncTableFunction}. */ +public class AsyncTableUtil extends FunctionCallUtil { + + /** + * Gets the options required to run the operator. + * + * @param config The config from which to fetch the options + * @return Extracted options + */ + public static AsyncOptions getAsyncOptions(ExecNodeConfig config) { + return new AsyncOptions( + config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_TABLE_MAX_CONCURRENT_OPERATIONS), + config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_TABLE_TIMEOUT).toMillis(), + false, + AsyncDataStream.OutputMode.ORDERED); + } + + @SuppressWarnings("unchecked") + public static AsyncRetryStrategy<RowData> getResultRetryStrategy(ExecNodeConfig config) { + ExecutionConfigOptions.RetryStrategy retryStrategy = + config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_TABLE_RETRY_STRATEGY); + Duration retryDelay = config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_TABLE_RETRY_DELAY); + int retryMaxAttempts = + config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_TABLE_MAX_RETRIES); + // Only fixed delay is allowed at the moment, so just ignore the config. + if (retryStrategy == ExecutionConfigOptions.RetryStrategy.FIXED_DELAY) { + return new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder<RowData>( + retryMaxAttempts, retryDelay.toMillis()) + .ifResult(EMPTY_RESPONSE) Review Comment: Do we want to hard code this to retry on empty response? I think it should at least be configurable. The empty response can be a legit result in many cases. I think the decision of whether retry on empty result should be left to the end users to decide. There are a few behaviors: 1. treat empty result as a valid result without retries. 2. treat empty result as an invalid result and retry, throw exception when retries are exhausted. 3. treat empty result (or some result) as an invalid result and retry, fallback to use some result (e.g empty result) when retries are exhausted. The desired behavior may vary from record to record, e.g. use behavior 1 for the one record and behavior 3 for another record. The current code forces behavior 3 for all the records. I am not sure if this is an agreed behavior in the FLIP. One approach to support all the behaviors is following: * do not specify any result predicate. * To achieve behavior 1, users function can simply complete with an empty result. * To achieve behavior 2, users function implementation can complete the future with an exception. * To achieve behavior 3, users function implementation can complete the future with a special exception of `TableFunctionExceptionWithFallbackValue`. When retries are exhausted, the fallback value in the exception will be used. ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/correlate/async/AsyncCorrelateRunner.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.correlate.async; + +import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.functions.async.CollectionSupplier; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.conversion.DataStructureConverter; +import org.apache.flink.table.data.utils.JoinedRowData; +import org.apache.flink.table.runtime.generated.GeneratedFunction; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * Async function runner for {@link org.apache.flink.table.functions.AsyncTableFunction}. It invokes + * the UDF for each of the input rows, joining the responses with the input. + */ +public class AsyncCorrelateRunner extends RichAsyncFunction<RowData, RowData> { + + private static final long serialVersionUID = 1L; + + private final GeneratedFunction<AsyncFunction<RowData, Object>> generatedFetcher; + private final DataStructureConverter<RowData, Object> fetcherConverter; + private transient AsyncFunction<RowData, Object> fetcher; + + public AsyncCorrelateRunner( + GeneratedFunction<AsyncFunction<RowData, Object>> generatedFetcher, + DataStructureConverter<RowData, Object> fetcherConverter) { + this.generatedFetcher = generatedFetcher; + this.fetcherConverter = fetcherConverter; + } + + @Override + public void open(OpenContext openContext) throws Exception { + super.open(openContext); + + ClassLoader cl = getRuntimeContext().getUserCodeClassLoader(); + this.fetcher = generatedFetcher.newInstance(cl); + + FunctionUtils.setFunctionRuntimeContext(fetcher, getRuntimeContext()); + FunctionUtils.openFunction(fetcher, openContext); + + fetcherConverter.open(cl); + } + + @Override + public void asyncInvoke(RowData input, ResultFuture<RowData> resultFuture) throws Exception { + try { + JoinedRowResultFuture outResultFuture = + new JoinedRowResultFuture(input, resultFuture, fetcherConverter); + + fetcher.asyncInvoke(input, outResultFuture); + } catch (Throwable t) { + resultFuture.completeExceptionally(t); + } + } + + @Override + public void close() throws Exception { + super.close(); + FunctionUtils.closeFunction(fetcher); + } + + private static final class JoinedRowResultFuture implements ResultFuture<Object> { + private final DataStructureConverter<RowData, Object> resultConverter; + + private RowData leftRow; + private ResultFuture<RowData> realOutput; + + private JoinedRowResultFuture( + RowData row, + ResultFuture<RowData> resultFuture, + DataStructureConverter<RowData, Object> resultConverter) { + this.leftRow = row; + this.realOutput = resultFuture; + this.resultConverter = resultConverter; + } + + @Override + public void complete(Collection<Object> result) { + try { + Collection<RowData> rightRows = wrapPrimitivesAndConvert(result); + completeResultFuture(rightRows); + } catch (Throwable t) { + realOutput.completeExceptionally(t); + } + } + + private void completeResultFuture(Collection<RowData> rightRows) { + realOutput.complete( Review Comment: I was trying to follow the execution flow here, and found it is a little confusing: 1. The `RetryableResultHandlerDelegator#complete()` will always enter the retry branch first. 2. Inside the retry branch, depending on whether the function result is empty or not, it may or may not hit retry. (See the other comment about `ResultPredicate`) Can we clean up the logic? ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/correlate/async/DelegatingAsyncTableResultFuture.java: ########## @@ -29,27 +33,46 @@ * is used as a bridge between {@link org.apache.flink.table.functions.AsyncTableFunction} and * {@link org.apache.flink.streaming.api.functions.async.AsyncFunction}. */ -public class DelegatingResultFuture<OUT> implements BiConsumer<Collection<OUT>, Throwable> { +public class DelegatingAsyncTableResultFuture implements BiConsumer<Collection<Object>, Throwable> { - private final ResultFuture<OUT> delegatedResultFuture; - private final CompletableFuture<Collection<OUT>> completableFuture; + private final ResultFuture<Object> delegatedResultFuture; + private final boolean needsWrapping; + private final boolean isInternalResultType; - public DelegatingResultFuture(ResultFuture<OUT> delegatedResultFuture) { + private final CompletableFuture<Collection<Object>> completableFuture; + + public DelegatingAsyncTableResultFuture( + ResultFuture<Object> delegatedResultFuture, + boolean needsWrapping, + boolean isInternalResultType) { this.delegatedResultFuture = delegatedResultFuture; + this.needsWrapping = needsWrapping; + this.isInternalResultType = isInternalResultType; this.completableFuture = new CompletableFuture<>(); this.completableFuture.whenComplete(this); } @Override - public void accept(Collection<OUT> outs, Throwable throwable) { + public void accept(Collection<Object> outs, Throwable throwable) { if (throwable != null) { delegatedResultFuture.completeExceptionally(throwable); } else { + if (needsWrapping) { Review Comment: nit: arguably, the `needsWrapping` and `isInternalResultType` flags can be handled as a part of the CodeGen to avoid branching in the per record handling. But I guess JIT and the CPU branch prediction may kick in to reduce the performance impact. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org