This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git

commit 01362a6534ed2e73c7b730cc62c7c87018e12348
Author: youjin <[email protected]>
AuthorDate: Mon Jan 12 14:03:51 2026 +0800

    [feature] Support Java resource retrieval through the Python resource using 
the get_resource function
---
 .../pom.xml                                        |  5 +++
 .../resource/test/ChatModelCrossLanguageAgent.java | 39 ++++++++++++++++++----
 python/flink_agents/runtime/python_java_utils.py   | 24 ++++++++++++-
 .../runtime/operator/ActionExecutionOperator.java  |  3 +-
 .../python/utils/PythonResourceAdapterImpl.java    | 35 +++++++++++++++----
 .../utils/PythonResourceAdapterImplTest.java       | 11 +++---
 6 files changed, 99 insertions(+), 18 deletions(-)

diff --git 
a/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/pom.xml 
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/pom.xml
index c93c37f..f06b302 100644
--- a/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/pom.xml
+++ b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/pom.xml
@@ -25,6 +25,11 @@
             <artifactId>flink-agents-runtime</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-agents-integrations-chat-models-ollama</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-streaming-java</artifactId>
diff --git 
a/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/ChatModelCrossLanguageAgent.java
 
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/ChatModelCrossLanguageAgent.java
index 251492e..6712ee5 100644
--- 
a/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/ChatModelCrossLanguageAgent.java
+++ 
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/ChatModelCrossLanguageAgent.java
@@ -35,6 +35,7 @@ import org.apache.flink.agents.api.event.ChatRequestEvent;
 import org.apache.flink.agents.api.event.ChatResponseEvent;
 import org.apache.flink.agents.api.resource.ResourceDescriptor;
 import org.apache.flink.agents.api.resource.ResourceType;
+import 
org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelConnection;
 
 import java.util.Collections;
 import java.util.List;
@@ -64,7 +65,15 @@ public class ChatModelCrossLanguageAgent extends Agent {
     public static final String OLLAMA_MODEL = "qwen3:0.6b";
 
     @ChatModelConnection
-    public static ResourceDescriptor chatModelConnection() {
+    public static ResourceDescriptor javaChatModelConnection() {
+        return 
ResourceDescriptor.Builder.newBuilder(OllamaChatModelConnection.class.getName())
+                .addInitialArgument("endpoint", "http://localhost:11434";)
+                .addInitialArgument("requestTimeout", 240)
+                .build();
+    }
+
+    @ChatModelConnection
+    public static ResourceDescriptor pythonChatModelConnection() {
         return 
ResourceDescriptor.Builder.newBuilder(PythonChatModelConnection.class.getName())
                 .addInitialArgument(
                         "module", 
"flink_agents.integrations.chat_models.ollama_chat_model")
@@ -73,16 +82,27 @@ public class ChatModelCrossLanguageAgent extends Agent {
     }
 
     @ChatModelSetup
-    public static ResourceDescriptor chatModel() {
+    public static ResourceDescriptor temperatureChatModel() {
         return 
ResourceDescriptor.Builder.newBuilder(PythonChatModelSetup.class.getName())
                 .addInitialArgument(
                         "module", 
"flink_agents.integrations.chat_models.ollama_chat_model")
                 .addInitialArgument("clazz", "OllamaChatModelSetup")
-                .addInitialArgument("connection", "chatModelConnection")
+                .addInitialArgument("connection", "javaChatModelConnection")
                 .addInitialArgument("model", OLLAMA_MODEL)
+                .addInitialArgument("tools", List.of("convertTemperature"))
+                .addInitialArgument("extract_reasoning", "true")
+                .build();
+    }
+
+    @ChatModelSetup
+    public static ResourceDescriptor chatModel() {
+        return 
ResourceDescriptor.Builder.newBuilder(PythonChatModelSetup.class.getName())
                 .addInitialArgument(
-                        "tools",
-                        List.of("calculateBMI", "convertTemperature", 
"createRandomNumber"))
+                        "module", 
"flink_agents.integrations.chat_models.ollama_chat_model")
+                .addInitialArgument("clazz", "OllamaChatModelSetup")
+                .addInitialArgument("connection", "pythonChatModelConnection")
+                .addInitialArgument("model", OLLAMA_MODEL)
+                .addInitialArgument("tools", List.of("calculateBMI", 
"createRandomNumber"))
                 .addInitialArgument("extract_reasoning", "true")
                 .build();
     }
@@ -133,9 +153,16 @@ public class ChatModelCrossLanguageAgent extends Agent {
 
     @Action(listenEvents = {InputEvent.class})
     public static void process(InputEvent event, RunnerContext ctx) throws 
Exception {
+        String model;
+        if (event.getInput().toString().contains("temperature")
+                || event.getInput().toString().contains("degree")) {
+            model = "temperatureChatModel";
+        } else {
+            model = "chatModel";
+        }
         ctx.sendEvent(
                 new ChatRequestEvent(
-                        "chatModel",
+                        model,
                         Collections.singletonList(
                                 new ChatMessage(MessageRole.USER, (String) 
event.getInput()))));
     }
diff --git a/python/flink_agents/runtime/python_java_utils.py 
b/python/flink_agents/runtime/python_java_utils.py
index 9debb3c..6a3a84e 100644
--- a/python/flink_agents/runtime/python_java_utils.py
+++ b/python/flink_agents/runtime/python_java_utils.py
@@ -23,9 +23,10 @@ from pemja import findClass
 
 from flink_agents.api.chat_message import ChatMessage, MessageRole
 from flink_agents.api.events.event import InputEvent
-from flink_agents.api.resource import Resource
+from flink_agents.api.resource import Resource, ResourceType, 
get_resource_class
 from flink_agents.api.tools.tool import ToolMetadata
 from flink_agents.api.tools.utils import create_model_from_java_tool_schema_str
+from flink_agents.plan.resource_provider import JAVA_RESOURCE_MAPPING
 from flink_agents.runtime.java.java_resource_wrapper import (
     JavaGetResourceWrapper,
     JavaPrompt,
@@ -106,6 +107,27 @@ def from_java_prompt(j_prompt: Any) -> JavaPrompt:
     """
     return JavaPrompt(j_prompt=j_prompt)
 
+def from_java_resource(type_name: str, kwargs: Dict[str, Any]) -> Resource:
+    """Convert a Java resource object to a Python Resource instance.
+    This function is used to convert a Java resource object to a Python 
Resource
+    instance.
+
+    Args:
+        type_name: Java resource type name
+        kwargs: Keyword arguments
+    Returns:
+        Resource: Python wrapper for the Java resource
+    """
+    class_path = JAVA_RESOURCE_MAPPING.get(ResourceType(type_name))
+    if not class_path:
+        err_msg = f"No Java resource mapping found for {type_name}"
+        raise ValueError(err_msg)
+
+    module_path, class_name = class_path.rsplit(".", 1)
+    cls = get_resource_class(module_path, class_name)
+
+    return cls(**kwargs)
+
 def normalize_tool_call_id(tool_call: Dict[str, Any]) -> Dict[str, Any]:
     """Normalize tool call by converting the ID field to string format while 
preserving
     all other fields.
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index 6b12a6d..084ad1f 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -620,7 +620,8 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
                                 throw new RuntimeException(e);
                             }
                         },
-                        pythonInterpreter);
+                        pythonInterpreter,
+                        javaResourceAdapter);
         pythonResourceAdapter.open();
         agentPlan.setPythonResourceAdapter(pythonResourceAdapter);
     }
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java
index cd9b935..b167e19 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java
@@ -18,6 +18,7 @@
 package org.apache.flink.agents.runtime.python.utils;
 
 import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.messages.MessageRole;
 import org.apache.flink.agents.api.prompt.Prompt;
 import org.apache.flink.agents.api.resource.Resource;
 import org.apache.flink.agents.api.resource.ResourceType;
@@ -27,6 +28,7 @@ import org.apache.flink.agents.api.tools.Tool;
 import pemja.core.PythonInterpreter;
 import pemja.core.object.PyObject;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.function.BiFunction;
 
@@ -34,6 +36,10 @@ public class PythonResourceAdapterImpl implements 
PythonResourceAdapter {
 
     static final String PYTHON_IMPORTS = "from flink_agents.runtime import 
python_java_utils";
 
+    static final String JAVA_RESOURCE = "j_resource";
+
+    static final String JAVA_RESOURCE_ADAPTER = "j_resource_adapter";
+
     static final String GET_RESOURCE_KEY = "get_resource";
 
     static final String PYTHON_MODULE_PREFIX = "python_java_utils.";
@@ -44,22 +50,28 @@ public class PythonResourceAdapterImpl implements 
PythonResourceAdapter {
 
     static final String CREATE_RESOURCE = PYTHON_MODULE_PREFIX + 
"create_resource";
 
+    static final String FROM_JAVA_RESOURCE = PYTHON_MODULE_PREFIX + 
"from_java_resource";
+
     static final String FROM_JAVA_TOOL = PYTHON_MODULE_PREFIX + 
"from_java_tool";
 
     static final String FROM_JAVA_PROMPT = PYTHON_MODULE_PREFIX + 
"from_java_prompt";
 
     static final String FROM_JAVA_CHAT_MESSAGE = PYTHON_MODULE_PREFIX + 
"from_java_chat_message";
 
-    static final String TO_JAVA_CHAT_MESSAGE = PYTHON_MODULE_PREFIX + 
"to_java_chat_message";
+    static final String TO_JAVA_CHAT_MESSAGE = PYTHON_MODULE_PREFIX + 
"update_java_chat_message";
 
     private final BiFunction<String, ResourceType, Resource> getResource;
     private final PythonInterpreter interpreter;
+    private final JavaResourceAdapter javaResourceAdapter;
     private Object pythonGetResourceFunction;
 
     public PythonResourceAdapterImpl(
-            BiFunction<String, ResourceType, Resource> getResource, 
PythonInterpreter interpreter) {
+            BiFunction<String, ResourceType, Resource> getResource,
+            PythonInterpreter interpreter,
+            JavaResourceAdapter javaResourceAdapter) {
         this.getResource = getResource;
         this.interpreter = interpreter;
+        this.javaResourceAdapter = javaResourceAdapter;
     }
 
     public void open() {
@@ -80,7 +92,15 @@ public class PythonResourceAdapterImpl implements 
PythonResourceAdapter {
         if (resource instanceof Prompt) {
             return convertToPythonPrompt((Prompt) resource);
         }
-        return resource;
+        return toPythonResource(resourceType, resource);
+    }
+
+    private Object toPythonResource(String resourceType, Resource resource) {
+        Map<String, Object> kwargs = new HashMap<>();
+        kwargs.put(JAVA_RESOURCE, resource);
+        kwargs.put(JAVA_RESOURCE_ADAPTER, javaResourceAdapter);
+        kwargs.put(GET_RESOURCE_KEY, pythonGetResourceFunction);
+        return interpreter.invoke(FROM_JAVA_RESOURCE, resourceType, kwargs);
     }
 
     @Override
@@ -96,10 +116,13 @@ public class PythonResourceAdapterImpl implements 
PythonResourceAdapter {
 
     @Override
     public ChatMessage fromPythonChatMessage(Object pythonChatMessage) {
-        ChatMessage message =
-                (ChatMessage) interpreter.invoke(TO_JAVA_CHAT_MESSAGE, 
pythonChatMessage);
+        // TODO: Update this method after the pemja findClass method is fixed.
+        ChatMessage chatMessage = new ChatMessage();
 
-        return message;
+        String roleValue =
+                (String) interpreter.invoke(TO_JAVA_CHAT_MESSAGE, 
pythonChatMessage, chatMessage);
+        chatMessage.setRole(MessageRole.fromValue(roleValue));
+        return chatMessage;
     }
 
     @Override
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImplTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImplTest.java
index 18ee02e..282c2ae 100644
--- 
a/runtime/src/test/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImplTest.java
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImplTest.java
@@ -49,7 +49,7 @@ public class PythonResourceAdapterImplTest {
     @BeforeEach
     void setUp() throws Exception {
         mocks = MockitoAnnotations.openMocks(this);
-        pythonResourceAdapter = new PythonResourceAdapterImpl(getResource, 
mockInterpreter);
+        pythonResourceAdapter = new PythonResourceAdapterImpl(getResource, 
mockInterpreter, null);
     }
 
     @AfterEach
@@ -152,10 +152,13 @@ public class PythonResourceAdapterImplTest {
 
         when(getResource.apply(resourceName, 
ResourceType.CHAT_MODEL)).thenReturn(mockResource);
 
-        Object result = pythonResourceAdapter.getResource(resourceName, 
resourceType);
+        pythonResourceAdapter.getResource(resourceName, resourceType);
 
-        assertThat(result).isEqualTo(mockResource);
-        verify(getResource).apply(resourceName, ResourceType.CHAT_MODEL);
+        verify(mockInterpreter)
+                .invoke(
+                        eq(PythonResourceAdapterImpl.FROM_JAVA_RESOURCE),
+                        eq(resourceType),
+                        any(Map.class));
     }
 
     @Test

Reply via email to