This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-0.1
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/release-0.1 by this push:
     new 226164d  [runtime] Execute action within user code class loader 
context. (#277)
226164d is described below

commit 226164d0b1c45a6245adfda3c147a25c3577d270
Author: Wenjin Xie <[email protected]>
AuthorDate: Tue Oct 21 16:44:18 2025 +0800

    [runtime] Execute action within user code class loader context. (#277)
---
 docs/content/docs/get-started/quickstart/react_agent.md     |  8 ++------
 docs/content/docs/get-started/quickstart/workflow_agent.md  |  6 +-----
 .../java/org/apache/flink/agents/plan/JavaFunction.java     |  6 +++---
 .../agents/runtime/operator/ActionExecutionOperator.java    |  4 ++--
 .../flink/agents/runtime/operator/JavaActionTask.java       | 13 +++++++++++--
 5 files changed, 19 insertions(+), 18 deletions(-)

diff --git a/docs/content/docs/get-started/quickstart/react_agent.md 
b/docs/content/docs/get-started/quickstart/react_agent.md
index 142da1d..b58a586 100644
--- a/docs/content/docs/get-started/quickstart/react_agent.md
+++ b/docs/content/docs/get-started/quickstart/react_agent.md
@@ -243,12 +243,8 @@ export PYTHONPATH=$(python -c 'import sysconfig; 
print(sysconfig.get_paths()["pu
 {{< /tab >}}
 
 {{< tab "Java" >}}
-1. Build Flink Agents from source to generate example jar. See 
[installation]({{< ref "docs/get-started/installation" >}}) for more details. 
-2. Copy the Flink Agents example jar to Flink lib directory
-    ```bash
-    cp flink-agents/examples/target/flink-agents-examples-$VERSION.jar 
./flink-1.20.3/lib/
-    ```
-3. Start the Flink cluster
+1. Build Flink Agents from source to generate example jar. See 
[installation]({{< ref "docs/get-started/installation" >}}) for more details.
+2. Start the Flink cluster
     ```bash
     ./flink-1.20.3/bin/start-cluster.sh
     ```
diff --git a/docs/content/docs/get-started/quickstart/workflow_agent.md 
b/docs/content/docs/get-started/quickstart/workflow_agent.md
index 0c22fb9..f7c5000 100644
--- a/docs/content/docs/get-started/quickstart/workflow_agent.md
+++ b/docs/content/docs/get-started/quickstart/workflow_agent.md
@@ -368,11 +368,7 @@ export PYTHONPATH=$(python -c 'import sysconfig; 
print(sysconfig.get_paths()["pu
 
 {{< tab "Java" >}}
 1. Build Flink Agents from source to generate example jar. See 
[installation]({{< ref "docs/get-started/installation" >}}) for more details.
-2. Copy the Flink Agents example jar to Flink lib directory
-    ```bash
-    cp flink-agents/examples/target/flink-agents-examples-$VERSION.jar 
./flink-1.20.3/lib/
-    ```
-3. Start the Flink cluster
+2. Start the Flink cluster
     ```bash
     ./flink-1.20.3/bin/start-cluster.sh
     ```
diff --git a/plan/src/main/java/org/apache/flink/agents/plan/JavaFunction.java 
b/plan/src/main/java/org/apache/flink/agents/plan/JavaFunction.java
index e007646..ad5279c 100644
--- a/plan/src/main/java/org/apache/flink/agents/plan/JavaFunction.java
+++ b/plan/src/main/java/org/apache/flink/agents/plan/JavaFunction.java
@@ -49,8 +49,6 @@ public class JavaFunction implements Function {
         this.qualName = qualName;
         this.methodName = methodName;
         this.parameterTypes = parameterTypes;
-        // TODO: support get method loaded by user code classloader.
-        this.method = Class.forName(qualName).getMethod(methodName, 
parameterTypes);
     }
 
     public JavaFunction(Class<?> clazz, String methodName, Class<?>[] 
parameterTypes)
@@ -75,7 +73,9 @@ public class JavaFunction implements Function {
 
     public Method getMethod() throws ClassNotFoundException, 
NoSuchMethodException {
         if (method == null) {
-            this.method = Class.forName(qualName).getMethod(methodName, 
parameterTypes);
+            this.method =
+                    Class.forName(qualName, true, 
Thread.currentThread().getContextClassLoader())
+                            .getMethod(methodName, parameterTypes);
         }
         return method;
     }
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index 2543dee..a17299e 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -181,7 +181,6 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
     @Override
     public void open() throws Exception {
         super.open();
-
         reusedStreamRecord = new StreamRecord<>(null);
 
         // init shortTermMemState
@@ -639,7 +638,8 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
 
     private ActionTask createActionTask(Object key, Action action, Event 
event) {
         if (action.getExec() instanceof JavaFunction) {
-            return new JavaActionTask(key, event, action);
+            return new JavaActionTask(
+                    key, event, action, 
getRuntimeContext().getUserCodeClassLoader());
         } else if (action.getExec() instanceof PythonFunction) {
             return new PythonActionTask(key, event, action, 
pythonActionExecutor);
         } else {
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/JavaActionTask.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/JavaActionTask.java
index 1565bd0..5898758 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/JavaActionTask.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/JavaActionTask.java
@@ -31,9 +31,12 @@ import static org.apache.flink.util.Preconditions.checkState;
  */
 public class JavaActionTask extends ActionTask {
 
-    public JavaActionTask(Object key, Event event, Action action) {
+    private final ClassLoader userCodeClassLoader;
+
+    public JavaActionTask(Object key, Event event, Action action, ClassLoader 
userCodeClassLoader) {
         super(key, event, action);
         checkState(action.getExec() instanceof JavaFunction);
+        this.userCodeClassLoader = userCodeClassLoader;
     }
 
     @Override
@@ -44,7 +47,13 @@ public class JavaActionTask extends ActionTask {
                 event,
                 key);
         runnerContext.checkNoPendingEvents();
-        action.getExec().call(event, runnerContext);
+        ClassLoader cl = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(userCodeClassLoader);
+            action.getExec().call(event, runnerContext);
+        } finally {
+            Thread.currentThread().setContextClassLoader(cl);
+        }
         return new ActionTaskResult(
                 true, runnerContext.drainEvents(event.getSourceTimestamp()), 
null);
     }

Reply via email to