This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new b918d83  [Feature][runtime] Support the use of Python ChatModel in 
Java (#314)
b918d83 is described below

commit b918d83e3932e58450f4d8474143434864734f16
Author: Eugene <[email protected]>
AuthorDate: Fri Dec 12 14:41:53 2025 +0800

    [Feature][runtime] Support the use of Python ChatModel in Java (#314)
---
 .github/workflows/ci.yml                           |   6 +-
 api/pom.xml                                        |   6 +
 .../agents/api/chat/messages/ChatMessage.java      |   6 -
 .../model/python/PythonChatModelConnection.java    |  90 ++++++++++
 .../chat/model/python/PythonChatModelSetup.java    |  88 ++++++++++
 .../api/resource/python/PythonResourceAdapter.java |  95 ++++++++++
 .../api/resource/python/PythonResourceWrapper.java |  32 ++++
 .../python/PythonChatModelConnectionTest.java      | 158 +++++++++++++++++
 .../model/python/PythonChatModelSetupTest.java     | 161 +++++++++++++++++
 .../pom.xml                                        |  40 +----
 .../resource/test/ChatModelCrossLanguageAgent.java | 147 ++++++++++++++++
 .../resource/test/ChatModelCrossLanguageTest.java  | 126 ++++++++++++++
 .../src/test/resources/ollama_pull_model.sh        |  30 +---
 e2e-test/pom.xml                                   |   1 +
 .../test-scripts/test_resource_cross_language.sh   |  41 +++++
 examples/pom.xml                                   |   2 +
 .../org/apache/flink/agents/plan/AgentPlan.java    |  34 +++-
 .../resourceprovider/PythonResourceProvider.java   |  50 +++++-
 .../ResourceProviderJsonDeserializer.java          |   9 +
 .../serializer/ResourceProviderJsonSerializer.java |   4 +
 .../apache/flink/agents/plan/AgentPlanTest.java    | 123 ++++++++++++-
 python/flink_agents/api/tools/utils.py             |  14 ++
 .../{python_java_utils.py => java/__init__.py}     |  25 ---
 .../runtime/java/java_resource_wrapper.py          |  75 ++++++++
 python/flink_agents/runtime/python_java_utils.py   | 132 +++++++++++++-
 python/pyproject.toml                              |   1 +
 .../runtime/operator/ActionExecutionOperator.java  |  77 +++++++--
 .../runtime/python/utils/PythonActionExecutor.java |  18 +-
 .../python/utils/PythonResourceAdapterImpl.java    | 123 +++++++++++++
 .../utils/PythonResourceAdapterImplTest.java       | 192 +++++++++++++++++++++
 tools/e2e.sh                                       |  15 +-
 tools/ut.sh                                        |   2 +-
 32 files changed, 1791 insertions(+), 132 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index b60288a..f70936c 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -170,5 +170,9 @@ jobs:
           version: "latest"
       - name: Build flink-agents
         run: bash tools/build.sh
+      - name: Install ollama
+        run: bash tools/start_ollama_server.sh
       - name: Run e2e tests
-        run: bash tools/e2e.sh
\ No newline at end of file
+        run: |
+          export PYTHONPATH="${{ github.workspace 
}}/python/.venv/lib/python${{ matrix.python-version 
}}/site-packages:$PYTHONPATH"
+          tools/e2e.sh
\ No newline at end of file
diff --git a/api/pom.xml b/api/pom.xml
index fb4123e..a540361 100644
--- a/api/pom.xml
+++ b/api/pom.xml
@@ -57,6 +57,12 @@ under the License.
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>pemja</artifactId>
+            <version>${pemja.version}</version>
+            <scope>provided</scope>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git 
a/api/src/main/java/org/apache/flink/agents/api/chat/messages/ChatMessage.java 
b/api/src/main/java/org/apache/flink/agents/api/chat/messages/ChatMessage.java
index 7ebc787..6e844e7 100644
--- 
a/api/src/main/java/org/apache/flink/agents/api/chat/messages/ChatMessage.java
+++ 
b/api/src/main/java/org/apache/flink/agents/api/chat/messages/ChatMessage.java
@@ -32,9 +32,6 @@ import java.util.Objects;
  */
 public class ChatMessage {
 
-    /** The key for the message type in the metadata. */
-    public static final String MESSAGE_TYPE = "messageType";
-
     private MessageRole role;
     private String content;
     private List<Map<String, Object>> toolCalls;
@@ -68,7 +65,6 @@ public class ChatMessage {
         this.content = content != null ? content : "";
         this.toolCalls = toolCalls != null ? toolCalls : new ArrayList<>();
         this.extraArgs = extraArgs != null ? new HashMap<>(extraArgs) : new 
HashMap<>();
-        this.extraArgs.put(MESSAGE_TYPE, this.role);
     }
 
     public MessageRole getRole() {
@@ -77,7 +73,6 @@ public class ChatMessage {
 
     public void setRole(MessageRole role) {
         this.role = role;
-        this.extraArgs.put(MESSAGE_TYPE, this.role);
     }
 
     public String getContent() {
@@ -102,7 +97,6 @@ public class ChatMessage {
 
     public void setExtraArgs(Map<String, Object> extraArgs) {
         this.extraArgs = extraArgs != null ? extraArgs : new HashMap<>();
-        this.extraArgs.put(MESSAGE_TYPE, this.role);
     }
 
     @JsonIgnore
diff --git 
a/api/src/main/java/org/apache/flink/agents/api/chat/model/python/PythonChatModelConnection.java
 
b/api/src/main/java/org/apache/flink/agents/api/chat/model/python/PythonChatModelConnection.java
new file mode 100644
index 0000000..7cae38b
--- /dev/null
+++ 
b/api/src/main/java/org/apache/flink/agents/api/chat/model/python/PythonChatModelConnection.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.api.chat.model.python;
+
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.model.BaseChatModelConnection;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.resource.python.PythonResourceAdapter;
+import org.apache.flink.agents.api.resource.python.PythonResourceWrapper;
+import org.apache.flink.agents.api.tools.Tool;
+import pemja.core.object.PyObject;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+/**
+ * Python-based implementation of ChatModelConnection that wraps a Python chat 
model object. This
+ * class serves as a bridge between Java and Python chat model environments, 
but unlike {@link
+ * PythonChatModelSetup}, it does not provide direct chat functionality in 
Java.
+ */
+public class PythonChatModelConnection extends BaseChatModelConnection
+        implements PythonResourceWrapper {
+    private final PyObject chatModel;
+    private final PythonResourceAdapter adapter;
+
+    /**
+     * Creates a new PythonChatModelConnection.
+     *
+     * @param adapter The Python resource adapter (required by 
PythonResourceProvider's
+     *     reflection-based instantiation but not used directly in this 
implementation)
+     * @param chatModel The Python chat model object
+     * @param descriptor The resource descriptor
+     * @param getResource Function to retrieve resources by name and type
+     */
+    public PythonChatModelConnection(
+            PythonResourceAdapter adapter,
+            PyObject chatModel,
+            ResourceDescriptor descriptor,
+            BiFunction<String, ResourceType, Resource> getResource) {
+        super(descriptor, getResource);
+        this.chatModel = chatModel;
+        this.adapter = adapter;
+    }
+
+    @Override
+    public Object getPythonResource() {
+        return chatModel;
+    }
+
+    @Override
+    public ChatMessage chat(
+            List<ChatMessage> messages, List<Tool> tools, Map<String, Object> 
arguments) {
+        Map<String, Object> kwargs = new HashMap<>(arguments);
+
+        List<Object> pythonMessages = new ArrayList<>();
+        for (ChatMessage message : messages) {
+            pythonMessages.add(adapter.toPythonChatMessage(message));
+        }
+        kwargs.put("messages", pythonMessages);
+
+        List<Object> pythonTools = new ArrayList<>();
+        for (Tool tool : tools) {
+            pythonTools.add(adapter.convertToPythonTool(tool));
+        }
+        kwargs.put("tools", pythonTools);
+
+        Object pythonMessageResponse = adapter.callMethod(chatModel, "chat", 
kwargs);
+        return adapter.fromPythonChatMessage(pythonMessageResponse);
+    }
+}
diff --git 
a/api/src/main/java/org/apache/flink/agents/api/chat/model/python/PythonChatModelSetup.java
 
b/api/src/main/java/org/apache/flink/agents/api/chat/model/python/PythonChatModelSetup.java
new file mode 100644
index 0000000..7a5cf43
--- /dev/null
+++ 
b/api/src/main/java/org/apache/flink/agents/api/chat/model/python/PythonChatModelSetup.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.api.chat.model.python;
+
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.model.BaseChatModelSetup;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.resource.python.PythonResourceAdapter;
+import org.apache.flink.agents.api.resource.python.PythonResourceWrapper;
+import pemja.core.object.PyObject;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Python-based implementation of ChatModelSetup that bridges Java and Python 
chat model
+ * functionality. This class wraps a Python chat model setup object and 
provides Java interface
+ * compatibility while delegating actual chat operations to the underlying 
Python implementation.
+ */
+public class PythonChatModelSetup extends BaseChatModelSetup implements 
PythonResourceWrapper {
+    static final String FROM_JAVA_CHAT_MESSAGE = 
"python_java_utils.from_java_chat_message";
+
+    static final String TO_JAVA_CHAT_MESSAGE = 
"python_java_utils.to_java_chat_message";
+
+    private final PyObject chatModelSetup;
+    private final PythonResourceAdapter adapter;
+
+    public PythonChatModelSetup(
+            PythonResourceAdapter adapter,
+            PyObject chatModelSetup,
+            ResourceDescriptor descriptor,
+            BiFunction<String, ResourceType, Resource> getResource) {
+        super(descriptor, getResource);
+        this.chatModelSetup = chatModelSetup;
+        this.adapter = adapter;
+    }
+
+    @Override
+    public ChatMessage chat(List<ChatMessage> messages, Map<String, Object> 
parameters) {
+        checkState(
+                chatModelSetup != null,
+                "ChatModelSetup is not initialized. Cannot perform chat 
operation.");
+
+        Map<String, Object> kwargs = new HashMap<>(parameters);
+
+        List<Object> pythonMessages = new ArrayList<>();
+        for (ChatMessage message : messages) {
+            pythonMessages.add(adapter.toPythonChatMessage(message));
+        }
+
+        kwargs.put("messages", pythonMessages);
+
+        Object pythonMessageResponse = adapter.callMethod(chatModelSetup, 
"chat", kwargs);
+        return adapter.fromPythonChatMessage(pythonMessageResponse);
+    }
+
+    @Override
+    public Object getPythonResource() {
+        return chatModelSetup;
+    }
+
+    @Override
+    public Map<String, Object> getParameters() {
+        return Map.of();
+    }
+}
diff --git 
a/api/src/main/java/org/apache/flink/agents/api/resource/python/PythonResourceAdapter.java
 
b/api/src/main/java/org/apache/flink/agents/api/resource/python/PythonResourceAdapter.java
new file mode 100644
index 0000000..89652ae
--- /dev/null
+++ 
b/api/src/main/java/org/apache/flink/agents/api/resource/python/PythonResourceAdapter.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.resource.python;
+
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.tools.Tool;
+import pemja.core.object.PyObject;
+
+import java.util.Map;
+
+/**
+ * Adapter interface for managing Python resources and facilitating 
Java-Python interoperability.
+ * This interface provides methods to interact with Python objects, invoke 
Python methods, and
+ * handle data conversion between Java and Python environments.
+ */
+public interface PythonResourceAdapter {
+
+    /**
+     * Retrieves a Python resource by name and type.
+     *
+     * @param resourceName the name of the resource to retrieve
+     * @param resourceType the type of the resource
+     * @return the retrieved resource object
+     */
+    Object getResource(String resourceName, String resourceType);
+
+    /**
+     * Initializes a Python resource instance from the specified module and 
class.
+     *
+     * @param module the Python module containing the target class
+     * @param clazz the Python class name to instantiate
+     * @param kwargs keyword arguments to pass to the Python class constructor
+     * @return a PyObject representing the initialized Python resource
+     */
+    PyObject initPythonResource(String module, String clazz, Map<String, 
Object> kwargs);
+
+    /**
+     * Converts a Java ChatMessage object to its Python equivalent.
+     *
+     * @param message the Java ChatMessage to convert
+     * @return the Python representation of the chat message
+     */
+    Object toPythonChatMessage(ChatMessage message);
+
+    /**
+     * Converts a Python chat message object back to a Java ChatMessage.
+     *
+     * @param pythonChatMessage the Python chat message object to convert
+     * @return the Java ChatMessage representation
+     */
+    ChatMessage fromPythonChatMessage(Object pythonChatMessage);
+
+    /**
+     * Converts a Java Tool object to its Python equivalent.
+     *
+     * @param tool the Java Tool to convert
+     * @return the Python representation of the tool
+     */
+    Object convertToPythonTool(Tool tool);
+
+    /**
+     * Invokes a method on a Python object with the specified parameters.
+     *
+     * @param obj the Python object on which to call the method
+     * @param methodName the name of the method to invoke
+     * @param kwargs keyword arguments to pass to the method
+     * @return the result of the method invocation
+     */
+    Object callMethod(Object obj, String methodName, Map<String, Object> 
kwargs);
+
+    /**
+     * Invokes a method with the specified name and arguments.
+     *
+     * @param name the name of the method to invoke
+     * @param args the arguments to pass to the method
+     * @return the result of the method invocation
+     */
+    Object invoke(String name, Object... args);
+}
diff --git 
a/api/src/main/java/org/apache/flink/agents/api/resource/python/PythonResourceWrapper.java
 
b/api/src/main/java/org/apache/flink/agents/api/resource/python/PythonResourceWrapper.java
new file mode 100644
index 0000000..c69cf59
--- /dev/null
+++ 
b/api/src/main/java/org/apache/flink/agents/api/resource/python/PythonResourceWrapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.api.resource.python;
+
+/**
+ * Wrapper interface for Python resource objects. This interface provides a 
unified way to access
+ * the underlying Python resource from Java objects that encapsulate Python 
functionality.
+ */
+public interface PythonResourceWrapper {
+
+    /**
+     * Retrieves the underlying Python resource object.
+     *
+     * @return the wrapped Python resource object
+     */
+    Object getPythonResource();
+}
diff --git 
a/api/src/test/java/org/apache/flink/agents/api/chat/model/python/PythonChatModelConnectionTest.java
 
b/api/src/test/java/org/apache/flink/agents/api/chat/model/python/PythonChatModelConnectionTest.java
new file mode 100644
index 0000000..79069d7
--- /dev/null
+++ 
b/api/src/test/java/org/apache/flink/agents/api/chat/model/python/PythonChatModelConnectionTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.api.chat.model.python;
+
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.model.BaseChatModelConnection;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.resource.python.PythonResourceAdapter;
+import org.apache.flink.agents.api.resource.python.PythonResourceWrapper;
+import org.apache.flink.agents.api.tools.Tool;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import pemja.core.object.PyObject;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.*;
+
+public class PythonChatModelConnectionTest {
+    @Mock private PythonResourceAdapter mockAdapter;
+
+    @Mock private PyObject mockChatModel;
+
+    @Mock private ResourceDescriptor mockDescriptor;
+
+    @Mock private BiFunction<String, ResourceType, Resource> mockGetResource;
+
+    private PythonChatModelConnection pythonChatModelConnection;
+    private AutoCloseable mocks;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        mocks = MockitoAnnotations.openMocks(this);
+        pythonChatModelConnection =
+                new PythonChatModelConnection(
+                        mockAdapter, mockChatModel, mockDescriptor, 
mockGetResource);
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        if (mocks != null) {
+            mocks.close();
+        }
+    }
+
+    @Test
+    void testConstructor() {
+        assertThat(pythonChatModelConnection).isNotNull();
+        
assertThat(pythonChatModelConnection.getPythonResource()).isEqualTo(mockChatModel);
+    }
+
+    @Test
+    void testGetPythonResourceWithNullChatModel() {
+        PythonChatModelConnection connectionWithNullModel =
+                new PythonChatModelConnection(mockAdapter, null, 
mockDescriptor, mockGetResource);
+
+        Object result = connectionWithNullModel.getPythonResource();
+
+        assertThat(result).isNull();
+    }
+
+    @Test
+    void testChat() {
+        ChatMessage inputMessage = mock(ChatMessage.class);
+        ChatMessage outputMessage = mock(ChatMessage.class);
+        Tool mockTool = mock(Tool.class);
+        List<ChatMessage> messages = Collections.singletonList(inputMessage);
+        List<Tool> tools = Collections.singletonList(mockTool);
+        Map<String, Object> arguments = new HashMap<>();
+        arguments.put("temperature", 0.7);
+        arguments.put("max_tokens", 100);
+
+        Object pythonInputMessage = new Object();
+        Object pythonOutputMessage = new Object();
+        Object pythonTool = new Object();
+
+        
when(mockAdapter.toPythonChatMessage(inputMessage)).thenReturn(pythonInputMessage);
+        when(mockAdapter.convertToPythonTool(mockTool)).thenReturn(pythonTool);
+        when(mockAdapter.callMethod(eq(mockChatModel), eq("chat"), 
any(Map.class)))
+                .thenReturn(pythonOutputMessage);
+        
when(mockAdapter.fromPythonChatMessage(pythonOutputMessage)).thenReturn(outputMessage);
+
+        ChatMessage result = pythonChatModelConnection.chat(messages, tools, 
arguments);
+
+        assertThat(result).isEqualTo(outputMessage);
+
+        verify(mockAdapter).toPythonChatMessage(inputMessage);
+        verify(mockAdapter).convertToPythonTool(mockTool);
+        verify(mockAdapter)
+                .callMethod(
+                        eq(mockChatModel),
+                        eq("chat"),
+                        argThat(
+                                kwargs -> {
+                                    assertThat(kwargs).containsKey("messages");
+                                    assertThat(kwargs).containsKey("tools");
+                                    
assertThat(kwargs).containsKey("temperature");
+                                    
assertThat(kwargs).containsKey("max_tokens");
+                                    
assertThat(kwargs.get("temperature")).isEqualTo(0.7);
+                                    
assertThat(kwargs.get("max_tokens")).isEqualTo(100);
+
+                                    List<?> pythonMessages = (List<?>) 
kwargs.get("messages");
+                                    assertThat(pythonMessages).hasSize(1);
+                                    
assertThat(pythonMessages.get(0)).isEqualTo(pythonInputMessage);
+
+                                    List<?> pythonTools = (List<?>) 
kwargs.get("tools");
+                                    assertThat(pythonTools).hasSize(1);
+                                    
assertThat(pythonTools.get(0)).isEqualTo(pythonTool);
+
+                                    return true;
+                                }));
+        verify(mockAdapter).fromPythonChatMessage(pythonOutputMessage);
+    }
+
+    @Test
+    void testInheritanceFromBaseChatModelConnection() {
+        
assertThat(pythonChatModelConnection).isInstanceOf(BaseChatModelConnection.class);
+    }
+
+    @Test
+    void testImplementsPythonResourceWrapper() {
+        
assertThat(pythonChatModelConnection).isInstanceOf(PythonResourceWrapper.class);
+    }
+
+    @Test
+    void testConstructorWithAllNullParameters() {
+        PythonChatModelConnection connectionWithNulls =
+                new PythonChatModelConnection(null, null, null, null);
+
+        assertThat(connectionWithNulls).isNotNull();
+        assertThat(connectionWithNulls.getPythonResource()).isNull();
+    }
+}
diff --git 
a/api/src/test/java/org/apache/flink/agents/api/chat/model/python/PythonChatModelSetupTest.java
 
b/api/src/test/java/org/apache/flink/agents/api/chat/model/python/PythonChatModelSetupTest.java
new file mode 100644
index 0000000..274ddfe
--- /dev/null
+++ 
b/api/src/test/java/org/apache/flink/agents/api/chat/model/python/PythonChatModelSetupTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.api.chat.model.python;
+
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.resource.python.PythonResourceAdapter;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import pemja.core.object.PyObject;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.*;
+
+public class PythonChatModelSetupTest {
+    @Mock private PythonResourceAdapter mockAdapter;
+
+    @Mock private PyObject mockChatModelSetup;
+
+    @Mock private ResourceDescriptor mockDescriptor;
+
+    @Mock private BiFunction<String, ResourceType, Resource> mockGetResource;
+
+    private PythonChatModelSetup pythonChatModelSetup;
+    private AutoCloseable mocks;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        mocks = MockitoAnnotations.openMocks(this);
+        pythonChatModelSetup =
+                new PythonChatModelSetup(
+                        mockAdapter, mockChatModelSetup, mockDescriptor, 
mockGetResource);
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        if (mocks != null) {
+            mocks.close();
+        }
+    }
+
+    @Test
+    void testConstructor() {
+        assertThat(pythonChatModelSetup).isNotNull();
+        
assertThat(pythonChatModelSetup.getPythonResource()).isEqualTo(mockChatModelSetup);
+    }
+
+    @Test
+    void testGetPythonResourceWithNullChatModelSetup() {
+        PythonChatModelSetup setupWithNullModel =
+                new PythonChatModelSetup(mockAdapter, null, mockDescriptor, 
mockGetResource);
+
+        Object result = setupWithNullModel.getPythonResource();
+
+        assertThat(result).isNull();
+    }
+
+    @Test
+    void testGetParameters() {
+        Map<String, Object> result = pythonChatModelSetup.getParameters();
+
+        assertThat(result).isNotNull();
+        assertThat(result).isEmpty();
+    }
+
+    @Test
+    void testChat() {
+        ChatMessage inputMessage = mock(ChatMessage.class);
+        ChatMessage outputMessage = mock(ChatMessage.class);
+        List<ChatMessage> messages = Collections.singletonList(inputMessage);
+        Map<String, Object> parameters = new HashMap<>();
+        parameters.put("temperature", 0.7);
+        parameters.put("max_tokens", 100);
+
+        Object pythonInputMessage = new Object();
+        Object pythonOutputMessage = new Object();
+
+        
when(mockAdapter.toPythonChatMessage(inputMessage)).thenReturn(pythonInputMessage);
+        when(mockAdapter.callMethod(eq(mockChatModelSetup), eq("chat"), 
any(Map.class)))
+                .thenReturn(pythonOutputMessage);
+        
when(mockAdapter.fromPythonChatMessage(pythonOutputMessage)).thenReturn(outputMessage);
+
+        ChatMessage result = pythonChatModelSetup.chat(messages, parameters);
+
+        assertThat(result).isEqualTo(outputMessage);
+
+        verify(mockAdapter).toPythonChatMessage(inputMessage);
+        verify(mockAdapter)
+                .callMethod(
+                        eq(mockChatModelSetup),
+                        eq("chat"),
+                        argThat(
+                                kwargs -> {
+                                    assertThat(kwargs).containsKey("messages");
+                                    
assertThat(kwargs).containsKey("temperature");
+                                    
assertThat(kwargs).containsKey("max_tokens");
+                                    
assertThat(kwargs.get("temperature")).isEqualTo(0.7);
+                                    
assertThat(kwargs.get("max_tokens")).isEqualTo(100);
+                                    List<?> pythonMessages = (List<?>) 
kwargs.get("messages");
+                                    assertThat(pythonMessages).hasSize(1);
+                                    
assertThat(pythonMessages.get(0)).isEqualTo(pythonInputMessage);
+                                    return true;
+                                }));
+        verify(mockAdapter).fromPythonChatMessage(pythonOutputMessage);
+    }
+
+    @Test
+    void testChatWithNullChatModelSetupThrowsException() {
+        PythonChatModelSetup setupWithNullModel =
+                new PythonChatModelSetup(mockAdapter, null, mockDescriptor, 
mockGetResource);
+
+        ChatMessage inputMessage = mock(ChatMessage.class);
+        List<ChatMessage> messages = Collections.singletonList(inputMessage);
+        Map<String, Object> parameters = new HashMap<>();
+
+        assertThatThrownBy(() -> setupWithNullModel.chat(messages, parameters))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("ChatModelSetup is not initialized")
+                .hasMessageContaining("Cannot perform chat operation");
+    }
+
+    @Test
+    void testInheritanceFromBaseChatModelSetup() {
+        assertThat(pythonChatModelSetup)
+                
.isInstanceOf(org.apache.flink.agents.api.chat.model.BaseChatModelSetup.class);
+    }
+
+    @Test
+    void testImplementsPythonResourceWrapper() {
+        assertThat(pythonChatModelSetup)
+                .isInstanceOf(
+                        
org.apache.flink.agents.api.resource.python.PythonResourceWrapper.class);
+    }
+}
diff --git a/examples/pom.xml 
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/pom.xml
similarity index 53%
copy from examples/pom.xml
copy to e2e-test/flink-agents-end-to-end-tests-resource-cross-language/pom.xml
index 1b8d543..c93c37f 100644
--- a/examples/pom.xml
+++ b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/pom.xml
@@ -1,32 +1,16 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-  http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
 <project xmlns="http://maven.apache.org/POM/4.0.0";
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.flink</groupId>
-        <artifactId>flink-agents</artifactId>
+        <artifactId>flink-agents-e2e-tests</artifactId>
         <version>0.2-SNAPSHOT</version>
     </parent>
 
-    <artifactId>flink-agents-examples</artifactId>
-    <name>Flink Agents : Examples</name>
+    
<artifactId>flink-agents-end-to-end-tests-resource-cross-language</artifactId>
+    <name>Flink Agents : E2E Tests: Resource Cross-Language</name>
 
     <dependencies>
         <dependency>
@@ -34,16 +18,13 @@ under the License.
             <artifactId>flink-agents-api</artifactId>
             <version>${project.version}</version>
         </dependency>
+
+        <!-- Dependencies required for running agents in minicluster -->
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-agents-runtime</artifactId>
             <version>${project.version}</version>
         </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-connector-files</artifactId>
-            <version>${flink.version}</version>
-        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-streaming-java</artifactId>
@@ -54,11 +35,6 @@ under the License.
             <artifactId>flink-table-api-java-bridge</artifactId>
             <version>${flink.version}</version>
         </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-planner_2.12</artifactId>
-            <version>${flink.version}</version>
-        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-clients</artifactId>
@@ -66,10 +42,8 @@ under the License.
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-agents-integrations-chat-models-ollama</artifactId>
-            <version>${project.version}</version>
+            <artifactId>flink-python</artifactId>
+            <version>${flink.version}</version>
         </dependency>
-
     </dependencies>
-
 </project>
\ No newline at end of file
diff --git 
a/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/ChatModelCrossLanguageAgent.java
 
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/ChatModelCrossLanguageAgent.java
new file mode 100644
index 0000000..4511044
--- /dev/null
+++ 
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/ChatModelCrossLanguageAgent.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.resource.test;
+
+import org.apache.flink.agents.api.Agent;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.api.annotation.Action;
+import org.apache.flink.agents.api.annotation.ChatModelConnection;
+import org.apache.flink.agents.api.annotation.ChatModelSetup;
+import org.apache.flink.agents.api.annotation.Tool;
+import org.apache.flink.agents.api.annotation.ToolParam;
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.messages.MessageRole;
+import org.apache.flink.agents.api.chat.model.python.PythonChatModelConnection;
+import org.apache.flink.agents.api.chat.model.python.PythonChatModelSetup;
+import org.apache.flink.agents.api.context.RunnerContext;
+import org.apache.flink.agents.api.event.ChatRequestEvent;
+import org.apache.flink.agents.api.event.ChatResponseEvent;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Agent example that integrates an external Ollama chat model into Flink 
Agents.
+ *
+ * <p>This class demonstrates how to:
+ *
+ * <ul>
+ *   <li>Declare a chat model connection using {@link ChatModelConnection} 
metadata pointing to
+ *       {@link PythonChatModelConnection}
+ *   <li>Declare a chat model setup using {@link ChatModelSetup} metadata 
pointing to {@link
+ *       PythonChatModelSetup}
+ *   <li>Expose callable tools via {@link Tool} annotated static methods 
(temperature conversion,
+ *       BMI, random number)
+ *   <li>Fetch a chat model from the {@link RunnerContext} and perform a 
single-turn chat
+ *   <li>Emit the model response as an {@link OutputEvent}
+ * </ul>
+ *
+ * <p>The {@code ollamaChatModel()} method publishes a resource with type 
{@link
+ * ResourceType#CHAT_MODEL} so it can be retrieved at runtime inside the 
{@code process} action. The
+ * resource is configured with the connection name, the model name and the 
list of tool names that
+ * the model is allowed to call.
+ */
+public class ChatModelCrossLanguageAgent extends Agent {
+    public static final String OLLAMA_MODEL = "qwen3:0.6b";
+
+    @ChatModelConnection
+    public static ResourceDescriptor chatModelConnection() {
+        return 
ResourceDescriptor.Builder.newBuilder(PythonChatModelConnection.class.getName())
+                .addInitialArgument(
+                        "module", 
"flink_agents.integrations.chat_models.ollama_chat_model")
+                .addInitialArgument("clazz", "OllamaChatModelConnection")
+                .build();
+    }
+
+    @ChatModelSetup
+    public static ResourceDescriptor chatModel() {
+        return 
ResourceDescriptor.Builder.newBuilder(PythonChatModelSetup.class.getName())
+                .addInitialArgument("connection", "chatModelConnection")
+                .addInitialArgument(
+                        "module", 
"flink_agents.integrations.chat_models.ollama_chat_model")
+                .addInitialArgument("clazz", "OllamaChatModelSetup")
+                .addInitialArgument("model", OLLAMA_MODEL)
+                .addInitialArgument(
+                        "tools",
+                        List.of("calculateBMI", "convertTemperature", 
"createRandomNumber"))
+                .addInitialArgument("extract_reasoning", "true")
+                .build();
+    }
+
+    @Tool(description = "Converts temperature between Celsius and Fahrenheit")
+    public static double convertTemperature(
+            @ToolParam(name = "value", description = "Temperature value to 
convert") Double value,
+            @ToolParam(
+                            name = "fromUnit",
+                            description = "Source unit ('C' for Celsius or 'F' 
for Fahrenheit)")
+                    String fromUnit,
+            @ToolParam(
+                            name = "toUnit",
+                            description = "Target unit ('C' for Celsius or 'F' 
for Fahrenheit)")
+                    String toUnit) {
+
+        fromUnit = fromUnit.toUpperCase();
+        toUnit = toUnit.toUpperCase();
+
+        if (fromUnit.equals(toUnit)) {
+            return value;
+        }
+
+        if (fromUnit.equals("C") && toUnit.equals("F")) {
+            return (value * 9 / 5) + 32;
+        } else if (fromUnit.equals("F") && toUnit.equals("C")) {
+            return (value - 32) * 5 / 9;
+        } else {
+            throw new IllegalArgumentException("Invalid temperature units. Use 
'C' or 'F'");
+        }
+    }
+
+    @Tool(description = "Calculates Body Mass Index (BMI)")
+    public static double calculateBMI(
+            @ToolParam(name = "weightKg", description = "Weight in kilograms") 
Double weightKg,
+            @ToolParam(name = "heightM", description = "Height in meters") 
Double heightM) {
+
+        if (weightKg <= 0 || heightM <= 0) {
+            throw new IllegalArgumentException("Weight and height must be 
positive values");
+        }
+        return weightKg / (heightM * heightM);
+    }
+
+    @Tool(description = "Create a random number")
+    public static double createRandomNumber() {
+        return Math.random();
+    }
+
+    @Action(listenEvents = {InputEvent.class})
+    public static void process(InputEvent event, RunnerContext ctx) throws 
Exception {
+        ctx.sendEvent(
+                new ChatRequestEvent(
+                        "chatModel",
+                        Collections.singletonList(
+                                new ChatMessage(MessageRole.USER, (String) 
event.getInput()))));
+    }
+
+    @Action(listenEvents = {ChatResponseEvent.class})
+    public static void processChatResponse(ChatResponseEvent event, 
RunnerContext ctx) {
+        ctx.sendEvent(new OutputEvent(event.getResponse().getContent()));
+    }
+}
diff --git 
a/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/ChatModelCrossLanguageTest.java
 
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/ChatModelCrossLanguageTest.java
new file mode 100644
index 0000000..562a891
--- /dev/null
+++ 
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/ChatModelCrossLanguageTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.resource.test;
+
+import org.apache.flink.agents.api.AgentsExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.agents.resource.test.ChatModelCrossLanguageAgent.OLLAMA_MODEL;
+
+public class ChatModelCrossLanguageTest {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ChatModelCrossLanguageTest.class);
+
+    private final boolean ollamaReady;
+
+    public ChatModelCrossLanguageTest() throws IOException {
+        ollamaReady = pullModel(OLLAMA_MODEL);
+    }
+
+    @Test
+    public void testChatModeIntegration() throws Exception {
+        Assumptions.assumeTrue(ollamaReady, "Ollama Server information is not 
provided");
+
+        // Create the execution environment
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+
+        // Use prompts that trigger different tool calls in the agent
+        DataStream<String> inputStream =
+                env.fromData(
+                        "Convert 25 degrees Celsius to Fahrenheit",
+                        "What is 98.6 Fahrenheit in Celsius?",
+                        "Change 32 degrees Celsius to Fahrenheit",
+                        "If it's 75 degrees Fahrenheit, what would that be in 
Celsius?",
+                        "Convert room temperature of 20C to F",
+                        "Calculate BMI for someone who is 1.75 meters tall and 
weighs 70 kg",
+                        "What's the BMI for a person weighing 85 kg with 
height 1.80 meters?",
+                        "Can you tell me the BMI if I'm 1.65m tall and weigh 
60kg?",
+                        "Find BMI for 75kg weight and 1.78m height",
+                        "Create me a random number please");
+
+        // Create agents execution environment
+        AgentsExecutionEnvironment agentsEnv =
+                AgentsExecutionEnvironment.getExecutionEnvironment(env);
+
+        // Apply agent to the DataStream and use the prompt itself as the key
+        DataStream<Object> outputStream =
+                agentsEnv
+                        .fromDataStream(
+                                inputStream, (KeySelector<String, String>) 
value -> "orderKey")
+                        .apply(new ChatModelCrossLanguageAgent())
+                        .toDataStream();
+
+        // Collect the results
+        CloseableIterator<Object> results = outputStream.collectAsync();
+
+        // Execute the pipeline
+        agentsEnv.execute();
+
+        checkResult(results);
+    }
+
+    public void checkResult(CloseableIterator<Object> results) {
+        List<String> expectedWords =
+                List.of("77", "37", "89", "23", "68", "22", "26", "22", "23", 
"");
+        for (String expected : expectedWords) {
+            Assertions.assertTrue(
+                    results.hasNext(), "Output messages count %s is less than 
expected.");
+            String res = (String) results.next();
+            if (res.contains("error") || res.contains("parameters")) {
+                LOG.warn(res);
+            } else {
+                Assertions.assertTrue(
+                        res.contains(expected),
+                        String.format(
+                                "Groud truth %s is not contained in answer 
{%s}", expected, res));
+            }
+        }
+    }
+
+    public static boolean pullModel(String model) throws IOException {
+        String path =
+                Objects.requireNonNull(
+                                ChatModelCrossLanguageTest.class
+                                        .getClassLoader()
+                                        .getResource("ollama_pull_model.sh"))
+                        .getPath();
+        ProcessBuilder builder = new ProcessBuilder("bash", path, model);
+        Process process = builder.start();
+        try {
+            process.waitFor(120, TimeUnit.SECONDS);
+            return process.exitValue() == 0;
+        } catch (Exception e) {
+            LOG.warn("Pull {} failed, will skip test", model);
+        }
+        return false;
+    }
+}
diff --git a/python/flink_agents/runtime/python_java_utils.py 
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/resources/ollama_pull_model.sh
old mode 100644
new mode 100755
similarity index 54%
copy from python/flink_agents/runtime/python_java_utils.py
copy to 
e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/resources/ollama_pull_model.sh
index 586e6cb..7277e12
--- a/python/flink_agents/runtime/python_java_utils.py
+++ 
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/resources/ollama_pull_model.sh
@@ -1,3 +1,4 @@
+#!/usr/bin/env bash
 
################################################################################
 #  Licensed to the Apache Software Foundation (ASF) under one
 #  or more contributor license agreements.  See the NOTICE file
@@ -14,29 +15,6 @@
 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 #  See the License for the specific language governing permissions and
 # limitations under the License.
-#################################################################################
-from typing import Any
-
-import cloudpickle
-
-from flink_agents.api.events.event import InputEvent
-
-
-def convert_to_python_object(bytesObject: bytes) -> Any:
-    """Used for deserializing Python objects."""
-    return cloudpickle.loads(bytesObject)
-
-
-def wrap_to_input_event(bytesObject: bytes) -> tuple[bytes, str]:
-    """Wrap data to python input event and serialize.
-
-    Returns:
-        A tuple of (serialized_event_bytes, event_string_representation)
-    """
-    event = InputEvent(input=cloudpickle.loads(bytesObject))
-    return (cloudpickle.dumps(event), str(event))
-
-
-def get_output_from_output_event(bytesObject: bytes) -> Any:
-    """Get output data from OutputEvent and serialize."""
-    return cloudpickle.dumps(convert_to_python_object(bytesObject).output)
+################################################################################
+echo "ollama pull $1"
+ollama pull $1
\ No newline at end of file
diff --git a/e2e-test/pom.xml b/e2e-test/pom.xml
index 5d5683d..bf938fe 100644
--- a/e2e-test/pom.xml
+++ b/e2e-test/pom.xml
@@ -31,5 +31,6 @@ under the License.
     <modules>
         <module>flink-agents-end-to-end-tests-agent-plan-compatibility</module>
         <module>flink-agents-end-to-end-tests-integration</module>
+        <module>flink-agents-end-to-end-tests-resource-cross-language</module>
     </modules>
 </project>
\ No newline at end of file
diff --git a/e2e-test/test-scripts/test_resource_cross_language.sh 
b/e2e-test/test-scripts/test_resource_cross_language.sh
new file mode 100755
index 0000000..8fb7fc2
--- /dev/null
+++ b/e2e-test/test-scripts/test_resource_cross_language.sh
@@ -0,0 +1,41 @@
+#
+#   Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+root_dir=$(pwd)
+
+# If we're running from the python subdirectory, adjust the root_dir
+if [[ "$(basename "$root_dir")" == "python" ]]; then
+    root_dir=$(dirname "$root_dir")
+fi
+
+echo "Root directory: $root_dir"
+
+# Run all tests in the resource-cross-language module using Maven
+cd "$root_dir/e2e-test/flink-agents-end-to-end-tests-resource-cross-language"
+
+echo "Running all tests in resource-cross-language module..."
+mvn -T16 --batch-mode --no-transfer-progress test
+
+ret=$?
+if [ "$ret" != "0" ]; then
+    echo "Resource cross-language tests failed with exit code $ret"
+    exit $ret
+fi
+
+echo "All resource cross-language tests passed successfully!"
diff --git a/examples/pom.xml b/examples/pom.xml
index 1b8d543..1e1b229 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -34,6 +34,8 @@ under the License.
             <artifactId>flink-agents-api</artifactId>
             <version>${project.version}</version>
         </dependency>
+
+        <!-- Dependencies required for running agents in minicluster -->
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-agents-runtime</artifactId>
diff --git a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java 
b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
index 7cb8138..cc8b8a7 100644
--- a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
+++ b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
@@ -30,12 +30,15 @@ import org.apache.flink.agents.api.resource.Resource;
 import org.apache.flink.agents.api.resource.ResourceDescriptor;
 import org.apache.flink.agents.api.resource.ResourceType;
 import org.apache.flink.agents.api.resource.SerializableResource;
+import org.apache.flink.agents.api.resource.python.PythonResourceAdapter;
+import org.apache.flink.agents.api.resource.python.PythonResourceWrapper;
 import org.apache.flink.agents.api.tools.ToolMetadata;
 import org.apache.flink.agents.plan.actions.Action;
 import org.apache.flink.agents.plan.actions.ChatModelAction;
 import org.apache.flink.agents.plan.actions.ToolCallAction;
 import org.apache.flink.agents.plan.resourceprovider.JavaResourceProvider;
 import 
org.apache.flink.agents.plan.resourceprovider.JavaSerializableResourceProvider;
+import org.apache.flink.agents.plan.resourceprovider.PythonResourceProvider;
 import org.apache.flink.agents.plan.resourceprovider.ResourceProvider;
 import org.apache.flink.agents.plan.serializer.AgentPlanJsonDeserializer;
 import org.apache.flink.agents.plan.serializer.AgentPlanJsonSerializer;
@@ -76,6 +79,8 @@ public class AgentPlan implements Serializable {
 
     private AgentConfiguration config;
 
+    private transient PythonResourceAdapter pythonResourceAdapter;
+
     /** Cache for instantiated resources. */
     private transient Map<ResourceType, Map<String, Resource>> resourceCache;
 
@@ -128,6 +133,10 @@ public class AgentPlan implements Serializable {
         this.config = config;
     }
 
+    public void setPythonResourceAdapter(PythonResourceAdapter adapter) {
+        this.pythonResourceAdapter = adapter;
+    }
+
     public Map<String, Action> getActions() {
         return actions;
     }
@@ -174,6 +183,10 @@ public class AgentPlan implements Serializable {
 
         ResourceProvider provider = resourceProviders.get(type).get(name);
 
+        if (pythonResourceAdapter != null && provider instanceof 
PythonResourceProvider) {
+            ((PythonResourceProvider) 
provider).setPythonResourceAdapter(pythonResourceAdapter);
+        }
+
         // Create resource using provider
         Resource resource =
                 provider.provide(
@@ -279,8 +292,13 @@ public class AgentPlan implements Serializable {
 
     private void extractResource(ResourceType type, Method method) throws 
Exception {
         String name = method.getName();
+        ResourceProvider provider;
         ResourceDescriptor descriptor = (ResourceDescriptor) 
method.invoke(null);
-        JavaResourceProvider provider = new JavaResourceProvider(name, type, 
descriptor);
+        if 
(PythonResourceWrapper.class.isAssignableFrom(Class.forName(descriptor.getClazz())))
 {
+            provider = new PythonResourceProvider(name, type, descriptor);
+        } else {
+            provider = new JavaResourceProvider(name, type, descriptor);
+        }
         addResourceProvider(provider);
     }
 
@@ -385,9 +403,17 @@ public class AgentPlan implements Serializable {
             ResourceType type = entry.getKey();
             if (type == ResourceType.CHAT_MODEL || type == 
ResourceType.CHAT_MODEL_CONNECTION) {
                 for (Map.Entry<String, Object> kv : 
entry.getValue().entrySet()) {
-                    JavaResourceProvider provider =
-                            new JavaResourceProvider(
-                                    kv.getKey(), type, (ResourceDescriptor) 
kv.getValue());
+                    ResourceProvider provider;
+                    if (PythonResourceWrapper.class.isAssignableFrom(
+                            Class.forName(((ResourceDescriptor) 
kv.getValue()).getClazz()))) {
+                        provider =
+                                new PythonResourceProvider(
+                                        kv.getKey(), type, 
(ResourceDescriptor) kv.getValue());
+                    } else {
+                        provider =
+                                new JavaResourceProvider(
+                                        kv.getKey(), type, 
(ResourceDescriptor) kv.getValue());
+                    }
                     addResourceProvider(provider);
                 }
             } else if (type == ResourceType.PROMPT) {
diff --git 
a/plan/src/main/java/org/apache/flink/agents/plan/resourceprovider/PythonResourceProvider.java
 
b/plan/src/main/java/org/apache/flink/agents/plan/resourceprovider/PythonResourceProvider.java
index 329a2db..ce621a4 100644
--- 
a/plan/src/main/java/org/apache/flink/agents/plan/resourceprovider/PythonResourceProvider.java
+++ 
b/plan/src/main/java/org/apache/flink/agents/plan/resourceprovider/PythonResourceProvider.java
@@ -19,12 +19,19 @@
 package org.apache.flink.agents.plan.resourceprovider;
 
 import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
 import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.resource.python.PythonResourceAdapter;
+import pemja.core.object.PyObject;
 
+import java.lang.reflect.Constructor;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.function.BiFunction;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * Python Resource provider that carries resource metadata to create Resource 
objects at runtime.
  *
@@ -35,6 +42,9 @@ public class PythonResourceProvider extends ResourceProvider {
     private final String module;
     private final String clazz;
     private final Map<String, Object> kwargs;
+    private final ResourceDescriptor descriptor;
+
+    protected PythonResourceAdapter pythonResourceAdapter;
 
     public PythonResourceProvider(
             String name,
@@ -46,6 +56,25 @@ public class PythonResourceProvider extends ResourceProvider 
{
         this.module = module;
         this.clazz = clazz;
         this.kwargs = kwargs;
+        this.descriptor = null;
+    }
+
+    public PythonResourceProvider(String name, ResourceType type, 
ResourceDescriptor descriptor) {
+        super(name, type);
+        this.kwargs = new HashMap<>(descriptor.getInitialArguments());
+        module = (String) kwargs.remove("module");
+        if (module == null || module.isEmpty()) {
+            throw new IllegalArgumentException("module should not be null or 
empty.");
+        }
+        clazz = (String) kwargs.remove("clazz");
+        if (clazz == null || clazz.isEmpty()) {
+            throw new IllegalArgumentException("clazz should not be null or 
empty.");
+        }
+        this.descriptor = descriptor;
+    }
+
+    public void setPythonResourceAdapter(PythonResourceAdapter 
pythonResourceAdapter) {
+        this.pythonResourceAdapter = pythonResourceAdapter;
     }
 
     public String getModule() {
@@ -63,11 +92,18 @@ public class PythonResourceProvider extends 
ResourceProvider {
     @Override
     public Resource provide(BiFunction<String, ResourceType, Resource> 
getResource)
             throws Exception {
-        // TODO: Implement Python resource creation logic
-        // This would typically involve calling into Python runtime to create 
the
-        // resource
-        throw new UnsupportedOperationException(
-                "Python resource creation not yet implemented in Java 
runtime");
+        checkState(pythonResourceAdapter != null, "PythonResourceAdapter is 
not set");
+        Class<?> clazz = Class.forName(descriptor.getClazz());
+        PyObject pyResource =
+                pythonResourceAdapter.initPythonResource(this.module, 
this.clazz, kwargs);
+        Constructor<?> constructor =
+                clazz.getConstructor(
+                        PythonResourceAdapter.class,
+                        PyObject.class,
+                        ResourceDescriptor.class,
+                        BiFunction.class);
+        return (Resource)
+                constructor.newInstance(pythonResourceAdapter, pyResource, 
descriptor, getResource);
     }
 
     @Override
@@ -92,4 +128,8 @@ public class PythonResourceProvider extends ResourceProvider 
{
     public int hashCode() {
         return Objects.hash(this.getName(), this.getType(), module, clazz, 
kwargs);
     }
+
+    public ResourceDescriptor getDescriptor() {
+        return descriptor;
+    }
 }
diff --git 
a/plan/src/main/java/org/apache/flink/agents/plan/serializer/ResourceProviderJsonDeserializer.java
 
b/plan/src/main/java/org/apache/flink/agents/plan/serializer/ResourceProviderJsonDeserializer.java
index 6dbd870..df44e27 100644
--- 
a/plan/src/main/java/org/apache/flink/agents/plan/serializer/ResourceProviderJsonDeserializer.java
+++ 
b/plan/src/main/java/org/apache/flink/agents/plan/serializer/ResourceProviderJsonDeserializer.java
@@ -74,6 +74,15 @@ public class ResourceProviderJsonDeserializer extends 
StdDeserializer<ResourcePr
     private PythonResourceProvider deserializePythonResourceProvider(JsonNode 
node) {
         String name = node.get("name").asText();
         String type = node.get("type").asText();
+        try {
+            if (node.has("descriptor")) {
+                ResourceDescriptor descriptor =
+                        mapper.treeToValue(node.get("descriptor"), 
ResourceDescriptor.class);
+                return new PythonResourceProvider(name, 
ResourceType.fromValue(type), descriptor);
+            }
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
+        }
         String module = node.get("module").asText();
         String clazz = node.get("clazz").asText();
 
diff --git 
a/plan/src/main/java/org/apache/flink/agents/plan/serializer/ResourceProviderJsonSerializer.java
 
b/plan/src/main/java/org/apache/flink/agents/plan/serializer/ResourceProviderJsonSerializer.java
index f11ff05..4b61c83 100644
--- 
a/plan/src/main/java/org/apache/flink/agents/plan/serializer/ResourceProviderJsonSerializer.java
+++ 
b/plan/src/main/java/org/apache/flink/agents/plan/serializer/ResourceProviderJsonSerializer.java
@@ -71,6 +71,10 @@ public class ResourceProviderJsonSerializer extends 
StdSerializer<ResourceProvid
         gen.writeStringField("module", provider.getModule());
         gen.writeStringField("clazz", provider.getClazz());
 
+        if (provider.getDescriptor() != null) {
+            gen.writeObjectField("descriptor", provider.getDescriptor());
+        }
+
         gen.writeFieldName("kwargs");
         gen.writeStartObject();
         provider.getKwargs()
diff --git a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java 
b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java
index e61dbb9..7d532a2 100644
--- a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java
+++ b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java
@@ -24,20 +24,28 @@ import org.apache.flink.agents.api.InputEvent;
 import org.apache.flink.agents.api.OutputEvent;
 import org.apache.flink.agents.api.annotation.ChatModelSetup;
 import org.apache.flink.agents.api.annotation.Tool;
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
 import org.apache.flink.agents.api.context.RunnerContext;
 import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
 import org.apache.flink.agents.api.resource.ResourceType;
 import org.apache.flink.agents.api.resource.SerializableResource;
+import org.apache.flink.agents.api.resource.python.PythonResourceAdapter;
+import org.apache.flink.agents.api.resource.python.PythonResourceWrapper;
 import org.apache.flink.agents.plan.actions.Action;
 import 
org.apache.flink.agents.plan.resourceprovider.JavaSerializableResourceProvider;
+import org.apache.flink.agents.plan.resourceprovider.PythonResourceProvider;
 import org.apache.flink.agents.plan.resourceprovider.ResourceProvider;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import pemja.core.object.PyObject;
 
 import java.util.List;
 import java.util.Map;
+import java.util.function.BiFunction;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link AgentPlan} constructor that takes an Agent. */
 public class AgentPlanTest {
@@ -91,6 +99,27 @@ public class AgentPlanTest {
         }
     }
 
+    public static class TestPythonResource extends Resource implements 
PythonResourceWrapper {
+
+        public TestPythonResource(
+                PythonResourceAdapter adapter,
+                PyObject chatModel,
+                ResourceDescriptor descriptor,
+                BiFunction<String, ResourceType, Resource> getResource) {
+            super(descriptor, getResource);
+        }
+
+        @Override
+        public ResourceType getResourceType() {
+            return ResourceType.CHAT_MODEL;
+        }
+
+        @Override
+        public Object getPythonResource() {
+            return null;
+        }
+    }
+
     /** Test agent class with annotated methods. */
     public static class TestAgent extends Agent {
 
@@ -120,6 +149,14 @@ public class AgentPlanTest {
         private TestSerializableChatModel chatModel =
                 new TestSerializableChatModel("defaultChatModel");
 
+        @ChatModelSetup
+        public static ResourceDescriptor pythonChatModel() {
+            return 
ResourceDescriptor.Builder.newBuilder(TestPythonResource.class.getName())
+                    .addInitialArgument("module", "test.module")
+                    .addInitialArgument("clazz", "TestClazz")
+                    .build();
+        }
+
         @Tool private TestTool anotherTool = new TestTool("anotherTool");
 
         @org.apache.flink.agents.api.annotation.Action(listenEvents = 
{InputEvent.class})
@@ -128,6 +165,54 @@ public class AgentPlanTest {
         }
     }
 
+    /** Test agent class with illegal python resource. */
+    public static class TestAgentWithIllegalPythonResource extends Agent {
+        @ChatModelSetup
+        public static ResourceDescriptor reviewAnalysisModel() {
+            return 
ResourceDescriptor.Builder.newBuilder(TestPythonResource.class.getName())
+                    .build();
+        }
+    }
+
+    public static class TestPythonResourceAdapter implements 
PythonResourceAdapter {
+
+        @Override
+        public Object getResource(String resourceName, String resourceType) {
+            return null;
+        }
+
+        @Override
+        public PyObject initPythonResource(
+                String module, String clazz, Map<String, Object> kwargs) {
+            return null;
+        }
+
+        @Override
+        public Object toPythonChatMessage(ChatMessage message) {
+            return null;
+        }
+
+        @Override
+        public ChatMessage fromPythonChatMessage(Object pythonChatMessage) {
+            return null;
+        }
+
+        @Override
+        public Object 
convertToPythonTool(org.apache.flink.agents.api.tools.Tool tool) {
+            return null;
+        }
+
+        @Override
+        public Object callMethod(Object obj, String methodName, Map<String, 
Object> kwargs) {
+            return null;
+        }
+
+        @Override
+        public Object invoke(String name, Object... args) {
+            return null;
+        }
+    }
+
     @Test
     public void testConstructorWithAgent() throws Exception {
         // Create an agent instance
@@ -303,8 +388,9 @@ public class AgentPlanTest {
         Map<String, ResourceProvider> chatModelProviders =
                 resourceProviders.get(ResourceType.CHAT_MODEL);
         assertThat(chatModelProviders).isNotNull();
-        assertThat(chatModelProviders).hasSize(1); // defaultChatModel (field 
name used as default)
+        assertThat(chatModelProviders).hasSize(2); // defaultChatModel (field 
name used as default)
         assertThat(chatModelProviders).containsKey("chatModel");
+        assertThat(chatModelProviders).containsKey("pythonChatModel");
 
         // Verify that chat model provider is JavaSerializableResourceProvider
         // (serializable)
@@ -319,6 +405,19 @@ public class AgentPlanTest {
         assertThat(serializableProvider.getModule())
                 
.isEqualTo(TestAgentWithResources.class.getPackage().getName());
         
assertThat(serializableProvider.getClazz()).contains("TestSerializableChatModel");
+
+        // Verify that python chat model provider is PythonResourceProvider
+        // (serializable)
+        ResourceProvider pythonChatModelProvider = 
chatModelProviders.get("pythonChatModel");
+        
assertThat(pythonChatModelProvider).isInstanceOf(PythonResourceProvider.class);
+        
assertThat(pythonChatModelProvider.getName()).isEqualTo("pythonChatModel");
+        
assertThat(pythonChatModelProvider.getType()).isEqualTo(ResourceType.CHAT_MODEL);
+
+        // Test PythonResourceProvider specific methods
+        PythonResourceProvider pythonResourceProvider =
+                (PythonResourceProvider) pythonChatModelProvider;
+        assertThat(pythonResourceProvider.getClazz()).isEqualTo("TestClazz");
+        
assertThat(pythonResourceProvider.getModule()).isEqualTo("test.module");
     }
 
     @Test
@@ -339,8 +438,30 @@ public class AgentPlanTest {
         assertThat(chatModel).isInstanceOf(TestSerializableChatModel.class);
         
assertThat(chatModel.getResourceType()).isEqualTo(ResourceType.CHAT_MODEL);
 
+        assertThatThrownBy(() -> agentPlan.getResource("pythonChatModel", 
ResourceType.CHAT_MODEL))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("PythonResourceAdapter is not set");
+
+        agentPlan.setPythonResourceAdapter(new TestPythonResourceAdapter());
+        Resource pythonChatModel =
+                agentPlan.getResource("pythonChatModel", 
ResourceType.CHAT_MODEL);
+        assertThat(pythonChatModel).isNotNull();
+        assertThat(pythonChatModel).isInstanceOf(TestPythonResource.class);
+        
assertThat(pythonChatModel.getResourceType()).isEqualTo(ResourceType.CHAT_MODEL);
+
         // Test that resources are cached (should be the same instance)
         Resource myToolAgain = agentPlan.getResource("myTool", 
ResourceType.TOOL);
         assertThat(myTool).isSameAs(myToolAgain);
     }
+
+    @Test
+    public void testExtractIllegalResourceProviderFromAgent() throws Exception 
{
+        // Create an agent with resource annotations
+        TestAgentWithIllegalPythonResource agent = new 
TestAgentWithIllegalPythonResource();
+
+        // Expect IllegalArgumentException when creating AgentPlan with 
illegal resource
+        assertThatThrownBy(() -> new AgentPlan(agent))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("module should not be null or empty");
+    }
 }
diff --git a/python/flink_agents/api/tools/utils.py 
b/python/flink_agents/api/tools/utils.py
index 81784d4..976fcde 100644
--- a/python/flink_agents/api/tools/utils.py
+++ b/python/flink_agents/api/tools/utils.py
@@ -15,6 +15,7 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
#################################################################################
+import json
 import typing
 from inspect import signature
 from typing import Any, Callable, Dict, Optional, Type, Union
@@ -178,6 +179,19 @@ def create_model_from_schema(name: str, schema: dict) -> 
type[BaseModel]:
 
     return create_model(name, **main_fields, __doc__=schema.get("description", 
""))
 
+def create_model_from_java_tool_schema_str(name: str, schema_str: str) -> 
type[BaseModel]:
+    """Create Pydantic model from a java tool input schema."""
+    json_schema = json.loads(schema_str)
+    properties = json_schema["properties"]
+
+    fields = {}
+    for param_name in properties:
+        description = properties[param_name]["description"]
+        if description is None:
+            description = f"Parameter: {param_name}"
+        type = TYPE_MAPPING.get(properties[param_name]["type"])
+        fields[param_name] = (type, FieldInfo(description=description))
+    return create_model(name, **fields)
 
 def extract_mcp_content_item(content_item: Any) -> Dict[str, Any] | str:
     """Extract and normalize a single MCP content item.
diff --git a/python/flink_agents/runtime/python_java_utils.py 
b/python/flink_agents/runtime/java/__init__.py
similarity index 55%
copy from python/flink_agents/runtime/python_java_utils.py
copy to python/flink_agents/runtime/java/__init__.py
index 586e6cb..e154fad 100644
--- a/python/flink_agents/runtime/python_java_utils.py
+++ b/python/flink_agents/runtime/java/__init__.py
@@ -15,28 +15,3 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
#################################################################################
-from typing import Any
-
-import cloudpickle
-
-from flink_agents.api.events.event import InputEvent
-
-
-def convert_to_python_object(bytesObject: bytes) -> Any:
-    """Used for deserializing Python objects."""
-    return cloudpickle.loads(bytesObject)
-
-
-def wrap_to_input_event(bytesObject: bytes) -> tuple[bytes, str]:
-    """Wrap data to python input event and serialize.
-
-    Returns:
-        A tuple of (serialized_event_bytes, event_string_representation)
-    """
-    event = InputEvent(input=cloudpickle.loads(bytesObject))
-    return (cloudpickle.dumps(event), str(event))
-
-
-def get_output_from_output_event(bytesObject: bytes) -> Any:
-    """Get output data from OutputEvent and serialize."""
-    return cloudpickle.dumps(convert_to_python_object(bytesObject).output)
diff --git a/python/flink_agents/runtime/java/java_resource_wrapper.py 
b/python/flink_agents/runtime/java/java_resource_wrapper.py
new file mode 100644
index 0000000..496aab3
--- /dev/null
+++ b/python/flink_agents/runtime/java/java_resource_wrapper.py
@@ -0,0 +1,75 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+from typing import Any, List
+
+from pemja import findClass
+from pydantic import Field
+from typing_extensions import override
+
+from flink_agents.api.chat_message import ChatMessage, MessageRole
+from flink_agents.api.prompts.prompt import Prompt
+from flink_agents.api.resource import Resource, ResourceType
+from flink_agents.api.tools.tool import Tool, ToolType
+
+
+class JavaTool(Tool):
+    """Java Tool that carries tool metadata and can be recognized by 
PythonChatModel."""
+
+    @classmethod
+    @override
+    def tool_type(cls) -> ToolType:
+        """Get the tool type."""
+        return ToolType.REMOTE_FUNCTION
+
+    @override
+    def call(self, *args: Any, **kwargs: Any) -> Any:
+        err_msg = "Java tool is defined in Java and needs to be executed 
through the Java runtime."
+        raise NotImplementedError(err_msg)
+
+class JavaPrompt(Prompt):
+    """Python wrapper for Java's Prompt."""
+
+    j_prompt: Any= Field(exclude=True)
+
+    @override
+    def format_string(self, **kwargs: str) -> str:
+        return self.j_prompt.formatString(kwargs)
+
+    @override
+    def format_messages(
+        self, role: MessageRole = MessageRole.SYSTEM, **kwargs: str
+    ) -> List[ChatMessage]:
+        j_MessageRole = 
findClass("org.apache.flink.agents.api.chat.messages.MessageRole")
+        j_chat_messages = 
self.j_prompt.formatMessages(j_MessageRole.fromValue(role.value), kwargs)
+        chatMessages = 
[ChatMessage(role=MessageRole(j_chat_message.getRole().getValue()),
+                                            
content=j_chat_message.getContent(),
+                                            tool_calls= 
j_chat_message.getToolCalls(),
+                                            
extra_args=j_chat_message.getExtraArgs()) for j_chat_message in j_chat_messages]
+        return chatMessages
+
+class JavaGetResourceWrapper:
+    """Python wrapper for Java ResourceAdapter."""
+
+    def __init__(self, j_resource_adapter: Any) -> None:
+        """Initialize with a Java ResourceAdapter."""
+        self._j_resource_adapter = j_resource_adapter
+
+
+    def get_resource(self, name: str, type: ResourceType) -> Resource:
+        """Get a resource by name and type."""
+        return self._j_resource_adapter.getResource(name, type.value)
diff --git a/python/flink_agents/runtime/python_java_utils.py 
b/python/flink_agents/runtime/python_java_utils.py
index 586e6cb..3c38985 100644
--- a/python/flink_agents/runtime/python_java_utils.py
+++ b/python/flink_agents/runtime/python_java_utils.py
@@ -15,11 +15,22 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
#################################################################################
-from typing import Any
+import importlib
+from typing import Any, Callable, Dict
 
 import cloudpickle
+from pemja import findClass
 
+from flink_agents.api.chat_message import ChatMessage, MessageRole
 from flink_agents.api.events.event import InputEvent
+from flink_agents.api.resource import Resource
+from flink_agents.api.tools.tool import ToolMetadata
+from flink_agents.api.tools.utils import create_model_from_java_tool_schema_str
+from flink_agents.runtime.java.java_resource_wrapper import (
+    JavaGetResourceWrapper,
+    JavaPrompt,
+    JavaTool,
+)
 
 
 def convert_to_python_object(bytesObject: bytes) -> Any:
@@ -40,3 +51,122 @@ def wrap_to_input_event(bytesObject: bytes) -> tuple[bytes, 
str]:
 def get_output_from_output_event(bytesObject: bytes) -> Any:
     """Get output data from OutputEvent and serialize."""
     return cloudpickle.dumps(convert_to_python_object(bytesObject).output)
+
+def create_resource(resource_module: str, resource_clazz: str, func_kwargs: 
Dict[str, Any]) -> Resource:
+    """Dynamically create a resource instance from module and class name.
+
+    Args:
+        resource_module: The module path containing the resource class
+        resource_clazz: The class name to instantiate
+        func_kwargs: Keyword arguments to pass to the class constructor
+
+    Returns:
+        Resource: An instance of the specified resource class
+    """
+    module = importlib.import_module(resource_module)
+    cls = getattr(module, resource_clazz)
+    return cls(**func_kwargs)
+
+def get_resource_function(j_resource_adapter: Any) -> Callable:
+    """Create a callable wrapper for Java resource adapter.
+
+    Args:
+        j_resource_adapter: Java resource adapter object
+
+    Returns:
+        Callable: A Python callable that wraps the Java resource adapter
+    """
+    return JavaGetResourceWrapper(j_resource_adapter).get_resource
+
+def from_java_tool(j_tool: Any) -> JavaTool:
+    """Convert a Java tool object to a Python JavaTool instance.
+
+    Args:
+        j_tool: Java tool object
+
+    Returns:
+        JavaTool: Python wrapper for the Java tool with extracted metadata
+    """
+    name = j_tool.getName()
+    metadata = ToolMetadata(
+        name=name,
+        description=j_tool.getDescription(),
+        args_schema=create_model_from_java_tool_schema_str(name, 
j_tool.getMetadata().getInputSchema()),
+    )
+    return JavaTool(metadata=metadata)
+
+def from_java_prompt(j_prompt: Any) -> JavaPrompt:
+    """Convert a Java prompt object to a Python JavaPrompt instance.
+
+    Args:
+        j_prompt: Java prompt object to be wrapped
+
+    Returns:
+        JavaPrompt: Python wrapper for the Java prompt
+    """
+    return JavaPrompt(j_prompt=j_prompt)
+
+def normalize_tool_call_id(tool_call: Dict[str, Any]) -> Dict[str, Any]:
+    """Normalize tool call by converting the ID field to string format while 
preserving
+    all other fields.
+
+    This function ensures that the tool call ID is consistently represented as 
a string,
+    which is required for compatibility with certain systems that expect 
string IDs.
+
+    Args:
+        tool_call: Dictionary containing tool call information. The dictionary 
may
+                   contain any number of fields, but typically includes:
+                  - id: Tool call identifier (will be converted to string)
+                  - type: Tool call type (preserved as-is)
+                  - function: Function details (preserved as-is)
+                  - Any other fields (preserved as-is)
+    """
+    normalized_call = tool_call.copy()
+
+    normalized_call["id"] = str(tool_call.get("id", ""))
+
+    return normalized_call
+
+def from_java_chat_message(j_chat_message: Any) -> ChatMessage:
+    """Convert a chat message to a python chat message."""
+    return ChatMessage(role=MessageRole(j_chat_message.getRole().getValue()),
+                       content=j_chat_message.getContent(),
+                       tool_calls=[normalize_tool_call_id(tool_call) for 
tool_call in j_chat_message.getToolCalls()],
+                       extra_args=j_chat_message.getExtraArgs())
+
+
+def to_java_chat_message(chat_message: ChatMessage) -> Any:
+    """Convert a chat message to a java chat message."""
+    j_ChatMessage = 
findClass("org.apache.flink.agents.api.chat.messages.ChatMessage")
+    j_chat_message = j_ChatMessage()
+
+    j_MessageRole = 
findClass("org.apache.flink.agents.api.chat.messages.MessageRole")
+    j_chat_message.setRole(j_MessageRole.fromValue(chat_message.role.value))
+    j_chat_message.setContent(chat_message.content)
+    j_chat_message.setExtraArgs(chat_message.extra_args)
+    if chat_message.tool_calls:
+        tool_calls = [normalize_tool_call_id(tool_call) for tool_call in 
chat_message.tool_calls]
+        j_chat_message.setToolCalls(tool_calls)
+
+    return j_chat_message
+
+def call_method(obj: Any, method_name: str, kwargs: Dict[str, Any]) -> Any:
+    """Calls a method on `obj` by name and passes in positional and keyword 
arguments.
+
+    Parameters:
+        obj: Any Python object
+        method_name: A string representing the name of the method to call
+        kwargs: Keyword arguments to pass to the method
+
+    Returns:
+        The return value of the method
+
+    Raises:
+        AttributeError: If the object does not have the specified method
+    """
+    if not hasattr(obj, method_name):
+        err_msg = f"Object {obj} has no attribute '{method_name}'"
+        raise AttributeError(err_msg)
+
+    method = getattr(obj, method_name)
+    return method(**kwargs)
diff --git a/python/pyproject.toml b/python/pyproject.toml
index bcf577b..ee282d6 100644
--- a/python/pyproject.toml
+++ b/python/pyproject.toml
@@ -48,6 +48,7 @@ dependencies = [
     "pyyaml==6.0.2",
     "mcp>=1.8.0",
     "setuptools>=75.3",
+    "find_libpython",
     #TODO: Seperate integration dependencies from project
     "ollama==0.4.8",
     "dashscope~=1.24.2",
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index 2a54563..7712917 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -27,14 +27,17 @@ import org.apache.flink.agents.api.logger.EventLogger;
 import org.apache.flink.agents.api.logger.EventLoggerConfig;
 import org.apache.flink.agents.api.logger.EventLoggerFactory;
 import org.apache.flink.agents.api.logger.EventLoggerOpenParams;
+import org.apache.flink.agents.api.resource.ResourceType;
 import org.apache.flink.agents.plan.AgentPlan;
 import org.apache.flink.agents.plan.JavaFunction;
 import org.apache.flink.agents.plan.PythonFunction;
 import org.apache.flink.agents.plan.actions.Action;
+import org.apache.flink.agents.plan.resourceprovider.PythonResourceProvider;
 import org.apache.flink.agents.runtime.actionstate.ActionState;
 import org.apache.flink.agents.runtime.actionstate.ActionStateStore;
 import org.apache.flink.agents.runtime.actionstate.KafkaActionStateStore;
 import org.apache.flink.agents.runtime.context.RunnerContextImpl;
+import org.apache.flink.agents.runtime.env.EmbeddedPythonEnvironment;
 import org.apache.flink.agents.runtime.env.PythonEnvironmentManager;
 import org.apache.flink.agents.runtime.memory.CachedMemoryStore;
 import org.apache.flink.agents.runtime.memory.MemoryObjectImpl;
@@ -45,6 +48,7 @@ import 
org.apache.flink.agents.runtime.python.context.PythonRunnerContextImpl;
 import org.apache.flink.agents.runtime.python.event.PythonEvent;
 import org.apache.flink.agents.runtime.python.operator.PythonActionTask;
 import org.apache.flink.agents.runtime.python.utils.PythonActionExecutor;
+import org.apache.flink.agents.runtime.python.utils.PythonResourceAdapterImpl;
 import org.apache.flink.agents.runtime.utils.EventUtil;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.operators.MailboxExecutor;
@@ -75,6 +79,7 @@ import org.apache.flink.types.Row;
 import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import pemja.core.PythonInterpreter;
 
 import java.lang.reflect.Field;
 import java.util.ArrayList;
@@ -119,9 +124,16 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
 
     private transient MapState<String, MemoryObjectImpl.MemoryItem> 
shortTermMemState;
 
+    private transient PythonEnvironmentManager pythonEnvironmentManager;
+
+    private transient PythonInterpreter pythonInterpreter;
+
     // PythonActionExecutor for Python actions
     private transient PythonActionExecutor pythonActionExecutor;
 
+    // PythonResourceAdapter for Python resources in Java actions
+    private transient PythonResourceAdapterImpl pythonResourceAdapter;
+
     private transient FlinkAgentsMetricGroupImpl metricGroup;
 
     private transient BuiltInMetrics builtInMetrics;
@@ -251,8 +263,8 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
                                 new ListStateDescriptor<>(
                                         "currentProcessingKeys", 
TypeInformation.of(Object.class)));
 
-        // init PythonActionExecutor
-        initPythonActionExecutor();
+        // init PythonActionExecutor and PythonResourceAdapter
+        initPythonEnvironment();
 
         mailboxProcessor = getMailboxProcessor();
 
@@ -496,17 +508,29 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
         }
     }
 
-    private void initPythonActionExecutor() throws Exception {
+    private void initPythonEnvironment() throws Exception {
         boolean containPythonAction =
                 agentPlan.getActions().values().stream()
                         .anyMatch(action -> action.getExec() instanceof 
PythonFunction);
-        if (containPythonAction) {
-            LOG.debug("Begin initialize PythonActionExecutor.");
+
+        boolean containPythonResource =
+                agentPlan.getResourceProviders().values().stream()
+                        .anyMatch(
+                                resourceProviderMap ->
+                                        resourceProviderMap.values().stream()
+                                                .anyMatch(
+                                                        resourceProvider ->
+                                                                
resourceProvider
+                                                                        
instanceof
+                                                                        
PythonResourceProvider));
+
+        if (containPythonAction || containPythonResource) {
+            LOG.debug("Begin initialize PythonEnvironmentManager.");
             PythonDependencyInfo dependencyInfo =
                     PythonDependencyInfo.create(
                             getExecutionConfig().toConfiguration(),
                             getRuntimeContext().getDistributedCache());
-            PythonEnvironmentManager pythonEnvironmentManager =
+            pythonEnvironmentManager =
                     new PythonEnvironmentManager(
                             dependencyInfo,
                             getContainingTask()
@@ -515,14 +539,39 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
                                     .getTmpDirectories(),
                             new HashMap<>(System.getenv()),
                             getRuntimeContext().getJobInfo().getJobId());
-            pythonActionExecutor =
-                    new PythonActionExecutor(
-                            pythonEnvironmentManager,
-                            new ObjectMapper().writeValueAsString(agentPlan));
-            pythonActionExecutor.open();
+            pythonEnvironmentManager.open();
+            EmbeddedPythonEnvironment env = 
pythonEnvironmentManager.createEnvironment();
+            pythonInterpreter = env.getInterpreter();
+            if (containPythonAction) {
+                initPythonActionExecutor();
+            } else {
+                initPythonResourceAdapter();
+            }
         }
     }
 
+    private void initPythonActionExecutor() throws Exception {
+        pythonActionExecutor =
+                new PythonActionExecutor(
+                        pythonInterpreter, new 
ObjectMapper().writeValueAsString(agentPlan));
+        pythonActionExecutor.open();
+    }
+
+    private void initPythonResourceAdapter() throws Exception {
+        pythonResourceAdapter =
+                new PythonResourceAdapterImpl(
+                        (String anotherName, ResourceType anotherType) -> {
+                            try {
+                                return agentPlan.getResource(anotherName, 
anotherType);
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                        },
+                        pythonInterpreter);
+        pythonResourceAdapter.open();
+        agentPlan.setPythonResourceAdapter(pythonResourceAdapter);
+    }
+
     @Override
     public void endInput() throws Exception {
         waitInFlightEventsFinished();
@@ -540,6 +589,12 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
         if (pythonActionExecutor != null) {
             pythonActionExecutor.close();
         }
+        if (pythonInterpreter != null) {
+            pythonInterpreter.close();
+        }
+        if (pythonEnvironmentManager != null) {
+            pythonEnvironmentManager.close();
+        }
         if (eventLogger != null) {
             eventLogger.close();
         }
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
index ded0140..9b8c7c4 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
@@ -19,8 +19,6 @@ package org.apache.flink.agents.runtime.python.utils;
 
 import org.apache.flink.agents.plan.PythonFunction;
 import org.apache.flink.agents.runtime.context.RunnerContextImpl;
-import org.apache.flink.agents.runtime.env.EmbeddedPythonEnvironment;
-import org.apache.flink.agents.runtime.env.PythonEnvironmentManager;
 import org.apache.flink.agents.runtime.python.event.PythonEvent;
 import org.apache.flink.agents.runtime.utils.EventUtil;
 import pemja.core.PythonInterpreter;
@@ -59,21 +57,16 @@ public class PythonActionExecutor {
     private static final String GET_OUTPUT_FROM_OUTPUT_EVENT =
             "python_java_utils.get_output_from_output_event";
 
-    private final PythonEnvironmentManager environmentManager;
+    private final PythonInterpreter interpreter;
     private final String agentPlanJson;
-    private PythonInterpreter interpreter;
     private Object pythonAsyncThreadPool;
 
-    public PythonActionExecutor(PythonEnvironmentManager environmentManager, 
String agentPlanJson) {
-        this.environmentManager = environmentManager;
+    public PythonActionExecutor(PythonInterpreter interpreter, String 
agentPlanJson) {
+        this.interpreter = interpreter;
         this.agentPlanJson = agentPlanJson;
     }
 
     public void open() throws Exception {
-        environmentManager.open();
-        EmbeddedPythonEnvironment env = environmentManager.createEnvironment();
-
-        interpreter = env.getInterpreter();
         interpreter.exec(PYTHON_IMPORTS);
 
         pythonAsyncThreadPool = interpreter.invoke(CREATE_ASYNC_THREAD_POOL);
@@ -160,11 +153,6 @@ public class PythonActionExecutor {
             if (pythonAsyncThreadPool != null) {
                 interpreter.invoke(CLOSE_ASYNC_THREAD_POOL, 
pythonAsyncThreadPool);
             }
-            interpreter.close();
-        }
-
-        if (environmentManager != null) {
-            environmentManager.close();
         }
     }
 
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java
new file mode 100644
index 0000000..cd9b935
--- /dev/null
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.python.utils;
+
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.prompt.Prompt;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.resource.python.PythonResourceAdapter;
+import org.apache.flink.agents.api.resource.python.PythonResourceWrapper;
+import org.apache.flink.agents.api.tools.Tool;
+import pemja.core.PythonInterpreter;
+import pemja.core.object.PyObject;
+
+import java.util.Map;
+import java.util.function.BiFunction;
+
+public class PythonResourceAdapterImpl implements PythonResourceAdapter {
+
+    static final String PYTHON_IMPORTS = "from flink_agents.runtime import 
python_java_utils";
+
+    static final String GET_RESOURCE_KEY = "get_resource";
+
+    static final String PYTHON_MODULE_PREFIX = "python_java_utils.";
+
+    static final String GET_RESOURCE_FUNCTION = PYTHON_MODULE_PREFIX + 
"get_resource_function";
+
+    static final String CALL_METHOD = PYTHON_MODULE_PREFIX + "call_method";
+
+    static final String CREATE_RESOURCE = PYTHON_MODULE_PREFIX + 
"create_resource";
+
+    static final String FROM_JAVA_TOOL = PYTHON_MODULE_PREFIX + 
"from_java_tool";
+
+    static final String FROM_JAVA_PROMPT = PYTHON_MODULE_PREFIX + 
"from_java_prompt";
+
+    static final String FROM_JAVA_CHAT_MESSAGE = PYTHON_MODULE_PREFIX + 
"from_java_chat_message";
+
+    static final String TO_JAVA_CHAT_MESSAGE = PYTHON_MODULE_PREFIX + 
"to_java_chat_message";
+
+    private final BiFunction<String, ResourceType, Resource> getResource;
+    private final PythonInterpreter interpreter;
+    private Object pythonGetResourceFunction;
+
+    public PythonResourceAdapterImpl(
+            BiFunction<String, ResourceType, Resource> getResource, 
PythonInterpreter interpreter) {
+        this.getResource = getResource;
+        this.interpreter = interpreter;
+    }
+
+    public void open() {
+        interpreter.exec(PYTHON_IMPORTS);
+        pythonGetResourceFunction = interpreter.invoke(GET_RESOURCE_FUNCTION, 
this);
+    }
+
+    public Object getResource(String resourceName, String resourceType) {
+        Resource resource =
+                this.getResource.apply(resourceName, 
ResourceType.fromValue(resourceType));
+        if (resource instanceof PythonResourceWrapper) {
+            PythonResourceWrapper pythonResource = (PythonResourceWrapper) 
resource;
+            return pythonResource.getPythonResource();
+        }
+        if (resource instanceof Tool) {
+            return convertToPythonTool((Tool) resource);
+        }
+        if (resource instanceof Prompt) {
+            return convertToPythonPrompt((Prompt) resource);
+        }
+        return resource;
+    }
+
+    @Override
+    public PyObject initPythonResource(String module, String clazz, 
Map<String, Object> kwargs) {
+        kwargs.put(GET_RESOURCE_KEY, pythonGetResourceFunction);
+        return (PyObject) interpreter.invoke(CREATE_RESOURCE, module, clazz, 
kwargs);
+    }
+
+    @Override
+    public Object toPythonChatMessage(ChatMessage message) {
+        return interpreter.invoke(FROM_JAVA_CHAT_MESSAGE, message);
+    }
+
+    @Override
+    public ChatMessage fromPythonChatMessage(Object pythonChatMessage) {
+        ChatMessage message =
+                (ChatMessage) interpreter.invoke(TO_JAVA_CHAT_MESSAGE, 
pythonChatMessage);
+
+        return message;
+    }
+
+    @Override
+    public Object convertToPythonTool(Tool tool) {
+        return interpreter.invoke(FROM_JAVA_TOOL, tool);
+    }
+
+    private Object convertToPythonPrompt(Prompt prompt) {
+        return interpreter.invoke(FROM_JAVA_PROMPT, prompt);
+    }
+
+    @Override
+    public Object callMethod(Object obj, String methodName, Map<String, 
Object> kwargs) {
+        return interpreter.invoke(CALL_METHOD, obj, methodName, kwargs);
+    }
+
+    @Override
+    public Object invoke(String name, Object... args) {
+        return interpreter.invoke(name, args);
+    }
+}
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImplTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImplTest.java
new file mode 100644
index 0000000..18ee02e
--- /dev/null
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImplTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.python.utils;
+
+import org.apache.flink.agents.api.chat.model.python.PythonChatModelSetup;
+import org.apache.flink.agents.api.prompt.Prompt;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.resource.python.PythonResourceWrapper;
+import org.apache.flink.agents.api.tools.Tool;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import pemja.core.PythonInterpreter;
+import pemja.core.object.PyObject;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.*;
+
+public class PythonResourceAdapterImplTest {
+    @Mock private PythonInterpreter mockInterpreter;
+
+    @Mock private BiFunction<String, ResourceType, Resource> getResource;
+
+    private PythonResourceAdapterImpl pythonResourceAdapter;
+    private AutoCloseable mocks;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        mocks = MockitoAnnotations.openMocks(this);
+        pythonResourceAdapter = new PythonResourceAdapterImpl(getResource, 
mockInterpreter);
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        if (mocks != null) {
+            mocks.close();
+        }
+    }
+
+    @Test
+    void testInitPythonResource() {
+        String module = "test_module";
+        String clazz = "TestClass";
+        Map<String, Object> kwargs = new HashMap<>();
+        kwargs.put("param1", "value1");
+        kwargs.put("param2", 42);
+
+        PyObject expectedResult = mock(PyObject.class);
+        when(mockInterpreter.invoke(
+                        PythonResourceAdapterImpl.CREATE_RESOURCE, module, 
clazz, kwargs))
+                .thenReturn(expectedResult);
+
+        PyObject result = pythonResourceAdapter.initPythonResource(module, 
clazz, kwargs);
+
+        assertThat(result).isEqualTo(expectedResult);
+        
assertThat(kwargs).containsKey(PythonResourceAdapterImpl.GET_RESOURCE_KEY);
+        verify(mockInterpreter)
+                .invoke(PythonResourceAdapterImpl.CREATE_RESOURCE, module, 
clazz, kwargs);
+    }
+
+    @Test
+    void testOpen() {
+
+        pythonResourceAdapter.open();
+
+        verify(mockInterpreter).exec(PythonResourceAdapterImpl.PYTHON_IMPORTS);
+        verify(mockInterpreter)
+                .invoke(PythonResourceAdapterImpl.GET_RESOURCE_FUNCTION, 
pythonResourceAdapter);
+    }
+
+    @Test
+    void testGetResourceWithPythonResourceWrapper() {
+        String resourceName = "test_resource";
+        String resourceType = "chat_model";
+        PythonResourceWrapper mockPythonChatModelSetup = 
mock(PythonChatModelSetup.class);
+        Object expectedPythonResource = new Object();
+
+        when(getResource.apply(resourceName, ResourceType.CHAT_MODEL))
+                .thenReturn((Resource) mockPythonChatModelSetup);
+        
when(mockPythonChatModelSetup.getPythonResource()).thenReturn(expectedPythonResource);
+
+        Object result = pythonResourceAdapter.getResource(resourceName, 
resourceType);
+
+        assertThat(result).isEqualTo(expectedPythonResource);
+        verify(getResource).apply(resourceName, ResourceType.CHAT_MODEL);
+        verify(mockPythonChatModelSetup).getPythonResource();
+    }
+
+    @Test
+    void testGetResourceWithTool() {
+        String resourceName = "test_tool";
+        String resourceType = "tool";
+        Tool mockTool = mock(Tool.class);
+        Object expectedPythonTool = new Object();
+
+        when(getResource.apply(resourceName, 
ResourceType.TOOL)).thenReturn(mockTool);
+        when(mockInterpreter.invoke(PythonResourceAdapterImpl.FROM_JAVA_TOOL, 
mockTool))
+                .thenReturn(expectedPythonTool);
+
+        Object result = pythonResourceAdapter.getResource(resourceName, 
resourceType);
+
+        assertThat(result).isEqualTo(expectedPythonTool);
+        verify(getResource).apply(resourceName, ResourceType.TOOL);
+        
verify(mockInterpreter).invoke(PythonResourceAdapterImpl.FROM_JAVA_TOOL, 
mockTool);
+    }
+
+    @Test
+    void testGetResourceWithPrompt() {
+        String resourceName = "test_prompt";
+        String resourceType = "prompt";
+        Prompt mockPrompt = mock(Prompt.class);
+        Object expectedPythonPrompt = new Object();
+
+        when(getResource.apply(resourceName, 
ResourceType.PROMPT)).thenReturn(mockPrompt);
+        
when(mockInterpreter.invoke(PythonResourceAdapterImpl.FROM_JAVA_PROMPT, 
mockPrompt))
+                .thenReturn(expectedPythonPrompt);
+
+        Object result = pythonResourceAdapter.getResource(resourceName, 
resourceType);
+
+        assertThat(result).isEqualTo(expectedPythonPrompt);
+        verify(getResource).apply(resourceName, ResourceType.PROMPT);
+        
verify(mockInterpreter).invoke(PythonResourceAdapterImpl.FROM_JAVA_PROMPT, 
mockPrompt);
+    }
+
+    @Test
+    void testGetResourceWithRegularResource() {
+        String resourceName = "test_resource";
+        String resourceType = "chat_model";
+        Resource mockResource = mock(Resource.class);
+
+        when(getResource.apply(resourceName, 
ResourceType.CHAT_MODEL)).thenReturn(mockResource);
+
+        Object result = pythonResourceAdapter.getResource(resourceName, 
resourceType);
+
+        assertThat(result).isEqualTo(mockResource);
+        verify(getResource).apply(resourceName, ResourceType.CHAT_MODEL);
+    }
+
+    @Test
+    void testCallMethod() {
+        // Arrange
+        Object obj = new Object();
+        String methodName = "test_method";
+        Map<String, Object> kwargs = Map.of("param", "value");
+        Object expectedResult = "method_result";
+
+        when(mockInterpreter.invoke(PythonResourceAdapterImpl.CALL_METHOD, 
obj, methodName, kwargs))
+                .thenReturn(expectedResult);
+
+        Object result = pythonResourceAdapter.callMethod(obj, methodName, 
kwargs);
+
+        assertThat(result).isEqualTo(expectedResult);
+        verify(mockInterpreter)
+                .invoke(PythonResourceAdapterImpl.CALL_METHOD, obj, 
methodName, kwargs);
+    }
+
+    @Test
+    void testInvoke() {
+        String name = "test_function";
+        Object[] args = {"arg1", 42, true};
+        Object expectedResult = "invoke_result";
+
+        when(mockInterpreter.invoke(name, args)).thenReturn(expectedResult);
+
+        Object result = pythonResourceAdapter.invoke(name, args);
+
+        assertThat(result).isEqualTo(expectedResult);
+        verify(mockInterpreter).invoke(name, args);
+    }
+}
diff --git a/tools/e2e.sh b/tools/e2e.sh
index 103d024..f244bc0 100755
--- a/tools/e2e.sh
+++ b/tools/e2e.sh
@@ -66,6 +66,18 @@ function run_cross_language_config_test {
   cd "$python_dir" && uv run bash 
../e2e-test/test-scripts/test_java_config_in_python.sh
 }
 
+function run_resource_cross_language_test {
+  # This test only runs Maven, no need for Python environment
+  # Ensure we're in the project root directory
+  cd "$project_root"
+  uv run bash e2e-test/test-scripts/test_resource_cross_language.sh
+}
+
+export TOTAL=0
+export PASSED=0
+
+run_test "Resource Cross-Language end-to-end test" 
"run_resource_cross_language_test"
+
 if [[ ! -d "e2e-test/target" ]]; then
   echo "Build flink-agents before run e2e tests."
   bash tools/build.sh
@@ -94,9 +106,6 @@ uv sync --extra dev
 uv pip install -e .
 cd ..
 
-export TOTAL=0
-export PASSED=0
-
 # Create temporary directory with better cross-platform compatibility
 if command -v mktemp >/dev/null 2>&1; then
   tempdir=$(mktemp -d)
diff --git a/tools/ut.sh b/tools/ut.sh
index 18d459e..e172b0c 100755
--- a/tools/ut.sh
+++ b/tools/ut.sh
@@ -98,7 +98,7 @@ java_tests() {
     if $run_e2e; then
         mvn -T16 --batch-mode --no-transfer-progress test -pl 
'e2e-test/flink-agents-end-to-end-tests-integration'
     else
-        mvn -T16 --batch-mode --no-transfer-progress test -pl 
'!e2e-test/flink-agents-end-to-end-tests-integration'
+        mvn -T16 --batch-mode --no-transfer-progress test -pl 
'!e2e-test/flink-agents-end-to-end-tests-integration,!e2e-test/flink-agents-end-to-end-tests-resource-cross-language'
     fi
     testcode=$?
     case $testcode in

Reply via email to