This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch release-0.1
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/release-0.1 by this push:
new 226164d [runtime] Execute action within user code class loader
context. (#277)
226164d is described below
commit 226164d0b1c45a6245adfda3c147a25c3577d270
Author: Wenjin Xie <[email protected]>
AuthorDate: Tue Oct 21 16:44:18 2025 +0800
[runtime] Execute action within user code class loader context. (#277)
---
docs/content/docs/get-started/quickstart/react_agent.md | 8 ++------
docs/content/docs/get-started/quickstart/workflow_agent.md | 6 +-----
.../java/org/apache/flink/agents/plan/JavaFunction.java | 6 +++---
.../agents/runtime/operator/ActionExecutionOperator.java | 4 ++--
.../flink/agents/runtime/operator/JavaActionTask.java | 13 +++++++++++--
5 files changed, 19 insertions(+), 18 deletions(-)
diff --git a/docs/content/docs/get-started/quickstart/react_agent.md
b/docs/content/docs/get-started/quickstart/react_agent.md
index 142da1d..b58a586 100644
--- a/docs/content/docs/get-started/quickstart/react_agent.md
+++ b/docs/content/docs/get-started/quickstart/react_agent.md
@@ -243,12 +243,8 @@ export PYTHONPATH=$(python -c 'import sysconfig;
print(sysconfig.get_paths()["pu
{{< /tab >}}
{{< tab "Java" >}}
-1. Build Flink Agents from source to generate example jar. See
[installation]({{< ref "docs/get-started/installation" >}}) for more details.
-2. Copy the Flink Agents example jar to Flink lib directory
- ```bash
- cp flink-agents/examples/target/flink-agents-examples-$VERSION.jar
./flink-1.20.3/lib/
- ```
-3. Start the Flink cluster
+1. Build Flink Agents from source to generate example jar. See
[installation]({{< ref "docs/get-started/installation" >}}) for more details.
+2. Start the Flink cluster
```bash
./flink-1.20.3/bin/start-cluster.sh
```
diff --git a/docs/content/docs/get-started/quickstart/workflow_agent.md
b/docs/content/docs/get-started/quickstart/workflow_agent.md
index 0c22fb9..f7c5000 100644
--- a/docs/content/docs/get-started/quickstart/workflow_agent.md
+++ b/docs/content/docs/get-started/quickstart/workflow_agent.md
@@ -368,11 +368,7 @@ export PYTHONPATH=$(python -c 'import sysconfig;
print(sysconfig.get_paths()["pu
{{< tab "Java" >}}
1. Build Flink Agents from source to generate example jar. See
[installation]({{< ref "docs/get-started/installation" >}}) for more details.
-2. Copy the Flink Agents example jar to Flink lib directory
- ```bash
- cp flink-agents/examples/target/flink-agents-examples-$VERSION.jar
./flink-1.20.3/lib/
- ```
-3. Start the Flink cluster
+2. Start the Flink cluster
```bash
./flink-1.20.3/bin/start-cluster.sh
```
diff --git a/plan/src/main/java/org/apache/flink/agents/plan/JavaFunction.java
b/plan/src/main/java/org/apache/flink/agents/plan/JavaFunction.java
index e007646..ad5279c 100644
--- a/plan/src/main/java/org/apache/flink/agents/plan/JavaFunction.java
+++ b/plan/src/main/java/org/apache/flink/agents/plan/JavaFunction.java
@@ -49,8 +49,6 @@ public class JavaFunction implements Function {
this.qualName = qualName;
this.methodName = methodName;
this.parameterTypes = parameterTypes;
- // TODO: support get method loaded by user code classloader.
- this.method = Class.forName(qualName).getMethod(methodName,
parameterTypes);
}
public JavaFunction(Class<?> clazz, String methodName, Class<?>[]
parameterTypes)
@@ -75,7 +73,9 @@ public class JavaFunction implements Function {
public Method getMethod() throws ClassNotFoundException,
NoSuchMethodException {
if (method == null) {
- this.method = Class.forName(qualName).getMethod(methodName,
parameterTypes);
+ this.method =
+ Class.forName(qualName, true,
Thread.currentThread().getContextClassLoader())
+ .getMethod(methodName, parameterTypes);
}
return method;
}
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index 2543dee..a17299e 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -181,7 +181,6 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
@Override
public void open() throws Exception {
super.open();
-
reusedStreamRecord = new StreamRecord<>(null);
// init shortTermMemState
@@ -639,7 +638,8 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
private ActionTask createActionTask(Object key, Action action, Event
event) {
if (action.getExec() instanceof JavaFunction) {
- return new JavaActionTask(key, event, action);
+ return new JavaActionTask(
+ key, event, action,
getRuntimeContext().getUserCodeClassLoader());
} else if (action.getExec() instanceof PythonFunction) {
return new PythonActionTask(key, event, action,
pythonActionExecutor);
} else {
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/JavaActionTask.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/JavaActionTask.java
index 1565bd0..5898758 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/JavaActionTask.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/JavaActionTask.java
@@ -31,9 +31,12 @@ import static org.apache.flink.util.Preconditions.checkState;
*/
public class JavaActionTask extends ActionTask {
- public JavaActionTask(Object key, Event event, Action action) {
+ private final ClassLoader userCodeClassLoader;
+
+ public JavaActionTask(Object key, Event event, Action action, ClassLoader
userCodeClassLoader) {
super(key, event, action);
checkState(action.getExec() instanceof JavaFunction);
+ this.userCodeClassLoader = userCodeClassLoader;
}
@Override
@@ -44,7 +47,13 @@ public class JavaActionTask extends ActionTask {
event,
key);
runnerContext.checkNoPendingEvents();
- action.getExec().call(event, runnerContext);
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(userCodeClassLoader);
+ action.getExec().call(event, runnerContext);
+ } finally {
+ Thread.currentThread().setContextClassLoader(cl);
+ }
return new ActionTaskResult(
true, runnerContext.drainEvents(event.getSourceTimestamp()),
null);
}