bgeng777 commented on code in PR #27438: URL: https://github.com/apache/flink/pull/27438#discussion_r2756894889
########## flink-python/pyflink/fn_execution/table/async_function/operations.py: ########## @@ -0,0 +1,222 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import asyncio + +from pyflink.datastream import async_retry_strategies +from pyflink.fn_execution.datastream.operations import Operation, AsyncOperationMixin +from pyflink.fn_execution.datastream.process.async_function.operation import Emitter, \ + AsyncFunctionRunner, ResultHandler, RetryableResultHandler +from pyflink.fn_execution.datastream.process.async_function.queue import OrderedStreamElementQueue +from pyflink.fn_execution.metrics.process.metric_impl import GenericMetricGroup +from pyflink.fn_execution.utils import operation_utils +from pyflink.table import FunctionContext + +ASYNC_SCALAR_FUNCTION_URN = "flink:transform:async_scalar_function:v1" + + +class AsyncScalarFunctionOperation(Operation, AsyncOperationMixin): + """ + Operation for executing Python async scalar functions. + + This operation implements true asynchronous execution by leveraging the async + infrastructure from DataStream API's AsyncOperation: + - AsyncFunctionRunner: Manages asyncio event loop in a separate thread + - Queue: Maintains in-flight async operations with configurable capacity + - Emitter: Collects and emits results asynchronously + - Non-blocking: Multiple async operations can be in-flight simultaneously + + This provides high performance for I/O-bound async operations compared to + synchronous blocking execution. + """ + + def __init__(self, serialized_fn): + if serialized_fn.metric_enabled: + self.base_metric_group = GenericMetricGroup(None, None) + else: + self.base_metric_group = None + + self._capacity = serialized_fn.async_options.max_concurrent_operations + self._timeout = serialized_fn.async_options.timeout_ms / 1000.0 + self._retry_enabled = serialized_fn.async_options.retry_enabled + self._max_attempts = serialized_fn.async_options.retry_max_attempts + self._retry_delay = serialized_fn.async_options.retry_delay_ms / 1000.0 + + scalar_function, variable_dict, self.user_defined_funcs = \ + operation_utils.extract_user_defined_function( + serialized_fn.udfs[0], one_arg_optimization=False) + + # Create the eval function + self._eval_func = eval('lambda value: %s' % scalar_function, variable_dict) + + # Create ordered queue to maintain result order + self._queue = OrderedStreamElementQueue(self._capacity, self._raise_exception_if_exists) + + # Async execution components + self._async_function_runner = None + self._emitter = None + self._exception = None + self._output_processor = None + + # Job parameters + self.job_parameters = {p.key: p.value for p in serialized_fn.job_parameters} Review Comment: why doesn't`job_parameters` field start with '_' as well ########## flink-python/pyflink/table/tests/test_async_scalar_function.py: ########## @@ -0,0 +1,285 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import asyncio +import uuid + +from pyflink.table import DataTypes +from pyflink.table.udf import AsyncScalarFunction, udf, FunctionContext +from pyflink.testing import source_sink_utils +from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase, run_with_config + + +def generate_random_table_name(): + return "Table{0}".format(str(uuid.uuid1()).replace("-", "_")) + + +class AsyncScalarFunctionTests(PyFlinkStreamTableTestCase): + """ + Integration tests for Python Async Scalar Function. + """ + + def test_basic_async_scalar_function(self): + + class AsyncFunctionWithLifecycle(AsyncScalarFunction): + def open(self, function_context: FunctionContext): + self.prefix = "opened_" + + async def eval(self, value): + await asyncio.sleep(0.001) + return self.prefix + value + + def close(self): + pass + + async_func = udf( + AsyncFunctionWithLifecycle(), + input_types=[DataTypes.STRING()], + result_type=DataTypes.STRING() + ) + + sink_table = generate_random_table_name() + self.t_env.execute_sql(f""" + CREATE TABLE {sink_table}(a STRING, b STRING) + WITH ('connector'='test-sink') + """) + + t = self.t_env.from_elements([("test1",), ("test2",)], ['a']) + t.select(t.a, async_func(t.a).alias('b')).execute_insert(sink_table).wait() + + actual = source_sink_utils.results() + self.assert_equals(actual, [ + "+I[test1, opened_test1]", + "+I[test2, opened_test2]" + ]) + + def test_raise_exception_in_async_eval(self): + """Test async scalar function that raises exception during evaluation.""" + + class ExceptionAsyncFunction(AsyncScalarFunction): + async def eval(self, value: str) -> str: + raise ValueError("Test exception in async eval") + + async_func = udf( + ExceptionAsyncFunction(), + input_types=[DataTypes.STRING()], + result_type=DataTypes.STRING() + ) + + sink_table = generate_random_table_name() + self.t_env.execute_sql(f""" + CREATE TABLE {sink_table}(a STRING, b STRING) + WITH ('connector'='test-sink') + """) + + t = self.t_env.from_elements([("test1",)], ['a']) + + with self.assertRaises(Exception) as context: + t.select(t.a, async_func(t.a).alias('b')).execute_insert(sink_table).wait() + + # Verify exception message is propagated + self.assertIn("Test exception in async eval", str(context.exception)) + + def test_async_function_with_retry_logic(self): + """Test async scalar function with custom retry logic.""" Review Comment: How do we enable retry for this test? I don't find we have set some retry configs here ########## flink-python/pyflink/table/udf.py: ########## @@ -671,7 +782,7 @@ def udf(f: Union[Callable, ScalarFunction, Type] = None, (default: general) :param udf_type: the type of the python function, available value: general, pandas, Review Comment: It looks like we do not have `:param udf_type` any more in the input parameters, we can remove this doc line ########## flink-python/pyflink/fn_execution/table/async_function/operations.py: ########## @@ -0,0 +1,222 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import asyncio + +from pyflink.datastream import async_retry_strategies +from pyflink.fn_execution.datastream.operations import Operation, AsyncOperationMixin +from pyflink.fn_execution.datastream.process.async_function.operation import Emitter, \ + AsyncFunctionRunner, ResultHandler, RetryableResultHandler +from pyflink.fn_execution.datastream.process.async_function.queue import OrderedStreamElementQueue +from pyflink.fn_execution.metrics.process.metric_impl import GenericMetricGroup +from pyflink.fn_execution.utils import operation_utils +from pyflink.table import FunctionContext + +ASYNC_SCALAR_FUNCTION_URN = "flink:transform:async_scalar_function:v1" + + +class AsyncScalarFunctionOperation(Operation, AsyncOperationMixin): + """ + Operation for executing Python async scalar functions. + + This operation implements true asynchronous execution by leveraging the async + infrastructure from DataStream API's AsyncOperation: + - AsyncFunctionRunner: Manages asyncio event loop in a separate thread + - Queue: Maintains in-flight async operations with configurable capacity + - Emitter: Collects and emits results asynchronously + - Non-blocking: Multiple async operations can be in-flight simultaneously + + This provides high performance for I/O-bound async operations compared to + synchronous blocking execution. + """ + + def __init__(self, serialized_fn): + if serialized_fn.metric_enabled: + self.base_metric_group = GenericMetricGroup(None, None) + else: + self.base_metric_group = None + + self._capacity = serialized_fn.async_options.max_concurrent_operations + self._timeout = serialized_fn.async_options.timeout_ms / 1000.0 + self._retry_enabled = serialized_fn.async_options.retry_enabled + self._max_attempts = serialized_fn.async_options.retry_max_attempts + self._retry_delay = serialized_fn.async_options.retry_delay_ms / 1000.0 + + scalar_function, variable_dict, self.user_defined_funcs = \ + operation_utils.extract_user_defined_function( + serialized_fn.udfs[0], one_arg_optimization=False) + + # Create the eval function + self._eval_func = eval('lambda value: %s' % scalar_function, variable_dict) + + # Create ordered queue to maintain result order + self._queue = OrderedStreamElementQueue(self._capacity, self._raise_exception_if_exists) + + # Async execution components + self._async_function_runner = None + self._emitter = None + self._exception = None + self._output_processor = None + + # Job parameters + self.job_parameters = {p.key: p.value for p in serialized_fn.job_parameters} + + def set_output_processor(self, output_processor): + """Set the output processor for emitting results. + + This method is called by FunctionOperation for AsyncOperationMixin implementations. + """ + self._output_processor = output_processor + + def open(self): + # Open user defined functions + for user_defined_func in self.user_defined_funcs: + if hasattr(user_defined_func, 'open'): + user_defined_func.open( + FunctionContext(self.base_metric_group, self.job_parameters)) + + # Start emitter thread to collect async results + self._emitter = Emitter(self._mark_exception, self._output_processor, self._queue) + self._emitter.daemon = True + self._emitter.start() + + # Start async function runner with event loop + self._async_function_runner = AsyncFunctionRunner() + self._async_function_runner.daemon = True + self._async_function_runner.start() + self._async_function_runner.wait_ready() + + def close(self): + # Stop emitter + if self._emitter is not None: + self._emitter.stop() + self._emitter = None + + # Stop async function runner + if self._async_function_runner is not None: + self._async_function_runner.stop() + self._async_function_runner = None + + self._exception = None + + # Close user defined functions + for user_defined_func in self.user_defined_funcs: + if hasattr(user_defined_func, 'close'): + user_defined_func.close() + + def process_element(self, value): + """ + Process an input element asynchronously. + + This is non-blocking - it submits the async operation and returns immediately, + allowing multiple operations to be in-flight simultaneously. + """ + self._raise_exception_if_exists() + + entry = self._queue.put(None, 0, 0, value) + + async def execute_async(result_handler): Review Comment: line 149 directly uses `result_handler` as a variable name, I believe the codes are right but maybe renaming the input parameter here to a different name can make the codes more readable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
