bgeng777 commented on code in PR #27395: URL: https://github.com/apache/flink/pull/27395#discussion_r2703070589
########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonAsyncCalc.java: ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.batch; + +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext; +import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; +import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonAsyncCalc; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import org.apache.calcite.rex.RexNode; + +import java.util.Collections; +import java.util.List; + +/** Batch {@link ExecNode} for Python Async ScalarFunctions. */ +public class BatchExecPythonAsyncCalc extends CommonExecPythonAsyncCalc Review Comment: I notice that for python batch exec ndoes, we do not add metadata annotations like the streaming counterparts. I believe it is not strictly required but I wonder which one is preferred. It looks like https://issues.apache.org/jira/browse/FLINK-35797 has declared some extra requirement. ``` @ExecNodeMetadata( name = "batch-exec-python-async-calc", version = 1, producedTransformations = CommonExecPythonAsyncCalc.PYTHON_ASYNC_CALC_TRANSFORMATION, minPlanVersion = FlinkVersion.v2_0, minStateVersion = FlinkVersion.v2_0) ``` ########## flink-python/pyflink/proto/flink-fn-execution.proto: ########## @@ -71,6 +71,13 @@ message UserDefinedFunctions { repeated OverWindow windows = 3; bool profile_enabled = 4; repeated JobParameter job_parameters = 5; + // Async execution configuration for async scalar functions + int32 async_max_concurrent_operations = 6; // Max number of concurrent async operations + int64 async_timeout_ms = 7; // Timeout in milliseconds for async operations + // Async retry strategy configuration + bool async_retry_enabled = 8; // Whether retry is enabled + int32 async_retry_max_attempts = 9; // Maximum number of retry attempts + int64 async_retry_delay_ms = 10; // Delay between retries in milliseconds Review Comment: It looks like we need 5 extra fields for async execution. Is is possible to introduce a `AsyncOptions` messaage to hold these 5 fields? ########## flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/async/PythonAsyncScalarFunctionOperator.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.python.scalar.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.fnexecution.v1.FlinkFnApi; +import org.apache.flink.table.functions.AsyncScalarFunction; +import org.apache.flink.table.functions.python.PythonFunctionInfo; +import org.apache.flink.table.runtime.generated.GeneratedProjection; +import org.apache.flink.table.runtime.operators.python.scalar.PythonScalarFunctionOperator; +import org.apache.flink.table.types.logical.RowType; + +/** The Python {@link AsyncScalarFunction} operator for Table API. */ +@Internal +public class PythonAsyncScalarFunctionOperator extends PythonScalarFunctionOperator { + + private static final long serialVersionUID = 1L; + + /** + * The maximum number of async operations that can be in-flight at the same time. This controls + * the buffer capacity for async execution. + */ + private final int asyncMaxConcurrentOperations; + + /** The timeout in milliseconds for async operations. */ + private final long asyncTimeout; + + /** Whether retry is enabled for async operations. */ + private final boolean asyncRetryEnabled; + + /** Maximum number of retry attempts. */ + private final int asyncRetryMaxAttempts; + + /** Delay between retries in milliseconds. */ + private final long asyncRetryDelayMs; + + public PythonAsyncScalarFunctionOperator( + Configuration config, + PythonFunctionInfo[] scalarFunctions, + RowType inputType, + RowType udfInputType, + RowType udfOutputType, + GeneratedProjection udfInputGeneratedProjection, + GeneratedProjection forwardedFieldGeneratedProjection, + int asyncBufferCapacity, + long asyncTimeout, + boolean asyncRetryEnabled, + int asyncRetryMaxAttempts, + long asyncRetryDelayMs) { + super( + config, + scalarFunctions, + inputType, + udfInputType, + udfOutputType, + udfInputGeneratedProjection, + forwardedFieldGeneratedProjection); + this.asyncMaxConcurrentOperations = asyncBufferCapacity; + this.asyncTimeout = asyncTimeout; + this.asyncRetryEnabled = asyncRetryEnabled; + this.asyncRetryMaxAttempts = asyncRetryMaxAttempts; + this.asyncRetryDelayMs = asyncRetryDelayMs; + } + + @Override + public String getFunctionUrn() { + // Use a dedicated URN for async scalar functions + return "flink:transform:async_scalar_function:v1"; Review Comment: Introduce a `private static final String ASYNC_SCALAR_FUNCTION_URN = "flink:transform: async_scalar_function:v1";`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
