GitHub user Sxnan edited a discussion: Improve Python Async Execution API
## Background
The Python API of Flink Agents provides the `execute_async` method, allowing
users to asynchronously execute time-consuming operations (such as LLM API
calls) within actions, avoiding blocking the main thread of Flink operators.
The current implementation uses `yield from` syntax:
```python
def my_action(event, context):
result = yield from context.execute_async(llm_call, prompt)
context.send_event(OutputEvent(output=result))
```
This approach has the following issues:
- **Unintuitive syntax**: `yield from` is Python Generator syntax, requiring
users to understand how Generators work
- **Not idiomatic modern Python**: Python 3.5+ introduced `async/await` syntax,
which has become the standard for asynchronous programming
- **High learning curve**: For users unfamiliar with Generators, understanding
and using this API is challenging
## Design Goals
Change the usage of `execute_async` to standard `async/await` syntax:
```python
async def my_action(event, context):
result = await context.execute_async(slow_llm_call, prompt)
context.send_event(OutputEvent(output=result))
```
## Design Principles
1. **User-friendly syntax**: Use standard Python `async/await` syntax to reduce
the learning curve
2. **Lazy submission**: Tasks are submitted to the thread pool only when
`await` is called, maintaining serial execution semantics
## Technical Detail
### Core Principle
Both Python Coroutines and Generators can be driven via the `send()` method:
| Operation | Generator | Coroutine |
|-----------|-----------|-----------|
| Advance execution | `gen.send(None)` | `coro.send(None)` |
| Suspend | `yield` | `yield` inside `await` |
| Complete | Raises `StopIteration` | Raises `StopIteration` |
Therefore, the Java side can directly control Coroutines using `send()`,
without converting to Generators.
### Architecture Design
```
┌─────────────────────────────────────────────────────────────────┐
│ User Code │
│ │
│ async def my_action(event, ctx): │
│ result = await ctx.execute_async(slow_func, arg) │
│ ctx.send_event(OutputEvent(output=result)) │
└─────────────────────────────────────────────────────────────────┘
│
│ Calling async function returns Coroutine
▼
┌─────────────────────────────────────────────────────────────────┐
│ function.py (Python-side adaptation) │
│ │
│ Detect if function return value is a Coroutine, │
│ return directly for Java to call │
└─────────────────────────────────────────────────────────────────┘
│
│ Coroutine
▼
┌─────────────────────────────────────────────────────────────────┐
│ Java Side (No modifications needed) │
│ │
│ PythonActionExecutor: Detect if Coroutine is returned │
│ PythonGeneratorActionTask: Repeatedly call send() to advance │
│ ActionExecutionOperator: Schedule via mailbox │
└─────────────────────────────────────────────────────────────────┘
```
### Key Component Design
#### AsyncExecutionResult
Add a new `AsyncExecutionResult` class that implements `__await__` to make it
awaitable:
```python
class AsyncExecutionResult:
"""Lazy async task that submits to thread pool only when awaited."""
def __init__(self, executor, func, args, kwargs):
self._executor = executor
self._func = func
self._args = args
self._kwargs = kwargs
def __await__(self):
# Submit task to thread pool only when awaited (lazy submission)
future = self._executor.submit(self._func, *self._args, **self._kwargs)
while not future.done():
yield # Yield control to the scheduler
return future.result()
```
#### execute_async Method
Modify the `execute_async` method to return `AsyncExecutionResult` instead of a
generator:
```python
def execute_async(self, func, *args, **kwargs):
# Only create task descriptor, don't submit to thread pool immediately
return AsyncExecutionResult(self.executor, func, args, kwargs)
```
#### call_python_function Modification
Modify `call_python_function` to change Generator detection to Coroutine
detection, returning the Coroutine object directly:
```python
def call_python_function(module, qualname, func_args):
# ... get and call function ...
func_result = python_func(*func_args)
# Return result directly (may be Coroutine or regular value)
# No longer need coroutine_to_generator conversion or
PythonGeneratorWrapper wrapping
return func_result
```
#### call_python_awaitable Modification
Rename the original `call_python_generator` function to
`call_python_awaitable`, using `send()` instead of `next()` to support both
Coroutines and Generators:
```python
_ASYNCIO_ERROR_MESSAGE = (
"asyncio functions (gather/wait/create_task/sleep) are not supported "
"in Flink Agents. Only 'await ctx.execute_async(...)' is supported."
)
def call_python_awaitable(awaitable):
"""Advance execution of a coroutine or generator."""
try:
result = awaitable.send(None)
except StopIteration as e:
return True, e.value if hasattr(e, 'value') else None
except RuntimeError as e:
err_msg = str(e)
if "no running event loop" in err_msg or "await wasn't used with
future" in err_msg:
raise RuntimeError(_ASYNCIO_ERROR_MESSAGE) from e
raise
else:
return False, result
```
## Execution Semantics
### Lazy Submission
Tasks are submitted to the thread pool when `await` is called, not when
`execute_async` is called:
```python
async def my_action(event, ctx):
task1 = ctx.execute_async(func1, arg1) # Not submitted, only creates
AsyncExecutionResult
task2 = ctx.execute_async(func2, arg2) # Not submitted, only creates
AsyncExecutionResult
result1 = await task1 # Submits func1 here, waits for completion
result2 = await task2 # After func1 completes, submits func2, waits for
completion
```
### Serial Execution
Due to lazy submission, multiple `await`s execute serially:
```
func1: |████████████|
func2: |████████████|
Total: |========================|
```
### Error Propagation
Exception propagation chain:
1. Async function throws exception → Future captures it
2. `future.result()` re-raises → Propagates from `__await__` to coroutine
3. In `call_python_awaitable`, `send()` throws exception, caught and logged,
then re-raised
4. Java-side `PythonActionExecutor` catches and wraps as
`PythonActionExecutionException`
## Limitations
This design uses `async/await` syntax but is **not based on the asyncio event
loop**. Instead, the Java side drives Coroutine execution directly via
`send()`. Therefore:
| Feature | Support Status |
|---------|---------------|
| `await ctx.execute_async(...)` | ✅ Supported |
| `asyncio.gather` | ❌ Not supported |
| `asyncio.wait` | ❌ Not supported |
| `asyncio.create_task` | ❌ Not supported |
| `asyncio.sleep` | ❌ Not supported |
Users can only use the `await ctx.execute_async(...)` pattern; other asyncio
module features are not available.
Using unsupported asyncio APIs will raise one of the following errors at
runtime:
- `RuntimeError: await wasn't used with future`
- `RuntimeError: no running event loop`
`call_python_awaitable` catches these errors and raises a more user-friendly
message.
GitHub link: https://github.com/apache/flink-agents/discussions/403
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]