This is an automated email from the ASF dual-hosted git repository.

sxnan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new 36e0731d [runtime] Fix Python awaitable lost during checkpoint restore 
(#509)
36e0731d is described below

commit 36e0731dace365784ea190594d051698dcf66808
Author: Weiqing Yang <[email protected]>
AuthorDate: Fri Jan 30 00:42:54 2026 -0800

    [runtime] Fix Python awaitable lost during checkpoint restore (#509)
    
    [runtime] Fix Python awaitable lost during checkpoint restore
    
    Co-authored-by: sxnan <[email protected]>
---
 .../runtime/operator/ActionExecutionOperator.java  | 18 +++++++++++++++++
 .../python/context/PythonRunnerContextImpl.java    | 15 ++++++++++++++
 .../runtime/python/operator/PythonActionTask.java  |  5 +++--
 .../python/operator/PythonGeneratorActionTask.java | 23 +++++++++++++++++-----
 .../runtime/python/utils/PythonActionExecutor.java |  4 ++++
 5 files changed, 58 insertions(+), 7 deletions(-)

diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index c83af977..3bc2912b 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -203,6 +203,8 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
 
     private final transient Map<ActionTask, ContinuationContext> 
continuationContexts;
 
+    private final transient Map<ActionTask, String> pythonAwaitableRefs;
+
     // Each job can only have one identifier and this identifier must be 
consistent across restarts.
     // We cannot use job id as the identifier here because user may change job 
id by
     // creating a savepoint, stop the job and then resume from savepoint.
@@ -229,6 +231,7 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
         this.actionTaskMemoryContexts = new HashMap<>();
         this.actionTaskDurableContexts = new HashMap<>();
         this.continuationContexts = new HashMap<>();
+        this.pythonAwaitableRefs = new HashMap<>();
         OperatorUtils.setChainStrategy(this, ChainingStrategy.ALWAYS);
     }
 
@@ -510,6 +513,7 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
             actionTaskMemoryContexts.remove(actionTask);
             actionTaskDurableContexts.remove(actionTask);
             continuationContexts.remove(actionTask);
+            pythonAwaitableRefs.remove(actionTask);
             maybePersistTaskResult(
                     key,
                     sequenceNumber,
@@ -557,6 +561,14 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
                         ((JavaRunnerContextImpl) actionTask.getRunnerContext())
                                 .getContinuationContext());
             }
+            if (actionTask.getRunnerContext() instanceof 
PythonRunnerContextImpl) {
+                String awaitableRef =
+                        ((PythonRunnerContextImpl) 
actionTask.getRunnerContext())
+                                .getPythonAwaitableRef();
+                if (awaitableRef != null) {
+                    pythonAwaitableRefs.put(generatedActionTask, awaitableRef);
+                }
+            }
 
             actionTasksKState.add(generatedActionTask);
         }
@@ -886,6 +898,12 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
             }
             ((JavaRunnerContextImpl) 
runnerContext).setContinuationContext(continuationContext);
         }
+        if (runnerContext instanceof PythonRunnerContextImpl) {
+            // Get the awaitable ref from the transient map. After checkpoint 
restore, this will be
+            // null, signaling that the awaitable was lost and needs 
re-execution.
+            String awaitableRef = pythonAwaitableRefs.get(actionTask);
+            ((PythonRunnerContextImpl) 
runnerContext).setPythonAwaitableRef(awaitableRef);
+        }
         actionTask.setRunnerContext(runnerContext);
     }
 
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
index 3d37b05a..7bfea4ad 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
@@ -31,6 +31,13 @@ import javax.annotation.concurrent.NotThreadSafe;
 /** A specialized {@link RunnerContext} that is specifically used when 
executing Python actions. */
 @NotThreadSafe
 public class PythonRunnerContextImpl extends RunnerContextImpl {
+
+    /**
+     * Reference to the Python awaitable object in the interpreter. This is 
set when a Python action
+     * yields an awaitable and is used by PythonGeneratorActionTask to resume 
execution.
+     */
+    private String pythonAwaitableRef;
+
     public PythonRunnerContextImpl(
             FlinkAgentsMetricGroupImpl agentMetricGroup,
             Runnable mailboxThreadChecker,
@@ -50,4 +57,12 @@ public class PythonRunnerContextImpl extends 
RunnerContextImpl {
         // this method will be invoked by PythonActionExecutor's python 
interpreter.
         sendEvent(new PythonEvent(event, type, eventJsonStr));
     }
+
+    public String getPythonAwaitableRef() {
+        return pythonAwaitableRef;
+    }
+
+    public void setPythonAwaitableRef(String pythonAwaitableRef) {
+        this.pythonAwaitableRef = pythonAwaitableRef;
+    }
 }
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
index 779297a4..11a502bb 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
@@ -21,6 +21,7 @@ import org.apache.flink.agents.api.Event;
 import org.apache.flink.agents.plan.PythonFunction;
 import org.apache.flink.agents.plan.actions.Action;
 import org.apache.flink.agents.runtime.operator.ActionTask;
+import org.apache.flink.agents.runtime.python.context.PythonRunnerContextImpl;
 import org.apache.flink.agents.runtime.python.event.PythonEvent;
 import org.apache.flink.agents.runtime.python.utils.PythonActionExecutor;
 
@@ -60,8 +61,8 @@ public class PythonActionTask extends ActionTask {
         if (pythonAwaitableRef != null) {
             // The Python action generates an awaitable. We need to execute it 
once, which will
             // submit an asynchronous task and return whether the action has 
been completed.
-            ActionTask tempGeneratedActionTask =
-                    new PythonGeneratorActionTask(key, event, action, 
pythonAwaitableRef);
+            ((PythonRunnerContextImpl) 
runnerContext).setPythonAwaitableRef(pythonAwaitableRef);
+            ActionTask tempGeneratedActionTask = new 
PythonGeneratorActionTask(key, event, action);
             tempGeneratedActionTask.setRunnerContext(runnerContext);
             return tempGeneratedActionTask.invoke(userCodeClassLoader, 
executor);
         }
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
index c6834367..35296c1c 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
@@ -20,25 +20,38 @@ package org.apache.flink.agents.runtime.python.operator;
 import org.apache.flink.agents.api.Event;
 import org.apache.flink.agents.plan.actions.Action;
 import org.apache.flink.agents.runtime.operator.ActionTask;
+import org.apache.flink.agents.runtime.python.context.PythonRunnerContextImpl;
 import org.apache.flink.agents.runtime.python.utils.PythonActionExecutor;
 
 /** An {@link ActionTask} wrapper a Python awaitable to represent a code block 
in Python action. */
 public class PythonGeneratorActionTask extends PythonActionTask {
-    private final String pythonAwaitableRef;
 
-    public PythonGeneratorActionTask(
-            Object key, Event event, Action action, String pythonAwaitableRef) 
{
+    public PythonGeneratorActionTask(Object key, Event event, Action action) {
         super(key, event, action);
-        this.pythonAwaitableRef = pythonAwaitableRef;
     }
 
     @Override
-    public ActionTaskResult invoke(ClassLoader userCodeClassLoader, 
PythonActionExecutor executor) {
+    public ActionTaskResult invoke(ClassLoader userCodeClassLoader, 
PythonActionExecutor executor)
+            throws Exception {
         LOG.debug(
                 "Try execute python awaitable action {} for event {} with key 
{}.",
                 action.getName(),
                 event,
                 key);
+
+        String pythonAwaitableRef =
+                ((PythonRunnerContextImpl) 
runnerContext).getPythonAwaitableRef();
+
+        if (pythonAwaitableRef == null) {
+            LOG.info(
+                    "Python awaitable ref is null for action {} (likely 
restored from checkpoint), "
+                            + "re-executing from beginning.",
+                    action.getName());
+            PythonActionTask freshTask = new PythonActionTask(key, event, 
action);
+            freshTask.setRunnerContext(runnerContext);
+            return freshTask.invoke(userCodeClassLoader, executor);
+        }
+
         boolean finished = executor.callPythonAwaitable(pythonAwaitableRef);
         ActionTask generatedActionTask = finished ? null : this;
         return new ActionTaskResult(
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
index 89306eec..659c48a2 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
@@ -176,6 +176,10 @@ public class PythonActionExecutor {
     public boolean callPythonAwaitable(String pythonAwaitableRef) {
         // Calling awaitable.send(None) in Python returns a tuple of 
(finished, output).
         Object pythonAwaitable = interpreter.get(pythonAwaitableRef);
+        checkState(
+                pythonAwaitable != null,
+                "Python awaitable '%s' not found in interpreter. ",
+                pythonAwaitableRef);
         Object invokeResult = interpreter.invoke(CALL_PYTHON_AWAITABLE, 
pythonAwaitable);
         checkState(invokeResult.getClass().isArray() && ((Object[]) 
invokeResult).length == 2);
         return (boolean) ((Object[]) invokeResult)[0];

Reply via email to