weiqingy opened a new issue, #508:
URL: https://github.com/apache/flink-agents/issues/508

   ### Search before asking
   
   - [x] I searched in the 
[issues](https://github.com/apache/flink-agents/issues) and found nothing 
similar.
   
   ### Description
   
   When a Python async action fails mid-execution (e.g., LLM API timeout), the 
job       
     attempts to restart from the last checkpoint. During restore, the Python 
awaitable    
     (coroutine) reference is `None`, causing:                                  
           
                                                                                
           
     AttributeError: 'NoneType' object has no attribute 'send'                  
           
                                                                                
           
     This happens because Python coroutines/generators **cannot be 
serialized**. The       
     checkpoint saves a reference to the awaitable, but not the coroutine 
object itself. On
      restore, the reference exists but points to `None`.                       
           
                                                            
   
   ### How to reproduce
   
    1. Run `react_agent_example.py` with Ollama:                                
          
     ```bash                                                                    
           
     bin/flink run -py 
python/flink_agents/examples/quickstart/react_agent_example.py      
                                                                                
           
     2. Wait for the job to process some records (checkpoints 1 & 2 complete 
successfully) 
     3. Trigger an LLM timeout (slow network, overloaded Ollama, etc.)          
           
     4. Job fails with httpx.ReadTimeout, attempts to restart from checkpoint   
           
     5. Observe repeated NoneType errors during restore                         
           
                                                                                
           
     Error sequence                                                             
           
     
┌──────────┬──────────────────────────────────────────────────────────────────┐ 
      
     │   Time   │                              Event                            
   │       
     
├──────────┼──────────────────────────────────────────────────────────────────┤ 
      
     │ 00:02:49 │ Checkpoint 1 completed ✅                                      
  │       
     
├──────────┼──────────────────────────────────────────────────────────────────┤ 
      
     │ 00:03:49 │ Checkpoint 2 completed ✅ (9.28 MB state)                      
  │       
     
├──────────┼──────────────────────────────────────────────────────────────────┤ 
      
     │ 00:04:38 │ Initial failure: httpx.ReadTimeout: timed out                 
   │       
     
├──────────┼──────────────────────────────────────────────────────────────────┤ 
      
     │ 00:04:49 │ Checkpoint 3 failed - "Checkpoint Coordinator is suspending"  
   │       
     
├──────────┼──────────────────────────────────────────────────────────────────┤ 
      
     │ 00:06:38 │ Job restarts from checkpoint 2                                
   │       
     
├──────────┼──────────────────────────────────────────────────────────────────┤ 
      
     │ 00:06:41 │ Crash: AttributeError: 'NoneType' object has no attribute 
'send' │       
     
└──────────┴──────────────────────────────────────────────────────────────────┘ 
      
     Stack trace                                                                
           
                                                                                
           
     java.lang.RuntimeException: ActionTaskExecutionException: Failed to 
execute action    
     task                                                                       
           
         ...                                                                    
           
     Caused by: pemja.core.PythonException: <class 'AttributeError'>: 
'NoneType' object has
      no attribute 'send'                                                       
           
         at flink_agents/plan/function.call_python_awaitable(function.py:358)   
           
         at 
PythonActionExecutor.callPythonAwaitable(PythonActionExecutor.java:170)        
         ...                                                                    
           
         at StreamTask.restoreInternal(StreamTask.java:815)                     
           
                                                                                
           
     Root cause analysis                                                        
           
                                                                                
           
     Python coroutines cannot be serialized:                                    
           
                                                                                
           
     import pickle                                                              
           
     async def my_coroutine():                                                  
           
         await something()                                                      
           
                                                                                
           
     coro = my_coroutine()                                                      
           
     pickle.dumps(coro)  # TypeError: cannot pickle 'coroutine' object          
           
                                                                                
           
     What happens:                                                              
           
                                                                                
           
     1. chat_model_action.py calls await 
ctx.durable_execute_async(chat_model.chat,        
     messages)                                                                  
           
     2. This creates a coroutine that yields while waiting for the LLM          
           
     3. LLM times out → exception raised                                        
           
     4. Checkpoint 3 fails, job restarts from checkpoint 2                      
           
     5. Flink tries to resume the awaitable via 
PythonActionExecutor.callPythonAwaitable() 
     6. The awaitable reference was saved, but the coroutine object is gone → 
None         
     7. call_python_awaitable() calls awaitable.send(None) → crash              
           
                                                                                
           
     What's saved vs. not saved in checkpoint:                                  
           
     
┌───────────────────────────────────────────────────┬──────────────────────────┐
      
     │                       Data                        │          Saved?      
    │      
     
├───────────────────────────────────────────────────┼──────────────────────────┤
      
     │ ActionState.callResults (durable execution cache) │ ✅ Yes                
   │      
     
├───────────────────────────────────────────────────┼──────────────────────────┤
      
     │ ActionState.outputEvents                          │ ✅ Yes                
   │      
     
├───────────────────────────────────────────────────┼──────────────────────────┤
      
     │ Memory updates                                    │ ✅ Yes                
   │      
     
├───────────────────────────────────────────────────┼──────────────────────────┤
      
     │ Python coroutine object                           │ ❌ No (not 
serializable) │      
     
└───────────────────────────────────────────────────┴──────────────────────────┘
      
     Code locations                                                             
           
                                                                                
           
     Java side - No null check before invoking Python:                          
           
     // PythonActionExecutor.java:167-170                                       
           
     public boolean callPythonAwaitable(String pythonAwaitableRef) {            
           
         Object pythonAwaitable = interpreter.get(pythonAwaitableRef);  // Can 
be null!    
         Object invokeResult = interpreter.invoke(CALL_PYTHON_AWAITABLE, 
pythonAwaitable); 
         ...                                                                    
           
     }                                                                          
           
                                                                                
           
     Python side - No None check:                                               
           
     # function.py:358                                                          
           
     def call_python_awaitable(awaitable: Any) -> Tuple[bool, Any]:             
           
         result = awaitable.send(None)  # Crashes if awaitable is None          
           
                                                                                
           
     Suggested fix                                                              
           
                                                                                
           
     The durable execution mechanism already caches completed call results. The 
fix should 
     detect the missing awaitable and re-execute the action from the beginning, 
allowing   
     durable execution to skip already-completed calls.                         
           
                                                                                
           
     Option 1: Detect and re-execute in Java                                    
           
     // PythonActionExecutor.java                                               
           
     public boolean callPythonAwaitable(String pythonAwaitableRef) {            
           
         Object pythonAwaitable = interpreter.get(pythonAwaitableRef);          
           
         if (pythonAwaitable == null) {                                         
           
             // Awaitable lost during restore - signal that action needs 
re-execution      
             throw new AwaitableLostException(                                  
           
                 "Python awaitable not found: " + pythonAwaitableRef +          
           
                 ". Action will be re-executed from beginning.");               
           
         }                                                                      
           
         ...                                                                    
           
     }                                                                          
           
                                                                                
           
     Then in ActionExecutionOperator, catch this exception and re-execute the 
action from  
     scratch.                                                                   
           
                                                                                
           
     Option 2: Clear incomplete action state on restore                         
           
                                                                                
           
     If an action has an awaitable reference but completed=false, clear its 
state and      
     re-execute from the beginning during restore.                              
           
                                                                                
           
     Option 3: Block checkpoint during async execution                          
           
                                                                                
           
     Prevent checkpointing while an awaitable is in progress (increases 
checkpoint latency 
     but avoids the issue).                                                     
           
                               
   [jobmanager_log 
(7).txt](https://github.com/user-attachments/files/24949310/jobmanager_log.7.txt)
   
   
[taskmanager_log.txt](https://github.com/user-attachments/files/24949332/taskmanager_log.txt)
   
   ### Version and environment
   
     - Flink Agents version: 0.3-SNAPSHOT (also affects 0.2)                    
           
     - Flink version: 1.20.3                                                    
            
     - Python version: 3.11                                                     
           
     - OS: macOS   
   
   ### Are you willing to submit a PR?
   
   - [x] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to