This is an automated email from the ASF dual-hosted git repository.
sxnan pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/release-0.2 by this push:
new 5d9fb77c [runtime] Fix Python awaitable lost during checkpoint restore
(#509)
5d9fb77c is described below
commit 5d9fb77c0cb35de9112a63ac67e1fe305052459c
Author: Weiqing Yang <[email protected]>
AuthorDate: Fri Jan 30 00:42:54 2026 -0800
[runtime] Fix Python awaitable lost during checkpoint restore (#509)
[runtime] Fix Python awaitable lost during checkpoint restore
Co-authored-by: sxnan <[email protected]>
---
.../runtime/operator/ActionExecutionOperator.java | 18 +++++++++++++++++
.../python/context/PythonRunnerContextImpl.java | 15 ++++++++++++++
.../runtime/python/operator/PythonActionTask.java | 5 +++--
.../python/operator/PythonGeneratorActionTask.java | 23 +++++++++++++++++-----
.../runtime/python/utils/PythonActionExecutor.java | 4 ++++
5 files changed, 58 insertions(+), 7 deletions(-)
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index c83af977..3bc2912b 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -203,6 +203,8 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
private final transient Map<ActionTask, ContinuationContext>
continuationContexts;
+ private final transient Map<ActionTask, String> pythonAwaitableRefs;
+
// Each job can only have one identifier and this identifier must be
consistent across restarts.
// We cannot use job id as the identifier here because user may change job
id by
// creating a savepoint, stop the job and then resume from savepoint.
@@ -229,6 +231,7 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
this.actionTaskMemoryContexts = new HashMap<>();
this.actionTaskDurableContexts = new HashMap<>();
this.continuationContexts = new HashMap<>();
+ this.pythonAwaitableRefs = new HashMap<>();
OperatorUtils.setChainStrategy(this, ChainingStrategy.ALWAYS);
}
@@ -510,6 +513,7 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
actionTaskMemoryContexts.remove(actionTask);
actionTaskDurableContexts.remove(actionTask);
continuationContexts.remove(actionTask);
+ pythonAwaitableRefs.remove(actionTask);
maybePersistTaskResult(
key,
sequenceNumber,
@@ -557,6 +561,14 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
((JavaRunnerContextImpl) actionTask.getRunnerContext())
.getContinuationContext());
}
+ if (actionTask.getRunnerContext() instanceof
PythonRunnerContextImpl) {
+ String awaitableRef =
+ ((PythonRunnerContextImpl)
actionTask.getRunnerContext())
+ .getPythonAwaitableRef();
+ if (awaitableRef != null) {
+ pythonAwaitableRefs.put(generatedActionTask, awaitableRef);
+ }
+ }
actionTasksKState.add(generatedActionTask);
}
@@ -886,6 +898,12 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
}
((JavaRunnerContextImpl)
runnerContext).setContinuationContext(continuationContext);
}
+ if (runnerContext instanceof PythonRunnerContextImpl) {
+ // Get the awaitable ref from the transient map. After checkpoint
restore, this will be
+ // null, signaling that the awaitable was lost and needs
re-execution.
+ String awaitableRef = pythonAwaitableRefs.get(actionTask);
+ ((PythonRunnerContextImpl)
runnerContext).setPythonAwaitableRef(awaitableRef);
+ }
actionTask.setRunnerContext(runnerContext);
}
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
index 3d37b05a..7bfea4ad 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
@@ -31,6 +31,13 @@ import javax.annotation.concurrent.NotThreadSafe;
/** A specialized {@link RunnerContext} that is specifically used when
executing Python actions. */
@NotThreadSafe
public class PythonRunnerContextImpl extends RunnerContextImpl {
+
+ /**
+ * Reference to the Python awaitable object in the interpreter. This is
set when a Python action
+ * yields an awaitable and is used by PythonGeneratorActionTask to resume
execution.
+ */
+ private String pythonAwaitableRef;
+
public PythonRunnerContextImpl(
FlinkAgentsMetricGroupImpl agentMetricGroup,
Runnable mailboxThreadChecker,
@@ -50,4 +57,12 @@ public class PythonRunnerContextImpl extends
RunnerContextImpl {
// this method will be invoked by PythonActionExecutor's python
interpreter.
sendEvent(new PythonEvent(event, type, eventJsonStr));
}
+
+ public String getPythonAwaitableRef() {
+ return pythonAwaitableRef;
+ }
+
+ public void setPythonAwaitableRef(String pythonAwaitableRef) {
+ this.pythonAwaitableRef = pythonAwaitableRef;
+ }
}
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
index 779297a4..11a502bb 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
@@ -21,6 +21,7 @@ import org.apache.flink.agents.api.Event;
import org.apache.flink.agents.plan.PythonFunction;
import org.apache.flink.agents.plan.actions.Action;
import org.apache.flink.agents.runtime.operator.ActionTask;
+import org.apache.flink.agents.runtime.python.context.PythonRunnerContextImpl;
import org.apache.flink.agents.runtime.python.event.PythonEvent;
import org.apache.flink.agents.runtime.python.utils.PythonActionExecutor;
@@ -60,8 +61,8 @@ public class PythonActionTask extends ActionTask {
if (pythonAwaitableRef != null) {
// The Python action generates an awaitable. We need to execute it
once, which will
// submit an asynchronous task and return whether the action has
been completed.
- ActionTask tempGeneratedActionTask =
- new PythonGeneratorActionTask(key, event, action,
pythonAwaitableRef);
+ ((PythonRunnerContextImpl)
runnerContext).setPythonAwaitableRef(pythonAwaitableRef);
+ ActionTask tempGeneratedActionTask = new
PythonGeneratorActionTask(key, event, action);
tempGeneratedActionTask.setRunnerContext(runnerContext);
return tempGeneratedActionTask.invoke(userCodeClassLoader,
executor);
}
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
index c6834367..35296c1c 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
@@ -20,25 +20,38 @@ package org.apache.flink.agents.runtime.python.operator;
import org.apache.flink.agents.api.Event;
import org.apache.flink.agents.plan.actions.Action;
import org.apache.flink.agents.runtime.operator.ActionTask;
+import org.apache.flink.agents.runtime.python.context.PythonRunnerContextImpl;
import org.apache.flink.agents.runtime.python.utils.PythonActionExecutor;
/** An {@link ActionTask} wrapper a Python awaitable to represent a code block
in Python action. */
public class PythonGeneratorActionTask extends PythonActionTask {
- private final String pythonAwaitableRef;
- public PythonGeneratorActionTask(
- Object key, Event event, Action action, String pythonAwaitableRef)
{
+ public PythonGeneratorActionTask(Object key, Event event, Action action) {
super(key, event, action);
- this.pythonAwaitableRef = pythonAwaitableRef;
}
@Override
- public ActionTaskResult invoke(ClassLoader userCodeClassLoader,
PythonActionExecutor executor) {
+ public ActionTaskResult invoke(ClassLoader userCodeClassLoader,
PythonActionExecutor executor)
+ throws Exception {
LOG.debug(
"Try execute python awaitable action {} for event {} with key
{}.",
action.getName(),
event,
key);
+
+ String pythonAwaitableRef =
+ ((PythonRunnerContextImpl)
runnerContext).getPythonAwaitableRef();
+
+ if (pythonAwaitableRef == null) {
+ LOG.info(
+ "Python awaitable ref is null for action {} (likely
restored from checkpoint), "
+ + "re-executing from beginning.",
+ action.getName());
+ PythonActionTask freshTask = new PythonActionTask(key, event,
action);
+ freshTask.setRunnerContext(runnerContext);
+ return freshTask.invoke(userCodeClassLoader, executor);
+ }
+
boolean finished = executor.callPythonAwaitable(pythonAwaitableRef);
ActionTask generatedActionTask = finished ? null : this;
return new ActionTaskResult(
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
index 89306eec..659c48a2 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
@@ -176,6 +176,10 @@ public class PythonActionExecutor {
public boolean callPythonAwaitable(String pythonAwaitableRef) {
// Calling awaitable.send(None) in Python returns a tuple of
(finished, output).
Object pythonAwaitable = interpreter.get(pythonAwaitableRef);
+ checkState(
+ pythonAwaitable != null,
+ "Python awaitable '%s' not found in interpreter. ",
+ pythonAwaitableRef);
Object invokeResult = interpreter.invoke(CALL_PYTHON_AWAITABLE,
pythonAwaitable);
checkState(invokeResult.getClass().isArray() && ((Object[])
invokeResult).length == 2);
return (boolean) ((Object[]) invokeResult)[0];