This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch release-0.1
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/release-0.1 by this push:
new 2a7c2cb [runtime] Remove userCodeClassLoader field for ActionTask to
fix can't be serialized to state. (#379)
2a7c2cb is described below
commit 2a7c2cb55fcd961655966cbe6b2014469ba31082
Author: Wenjin Xie <[email protected]>
AuthorDate: Wed Dec 17 11:29:33 2025 +0800
[runtime] Remove userCodeClassLoader field for ActionTask to fix can't be
serialized to state. (#379)
---
.../flink/agents/runtime/operator/ActionExecutionOperator.java | 6 +++---
.../java/org/apache/flink/agents/runtime/operator/ActionTask.java | 2 +-
.../org/apache/flink/agents/runtime/operator/JavaActionTask.java | 8 ++------
.../flink/agents/runtime/python/operator/PythonActionTask.java | 4 ++--
.../agents/runtime/python/operator/PythonGeneratorActionTask.java | 2 +-
.../agents/runtime/{RescalingITCase.java => RescalingTest.java} | 4 ++--
6 files changed, 11 insertions(+), 15 deletions(-)
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index 95b991b..d00e8d8 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -405,7 +405,8 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
}
} else {
maybeInitActionState(key, sequenceNumber, actionTask.action,
actionTask.event);
- ActionTask.ActionTaskResult actionTaskResult = actionTask.invoke();
+ ActionTask.ActionTaskResult actionTaskResult =
+
actionTask.invoke(getRuntimeContext().getUserCodeClassLoader());
// We remove the RunnerContext of the action task from the map
after it is finished. The
// RunnerContext will be added later if the action task has a
generated action task,
@@ -638,8 +639,7 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
private ActionTask createActionTask(Object key, Action action, Event
event) {
if (action.getExec() instanceof JavaFunction) {
- return new JavaActionTask(
- key, event, action,
getRuntimeContext().getUserCodeClassLoader());
+ return new JavaActionTask(key, event, action);
} else if (action.getExec() instanceof PythonFunction) {
return new PythonActionTask(key, event, action);
} else {
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionTask.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionTask.java
index d8a30b3..34f7850 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionTask.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionTask.java
@@ -87,7 +87,7 @@ public abstract class ActionTask {
}
/** Invokes the action task. */
- public abstract ActionTaskResult invoke() throws Exception;
+ public abstract ActionTaskResult invoke(ClassLoader userCodeClassLoader)
throws Exception;
public class ActionTaskResult {
private final boolean finished;
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/JavaActionTask.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/JavaActionTask.java
index 5898758..9fc641b 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/JavaActionTask.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/JavaActionTask.java
@@ -30,17 +30,13 @@ import static
org.apache.flink.util.Preconditions.checkState;
* action task will be invoked only once.
*/
public class JavaActionTask extends ActionTask {
-
- private final ClassLoader userCodeClassLoader;
-
- public JavaActionTask(Object key, Event event, Action action, ClassLoader
userCodeClassLoader) {
+ public JavaActionTask(Object key, Event event, Action action) {
super(key, event, action);
checkState(action.getExec() instanceof JavaFunction);
- this.userCodeClassLoader = userCodeClassLoader;
}
@Override
- public ActionTaskResult invoke() throws Exception {
+ public ActionTaskResult invoke(ClassLoader userCodeClassLoader) throws
Exception {
LOG.debug(
"Try execute java action {} for event {} with key {}.",
action.getName(),
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
index c3e1f84..89b39c1 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
@@ -43,7 +43,7 @@ public class PythonActionTask extends ActionTask {
"Python action only accept python event, but got " + event);
}
- public ActionTaskResult invoke() throws Exception {
+ public ActionTaskResult invoke(ClassLoader userCodeClassLoader) throws
Exception {
LOG.debug(
"Try execute python action {} for event {} with key {}.",
action.getName(),
@@ -64,7 +64,7 @@ public class PythonActionTask extends ActionTask {
ActionTask tempGeneratedActionTask =
new PythonGeneratorActionTask(key, event, action,
pythonGeneratorRef);
tempGeneratedActionTask.setRunnerContext(runnerContext);
- return tempGeneratedActionTask.invoke();
+ return tempGeneratedActionTask.invoke(userCodeClassLoader);
}
return new ActionTaskResult(
true, runnerContext.drainEvents(event.getSourceTimestamp()),
null);
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
index 4119fcd..96afa19 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
@@ -32,7 +32,7 @@ public class PythonGeneratorActionTask extends
PythonActionTask {
}
@Override
- public ActionTaskResult invoke() {
+ public ActionTaskResult invoke(ClassLoader userCodeClassLoader) {
LOG.debug(
"Try execute python generator action {} for event {} with key
{}.",
action.getName(),
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/RescalingITCase.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/RescalingTest.java
similarity index 99%
rename from
runtime/src/test/java/org/apache/flink/agents/runtime/RescalingITCase.java
rename to
runtime/src/test/java/org/apache/flink/agents/runtime/RescalingTest.java
index cb2e255..dca3ed2 100644
--- a/runtime/src/test/java/org/apache/flink/agents/runtime/RescalingITCase.java
+++ b/runtime/src/test/java/org/apache/flink/agents/runtime/RescalingTest.java
@@ -89,7 +89,7 @@ import static org.junit.Assert.assertTrue;
/** This test case is derived from an existing test in Flink. */
@RunWith(Parameterized.class)
-public class RescalingITCase extends TestLogger {
+public class RescalingTest extends TestLogger {
@ClassRule
public static final TestExecutorResource<ScheduledExecutorService>
EXECUTOR_RESOURCE =
@@ -104,7 +104,7 @@ public class RescalingITCase extends TestLogger {
return Arrays.asList(new Object[][] {{"hashmap"}, {"rocksdb"}});
}
- public RescalingITCase(String backend) {
+ public RescalingTest(String backend) {
this.backend = backend;
}