This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new b918d83 [Feature][runtime] Support the use of Python ChatModel in
Java (#314)
b918d83 is described below
commit b918d83e3932e58450f4d8474143434864734f16
Author: Eugene <[email protected]>
AuthorDate: Fri Dec 12 14:41:53 2025 +0800
[Feature][runtime] Support the use of Python ChatModel in Java (#314)
---
.github/workflows/ci.yml | 6 +-
api/pom.xml | 6 +
.../agents/api/chat/messages/ChatMessage.java | 6 -
.../model/python/PythonChatModelConnection.java | 90 ++++++++++
.../chat/model/python/PythonChatModelSetup.java | 88 ++++++++++
.../api/resource/python/PythonResourceAdapter.java | 95 ++++++++++
.../api/resource/python/PythonResourceWrapper.java | 32 ++++
.../python/PythonChatModelConnectionTest.java | 158 +++++++++++++++++
.../model/python/PythonChatModelSetupTest.java | 161 +++++++++++++++++
.../pom.xml | 40 +----
.../resource/test/ChatModelCrossLanguageAgent.java | 147 ++++++++++++++++
.../resource/test/ChatModelCrossLanguageTest.java | 126 ++++++++++++++
.../src/test/resources/ollama_pull_model.sh | 30 +---
e2e-test/pom.xml | 1 +
.../test-scripts/test_resource_cross_language.sh | 41 +++++
examples/pom.xml | 2 +
.../org/apache/flink/agents/plan/AgentPlan.java | 34 +++-
.../resourceprovider/PythonResourceProvider.java | 50 +++++-
.../ResourceProviderJsonDeserializer.java | 9 +
.../serializer/ResourceProviderJsonSerializer.java | 4 +
.../apache/flink/agents/plan/AgentPlanTest.java | 123 ++++++++++++-
python/flink_agents/api/tools/utils.py | 14 ++
.../{python_java_utils.py => java/__init__.py} | 25 ---
.../runtime/java/java_resource_wrapper.py | 75 ++++++++
python/flink_agents/runtime/python_java_utils.py | 132 +++++++++++++-
python/pyproject.toml | 1 +
.../runtime/operator/ActionExecutionOperator.java | 77 +++++++--
.../runtime/python/utils/PythonActionExecutor.java | 18 +-
.../python/utils/PythonResourceAdapterImpl.java | 123 +++++++++++++
.../utils/PythonResourceAdapterImplTest.java | 192 +++++++++++++++++++++
tools/e2e.sh | 15 +-
tools/ut.sh | 2 +-
32 files changed, 1791 insertions(+), 132 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index b60288a..f70936c 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -170,5 +170,9 @@ jobs:
version: "latest"
- name: Build flink-agents
run: bash tools/build.sh
+ - name: Install ollama
+ run: bash tools/start_ollama_server.sh
- name: Run e2e tests
- run: bash tools/e2e.sh
\ No newline at end of file
+ run: |
+ export PYTHONPATH="${{ github.workspace
}}/python/.venv/lib/python${{ matrix.python-version
}}/site-packages:$PYTHONPATH"
+ tools/e2e.sh
\ No newline at end of file
diff --git a/api/pom.xml b/api/pom.xml
index fb4123e..a540361 100644
--- a/api/pom.xml
+++ b/api/pom.xml
@@ -57,6 +57,12 @@ under the License.
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>pemja</artifactId>
+ <version>${pemja.version}</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git
a/api/src/main/java/org/apache/flink/agents/api/chat/messages/ChatMessage.java
b/api/src/main/java/org/apache/flink/agents/api/chat/messages/ChatMessage.java
index 7ebc787..6e844e7 100644
---
a/api/src/main/java/org/apache/flink/agents/api/chat/messages/ChatMessage.java
+++
b/api/src/main/java/org/apache/flink/agents/api/chat/messages/ChatMessage.java
@@ -32,9 +32,6 @@ import java.util.Objects;
*/
public class ChatMessage {
- /** The key for the message type in the metadata. */
- public static final String MESSAGE_TYPE = "messageType";
-
private MessageRole role;
private String content;
private List<Map<String, Object>> toolCalls;
@@ -68,7 +65,6 @@ public class ChatMessage {
this.content = content != null ? content : "";
this.toolCalls = toolCalls != null ? toolCalls : new ArrayList<>();
this.extraArgs = extraArgs != null ? new HashMap<>(extraArgs) : new
HashMap<>();
- this.extraArgs.put(MESSAGE_TYPE, this.role);
}
public MessageRole getRole() {
@@ -77,7 +73,6 @@ public class ChatMessage {
public void setRole(MessageRole role) {
this.role = role;
- this.extraArgs.put(MESSAGE_TYPE, this.role);
}
public String getContent() {
@@ -102,7 +97,6 @@ public class ChatMessage {
public void setExtraArgs(Map<String, Object> extraArgs) {
this.extraArgs = extraArgs != null ? extraArgs : new HashMap<>();
- this.extraArgs.put(MESSAGE_TYPE, this.role);
}
@JsonIgnore
diff --git
a/api/src/main/java/org/apache/flink/agents/api/chat/model/python/PythonChatModelConnection.java
b/api/src/main/java/org/apache/flink/agents/api/chat/model/python/PythonChatModelConnection.java
new file mode 100644
index 0000000..7cae38b
--- /dev/null
+++
b/api/src/main/java/org/apache/flink/agents/api/chat/model/python/PythonChatModelConnection.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.api.chat.model.python;
+
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.model.BaseChatModelConnection;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.resource.python.PythonResourceAdapter;
+import org.apache.flink.agents.api.resource.python.PythonResourceWrapper;
+import org.apache.flink.agents.api.tools.Tool;
+import pemja.core.object.PyObject;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+/**
+ * Python-based implementation of ChatModelConnection that wraps a Python chat
model object. This
+ * class serves as a bridge between Java and Python chat model environments,
but unlike {@link
+ * PythonChatModelSetup}, it does not provide direct chat functionality in
Java.
+ */
+public class PythonChatModelConnection extends BaseChatModelConnection
+ implements PythonResourceWrapper {
+ private final PyObject chatModel;
+ private final PythonResourceAdapter adapter;
+
+ /**
+ * Creates a new PythonChatModelConnection.
+ *
+ * @param adapter The Python resource adapter (required by
PythonResourceProvider's
+ * reflection-based instantiation but not used directly in this
implementation)
+ * @param chatModel The Python chat model object
+ * @param descriptor The resource descriptor
+ * @param getResource Function to retrieve resources by name and type
+ */
+ public PythonChatModelConnection(
+ PythonResourceAdapter adapter,
+ PyObject chatModel,
+ ResourceDescriptor descriptor,
+ BiFunction<String, ResourceType, Resource> getResource) {
+ super(descriptor, getResource);
+ this.chatModel = chatModel;
+ this.adapter = adapter;
+ }
+
+ @Override
+ public Object getPythonResource() {
+ return chatModel;
+ }
+
+ @Override
+ public ChatMessage chat(
+ List<ChatMessage> messages, List<Tool> tools, Map<String, Object>
arguments) {
+ Map<String, Object> kwargs = new HashMap<>(arguments);
+
+ List<Object> pythonMessages = new ArrayList<>();
+ for (ChatMessage message : messages) {
+ pythonMessages.add(adapter.toPythonChatMessage(message));
+ }
+ kwargs.put("messages", pythonMessages);
+
+ List<Object> pythonTools = new ArrayList<>();
+ for (Tool tool : tools) {
+ pythonTools.add(adapter.convertToPythonTool(tool));
+ }
+ kwargs.put("tools", pythonTools);
+
+ Object pythonMessageResponse = adapter.callMethod(chatModel, "chat",
kwargs);
+ return adapter.fromPythonChatMessage(pythonMessageResponse);
+ }
+}
diff --git
a/api/src/main/java/org/apache/flink/agents/api/chat/model/python/PythonChatModelSetup.java
b/api/src/main/java/org/apache/flink/agents/api/chat/model/python/PythonChatModelSetup.java
new file mode 100644
index 0000000..7a5cf43
--- /dev/null
+++
b/api/src/main/java/org/apache/flink/agents/api/chat/model/python/PythonChatModelSetup.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.api.chat.model.python;
+
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.model.BaseChatModelSetup;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.resource.python.PythonResourceAdapter;
+import org.apache.flink.agents.api.resource.python.PythonResourceWrapper;
+import pemja.core.object.PyObject;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Python-based implementation of ChatModelSetup that bridges Java and Python
chat model
+ * functionality. This class wraps a Python chat model setup object and
provides Java interface
+ * compatibility while delegating actual chat operations to the underlying
Python implementation.
+ */
+public class PythonChatModelSetup extends BaseChatModelSetup implements
PythonResourceWrapper {
+ static final String FROM_JAVA_CHAT_MESSAGE =
"python_java_utils.from_java_chat_message";
+
+ static final String TO_JAVA_CHAT_MESSAGE =
"python_java_utils.to_java_chat_message";
+
+ private final PyObject chatModelSetup;
+ private final PythonResourceAdapter adapter;
+
+ public PythonChatModelSetup(
+ PythonResourceAdapter adapter,
+ PyObject chatModelSetup,
+ ResourceDescriptor descriptor,
+ BiFunction<String, ResourceType, Resource> getResource) {
+ super(descriptor, getResource);
+ this.chatModelSetup = chatModelSetup;
+ this.adapter = adapter;
+ }
+
+ @Override
+ public ChatMessage chat(List<ChatMessage> messages, Map<String, Object>
parameters) {
+ checkState(
+ chatModelSetup != null,
+ "ChatModelSetup is not initialized. Cannot perform chat
operation.");
+
+ Map<String, Object> kwargs = new HashMap<>(parameters);
+
+ List<Object> pythonMessages = new ArrayList<>();
+ for (ChatMessage message : messages) {
+ pythonMessages.add(adapter.toPythonChatMessage(message));
+ }
+
+ kwargs.put("messages", pythonMessages);
+
+ Object pythonMessageResponse = adapter.callMethod(chatModelSetup,
"chat", kwargs);
+ return adapter.fromPythonChatMessage(pythonMessageResponse);
+ }
+
+ @Override
+ public Object getPythonResource() {
+ return chatModelSetup;
+ }
+
+ @Override
+ public Map<String, Object> getParameters() {
+ return Map.of();
+ }
+}
diff --git
a/api/src/main/java/org/apache/flink/agents/api/resource/python/PythonResourceAdapter.java
b/api/src/main/java/org/apache/flink/agents/api/resource/python/PythonResourceAdapter.java
new file mode 100644
index 0000000..89652ae
--- /dev/null
+++
b/api/src/main/java/org/apache/flink/agents/api/resource/python/PythonResourceAdapter.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.resource.python;
+
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.tools.Tool;
+import pemja.core.object.PyObject;
+
+import java.util.Map;
+
+/**
+ * Adapter interface for managing Python resources and facilitating
Java-Python interoperability.
+ * This interface provides methods to interact with Python objects, invoke
Python methods, and
+ * handle data conversion between Java and Python environments.
+ */
+public interface PythonResourceAdapter {
+
+ /**
+ * Retrieves a Python resource by name and type.
+ *
+ * @param resourceName the name of the resource to retrieve
+ * @param resourceType the type of the resource
+ * @return the retrieved resource object
+ */
+ Object getResource(String resourceName, String resourceType);
+
+ /**
+ * Initializes a Python resource instance from the specified module and
class.
+ *
+ * @param module the Python module containing the target class
+ * @param clazz the Python class name to instantiate
+ * @param kwargs keyword arguments to pass to the Python class constructor
+ * @return a PyObject representing the initialized Python resource
+ */
+ PyObject initPythonResource(String module, String clazz, Map<String,
Object> kwargs);
+
+ /**
+ * Converts a Java ChatMessage object to its Python equivalent.
+ *
+ * @param message the Java ChatMessage to convert
+ * @return the Python representation of the chat message
+ */
+ Object toPythonChatMessage(ChatMessage message);
+
+ /**
+ * Converts a Python chat message object back to a Java ChatMessage.
+ *
+ * @param pythonChatMessage the Python chat message object to convert
+ * @return the Java ChatMessage representation
+ */
+ ChatMessage fromPythonChatMessage(Object pythonChatMessage);
+
+ /**
+ * Converts a Java Tool object to its Python equivalent.
+ *
+ * @param tool the Java Tool to convert
+ * @return the Python representation of the tool
+ */
+ Object convertToPythonTool(Tool tool);
+
+ /**
+ * Invokes a method on a Python object with the specified parameters.
+ *
+ * @param obj the Python object on which to call the method
+ * @param methodName the name of the method to invoke
+ * @param kwargs keyword arguments to pass to the method
+ * @return the result of the method invocation
+ */
+ Object callMethod(Object obj, String methodName, Map<String, Object>
kwargs);
+
+ /**
+ * Invokes a method with the specified name and arguments.
+ *
+ * @param name the name of the method to invoke
+ * @param args the arguments to pass to the method
+ * @return the result of the method invocation
+ */
+ Object invoke(String name, Object... args);
+}
diff --git
a/api/src/main/java/org/apache/flink/agents/api/resource/python/PythonResourceWrapper.java
b/api/src/main/java/org/apache/flink/agents/api/resource/python/PythonResourceWrapper.java
new file mode 100644
index 0000000..c69cf59
--- /dev/null
+++
b/api/src/main/java/org/apache/flink/agents/api/resource/python/PythonResourceWrapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.api.resource.python;
+
+/**
+ * Wrapper interface for Python resource objects. This interface provides a
unified way to access
+ * the underlying Python resource from Java objects that encapsulate Python
functionality.
+ */
+public interface PythonResourceWrapper {
+
+ /**
+ * Retrieves the underlying Python resource object.
+ *
+ * @return the wrapped Python resource object
+ */
+ Object getPythonResource();
+}
diff --git
a/api/src/test/java/org/apache/flink/agents/api/chat/model/python/PythonChatModelConnectionTest.java
b/api/src/test/java/org/apache/flink/agents/api/chat/model/python/PythonChatModelConnectionTest.java
new file mode 100644
index 0000000..79069d7
--- /dev/null
+++
b/api/src/test/java/org/apache/flink/agents/api/chat/model/python/PythonChatModelConnectionTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.api.chat.model.python;
+
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.model.BaseChatModelConnection;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.resource.python.PythonResourceAdapter;
+import org.apache.flink.agents.api.resource.python.PythonResourceWrapper;
+import org.apache.flink.agents.api.tools.Tool;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import pemja.core.object.PyObject;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.*;
+
+public class PythonChatModelConnectionTest {
+ @Mock private PythonResourceAdapter mockAdapter;
+
+ @Mock private PyObject mockChatModel;
+
+ @Mock private ResourceDescriptor mockDescriptor;
+
+ @Mock private BiFunction<String, ResourceType, Resource> mockGetResource;
+
+ private PythonChatModelConnection pythonChatModelConnection;
+ private AutoCloseable mocks;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ mocks = MockitoAnnotations.openMocks(this);
+ pythonChatModelConnection =
+ new PythonChatModelConnection(
+ mockAdapter, mockChatModel, mockDescriptor,
mockGetResource);
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ if (mocks != null) {
+ mocks.close();
+ }
+ }
+
+ @Test
+ void testConstructor() {
+ assertThat(pythonChatModelConnection).isNotNull();
+
assertThat(pythonChatModelConnection.getPythonResource()).isEqualTo(mockChatModel);
+ }
+
+ @Test
+ void testGetPythonResourceWithNullChatModel() {
+ PythonChatModelConnection connectionWithNullModel =
+ new PythonChatModelConnection(mockAdapter, null,
mockDescriptor, mockGetResource);
+
+ Object result = connectionWithNullModel.getPythonResource();
+
+ assertThat(result).isNull();
+ }
+
+ @Test
+ void testChat() {
+ ChatMessage inputMessage = mock(ChatMessage.class);
+ ChatMessage outputMessage = mock(ChatMessage.class);
+ Tool mockTool = mock(Tool.class);
+ List<ChatMessage> messages = Collections.singletonList(inputMessage);
+ List<Tool> tools = Collections.singletonList(mockTool);
+ Map<String, Object> arguments = new HashMap<>();
+ arguments.put("temperature", 0.7);
+ arguments.put("max_tokens", 100);
+
+ Object pythonInputMessage = new Object();
+ Object pythonOutputMessage = new Object();
+ Object pythonTool = new Object();
+
+
when(mockAdapter.toPythonChatMessage(inputMessage)).thenReturn(pythonInputMessage);
+ when(mockAdapter.convertToPythonTool(mockTool)).thenReturn(pythonTool);
+ when(mockAdapter.callMethod(eq(mockChatModel), eq("chat"),
any(Map.class)))
+ .thenReturn(pythonOutputMessage);
+
when(mockAdapter.fromPythonChatMessage(pythonOutputMessage)).thenReturn(outputMessage);
+
+ ChatMessage result = pythonChatModelConnection.chat(messages, tools,
arguments);
+
+ assertThat(result).isEqualTo(outputMessage);
+
+ verify(mockAdapter).toPythonChatMessage(inputMessage);
+ verify(mockAdapter).convertToPythonTool(mockTool);
+ verify(mockAdapter)
+ .callMethod(
+ eq(mockChatModel),
+ eq("chat"),
+ argThat(
+ kwargs -> {
+ assertThat(kwargs).containsKey("messages");
+ assertThat(kwargs).containsKey("tools");
+
assertThat(kwargs).containsKey("temperature");
+
assertThat(kwargs).containsKey("max_tokens");
+
assertThat(kwargs.get("temperature")).isEqualTo(0.7);
+
assertThat(kwargs.get("max_tokens")).isEqualTo(100);
+
+ List<?> pythonMessages = (List<?>)
kwargs.get("messages");
+ assertThat(pythonMessages).hasSize(1);
+
assertThat(pythonMessages.get(0)).isEqualTo(pythonInputMessage);
+
+ List<?> pythonTools = (List<?>)
kwargs.get("tools");
+ assertThat(pythonTools).hasSize(1);
+
assertThat(pythonTools.get(0)).isEqualTo(pythonTool);
+
+ return true;
+ }));
+ verify(mockAdapter).fromPythonChatMessage(pythonOutputMessage);
+ }
+
+ @Test
+ void testInheritanceFromBaseChatModelConnection() {
+
assertThat(pythonChatModelConnection).isInstanceOf(BaseChatModelConnection.class);
+ }
+
+ @Test
+ void testImplementsPythonResourceWrapper() {
+
assertThat(pythonChatModelConnection).isInstanceOf(PythonResourceWrapper.class);
+ }
+
+ @Test
+ void testConstructorWithAllNullParameters() {
+ PythonChatModelConnection connectionWithNulls =
+ new PythonChatModelConnection(null, null, null, null);
+
+ assertThat(connectionWithNulls).isNotNull();
+ assertThat(connectionWithNulls.getPythonResource()).isNull();
+ }
+}
diff --git
a/api/src/test/java/org/apache/flink/agents/api/chat/model/python/PythonChatModelSetupTest.java
b/api/src/test/java/org/apache/flink/agents/api/chat/model/python/PythonChatModelSetupTest.java
new file mode 100644
index 0000000..274ddfe
--- /dev/null
+++
b/api/src/test/java/org/apache/flink/agents/api/chat/model/python/PythonChatModelSetupTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.api.chat.model.python;
+
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.resource.python.PythonResourceAdapter;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import pemja.core.object.PyObject;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.*;
+
+public class PythonChatModelSetupTest {
+ @Mock private PythonResourceAdapter mockAdapter;
+
+ @Mock private PyObject mockChatModelSetup;
+
+ @Mock private ResourceDescriptor mockDescriptor;
+
+ @Mock private BiFunction<String, ResourceType, Resource> mockGetResource;
+
+ private PythonChatModelSetup pythonChatModelSetup;
+ private AutoCloseable mocks;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ mocks = MockitoAnnotations.openMocks(this);
+ pythonChatModelSetup =
+ new PythonChatModelSetup(
+ mockAdapter, mockChatModelSetup, mockDescriptor,
mockGetResource);
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ if (mocks != null) {
+ mocks.close();
+ }
+ }
+
+ @Test
+ void testConstructor() {
+ assertThat(pythonChatModelSetup).isNotNull();
+
assertThat(pythonChatModelSetup.getPythonResource()).isEqualTo(mockChatModelSetup);
+ }
+
+ @Test
+ void testGetPythonResourceWithNullChatModelSetup() {
+ PythonChatModelSetup setupWithNullModel =
+ new PythonChatModelSetup(mockAdapter, null, mockDescriptor,
mockGetResource);
+
+ Object result = setupWithNullModel.getPythonResource();
+
+ assertThat(result).isNull();
+ }
+
+ @Test
+ void testGetParameters() {
+ Map<String, Object> result = pythonChatModelSetup.getParameters();
+
+ assertThat(result).isNotNull();
+ assertThat(result).isEmpty();
+ }
+
+ @Test
+ void testChat() {
+ ChatMessage inputMessage = mock(ChatMessage.class);
+ ChatMessage outputMessage = mock(ChatMessage.class);
+ List<ChatMessage> messages = Collections.singletonList(inputMessage);
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("temperature", 0.7);
+ parameters.put("max_tokens", 100);
+
+ Object pythonInputMessage = new Object();
+ Object pythonOutputMessage = new Object();
+
+
when(mockAdapter.toPythonChatMessage(inputMessage)).thenReturn(pythonInputMessage);
+ when(mockAdapter.callMethod(eq(mockChatModelSetup), eq("chat"),
any(Map.class)))
+ .thenReturn(pythonOutputMessage);
+
when(mockAdapter.fromPythonChatMessage(pythonOutputMessage)).thenReturn(outputMessage);
+
+ ChatMessage result = pythonChatModelSetup.chat(messages, parameters);
+
+ assertThat(result).isEqualTo(outputMessage);
+
+ verify(mockAdapter).toPythonChatMessage(inputMessage);
+ verify(mockAdapter)
+ .callMethod(
+ eq(mockChatModelSetup),
+ eq("chat"),
+ argThat(
+ kwargs -> {
+ assertThat(kwargs).containsKey("messages");
+
assertThat(kwargs).containsKey("temperature");
+
assertThat(kwargs).containsKey("max_tokens");
+
assertThat(kwargs.get("temperature")).isEqualTo(0.7);
+
assertThat(kwargs.get("max_tokens")).isEqualTo(100);
+ List<?> pythonMessages = (List<?>)
kwargs.get("messages");
+ assertThat(pythonMessages).hasSize(1);
+
assertThat(pythonMessages.get(0)).isEqualTo(pythonInputMessage);
+ return true;
+ }));
+ verify(mockAdapter).fromPythonChatMessage(pythonOutputMessage);
+ }
+
+ @Test
+ void testChatWithNullChatModelSetupThrowsException() {
+ PythonChatModelSetup setupWithNullModel =
+ new PythonChatModelSetup(mockAdapter, null, mockDescriptor,
mockGetResource);
+
+ ChatMessage inputMessage = mock(ChatMessage.class);
+ List<ChatMessage> messages = Collections.singletonList(inputMessage);
+ Map<String, Object> parameters = new HashMap<>();
+
+ assertThatThrownBy(() -> setupWithNullModel.chat(messages, parameters))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("ChatModelSetup is not initialized")
+ .hasMessageContaining("Cannot perform chat operation");
+ }
+
+ @Test
+ void testInheritanceFromBaseChatModelSetup() {
+ assertThat(pythonChatModelSetup)
+
.isInstanceOf(org.apache.flink.agents.api.chat.model.BaseChatModelSetup.class);
+ }
+
+ @Test
+ void testImplementsPythonResourceWrapper() {
+ assertThat(pythonChatModelSetup)
+ .isInstanceOf(
+
org.apache.flink.agents.api.resource.python.PythonResourceWrapper.class);
+ }
+}
diff --git a/examples/pom.xml
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/pom.xml
similarity index 53%
copy from examples/pom.xml
copy to e2e-test/flink-agents-end-to-end-tests-resource-cross-language/pom.xml
index 1b8d543..c93c37f 100644
--- a/examples/pom.xml
+++ b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/pom.xml
@@ -1,32 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-agents</artifactId>
+ <artifactId>flink-agents-e2e-tests</artifactId>
<version>0.2-SNAPSHOT</version>
</parent>
- <artifactId>flink-agents-examples</artifactId>
- <name>Flink Agents : Examples</name>
+
<artifactId>flink-agents-end-to-end-tests-resource-cross-language</artifactId>
+ <name>Flink Agents : E2E Tests: Resource Cross-Language</name>
<dependencies>
<dependency>
@@ -34,16 +18,13 @@ under the License.
<artifactId>flink-agents-api</artifactId>
<version>${project.version}</version>
</dependency>
+
+ <!-- Dependencies required for running agents in minicluster -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-agents-runtime</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-files</artifactId>
- <version>${flink.version}</version>
- </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
@@ -54,11 +35,6 @@ under the License.
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner_2.12</artifactId>
- <version>${flink.version}</version>
- </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
@@ -66,10 +42,8 @@ under the License.
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-agents-integrations-chat-models-ollama</artifactId>
- <version>${project.version}</version>
+ <artifactId>flink-python</artifactId>
+ <version>${flink.version}</version>
</dependency>
-
</dependencies>
-
</project>
\ No newline at end of file
diff --git
a/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/ChatModelCrossLanguageAgent.java
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/ChatModelCrossLanguageAgent.java
new file mode 100644
index 0000000..4511044
--- /dev/null
+++
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/ChatModelCrossLanguageAgent.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.resource.test;
+
+import org.apache.flink.agents.api.Agent;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.api.annotation.Action;
+import org.apache.flink.agents.api.annotation.ChatModelConnection;
+import org.apache.flink.agents.api.annotation.ChatModelSetup;
+import org.apache.flink.agents.api.annotation.Tool;
+import org.apache.flink.agents.api.annotation.ToolParam;
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.messages.MessageRole;
+import org.apache.flink.agents.api.chat.model.python.PythonChatModelConnection;
+import org.apache.flink.agents.api.chat.model.python.PythonChatModelSetup;
+import org.apache.flink.agents.api.context.RunnerContext;
+import org.apache.flink.agents.api.event.ChatRequestEvent;
+import org.apache.flink.agents.api.event.ChatResponseEvent;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Agent example that integrates an external Ollama chat model into Flink
Agents.
+ *
+ * <p>This class demonstrates how to:
+ *
+ * <ul>
+ * <li>Declare a chat model connection using {@link ChatModelConnection}
metadata pointing to
+ * {@link PythonChatModelConnection}
+ * <li>Declare a chat model setup using {@link ChatModelSetup} metadata
pointing to {@link
+ * PythonChatModelSetup}
+ * <li>Expose callable tools via {@link Tool} annotated static methods
(temperature conversion,
+ * BMI, random number)
+ * <li>Fetch a chat model from the {@link RunnerContext} and perform a
single-turn chat
+ * <li>Emit the model response as an {@link OutputEvent}
+ * </ul>
+ *
+ * <p>The {@code ollamaChatModel()} method publishes a resource with type
{@link
+ * ResourceType#CHAT_MODEL} so it can be retrieved at runtime inside the
{@code process} action. The
+ * resource is configured with the connection name, the model name and the
list of tool names that
+ * the model is allowed to call.
+ */
+public class ChatModelCrossLanguageAgent extends Agent {
+ public static final String OLLAMA_MODEL = "qwen3:0.6b";
+
+ @ChatModelConnection
+ public static ResourceDescriptor chatModelConnection() {
+ return
ResourceDescriptor.Builder.newBuilder(PythonChatModelConnection.class.getName())
+ .addInitialArgument(
+ "module",
"flink_agents.integrations.chat_models.ollama_chat_model")
+ .addInitialArgument("clazz", "OllamaChatModelConnection")
+ .build();
+ }
+
+ @ChatModelSetup
+ public static ResourceDescriptor chatModel() {
+ return
ResourceDescriptor.Builder.newBuilder(PythonChatModelSetup.class.getName())
+ .addInitialArgument("connection", "chatModelConnection")
+ .addInitialArgument(
+ "module",
"flink_agents.integrations.chat_models.ollama_chat_model")
+ .addInitialArgument("clazz", "OllamaChatModelSetup")
+ .addInitialArgument("model", OLLAMA_MODEL)
+ .addInitialArgument(
+ "tools",
+ List.of("calculateBMI", "convertTemperature",
"createRandomNumber"))
+ .addInitialArgument("extract_reasoning", "true")
+ .build();
+ }
+
+ @Tool(description = "Converts temperature between Celsius and Fahrenheit")
+ public static double convertTemperature(
+ @ToolParam(name = "value", description = "Temperature value to
convert") Double value,
+ @ToolParam(
+ name = "fromUnit",
+ description = "Source unit ('C' for Celsius or 'F'
for Fahrenheit)")
+ String fromUnit,
+ @ToolParam(
+ name = "toUnit",
+ description = "Target unit ('C' for Celsius or 'F'
for Fahrenheit)")
+ String toUnit) {
+
+ fromUnit = fromUnit.toUpperCase();
+ toUnit = toUnit.toUpperCase();
+
+ if (fromUnit.equals(toUnit)) {
+ return value;
+ }
+
+ if (fromUnit.equals("C") && toUnit.equals("F")) {
+ return (value * 9 / 5) + 32;
+ } else if (fromUnit.equals("F") && toUnit.equals("C")) {
+ return (value - 32) * 5 / 9;
+ } else {
+ throw new IllegalArgumentException("Invalid temperature units. Use
'C' or 'F'");
+ }
+ }
+
+ @Tool(description = "Calculates Body Mass Index (BMI)")
+ public static double calculateBMI(
+ @ToolParam(name = "weightKg", description = "Weight in kilograms")
Double weightKg,
+ @ToolParam(name = "heightM", description = "Height in meters")
Double heightM) {
+
+ if (weightKg <= 0 || heightM <= 0) {
+ throw new IllegalArgumentException("Weight and height must be
positive values");
+ }
+ return weightKg / (heightM * heightM);
+ }
+
+ @Tool(description = "Create a random number")
+ public static double createRandomNumber() {
+ return Math.random();
+ }
+
+ @Action(listenEvents = {InputEvent.class})
+ public static void process(InputEvent event, RunnerContext ctx) throws
Exception {
+ ctx.sendEvent(
+ new ChatRequestEvent(
+ "chatModel",
+ Collections.singletonList(
+ new ChatMessage(MessageRole.USER, (String)
event.getInput()))));
+ }
+
+ @Action(listenEvents = {ChatResponseEvent.class})
+ public static void processChatResponse(ChatResponseEvent event,
RunnerContext ctx) {
+ ctx.sendEvent(new OutputEvent(event.getResponse().getContent()));
+ }
+}
diff --git
a/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/ChatModelCrossLanguageTest.java
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/ChatModelCrossLanguageTest.java
new file mode 100644
index 0000000..562a891
--- /dev/null
+++
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/ChatModelCrossLanguageTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.resource.test;
+
+import org.apache.flink.agents.api.AgentsExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.flink.agents.resource.test.ChatModelCrossLanguageAgent.OLLAMA_MODEL;
+
+public class ChatModelCrossLanguageTest {
+ private static final Logger LOG =
LoggerFactory.getLogger(ChatModelCrossLanguageTest.class);
+
+ private final boolean ollamaReady;
+
+ public ChatModelCrossLanguageTest() throws IOException {
+ ollamaReady = pullModel(OLLAMA_MODEL);
+ }
+
+ @Test
+ public void testChatModeIntegration() throws Exception {
+ Assumptions.assumeTrue(ollamaReady, "Ollama Server information is not
provided");
+
+ // Create the execution environment
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ // Use prompts that trigger different tool calls in the agent
+ DataStream<String> inputStream =
+ env.fromData(
+ "Convert 25 degrees Celsius to Fahrenheit",
+ "What is 98.6 Fahrenheit in Celsius?",
+ "Change 32 degrees Celsius to Fahrenheit",
+ "If it's 75 degrees Fahrenheit, what would that be in
Celsius?",
+ "Convert room temperature of 20C to F",
+ "Calculate BMI for someone who is 1.75 meters tall and
weighs 70 kg",
+ "What's the BMI for a person weighing 85 kg with
height 1.80 meters?",
+ "Can you tell me the BMI if I'm 1.65m tall and weigh
60kg?",
+ "Find BMI for 75kg weight and 1.78m height",
+ "Create me a random number please");
+
+ // Create agents execution environment
+ AgentsExecutionEnvironment agentsEnv =
+ AgentsExecutionEnvironment.getExecutionEnvironment(env);
+
+ // Apply agent to the DataStream and use the prompt itself as the key
+ DataStream<Object> outputStream =
+ agentsEnv
+ .fromDataStream(
+ inputStream, (KeySelector<String, String>)
value -> "orderKey")
+ .apply(new ChatModelCrossLanguageAgent())
+ .toDataStream();
+
+ // Collect the results
+ CloseableIterator<Object> results = outputStream.collectAsync();
+
+ // Execute the pipeline
+ agentsEnv.execute();
+
+ checkResult(results);
+ }
+
+ public void checkResult(CloseableIterator<Object> results) {
+ List<String> expectedWords =
+ List.of("77", "37", "89", "23", "68", "22", "26", "22", "23",
"");
+ for (String expected : expectedWords) {
+ Assertions.assertTrue(
+ results.hasNext(), "Output messages count %s is less than
expected.");
+ String res = (String) results.next();
+ if (res.contains("error") || res.contains("parameters")) {
+ LOG.warn(res);
+ } else {
+ Assertions.assertTrue(
+ res.contains(expected),
+ String.format(
+ "Groud truth %s is not contained in answer
{%s}", expected, res));
+ }
+ }
+ }
+
+ public static boolean pullModel(String model) throws IOException {
+ String path =
+ Objects.requireNonNull(
+ ChatModelCrossLanguageTest.class
+ .getClassLoader()
+ .getResource("ollama_pull_model.sh"))
+ .getPath();
+ ProcessBuilder builder = new ProcessBuilder("bash", path, model);
+ Process process = builder.start();
+ try {
+ process.waitFor(120, TimeUnit.SECONDS);
+ return process.exitValue() == 0;
+ } catch (Exception e) {
+ LOG.warn("Pull {} failed, will skip test", model);
+ }
+ return false;
+ }
+}
diff --git a/python/flink_agents/runtime/python_java_utils.py
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/resources/ollama_pull_model.sh
old mode 100644
new mode 100755
similarity index 54%
copy from python/flink_agents/runtime/python_java_utils.py
copy to
e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/resources/ollama_pull_model.sh
index 586e6cb..7277e12
--- a/python/flink_agents/runtime/python_java_utils.py
+++
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/resources/ollama_pull_model.sh
@@ -1,3 +1,4 @@
+#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@@ -14,29 +15,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-#################################################################################
-from typing import Any
-
-import cloudpickle
-
-from flink_agents.api.events.event import InputEvent
-
-
-def convert_to_python_object(bytesObject: bytes) -> Any:
- """Used for deserializing Python objects."""
- return cloudpickle.loads(bytesObject)
-
-
-def wrap_to_input_event(bytesObject: bytes) -> tuple[bytes, str]:
- """Wrap data to python input event and serialize.
-
- Returns:
- A tuple of (serialized_event_bytes, event_string_representation)
- """
- event = InputEvent(input=cloudpickle.loads(bytesObject))
- return (cloudpickle.dumps(event), str(event))
-
-
-def get_output_from_output_event(bytesObject: bytes) -> Any:
- """Get output data from OutputEvent and serialize."""
- return cloudpickle.dumps(convert_to_python_object(bytesObject).output)
+################################################################################
+echo "ollama pull $1"
+ollama pull $1
\ No newline at end of file
diff --git a/e2e-test/pom.xml b/e2e-test/pom.xml
index 5d5683d..bf938fe 100644
--- a/e2e-test/pom.xml
+++ b/e2e-test/pom.xml
@@ -31,5 +31,6 @@ under the License.
<modules>
<module>flink-agents-end-to-end-tests-agent-plan-compatibility</module>
<module>flink-agents-end-to-end-tests-integration</module>
+ <module>flink-agents-end-to-end-tests-resource-cross-language</module>
</modules>
</project>
\ No newline at end of file
diff --git a/e2e-test/test-scripts/test_resource_cross_language.sh
b/e2e-test/test-scripts/test_resource_cross_language.sh
new file mode 100755
index 0000000..8fb7fc2
--- /dev/null
+++ b/e2e-test/test-scripts/test_resource_cross_language.sh
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+root_dir=$(pwd)
+
+# If we're running from the python subdirectory, adjust the root_dir
+if [[ "$(basename "$root_dir")" == "python" ]]; then
+ root_dir=$(dirname "$root_dir")
+fi
+
+echo "Root directory: $root_dir"
+
+# Run all tests in the resource-cross-language module using Maven
+cd "$root_dir/e2e-test/flink-agents-end-to-end-tests-resource-cross-language"
+
+echo "Running all tests in resource-cross-language module..."
+mvn -T16 --batch-mode --no-transfer-progress test
+
+ret=$?
+if [ "$ret" != "0" ]; then
+ echo "Resource cross-language tests failed with exit code $ret"
+ exit $ret
+fi
+
+echo "All resource cross-language tests passed successfully!"
diff --git a/examples/pom.xml b/examples/pom.xml
index 1b8d543..1e1b229 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -34,6 +34,8 @@ under the License.
<artifactId>flink-agents-api</artifactId>
<version>${project.version}</version>
</dependency>
+
+ <!-- Dependencies required for running agents in minicluster -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-agents-runtime</artifactId>
diff --git a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
index 7cb8138..cc8b8a7 100644
--- a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
+++ b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
@@ -30,12 +30,15 @@ import org.apache.flink.agents.api.resource.Resource;
import org.apache.flink.agents.api.resource.ResourceDescriptor;
import org.apache.flink.agents.api.resource.ResourceType;
import org.apache.flink.agents.api.resource.SerializableResource;
+import org.apache.flink.agents.api.resource.python.PythonResourceAdapter;
+import org.apache.flink.agents.api.resource.python.PythonResourceWrapper;
import org.apache.flink.agents.api.tools.ToolMetadata;
import org.apache.flink.agents.plan.actions.Action;
import org.apache.flink.agents.plan.actions.ChatModelAction;
import org.apache.flink.agents.plan.actions.ToolCallAction;
import org.apache.flink.agents.plan.resourceprovider.JavaResourceProvider;
import
org.apache.flink.agents.plan.resourceprovider.JavaSerializableResourceProvider;
+import org.apache.flink.agents.plan.resourceprovider.PythonResourceProvider;
import org.apache.flink.agents.plan.resourceprovider.ResourceProvider;
import org.apache.flink.agents.plan.serializer.AgentPlanJsonDeserializer;
import org.apache.flink.agents.plan.serializer.AgentPlanJsonSerializer;
@@ -76,6 +79,8 @@ public class AgentPlan implements Serializable {
private AgentConfiguration config;
+ private transient PythonResourceAdapter pythonResourceAdapter;
+
/** Cache for instantiated resources. */
private transient Map<ResourceType, Map<String, Resource>> resourceCache;
@@ -128,6 +133,10 @@ public class AgentPlan implements Serializable {
this.config = config;
}
+ public void setPythonResourceAdapter(PythonResourceAdapter adapter) {
+ this.pythonResourceAdapter = adapter;
+ }
+
public Map<String, Action> getActions() {
return actions;
}
@@ -174,6 +183,10 @@ public class AgentPlan implements Serializable {
ResourceProvider provider = resourceProviders.get(type).get(name);
+ if (pythonResourceAdapter != null && provider instanceof
PythonResourceProvider) {
+ ((PythonResourceProvider)
provider).setPythonResourceAdapter(pythonResourceAdapter);
+ }
+
// Create resource using provider
Resource resource =
provider.provide(
@@ -279,8 +292,13 @@ public class AgentPlan implements Serializable {
private void extractResource(ResourceType type, Method method) throws
Exception {
String name = method.getName();
+ ResourceProvider provider;
ResourceDescriptor descriptor = (ResourceDescriptor)
method.invoke(null);
- JavaResourceProvider provider = new JavaResourceProvider(name, type,
descriptor);
+ if
(PythonResourceWrapper.class.isAssignableFrom(Class.forName(descriptor.getClazz())))
{
+ provider = new PythonResourceProvider(name, type, descriptor);
+ } else {
+ provider = new JavaResourceProvider(name, type, descriptor);
+ }
addResourceProvider(provider);
}
@@ -385,9 +403,17 @@ public class AgentPlan implements Serializable {
ResourceType type = entry.getKey();
if (type == ResourceType.CHAT_MODEL || type ==
ResourceType.CHAT_MODEL_CONNECTION) {
for (Map.Entry<String, Object> kv :
entry.getValue().entrySet()) {
- JavaResourceProvider provider =
- new JavaResourceProvider(
- kv.getKey(), type, (ResourceDescriptor)
kv.getValue());
+ ResourceProvider provider;
+ if (PythonResourceWrapper.class.isAssignableFrom(
+ Class.forName(((ResourceDescriptor)
kv.getValue()).getClazz()))) {
+ provider =
+ new PythonResourceProvider(
+ kv.getKey(), type,
(ResourceDescriptor) kv.getValue());
+ } else {
+ provider =
+ new JavaResourceProvider(
+ kv.getKey(), type,
(ResourceDescriptor) kv.getValue());
+ }
addResourceProvider(provider);
}
} else if (type == ResourceType.PROMPT) {
diff --git
a/plan/src/main/java/org/apache/flink/agents/plan/resourceprovider/PythonResourceProvider.java
b/plan/src/main/java/org/apache/flink/agents/plan/resourceprovider/PythonResourceProvider.java
index 329a2db..ce621a4 100644
---
a/plan/src/main/java/org/apache/flink/agents/plan/resourceprovider/PythonResourceProvider.java
+++
b/plan/src/main/java/org/apache/flink/agents/plan/resourceprovider/PythonResourceProvider.java
@@ -19,12 +19,19 @@
package org.apache.flink.agents.plan.resourceprovider;
import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.resource.python.PythonResourceAdapter;
+import pemja.core.object.PyObject;
+import java.lang.reflect.Constructor;
+import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;
+import static org.apache.flink.util.Preconditions.checkState;
+
/**
* Python Resource provider that carries resource metadata to create Resource
objects at runtime.
*
@@ -35,6 +42,9 @@ public class PythonResourceProvider extends ResourceProvider {
private final String module;
private final String clazz;
private final Map<String, Object> kwargs;
+ private final ResourceDescriptor descriptor;
+
+ protected PythonResourceAdapter pythonResourceAdapter;
public PythonResourceProvider(
String name,
@@ -46,6 +56,25 @@ public class PythonResourceProvider extends ResourceProvider
{
this.module = module;
this.clazz = clazz;
this.kwargs = kwargs;
+ this.descriptor = null;
+ }
+
+ public PythonResourceProvider(String name, ResourceType type,
ResourceDescriptor descriptor) {
+ super(name, type);
+ this.kwargs = new HashMap<>(descriptor.getInitialArguments());
+ module = (String) kwargs.remove("module");
+ if (module == null || module.isEmpty()) {
+ throw new IllegalArgumentException("module should not be null or
empty.");
+ }
+ clazz = (String) kwargs.remove("clazz");
+ if (clazz == null || clazz.isEmpty()) {
+ throw new IllegalArgumentException("clazz should not be null or
empty.");
+ }
+ this.descriptor = descriptor;
+ }
+
+ public void setPythonResourceAdapter(PythonResourceAdapter
pythonResourceAdapter) {
+ this.pythonResourceAdapter = pythonResourceAdapter;
}
public String getModule() {
@@ -63,11 +92,18 @@ public class PythonResourceProvider extends
ResourceProvider {
@Override
public Resource provide(BiFunction<String, ResourceType, Resource>
getResource)
throws Exception {
- // TODO: Implement Python resource creation logic
- // This would typically involve calling into Python runtime to create
the
- // resource
- throw new UnsupportedOperationException(
- "Python resource creation not yet implemented in Java
runtime");
+ checkState(pythonResourceAdapter != null, "PythonResourceAdapter is
not set");
+ Class<?> clazz = Class.forName(descriptor.getClazz());
+ PyObject pyResource =
+ pythonResourceAdapter.initPythonResource(this.module,
this.clazz, kwargs);
+ Constructor<?> constructor =
+ clazz.getConstructor(
+ PythonResourceAdapter.class,
+ PyObject.class,
+ ResourceDescriptor.class,
+ BiFunction.class);
+ return (Resource)
+ constructor.newInstance(pythonResourceAdapter, pyResource,
descriptor, getResource);
}
@Override
@@ -92,4 +128,8 @@ public class PythonResourceProvider extends ResourceProvider
{
public int hashCode() {
return Objects.hash(this.getName(), this.getType(), module, clazz,
kwargs);
}
+
+ public ResourceDescriptor getDescriptor() {
+ return descriptor;
+ }
}
diff --git
a/plan/src/main/java/org/apache/flink/agents/plan/serializer/ResourceProviderJsonDeserializer.java
b/plan/src/main/java/org/apache/flink/agents/plan/serializer/ResourceProviderJsonDeserializer.java
index 6dbd870..df44e27 100644
---
a/plan/src/main/java/org/apache/flink/agents/plan/serializer/ResourceProviderJsonDeserializer.java
+++
b/plan/src/main/java/org/apache/flink/agents/plan/serializer/ResourceProviderJsonDeserializer.java
@@ -74,6 +74,15 @@ public class ResourceProviderJsonDeserializer extends
StdDeserializer<ResourcePr
private PythonResourceProvider deserializePythonResourceProvider(JsonNode
node) {
String name = node.get("name").asText();
String type = node.get("type").asText();
+ try {
+ if (node.has("descriptor")) {
+ ResourceDescriptor descriptor =
+ mapper.treeToValue(node.get("descriptor"),
ResourceDescriptor.class);
+ return new PythonResourceProvider(name,
ResourceType.fromValue(type), descriptor);
+ }
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
String module = node.get("module").asText();
String clazz = node.get("clazz").asText();
diff --git
a/plan/src/main/java/org/apache/flink/agents/plan/serializer/ResourceProviderJsonSerializer.java
b/plan/src/main/java/org/apache/flink/agents/plan/serializer/ResourceProviderJsonSerializer.java
index f11ff05..4b61c83 100644
---
a/plan/src/main/java/org/apache/flink/agents/plan/serializer/ResourceProviderJsonSerializer.java
+++
b/plan/src/main/java/org/apache/flink/agents/plan/serializer/ResourceProviderJsonSerializer.java
@@ -71,6 +71,10 @@ public class ResourceProviderJsonSerializer extends
StdSerializer<ResourceProvid
gen.writeStringField("module", provider.getModule());
gen.writeStringField("clazz", provider.getClazz());
+ if (provider.getDescriptor() != null) {
+ gen.writeObjectField("descriptor", provider.getDescriptor());
+ }
+
gen.writeFieldName("kwargs");
gen.writeStartObject();
provider.getKwargs()
diff --git a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java
b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java
index e61dbb9..7d532a2 100644
--- a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java
+++ b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java
@@ -24,20 +24,28 @@ import org.apache.flink.agents.api.InputEvent;
import org.apache.flink.agents.api.OutputEvent;
import org.apache.flink.agents.api.annotation.ChatModelSetup;
import org.apache.flink.agents.api.annotation.Tool;
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
import org.apache.flink.agents.api.context.RunnerContext;
import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
import org.apache.flink.agents.api.resource.ResourceType;
import org.apache.flink.agents.api.resource.SerializableResource;
+import org.apache.flink.agents.api.resource.python.PythonResourceAdapter;
+import org.apache.flink.agents.api.resource.python.PythonResourceWrapper;
import org.apache.flink.agents.plan.actions.Action;
import
org.apache.flink.agents.plan.resourceprovider.JavaSerializableResourceProvider;
+import org.apache.flink.agents.plan.resourceprovider.PythonResourceProvider;
import org.apache.flink.agents.plan.resourceprovider.ResourceProvider;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import pemja.core.object.PyObject;
import java.util.List;
import java.util.Map;
+import java.util.function.BiFunction;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link AgentPlan} constructor that takes an Agent. */
public class AgentPlanTest {
@@ -91,6 +99,27 @@ public class AgentPlanTest {
}
}
+ public static class TestPythonResource extends Resource implements
PythonResourceWrapper {
+
+ public TestPythonResource(
+ PythonResourceAdapter adapter,
+ PyObject chatModel,
+ ResourceDescriptor descriptor,
+ BiFunction<String, ResourceType, Resource> getResource) {
+ super(descriptor, getResource);
+ }
+
+ @Override
+ public ResourceType getResourceType() {
+ return ResourceType.CHAT_MODEL;
+ }
+
+ @Override
+ public Object getPythonResource() {
+ return null;
+ }
+ }
+
/** Test agent class with annotated methods. */
public static class TestAgent extends Agent {
@@ -120,6 +149,14 @@ public class AgentPlanTest {
private TestSerializableChatModel chatModel =
new TestSerializableChatModel("defaultChatModel");
+ @ChatModelSetup
+ public static ResourceDescriptor pythonChatModel() {
+ return
ResourceDescriptor.Builder.newBuilder(TestPythonResource.class.getName())
+ .addInitialArgument("module", "test.module")
+ .addInitialArgument("clazz", "TestClazz")
+ .build();
+ }
+
@Tool private TestTool anotherTool = new TestTool("anotherTool");
@org.apache.flink.agents.api.annotation.Action(listenEvents =
{InputEvent.class})
@@ -128,6 +165,54 @@ public class AgentPlanTest {
}
}
+ /** Test agent class with illegal python resource. */
+ public static class TestAgentWithIllegalPythonResource extends Agent {
+ @ChatModelSetup
+ public static ResourceDescriptor reviewAnalysisModel() {
+ return
ResourceDescriptor.Builder.newBuilder(TestPythonResource.class.getName())
+ .build();
+ }
+ }
+
+ public static class TestPythonResourceAdapter implements
PythonResourceAdapter {
+
+ @Override
+ public Object getResource(String resourceName, String resourceType) {
+ return null;
+ }
+
+ @Override
+ public PyObject initPythonResource(
+ String module, String clazz, Map<String, Object> kwargs) {
+ return null;
+ }
+
+ @Override
+ public Object toPythonChatMessage(ChatMessage message) {
+ return null;
+ }
+
+ @Override
+ public ChatMessage fromPythonChatMessage(Object pythonChatMessage) {
+ return null;
+ }
+
+ @Override
+ public Object
convertToPythonTool(org.apache.flink.agents.api.tools.Tool tool) {
+ return null;
+ }
+
+ @Override
+ public Object callMethod(Object obj, String methodName, Map<String,
Object> kwargs) {
+ return null;
+ }
+
+ @Override
+ public Object invoke(String name, Object... args) {
+ return null;
+ }
+ }
+
@Test
public void testConstructorWithAgent() throws Exception {
// Create an agent instance
@@ -303,8 +388,9 @@ public class AgentPlanTest {
Map<String, ResourceProvider> chatModelProviders =
resourceProviders.get(ResourceType.CHAT_MODEL);
assertThat(chatModelProviders).isNotNull();
- assertThat(chatModelProviders).hasSize(1); // defaultChatModel (field
name used as default)
+ assertThat(chatModelProviders).hasSize(2); // defaultChatModel (field
name used as default)
assertThat(chatModelProviders).containsKey("chatModel");
+ assertThat(chatModelProviders).containsKey("pythonChatModel");
// Verify that chat model provider is JavaSerializableResourceProvider
// (serializable)
@@ -319,6 +405,19 @@ public class AgentPlanTest {
assertThat(serializableProvider.getModule())
.isEqualTo(TestAgentWithResources.class.getPackage().getName());
assertThat(serializableProvider.getClazz()).contains("TestSerializableChatModel");
+
+ // Verify that python chat model provider is PythonResourceProvider
+ // (serializable)
+ ResourceProvider pythonChatModelProvider =
chatModelProviders.get("pythonChatModel");
+
assertThat(pythonChatModelProvider).isInstanceOf(PythonResourceProvider.class);
+
assertThat(pythonChatModelProvider.getName()).isEqualTo("pythonChatModel");
+
assertThat(pythonChatModelProvider.getType()).isEqualTo(ResourceType.CHAT_MODEL);
+
+ // Test PythonResourceProvider specific methods
+ PythonResourceProvider pythonResourceProvider =
+ (PythonResourceProvider) pythonChatModelProvider;
+ assertThat(pythonResourceProvider.getClazz()).isEqualTo("TestClazz");
+
assertThat(pythonResourceProvider.getModule()).isEqualTo("test.module");
}
@Test
@@ -339,8 +438,30 @@ public class AgentPlanTest {
assertThat(chatModel).isInstanceOf(TestSerializableChatModel.class);
assertThat(chatModel.getResourceType()).isEqualTo(ResourceType.CHAT_MODEL);
+ assertThatThrownBy(() -> agentPlan.getResource("pythonChatModel",
ResourceType.CHAT_MODEL))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("PythonResourceAdapter is not set");
+
+ agentPlan.setPythonResourceAdapter(new TestPythonResourceAdapter());
+ Resource pythonChatModel =
+ agentPlan.getResource("pythonChatModel",
ResourceType.CHAT_MODEL);
+ assertThat(pythonChatModel).isNotNull();
+ assertThat(pythonChatModel).isInstanceOf(TestPythonResource.class);
+
assertThat(pythonChatModel.getResourceType()).isEqualTo(ResourceType.CHAT_MODEL);
+
// Test that resources are cached (should be the same instance)
Resource myToolAgain = agentPlan.getResource("myTool",
ResourceType.TOOL);
assertThat(myTool).isSameAs(myToolAgain);
}
+
+ @Test
+ public void testExtractIllegalResourceProviderFromAgent() throws Exception
{
+ // Create an agent with resource annotations
+ TestAgentWithIllegalPythonResource agent = new
TestAgentWithIllegalPythonResource();
+
+ // Expect IllegalArgumentException when creating AgentPlan with
illegal resource
+ assertThatThrownBy(() -> new AgentPlan(agent))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("module should not be null or empty");
+ }
}
diff --git a/python/flink_agents/api/tools/utils.py
b/python/flink_agents/api/tools/utils.py
index 81784d4..976fcde 100644
--- a/python/flink_agents/api/tools/utils.py
+++ b/python/flink_agents/api/tools/utils.py
@@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################
+import json
import typing
from inspect import signature
from typing import Any, Callable, Dict, Optional, Type, Union
@@ -178,6 +179,19 @@ def create_model_from_schema(name: str, schema: dict) ->
type[BaseModel]:
return create_model(name, **main_fields, __doc__=schema.get("description",
""))
+def create_model_from_java_tool_schema_str(name: str, schema_str: str) ->
type[BaseModel]:
+ """Create Pydantic model from a java tool input schema."""
+ json_schema = json.loads(schema_str)
+ properties = json_schema["properties"]
+
+ fields = {}
+ for param_name in properties:
+ description = properties[param_name]["description"]
+ if description is None:
+ description = f"Parameter: {param_name}"
+ type = TYPE_MAPPING.get(properties[param_name]["type"])
+ fields[param_name] = (type, FieldInfo(description=description))
+ return create_model(name, **fields)
def extract_mcp_content_item(content_item: Any) -> Dict[str, Any] | str:
"""Extract and normalize a single MCP content item.
diff --git a/python/flink_agents/runtime/python_java_utils.py
b/python/flink_agents/runtime/java/__init__.py
similarity index 55%
copy from python/flink_agents/runtime/python_java_utils.py
copy to python/flink_agents/runtime/java/__init__.py
index 586e6cb..e154fad 100644
--- a/python/flink_agents/runtime/python_java_utils.py
+++ b/python/flink_agents/runtime/java/__init__.py
@@ -15,28 +15,3 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################
-from typing import Any
-
-import cloudpickle
-
-from flink_agents.api.events.event import InputEvent
-
-
-def convert_to_python_object(bytesObject: bytes) -> Any:
- """Used for deserializing Python objects."""
- return cloudpickle.loads(bytesObject)
-
-
-def wrap_to_input_event(bytesObject: bytes) -> tuple[bytes, str]:
- """Wrap data to python input event and serialize.
-
- Returns:
- A tuple of (serialized_event_bytes, event_string_representation)
- """
- event = InputEvent(input=cloudpickle.loads(bytesObject))
- return (cloudpickle.dumps(event), str(event))
-
-
-def get_output_from_output_event(bytesObject: bytes) -> Any:
- """Get output data from OutputEvent and serialize."""
- return cloudpickle.dumps(convert_to_python_object(bytesObject).output)
diff --git a/python/flink_agents/runtime/java/java_resource_wrapper.py
b/python/flink_agents/runtime/java/java_resource_wrapper.py
new file mode 100644
index 0000000..496aab3
--- /dev/null
+++ b/python/flink_agents/runtime/java/java_resource_wrapper.py
@@ -0,0 +1,75 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+from typing import Any, List
+
+from pemja import findClass
+from pydantic import Field
+from typing_extensions import override
+
+from flink_agents.api.chat_message import ChatMessage, MessageRole
+from flink_agents.api.prompts.prompt import Prompt
+from flink_agents.api.resource import Resource, ResourceType
+from flink_agents.api.tools.tool import Tool, ToolType
+
+
+class JavaTool(Tool):
+ """Java Tool that carries tool metadata and can be recognized by
PythonChatModel."""
+
+ @classmethod
+ @override
+ def tool_type(cls) -> ToolType:
+ """Get the tool type."""
+ return ToolType.REMOTE_FUNCTION
+
+ @override
+ def call(self, *args: Any, **kwargs: Any) -> Any:
+ err_msg = "Java tool is defined in Java and needs to be executed
through the Java runtime."
+ raise NotImplementedError(err_msg)
+
+class JavaPrompt(Prompt):
+ """Python wrapper for Java's Prompt."""
+
+ j_prompt: Any= Field(exclude=True)
+
+ @override
+ def format_string(self, **kwargs: str) -> str:
+ return self.j_prompt.formatString(kwargs)
+
+ @override
+ def format_messages(
+ self, role: MessageRole = MessageRole.SYSTEM, **kwargs: str
+ ) -> List[ChatMessage]:
+ j_MessageRole =
findClass("org.apache.flink.agents.api.chat.messages.MessageRole")
+ j_chat_messages =
self.j_prompt.formatMessages(j_MessageRole.fromValue(role.value), kwargs)
+ chatMessages =
[ChatMessage(role=MessageRole(j_chat_message.getRole().getValue()),
+
content=j_chat_message.getContent(),
+ tool_calls=
j_chat_message.getToolCalls(),
+
extra_args=j_chat_message.getExtraArgs()) for j_chat_message in j_chat_messages]
+ return chatMessages
+
+class JavaGetResourceWrapper:
+ """Python wrapper for Java ResourceAdapter."""
+
+ def __init__(self, j_resource_adapter: Any) -> None:
+ """Initialize with a Java ResourceAdapter."""
+ self._j_resource_adapter = j_resource_adapter
+
+
+ def get_resource(self, name: str, type: ResourceType) -> Resource:
+ """Get a resource by name and type."""
+ return self._j_resource_adapter.getResource(name, type.value)
diff --git a/python/flink_agents/runtime/python_java_utils.py
b/python/flink_agents/runtime/python_java_utils.py
index 586e6cb..3c38985 100644
--- a/python/flink_agents/runtime/python_java_utils.py
+++ b/python/flink_agents/runtime/python_java_utils.py
@@ -15,11 +15,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################
-from typing import Any
+import importlib
+from typing import Any, Callable, Dict
import cloudpickle
+from pemja import findClass
+from flink_agents.api.chat_message import ChatMessage, MessageRole
from flink_agents.api.events.event import InputEvent
+from flink_agents.api.resource import Resource
+from flink_agents.api.tools.tool import ToolMetadata
+from flink_agents.api.tools.utils import create_model_from_java_tool_schema_str
+from flink_agents.runtime.java.java_resource_wrapper import (
+ JavaGetResourceWrapper,
+ JavaPrompt,
+ JavaTool,
+)
def convert_to_python_object(bytesObject: bytes) -> Any:
@@ -40,3 +51,122 @@ def wrap_to_input_event(bytesObject: bytes) -> tuple[bytes,
str]:
def get_output_from_output_event(bytesObject: bytes) -> Any:
"""Get output data from OutputEvent and serialize."""
return cloudpickle.dumps(convert_to_python_object(bytesObject).output)
+
+def create_resource(resource_module: str, resource_clazz: str, func_kwargs:
Dict[str, Any]) -> Resource:
+ """Dynamically create a resource instance from module and class name.
+
+ Args:
+ resource_module: The module path containing the resource class
+ resource_clazz: The class name to instantiate
+ func_kwargs: Keyword arguments to pass to the class constructor
+
+ Returns:
+ Resource: An instance of the specified resource class
+ """
+ module = importlib.import_module(resource_module)
+ cls = getattr(module, resource_clazz)
+ return cls(**func_kwargs)
+
+def get_resource_function(j_resource_adapter: Any) -> Callable:
+ """Create a callable wrapper for Java resource adapter.
+
+ Args:
+ j_resource_adapter: Java resource adapter object
+
+ Returns:
+ Callable: A Python callable that wraps the Java resource adapter
+ """
+ return JavaGetResourceWrapper(j_resource_adapter).get_resource
+
+def from_java_tool(j_tool: Any) -> JavaTool:
+ """Convert a Java tool object to a Python JavaTool instance.
+
+ Args:
+ j_tool: Java tool object
+
+ Returns:
+ JavaTool: Python wrapper for the Java tool with extracted metadata
+ """
+ name = j_tool.getName()
+ metadata = ToolMetadata(
+ name=name,
+ description=j_tool.getDescription(),
+ args_schema=create_model_from_java_tool_schema_str(name,
j_tool.getMetadata().getInputSchema()),
+ )
+ return JavaTool(metadata=metadata)
+
+def from_java_prompt(j_prompt: Any) -> JavaPrompt:
+ """Convert a Java prompt object to a Python JavaPrompt instance.
+
+ Args:
+ j_prompt: Java prompt object to be wrapped
+
+ Returns:
+ JavaPrompt: Python wrapper for the Java prompt
+ """
+ return JavaPrompt(j_prompt=j_prompt)
+
+def normalize_tool_call_id(tool_call: Dict[str, Any]) -> Dict[str, Any]:
+ """Normalize tool call by converting the ID field to string format while
preserving
+ all other fields.
+
+ This function ensures that the tool call ID is consistently represented as
a string,
+ which is required for compatibility with certain systems that expect
string IDs.
+
+ Args:
+ tool_call: Dictionary containing tool call information. The dictionary
may
+ contain any number of fields, but typically includes:
+ - id: Tool call identifier (will be converted to string)
+ - type: Tool call type (preserved as-is)
+ - function: Function details (preserved as-is)
+ - Any other fields (preserved as-is)
+ """
+ normalized_call = tool_call.copy()
+
+ normalized_call["id"] = str(tool_call.get("id", ""))
+
+ return normalized_call
+
+def from_java_chat_message(j_chat_message: Any) -> ChatMessage:
+ """Convert a chat message to a python chat message."""
+ return ChatMessage(role=MessageRole(j_chat_message.getRole().getValue()),
+ content=j_chat_message.getContent(),
+ tool_calls=[normalize_tool_call_id(tool_call) for
tool_call in j_chat_message.getToolCalls()],
+ extra_args=j_chat_message.getExtraArgs())
+
+
+def to_java_chat_message(chat_message: ChatMessage) -> Any:
+ """Convert a chat message to a java chat message."""
+ j_ChatMessage =
findClass("org.apache.flink.agents.api.chat.messages.ChatMessage")
+ j_chat_message = j_ChatMessage()
+
+ j_MessageRole =
findClass("org.apache.flink.agents.api.chat.messages.MessageRole")
+ j_chat_message.setRole(j_MessageRole.fromValue(chat_message.role.value))
+ j_chat_message.setContent(chat_message.content)
+ j_chat_message.setExtraArgs(chat_message.extra_args)
+ if chat_message.tool_calls:
+ tool_calls = [normalize_tool_call_id(tool_call) for tool_call in
chat_message.tool_calls]
+ j_chat_message.setToolCalls(tool_calls)
+
+ return j_chat_message
+
+def call_method(obj: Any, method_name: str, kwargs: Dict[str, Any]) -> Any:
+ """Calls a method on `obj` by name and passes in positional and keyword
arguments.
+
+ Parameters:
+ obj: Any Python object
+ method_name: A string representing the name of the method to call
+ kwargs: Keyword arguments to pass to the method
+
+ Returns:
+ The return value of the method
+
+ Raises:
+ AttributeError: If the object does not have the specified method
+ """
+ if not hasattr(obj, method_name):
+ err_msg = f"Object {obj} has no attribute '{method_name}'"
+ raise AttributeError(err_msg)
+
+ method = getattr(obj, method_name)
+ return method(**kwargs)
diff --git a/python/pyproject.toml b/python/pyproject.toml
index bcf577b..ee282d6 100644
--- a/python/pyproject.toml
+++ b/python/pyproject.toml
@@ -48,6 +48,7 @@ dependencies = [
"pyyaml==6.0.2",
"mcp>=1.8.0",
"setuptools>=75.3",
+ "find_libpython",
#TODO: Seperate integration dependencies from project
"ollama==0.4.8",
"dashscope~=1.24.2",
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index 2a54563..7712917 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -27,14 +27,17 @@ import org.apache.flink.agents.api.logger.EventLogger;
import org.apache.flink.agents.api.logger.EventLoggerConfig;
import org.apache.flink.agents.api.logger.EventLoggerFactory;
import org.apache.flink.agents.api.logger.EventLoggerOpenParams;
+import org.apache.flink.agents.api.resource.ResourceType;
import org.apache.flink.agents.plan.AgentPlan;
import org.apache.flink.agents.plan.JavaFunction;
import org.apache.flink.agents.plan.PythonFunction;
import org.apache.flink.agents.plan.actions.Action;
+import org.apache.flink.agents.plan.resourceprovider.PythonResourceProvider;
import org.apache.flink.agents.runtime.actionstate.ActionState;
import org.apache.flink.agents.runtime.actionstate.ActionStateStore;
import org.apache.flink.agents.runtime.actionstate.KafkaActionStateStore;
import org.apache.flink.agents.runtime.context.RunnerContextImpl;
+import org.apache.flink.agents.runtime.env.EmbeddedPythonEnvironment;
import org.apache.flink.agents.runtime.env.PythonEnvironmentManager;
import org.apache.flink.agents.runtime.memory.CachedMemoryStore;
import org.apache.flink.agents.runtime.memory.MemoryObjectImpl;
@@ -45,6 +48,7 @@ import
org.apache.flink.agents.runtime.python.context.PythonRunnerContextImpl;
import org.apache.flink.agents.runtime.python.event.PythonEvent;
import org.apache.flink.agents.runtime.python.operator.PythonActionTask;
import org.apache.flink.agents.runtime.python.utils.PythonActionExecutor;
+import org.apache.flink.agents.runtime.python.utils.PythonResourceAdapterImpl;
import org.apache.flink.agents.runtime.utils.EventUtil;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.MailboxExecutor;
@@ -75,6 +79,7 @@ import org.apache.flink.types.Row;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import pemja.core.PythonInterpreter;
import java.lang.reflect.Field;
import java.util.ArrayList;
@@ -119,9 +124,16 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
private transient MapState<String, MemoryObjectImpl.MemoryItem>
shortTermMemState;
+ private transient PythonEnvironmentManager pythonEnvironmentManager;
+
+ private transient PythonInterpreter pythonInterpreter;
+
// PythonActionExecutor for Python actions
private transient PythonActionExecutor pythonActionExecutor;
+ // PythonResourceAdapter for Python resources in Java actions
+ private transient PythonResourceAdapterImpl pythonResourceAdapter;
+
private transient FlinkAgentsMetricGroupImpl metricGroup;
private transient BuiltInMetrics builtInMetrics;
@@ -251,8 +263,8 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
new ListStateDescriptor<>(
"currentProcessingKeys",
TypeInformation.of(Object.class)));
- // init PythonActionExecutor
- initPythonActionExecutor();
+ // init PythonActionExecutor and PythonResourceAdapter
+ initPythonEnvironment();
mailboxProcessor = getMailboxProcessor();
@@ -496,17 +508,29 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
}
}
- private void initPythonActionExecutor() throws Exception {
+ private void initPythonEnvironment() throws Exception {
boolean containPythonAction =
agentPlan.getActions().values().stream()
.anyMatch(action -> action.getExec() instanceof
PythonFunction);
- if (containPythonAction) {
- LOG.debug("Begin initialize PythonActionExecutor.");
+
+ boolean containPythonResource =
+ agentPlan.getResourceProviders().values().stream()
+ .anyMatch(
+ resourceProviderMap ->
+ resourceProviderMap.values().stream()
+ .anyMatch(
+ resourceProvider ->
+
resourceProvider
+
instanceof
+
PythonResourceProvider));
+
+ if (containPythonAction || containPythonResource) {
+ LOG.debug("Begin initialize PythonEnvironmentManager.");
PythonDependencyInfo dependencyInfo =
PythonDependencyInfo.create(
getExecutionConfig().toConfiguration(),
getRuntimeContext().getDistributedCache());
- PythonEnvironmentManager pythonEnvironmentManager =
+ pythonEnvironmentManager =
new PythonEnvironmentManager(
dependencyInfo,
getContainingTask()
@@ -515,14 +539,39 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
.getTmpDirectories(),
new HashMap<>(System.getenv()),
getRuntimeContext().getJobInfo().getJobId());
- pythonActionExecutor =
- new PythonActionExecutor(
- pythonEnvironmentManager,
- new ObjectMapper().writeValueAsString(agentPlan));
- pythonActionExecutor.open();
+ pythonEnvironmentManager.open();
+ EmbeddedPythonEnvironment env =
pythonEnvironmentManager.createEnvironment();
+ pythonInterpreter = env.getInterpreter();
+ if (containPythonAction) {
+ initPythonActionExecutor();
+ } else {
+ initPythonResourceAdapter();
+ }
}
}
+ private void initPythonActionExecutor() throws Exception {
+ pythonActionExecutor =
+ new PythonActionExecutor(
+ pythonInterpreter, new
ObjectMapper().writeValueAsString(agentPlan));
+ pythonActionExecutor.open();
+ }
+
+ private void initPythonResourceAdapter() throws Exception {
+ pythonResourceAdapter =
+ new PythonResourceAdapterImpl(
+ (String anotherName, ResourceType anotherType) -> {
+ try {
+ return agentPlan.getResource(anotherName,
anotherType);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ },
+ pythonInterpreter);
+ pythonResourceAdapter.open();
+ agentPlan.setPythonResourceAdapter(pythonResourceAdapter);
+ }
+
@Override
public void endInput() throws Exception {
waitInFlightEventsFinished();
@@ -540,6 +589,12 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
if (pythonActionExecutor != null) {
pythonActionExecutor.close();
}
+ if (pythonInterpreter != null) {
+ pythonInterpreter.close();
+ }
+ if (pythonEnvironmentManager != null) {
+ pythonEnvironmentManager.close();
+ }
if (eventLogger != null) {
eventLogger.close();
}
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
index ded0140..9b8c7c4 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
@@ -19,8 +19,6 @@ package org.apache.flink.agents.runtime.python.utils;
import org.apache.flink.agents.plan.PythonFunction;
import org.apache.flink.agents.runtime.context.RunnerContextImpl;
-import org.apache.flink.agents.runtime.env.EmbeddedPythonEnvironment;
-import org.apache.flink.agents.runtime.env.PythonEnvironmentManager;
import org.apache.flink.agents.runtime.python.event.PythonEvent;
import org.apache.flink.agents.runtime.utils.EventUtil;
import pemja.core.PythonInterpreter;
@@ -59,21 +57,16 @@ public class PythonActionExecutor {
private static final String GET_OUTPUT_FROM_OUTPUT_EVENT =
"python_java_utils.get_output_from_output_event";
- private final PythonEnvironmentManager environmentManager;
+ private final PythonInterpreter interpreter;
private final String agentPlanJson;
- private PythonInterpreter interpreter;
private Object pythonAsyncThreadPool;
- public PythonActionExecutor(PythonEnvironmentManager environmentManager,
String agentPlanJson) {
- this.environmentManager = environmentManager;
+ public PythonActionExecutor(PythonInterpreter interpreter, String
agentPlanJson) {
+ this.interpreter = interpreter;
this.agentPlanJson = agentPlanJson;
}
public void open() throws Exception {
- environmentManager.open();
- EmbeddedPythonEnvironment env = environmentManager.createEnvironment();
-
- interpreter = env.getInterpreter();
interpreter.exec(PYTHON_IMPORTS);
pythonAsyncThreadPool = interpreter.invoke(CREATE_ASYNC_THREAD_POOL);
@@ -160,11 +153,6 @@ public class PythonActionExecutor {
if (pythonAsyncThreadPool != null) {
interpreter.invoke(CLOSE_ASYNC_THREAD_POOL,
pythonAsyncThreadPool);
}
- interpreter.close();
- }
-
- if (environmentManager != null) {
- environmentManager.close();
}
}
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java
new file mode 100644
index 0000000..cd9b935
--- /dev/null
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.python.utils;
+
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.prompt.Prompt;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.resource.python.PythonResourceAdapter;
+import org.apache.flink.agents.api.resource.python.PythonResourceWrapper;
+import org.apache.flink.agents.api.tools.Tool;
+import pemja.core.PythonInterpreter;
+import pemja.core.object.PyObject;
+
+import java.util.Map;
+import java.util.function.BiFunction;
+
+public class PythonResourceAdapterImpl implements PythonResourceAdapter {
+
+ static final String PYTHON_IMPORTS = "from flink_agents.runtime import
python_java_utils";
+
+ static final String GET_RESOURCE_KEY = "get_resource";
+
+ static final String PYTHON_MODULE_PREFIX = "python_java_utils.";
+
+ static final String GET_RESOURCE_FUNCTION = PYTHON_MODULE_PREFIX +
"get_resource_function";
+
+ static final String CALL_METHOD = PYTHON_MODULE_PREFIX + "call_method";
+
+ static final String CREATE_RESOURCE = PYTHON_MODULE_PREFIX +
"create_resource";
+
+ static final String FROM_JAVA_TOOL = PYTHON_MODULE_PREFIX +
"from_java_tool";
+
+ static final String FROM_JAVA_PROMPT = PYTHON_MODULE_PREFIX +
"from_java_prompt";
+
+ static final String FROM_JAVA_CHAT_MESSAGE = PYTHON_MODULE_PREFIX +
"from_java_chat_message";
+
+ static final String TO_JAVA_CHAT_MESSAGE = PYTHON_MODULE_PREFIX +
"to_java_chat_message";
+
+ private final BiFunction<String, ResourceType, Resource> getResource;
+ private final PythonInterpreter interpreter;
+ private Object pythonGetResourceFunction;
+
+ public PythonResourceAdapterImpl(
+ BiFunction<String, ResourceType, Resource> getResource,
PythonInterpreter interpreter) {
+ this.getResource = getResource;
+ this.interpreter = interpreter;
+ }
+
+ public void open() {
+ interpreter.exec(PYTHON_IMPORTS);
+ pythonGetResourceFunction = interpreter.invoke(GET_RESOURCE_FUNCTION,
this);
+ }
+
+ public Object getResource(String resourceName, String resourceType) {
+ Resource resource =
+ this.getResource.apply(resourceName,
ResourceType.fromValue(resourceType));
+ if (resource instanceof PythonResourceWrapper) {
+ PythonResourceWrapper pythonResource = (PythonResourceWrapper)
resource;
+ return pythonResource.getPythonResource();
+ }
+ if (resource instanceof Tool) {
+ return convertToPythonTool((Tool) resource);
+ }
+ if (resource instanceof Prompt) {
+ return convertToPythonPrompt((Prompt) resource);
+ }
+ return resource;
+ }
+
+ @Override
+ public PyObject initPythonResource(String module, String clazz,
Map<String, Object> kwargs) {
+ kwargs.put(GET_RESOURCE_KEY, pythonGetResourceFunction);
+ return (PyObject) interpreter.invoke(CREATE_RESOURCE, module, clazz,
kwargs);
+ }
+
+ @Override
+ public Object toPythonChatMessage(ChatMessage message) {
+ return interpreter.invoke(FROM_JAVA_CHAT_MESSAGE, message);
+ }
+
+ @Override
+ public ChatMessage fromPythonChatMessage(Object pythonChatMessage) {
+ ChatMessage message =
+ (ChatMessage) interpreter.invoke(TO_JAVA_CHAT_MESSAGE,
pythonChatMessage);
+
+ return message;
+ }
+
+ @Override
+ public Object convertToPythonTool(Tool tool) {
+ return interpreter.invoke(FROM_JAVA_TOOL, tool);
+ }
+
+ private Object convertToPythonPrompt(Prompt prompt) {
+ return interpreter.invoke(FROM_JAVA_PROMPT, prompt);
+ }
+
+ @Override
+ public Object callMethod(Object obj, String methodName, Map<String,
Object> kwargs) {
+ return interpreter.invoke(CALL_METHOD, obj, methodName, kwargs);
+ }
+
+ @Override
+ public Object invoke(String name, Object... args) {
+ return interpreter.invoke(name, args);
+ }
+}
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImplTest.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImplTest.java
new file mode 100644
index 0000000..18ee02e
--- /dev/null
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImplTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.python.utils;
+
+import org.apache.flink.agents.api.chat.model.python.PythonChatModelSetup;
+import org.apache.flink.agents.api.prompt.Prompt;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.resource.python.PythonResourceWrapper;
+import org.apache.flink.agents.api.tools.Tool;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import pemja.core.PythonInterpreter;
+import pemja.core.object.PyObject;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.*;
+
+public class PythonResourceAdapterImplTest {
+ @Mock private PythonInterpreter mockInterpreter;
+
+ @Mock private BiFunction<String, ResourceType, Resource> getResource;
+
+ private PythonResourceAdapterImpl pythonResourceAdapter;
+ private AutoCloseable mocks;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ mocks = MockitoAnnotations.openMocks(this);
+ pythonResourceAdapter = new PythonResourceAdapterImpl(getResource,
mockInterpreter);
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ if (mocks != null) {
+ mocks.close();
+ }
+ }
+
+ @Test
+ void testInitPythonResource() {
+ String module = "test_module";
+ String clazz = "TestClass";
+ Map<String, Object> kwargs = new HashMap<>();
+ kwargs.put("param1", "value1");
+ kwargs.put("param2", 42);
+
+ PyObject expectedResult = mock(PyObject.class);
+ when(mockInterpreter.invoke(
+ PythonResourceAdapterImpl.CREATE_RESOURCE, module,
clazz, kwargs))
+ .thenReturn(expectedResult);
+
+ PyObject result = pythonResourceAdapter.initPythonResource(module,
clazz, kwargs);
+
+ assertThat(result).isEqualTo(expectedResult);
+
assertThat(kwargs).containsKey(PythonResourceAdapterImpl.GET_RESOURCE_KEY);
+ verify(mockInterpreter)
+ .invoke(PythonResourceAdapterImpl.CREATE_RESOURCE, module,
clazz, kwargs);
+ }
+
+ @Test
+ void testOpen() {
+
+ pythonResourceAdapter.open();
+
+ verify(mockInterpreter).exec(PythonResourceAdapterImpl.PYTHON_IMPORTS);
+ verify(mockInterpreter)
+ .invoke(PythonResourceAdapterImpl.GET_RESOURCE_FUNCTION,
pythonResourceAdapter);
+ }
+
+ @Test
+ void testGetResourceWithPythonResourceWrapper() {
+ String resourceName = "test_resource";
+ String resourceType = "chat_model";
+ PythonResourceWrapper mockPythonChatModelSetup =
mock(PythonChatModelSetup.class);
+ Object expectedPythonResource = new Object();
+
+ when(getResource.apply(resourceName, ResourceType.CHAT_MODEL))
+ .thenReturn((Resource) mockPythonChatModelSetup);
+
when(mockPythonChatModelSetup.getPythonResource()).thenReturn(expectedPythonResource);
+
+ Object result = pythonResourceAdapter.getResource(resourceName,
resourceType);
+
+ assertThat(result).isEqualTo(expectedPythonResource);
+ verify(getResource).apply(resourceName, ResourceType.CHAT_MODEL);
+ verify(mockPythonChatModelSetup).getPythonResource();
+ }
+
+ @Test
+ void testGetResourceWithTool() {
+ String resourceName = "test_tool";
+ String resourceType = "tool";
+ Tool mockTool = mock(Tool.class);
+ Object expectedPythonTool = new Object();
+
+ when(getResource.apply(resourceName,
ResourceType.TOOL)).thenReturn(mockTool);
+ when(mockInterpreter.invoke(PythonResourceAdapterImpl.FROM_JAVA_TOOL,
mockTool))
+ .thenReturn(expectedPythonTool);
+
+ Object result = pythonResourceAdapter.getResource(resourceName,
resourceType);
+
+ assertThat(result).isEqualTo(expectedPythonTool);
+ verify(getResource).apply(resourceName, ResourceType.TOOL);
+
verify(mockInterpreter).invoke(PythonResourceAdapterImpl.FROM_JAVA_TOOL,
mockTool);
+ }
+
+ @Test
+ void testGetResourceWithPrompt() {
+ String resourceName = "test_prompt";
+ String resourceType = "prompt";
+ Prompt mockPrompt = mock(Prompt.class);
+ Object expectedPythonPrompt = new Object();
+
+ when(getResource.apply(resourceName,
ResourceType.PROMPT)).thenReturn(mockPrompt);
+
when(mockInterpreter.invoke(PythonResourceAdapterImpl.FROM_JAVA_PROMPT,
mockPrompt))
+ .thenReturn(expectedPythonPrompt);
+
+ Object result = pythonResourceAdapter.getResource(resourceName,
resourceType);
+
+ assertThat(result).isEqualTo(expectedPythonPrompt);
+ verify(getResource).apply(resourceName, ResourceType.PROMPT);
+
verify(mockInterpreter).invoke(PythonResourceAdapterImpl.FROM_JAVA_PROMPT,
mockPrompt);
+ }
+
+ @Test
+ void testGetResourceWithRegularResource() {
+ String resourceName = "test_resource";
+ String resourceType = "chat_model";
+ Resource mockResource = mock(Resource.class);
+
+ when(getResource.apply(resourceName,
ResourceType.CHAT_MODEL)).thenReturn(mockResource);
+
+ Object result = pythonResourceAdapter.getResource(resourceName,
resourceType);
+
+ assertThat(result).isEqualTo(mockResource);
+ verify(getResource).apply(resourceName, ResourceType.CHAT_MODEL);
+ }
+
+ @Test
+ void testCallMethod() {
+ // Arrange
+ Object obj = new Object();
+ String methodName = "test_method";
+ Map<String, Object> kwargs = Map.of("param", "value");
+ Object expectedResult = "method_result";
+
+ when(mockInterpreter.invoke(PythonResourceAdapterImpl.CALL_METHOD,
obj, methodName, kwargs))
+ .thenReturn(expectedResult);
+
+ Object result = pythonResourceAdapter.callMethod(obj, methodName,
kwargs);
+
+ assertThat(result).isEqualTo(expectedResult);
+ verify(mockInterpreter)
+ .invoke(PythonResourceAdapterImpl.CALL_METHOD, obj,
methodName, kwargs);
+ }
+
+ @Test
+ void testInvoke() {
+ String name = "test_function";
+ Object[] args = {"arg1", 42, true};
+ Object expectedResult = "invoke_result";
+
+ when(mockInterpreter.invoke(name, args)).thenReturn(expectedResult);
+
+ Object result = pythonResourceAdapter.invoke(name, args);
+
+ assertThat(result).isEqualTo(expectedResult);
+ verify(mockInterpreter).invoke(name, args);
+ }
+}
diff --git a/tools/e2e.sh b/tools/e2e.sh
index 103d024..f244bc0 100755
--- a/tools/e2e.sh
+++ b/tools/e2e.sh
@@ -66,6 +66,18 @@ function run_cross_language_config_test {
cd "$python_dir" && uv run bash
../e2e-test/test-scripts/test_java_config_in_python.sh
}
+function run_resource_cross_language_test {
+ # This test only runs Maven, no need for Python environment
+ # Ensure we're in the project root directory
+ cd "$project_root"
+ uv run bash e2e-test/test-scripts/test_resource_cross_language.sh
+}
+
+export TOTAL=0
+export PASSED=0
+
+run_test "Resource Cross-Language end-to-end test"
"run_resource_cross_language_test"
+
if [[ ! -d "e2e-test/target" ]]; then
echo "Build flink-agents before run e2e tests."
bash tools/build.sh
@@ -94,9 +106,6 @@ uv sync --extra dev
uv pip install -e .
cd ..
-export TOTAL=0
-export PASSED=0
-
# Create temporary directory with better cross-platform compatibility
if command -v mktemp >/dev/null 2>&1; then
tempdir=$(mktemp -d)
diff --git a/tools/ut.sh b/tools/ut.sh
index 18d459e..e172b0c 100755
--- a/tools/ut.sh
+++ b/tools/ut.sh
@@ -98,7 +98,7 @@ java_tests() {
if $run_e2e; then
mvn -T16 --batch-mode --no-transfer-progress test -pl
'e2e-test/flink-agents-end-to-end-tests-integration'
else
- mvn -T16 --batch-mode --no-transfer-progress test -pl
'!e2e-test/flink-agents-end-to-end-tests-integration'
+ mvn -T16 --batch-mode --no-transfer-progress test -pl
'!e2e-test/flink-agents-end-to-end-tests-integration,!e2e-test/flink-agents-end-to-end-tests-resource-cross-language'
fi
testcode=$?
case $testcode in