weiqingy opened a new issue, #508: URL: https://github.com/apache/flink-agents/issues/508
### Search before asking - [x] I searched in the [issues](https://github.com/apache/flink-agents/issues) and found nothing similar. ### Description When a Python async action fails mid-execution (e.g., LLM API timeout), the job attempts to restart from the last checkpoint. During restore, the Python awaitable (coroutine) reference is `None`, causing: AttributeError: 'NoneType' object has no attribute 'send' This happens because Python coroutines/generators **cannot be serialized**. The checkpoint saves a reference to the awaitable, but not the coroutine object itself. On restore, the reference exists but points to `None`. ### How to reproduce 1. Run `react_agent_example.py` with Ollama: ```bash bin/flink run -py python/flink_agents/examples/quickstart/react_agent_example.py 2. Wait for the job to process some records (checkpoints 1 & 2 complete successfully) 3. Trigger an LLM timeout (slow network, overloaded Ollama, etc.) 4. Job fails with httpx.ReadTimeout, attempts to restart from checkpoint 5. Observe repeated NoneType errors during restore Error sequence ┌──────────┬──────────────────────────────────────────────────────────────────┐ │ Time │ Event │ ├──────────┼──────────────────────────────────────────────────────────────────┤ │ 00:02:49 │ Checkpoint 1 completed ✅ │ ├──────────┼──────────────────────────────────────────────────────────────────┤ │ 00:03:49 │ Checkpoint 2 completed ✅ (9.28 MB state) │ ├──────────┼──────────────────────────────────────────────────────────────────┤ │ 00:04:38 │ Initial failure: httpx.ReadTimeout: timed out │ ├──────────┼──────────────────────────────────────────────────────────────────┤ │ 00:04:49 │ Checkpoint 3 failed - "Checkpoint Coordinator is suspending" │ ├──────────┼──────────────────────────────────────────────────────────────────┤ │ 00:06:38 │ Job restarts from checkpoint 2 │ ├──────────┼──────────────────────────────────────────────────────────────────┤ │ 00:06:41 │ Crash: AttributeError: 'NoneType' object has no attribute 'send' │ └──────────┴──────────────────────────────────────────────────────────────────┘ Stack trace java.lang.RuntimeException: ActionTaskExecutionException: Failed to execute action task ... Caused by: pemja.core.PythonException: <class 'AttributeError'>: 'NoneType' object has no attribute 'send' at flink_agents/plan/function.call_python_awaitable(function.py:358) at PythonActionExecutor.callPythonAwaitable(PythonActionExecutor.java:170) ... at StreamTask.restoreInternal(StreamTask.java:815) Root cause analysis Python coroutines cannot be serialized: import pickle async def my_coroutine(): await something() coro = my_coroutine() pickle.dumps(coro) # TypeError: cannot pickle 'coroutine' object What happens: 1. chat_model_action.py calls await ctx.durable_execute_async(chat_model.chat, messages) 2. This creates a coroutine that yields while waiting for the LLM 3. LLM times out → exception raised 4. Checkpoint 3 fails, job restarts from checkpoint 2 5. Flink tries to resume the awaitable via PythonActionExecutor.callPythonAwaitable() 6. The awaitable reference was saved, but the coroutine object is gone → None 7. call_python_awaitable() calls awaitable.send(None) → crash What's saved vs. not saved in checkpoint: ┌───────────────────────────────────────────────────┬──────────────────────────┐ │ Data │ Saved? │ ├───────────────────────────────────────────────────┼──────────────────────────┤ │ ActionState.callResults (durable execution cache) │ ✅ Yes │ ├───────────────────────────────────────────────────┼──────────────────────────┤ │ ActionState.outputEvents │ ✅ Yes │ ├───────────────────────────────────────────────────┼──────────────────────────┤ │ Memory updates │ ✅ Yes │ ├───────────────────────────────────────────────────┼──────────────────────────┤ │ Python coroutine object │ ❌ No (not serializable) │ └───────────────────────────────────────────────────┴──────────────────────────┘ Code locations Java side - No null check before invoking Python: // PythonActionExecutor.java:167-170 public boolean callPythonAwaitable(String pythonAwaitableRef) { Object pythonAwaitable = interpreter.get(pythonAwaitableRef); // Can be null! Object invokeResult = interpreter.invoke(CALL_PYTHON_AWAITABLE, pythonAwaitable); ... } Python side - No None check: # function.py:358 def call_python_awaitable(awaitable: Any) -> Tuple[bool, Any]: result = awaitable.send(None) # Crashes if awaitable is None Suggested fix The durable execution mechanism already caches completed call results. The fix should detect the missing awaitable and re-execute the action from the beginning, allowing durable execution to skip already-completed calls. Option 1: Detect and re-execute in Java // PythonActionExecutor.java public boolean callPythonAwaitable(String pythonAwaitableRef) { Object pythonAwaitable = interpreter.get(pythonAwaitableRef); if (pythonAwaitable == null) { // Awaitable lost during restore - signal that action needs re-execution throw new AwaitableLostException( "Python awaitable not found: " + pythonAwaitableRef + ". Action will be re-executed from beginning."); } ... } Then in ActionExecutionOperator, catch this exception and re-execute the action from scratch. Option 2: Clear incomplete action state on restore If an action has an awaitable reference but completed=false, clear its state and re-execute from the beginning during restore. Option 3: Block checkpoint during async execution Prevent checkpointing while an awaitable is in progress (increases checkpoint latency but avoids the issue). [jobmanager_log (7).txt](https://github.com/user-attachments/files/24949310/jobmanager_log.7.txt) [taskmanager_log.txt](https://github.com/user-attachments/files/24949332/taskmanager_log.txt) ### Version and environment - Flink Agents version: 0.3-SNAPSHOT (also affects 0.2) - Flink version: 1.20.3 - Python version: 3.11 - OS: macOS ### Are you willing to submit a PR? - [x] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
