This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/release-0.2 by this push:
new b8559a3e [Bugfix] Use Thread Context ClassLoader for user class
loading (#514)
b8559a3e is described below
commit b8559a3eb8b3439767cbe73877b8a8cb5407a77f
Author: Nico Duldhardt <[email protected]>
AuthorDate: Wed Feb 4 09:50:03 2026 +0100
[Bugfix] Use Thread Context ClassLoader for user class loading (#514)
---
.../main/java/org/apache/flink/agents/plan/AgentPlan.java | 11 +++++++++--
.../plan/resourceprovider/JavaResourceProvider.java | 3 ++-
.../JavaSerializableResourceProvider.java | 7 ++++++-
.../agents/plan/serializer/ActionJsonDeserializer.java | 15 +++++++++++++--
.../tools/serializer/FunctionToolJsonDeserializer.java | 6 +++++-
.../runtime/eventlog/EventLogRecordJsonDeserializer.java | 3 ++-
6 files changed, 37 insertions(+), 8 deletions(-)
diff --git a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
index 2cd77915..0641e819 100644
--- a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
+++ b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
@@ -375,7 +375,11 @@ public class AgentPlan implements Serializable {
descriptor =
descriptorDecorator != null ?
descriptorDecorator.apply(descriptor) : descriptor;
- if
(PythonResourceWrapper.class.isAssignableFrom(Class.forName(descriptor.getClazz())))
{
+ if (PythonResourceWrapper.class.isAssignableFrom(
+ Class.forName(
+ descriptor.getClazz(),
+ true,
+ Thread.currentThread().getContextClassLoader()))) {
provider = new PythonResourceProvider(name, type, descriptor);
} else {
provider = new JavaResourceProvider(name, type, descriptor);
@@ -561,7 +565,10 @@ public class AgentPlan implements Serializable {
for (Map.Entry<String, Object> kv :
entry.getValue().entrySet()) {
ResourceProvider provider;
if (PythonResourceWrapper.class.isAssignableFrom(
- Class.forName(((ResourceDescriptor)
kv.getValue()).getClazz()))) {
+ Class.forName(
+ ((ResourceDescriptor)
kv.getValue()).getClazz(),
+ true,
+
Thread.currentThread().getContextClassLoader()))) {
provider =
new PythonResourceProvider(
kv.getKey(), type,
(ResourceDescriptor) kv.getValue());
diff --git
a/plan/src/main/java/org/apache/flink/agents/plan/resourceprovider/JavaResourceProvider.java
b/plan/src/main/java/org/apache/flink/agents/plan/resourceprovider/JavaResourceProvider.java
index 373c3c4b..ff7ab7c7 100644
---
a/plan/src/main/java/org/apache/flink/agents/plan/resourceprovider/JavaResourceProvider.java
+++
b/plan/src/main/java/org/apache/flink/agents/plan/resourceprovider/JavaResourceProvider.java
@@ -43,7 +43,8 @@ public class JavaResourceProvider extends ResourceProvider {
} else {
clazzName =
descriptor.getInitialArguments().remove("java_clazz").toString();
}
- Class<?> clazz = Class.forName(clazzName);
+ Class<?> clazz =
+ Class.forName(clazzName, true,
Thread.currentThread().getContextClassLoader());
Constructor<?> constructor =
clazz.getConstructor(ResourceDescriptor.class,
BiFunction.class);
return (Resource) constructor.newInstance(descriptor, getResource);
diff --git
a/plan/src/main/java/org/apache/flink/agents/plan/resourceprovider/JavaSerializableResourceProvider.java
b/plan/src/main/java/org/apache/flink/agents/plan/resourceprovider/JavaSerializableResourceProvider.java
index 7ebc6567..bea54f3d 100644
---
a/plan/src/main/java/org/apache/flink/agents/plan/resourceprovider/JavaSerializableResourceProvider.java
+++
b/plan/src/main/java/org/apache/flink/agents/plan/resourceprovider/JavaSerializableResourceProvider.java
@@ -83,7 +83,12 @@ public class JavaSerializableResourceProvider extends
SerializableResourceProvid
if (resource == null) {
resource =
(SerializableResource)
- objectMapper.readValue(serializedResource,
Class.forName(getClazz()));
+ objectMapper.readValue(
+ serializedResource,
+ Class.forName(
+ getClazz(),
+ true,
+
Thread.currentThread().getContextClassLoader()));
}
return resource;
}
diff --git
a/plan/src/main/java/org/apache/flink/agents/plan/serializer/ActionJsonDeserializer.java
b/plan/src/main/java/org/apache/flink/agents/plan/serializer/ActionJsonDeserializer.java
index 974dc9ef..2618721f 100644
---
a/plan/src/main/java/org/apache/flink/agents/plan/serializer/ActionJsonDeserializer.java
+++
b/plan/src/main/java/org/apache/flink/agents/plan/serializer/ActionJsonDeserializer.java
@@ -108,7 +108,11 @@ public class ActionJsonDeserializer extends
StdDeserializer<Action> {
for (int i = 0; i < parameterTypes.length; i++) {
try {
String parameterTypeName =
execNode.get("parameter_types").get(i).asText();
- parameterTypes[i] = Class.forName(parameterTypeName);
+ parameterTypes[i] =
+ Class.forName(
+ parameterTypeName,
+ true,
+
Thread.currentThread().getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IOException("Failed to deserialize parameter type",
e);
}
@@ -141,7 +145,14 @@ public class ActionJsonDeserializer extends
StdDeserializer<Action> {
JsonNode clazzAndValue = entry.getValue();
String clazz = clazzAndValue.get("@class").asText();
JsonNode value = clazzAndValue.get("value");
- config.put(key, mapper.treeToValue(value,
Class.forName(clazz)));
+ config.put(
+ key,
+ mapper.treeToValue(
+ value,
+ Class.forName(
+ clazz,
+ true,
+
Thread.currentThread().getContextClassLoader())));
}
}
return config;
diff --git
a/plan/src/main/java/org/apache/flink/agents/plan/tools/serializer/FunctionToolJsonDeserializer.java
b/plan/src/main/java/org/apache/flink/agents/plan/tools/serializer/FunctionToolJsonDeserializer.java
index c8b3282d..e2234bd3 100644
---
a/plan/src/main/java/org/apache/flink/agents/plan/tools/serializer/FunctionToolJsonDeserializer.java
+++
b/plan/src/main/java/org/apache/flink/agents/plan/tools/serializer/FunctionToolJsonDeserializer.java
@@ -77,7 +77,11 @@ public class FunctionToolJsonDeserializer extends
StdDeserializer<FunctionTool>
for (int i = 0; i < parameterTypes.length; i++) {
try {
String parameterTypeName =
execNode.get("parameter_types").get(i).asText();
- parameterTypes[i] = Class.forName(parameterTypeName);
+ parameterTypes[i] =
+ Class.forName(
+ parameterTypeName,
+ true,
+
Thread.currentThread().getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IOException("Failed to deserialize parameter type",
e);
}
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonDeserializer.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonDeserializer.java
index a516edf5..45c7e624 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonDeserializer.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonDeserializer.java
@@ -83,7 +83,8 @@ public class EventLogRecordJsonDeserializer extends
JsonDeserializer<EventLogRec
throws IOException {
try {
// Load the concrete event class
- Class<?> eventClass = Class.forName(eventType);
+ Class<?> eventClass =
+ Class.forName(eventType, true,
Thread.currentThread().getContextClassLoader());
// Verify it's actually an Event subclass
if (!Event.class.isAssignableFrom(eventClass)) {