GitHub user Sxnan edited a discussion: Improve Python Async Execution API

## Background

The Python API of Flink Agents provides the `execute_async` method, allowing 
users to asynchronously execute time-consuming operations (such as LLM API 
calls) within actions, avoiding blocking the main thread of Flink operators.

The current implementation uses `yield from` syntax:

```python
def my_action(event, context):
    result = yield from context.execute_async(llm_call, prompt)
    context.send_event(OutputEvent(output=result))
```

This approach has the following issues:

- **Unintuitive syntax**: `yield from` is Python Generator syntax, requiring 
users to understand how Generators work
- **Not idiomatic modern Python**: Python 3.5+ introduced `async/await` syntax, 
which has become the standard for asynchronous programming
- **High learning curve**: For users unfamiliar with Generators, understanding 
and using this API is challenging

## Design Goals

Change the usage of `execute_async` to standard `async/await` syntax:

```python
async def my_action(event, context):
    result = await context.execute_async(slow_llm_call, prompt)
    context.send_event(OutputEvent(output=result))
```

## Design Principles

1. **User-friendly syntax**: Use standard Python `async/await` syntax to reduce 
the learning curve
2. **Lazy submission**: Tasks are submitted to the thread pool only when 
`await` is called, maintaining serial execution semantics

## Technical Detail

### Core Principle

Both Python Coroutines and Generators can be driven via the `send()` method:

| Operation | Generator | Coroutine |
|-----------|-----------|-----------|
| Advance execution | `gen.send(None)` | `coro.send(None)` |
| Suspend | `yield` | `yield` inside `await` |
| Complete | Raises `StopIteration` | Raises `StopIteration` |

Therefore, the Java side can directly control Coroutines using `send()`, 
without converting to Generators.

### Architecture Design

```
┌─────────────────────────────────────────────────────────────────┐
│  User Code                                                      │
│                                                                 │
│  async def my_action(event, ctx):                               │
│      result = await ctx.execute_async(slow_func, arg)           │
│      ctx.send_event(OutputEvent(output=result))                 │
└─────────────────────────────────────────────────────────────────┘
                              │
                              │ Calling async function returns Coroutine
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│  function.py (Python-side adaptation)                           │
│                                                                 │
│  Detect if function return value is a Coroutine,                │
│  return directly for Java to call                               │
└─────────────────────────────────────────────────────────────────┘
                              │
                              │ Coroutine
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│  Java Side (No modifications needed)                            │
│                                                                 │
│  PythonActionExecutor: Detect if Coroutine is returned          │
│  PythonGeneratorActionTask: Repeatedly call send() to advance   │
│  ActionExecutionOperator: Schedule via mailbox                  │
└─────────────────────────────────────────────────────────────────┘
```

### Key Component Design

#### AsyncExecutionResult

Add a new `AsyncExecutionResult` class that implements `__await__` to make it 
awaitable:

```python
class AsyncExecutionResult:
    """Lazy async task that submits to thread pool only when awaited."""
    
    def __init__(self, executor, func, args, kwargs):
        self._executor = executor
        self._func = func
        self._args = args
        self._kwargs = kwargs
    
    def __await__(self):
        # Submit task to thread pool only when awaited (lazy submission)
        future = self._executor.submit(self._func, *self._args, **self._kwargs)
        while not future.done():
            yield  # Yield control to the scheduler
        return future.result()
```

#### execute_async Method

Modify the `execute_async` method to return `AsyncExecutionResult` instead of a 
generator:

```python
def execute_async(self, func, *args, **kwargs):
    # Only create task descriptor, don't submit to thread pool immediately
    return AsyncExecutionResult(self.executor, func, args, kwargs)
```

#### call_python_function Modification

Modify `call_python_function` to change Generator detection to Coroutine 
detection, returning the Coroutine object directly:

```python
def call_python_function(module, qualname, func_args):
    # ... get and call function ...
    func_result = python_func(*func_args)
    
    # Return result directly (may be Coroutine or regular value)
    # No longer need coroutine_to_generator conversion or 
PythonGeneratorWrapper wrapping
    return func_result
```

#### call_python_awaitable Modification

Rename the original `call_python_generator` function to 
`call_python_awaitable`, using `send()` instead of `next()` to support both 
Coroutines and Generators:

```python
_ASYNCIO_ERROR_MESSAGE = (
    "asyncio functions (gather/wait/create_task/sleep) are not supported "
    "in Flink Agents. Only 'await ctx.execute_async(...)' is supported."
)

def call_python_awaitable(awaitable):
    """Advance execution of a coroutine or generator."""
    try:
        result = awaitable.send(None)
    except StopIteration as e:
        return True, e.value if hasattr(e, 'value') else None
    except RuntimeError as e:
        err_msg = str(e)
        if "no running event loop" in err_msg or "await wasn't used with 
future" in err_msg:
            raise RuntimeError(_ASYNCIO_ERROR_MESSAGE) from e
        raise
    else:
        return False, result
```

## Execution Semantics

### Lazy Submission

Tasks are submitted to the thread pool when `await` is called, not when 
`execute_async` is called:

```python
async def my_action(event, ctx):
    task1 = ctx.execute_async(func1, arg1)  # Not submitted, only creates 
AsyncExecutionResult
    task2 = ctx.execute_async(func2, arg2)  # Not submitted, only creates 
AsyncExecutionResult
    result1 = await task1  # Submits func1 here, waits for completion
    result2 = await task2  # After func1 completes, submits func2, waits for 
completion
```

### Serial Execution

Due to lazy submission, multiple `await`s execute serially:

```
func1: |████████████|
func2:              |████████████|
Total: |========================|
```

### Error Propagation

Exception propagation chain:

1. Async function throws exception → Future captures it
2. `future.result()` re-raises → Propagates from `__await__` to coroutine
3. In `call_python_awaitable`, `send()` throws exception, caught and logged, 
then re-raised
4. Java-side `PythonActionExecutor` catches and wraps as 
`PythonActionExecutionException`

## Limitations

This design uses `async/await` syntax but is **not based on the asyncio event 
loop**. Instead, the Java side drives Coroutine execution directly via 
`send()`. Therefore:

| Feature | Support Status |
|---------|---------------|
| `await ctx.execute_async(...)` | ✅ Supported |
| `asyncio.gather` | ❌ Not supported |
| `asyncio.wait` | ❌ Not supported |
| `asyncio.create_task` | ❌ Not supported |
| `asyncio.sleep` | ❌ Not supported |

Users can only use the `await ctx.execute_async(...)` pattern; other asyncio 
module features are not available.

Using unsupported asyncio APIs will raise one of the following errors at 
runtime:
- `RuntimeError: await wasn't used with future`
- `RuntimeError: no running event loop`

`call_python_awaitable` catches these errors and raises a more user-friendly 
message.



GitHub link: https://github.com/apache/flink-agents/discussions/403

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to