This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-0.1
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/release-0.1 by this push:
     new 2a7c2cb  [runtime] Remove userCodeClassLoader field for ActionTask to 
fix can't be serialized to state. (#379)
2a7c2cb is described below

commit 2a7c2cb55fcd961655966cbe6b2014469ba31082
Author: Wenjin Xie <[email protected]>
AuthorDate: Wed Dec 17 11:29:33 2025 +0800

    [runtime] Remove userCodeClassLoader field for ActionTask to fix can't be 
serialized to state. (#379)
---
 .../flink/agents/runtime/operator/ActionExecutionOperator.java    | 6 +++---
 .../java/org/apache/flink/agents/runtime/operator/ActionTask.java | 2 +-
 .../org/apache/flink/agents/runtime/operator/JavaActionTask.java  | 8 ++------
 .../flink/agents/runtime/python/operator/PythonActionTask.java    | 4 ++--
 .../agents/runtime/python/operator/PythonGeneratorActionTask.java | 2 +-
 .../agents/runtime/{RescalingITCase.java => RescalingTest.java}   | 4 ++--
 6 files changed, 11 insertions(+), 15 deletions(-)

diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index 95b991b..d00e8d8 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -405,7 +405,8 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
             }
         } else {
             maybeInitActionState(key, sequenceNumber, actionTask.action, 
actionTask.event);
-            ActionTask.ActionTaskResult actionTaskResult = actionTask.invoke();
+            ActionTask.ActionTaskResult actionTaskResult =
+                    
actionTask.invoke(getRuntimeContext().getUserCodeClassLoader());
 
             // We remove the RunnerContext of the action task from the map 
after it is finished. The
             // RunnerContext will be added later if the action task has a 
generated action task,
@@ -638,8 +639,7 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
 
     private ActionTask createActionTask(Object key, Action action, Event 
event) {
         if (action.getExec() instanceof JavaFunction) {
-            return new JavaActionTask(
-                    key, event, action, 
getRuntimeContext().getUserCodeClassLoader());
+            return new JavaActionTask(key, event, action);
         } else if (action.getExec() instanceof PythonFunction) {
             return new PythonActionTask(key, event, action);
         } else {
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionTask.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionTask.java
index d8a30b3..34f7850 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionTask.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionTask.java
@@ -87,7 +87,7 @@ public abstract class ActionTask {
     }
 
     /** Invokes the action task. */
-    public abstract ActionTaskResult invoke() throws Exception;
+    public abstract ActionTaskResult invoke(ClassLoader userCodeClassLoader) 
throws Exception;
 
     public class ActionTaskResult {
         private final boolean finished;
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/JavaActionTask.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/JavaActionTask.java
index 5898758..9fc641b 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/JavaActionTask.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/JavaActionTask.java
@@ -30,17 +30,13 @@ import static 
org.apache.flink.util.Preconditions.checkState;
  * action task will be invoked only once.
  */
 public class JavaActionTask extends ActionTask {
-
-    private final ClassLoader userCodeClassLoader;
-
-    public JavaActionTask(Object key, Event event, Action action, ClassLoader 
userCodeClassLoader) {
+    public JavaActionTask(Object key, Event event, Action action) {
         super(key, event, action);
         checkState(action.getExec() instanceof JavaFunction);
-        this.userCodeClassLoader = userCodeClassLoader;
     }
 
     @Override
-    public ActionTaskResult invoke() throws Exception {
+    public ActionTaskResult invoke(ClassLoader userCodeClassLoader) throws 
Exception {
         LOG.debug(
                 "Try execute java action {} for event {} with key {}.",
                 action.getName(),
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
index c3e1f84..89b39c1 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
@@ -43,7 +43,7 @@ public class PythonActionTask extends ActionTask {
                 "Python action only accept python event, but got " + event);
     }
 
-    public ActionTaskResult invoke() throws Exception {
+    public ActionTaskResult invoke(ClassLoader userCodeClassLoader) throws 
Exception {
         LOG.debug(
                 "Try execute python action {} for event {} with key {}.",
                 action.getName(),
@@ -64,7 +64,7 @@ public class PythonActionTask extends ActionTask {
             ActionTask tempGeneratedActionTask =
                     new PythonGeneratorActionTask(key, event, action, 
pythonGeneratorRef);
             tempGeneratedActionTask.setRunnerContext(runnerContext);
-            return tempGeneratedActionTask.invoke();
+            return tempGeneratedActionTask.invoke(userCodeClassLoader);
         }
         return new ActionTaskResult(
                 true, runnerContext.drainEvents(event.getSourceTimestamp()), 
null);
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
index 4119fcd..96afa19 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
@@ -32,7 +32,7 @@ public class PythonGeneratorActionTask extends 
PythonActionTask {
     }
 
     @Override
-    public ActionTaskResult invoke() {
+    public ActionTaskResult invoke(ClassLoader userCodeClassLoader) {
         LOG.debug(
                 "Try execute python generator action {} for event {} with key 
{}.",
                 action.getName(),
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/RescalingITCase.java 
b/runtime/src/test/java/org/apache/flink/agents/runtime/RescalingTest.java
similarity index 99%
rename from 
runtime/src/test/java/org/apache/flink/agents/runtime/RescalingITCase.java
rename to 
runtime/src/test/java/org/apache/flink/agents/runtime/RescalingTest.java
index cb2e255..dca3ed2 100644
--- a/runtime/src/test/java/org/apache/flink/agents/runtime/RescalingITCase.java
+++ b/runtime/src/test/java/org/apache/flink/agents/runtime/RescalingTest.java
@@ -89,7 +89,7 @@ import static org.junit.Assert.assertTrue;
 
 /** This test case is derived from an existing test in Flink. */
 @RunWith(Parameterized.class)
-public class RescalingITCase extends TestLogger {
+public class RescalingTest extends TestLogger {
 
     @ClassRule
     public static final TestExecutorResource<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
@@ -104,7 +104,7 @@ public class RescalingITCase extends TestLogger {
         return Arrays.asList(new Object[][] {{"hashmap"}, {"rocksdb"}});
     }
 
-    public RescalingITCase(String backend) {
+    public RescalingTest(String backend) {
         this.backend = backend;
     }
 

Reply via email to