This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-agents.git
commit c9905019a4e0e08f924aba90a1405d4bf3926d84 Author: youjin <[email protected]> AuthorDate: Mon Jan 12 19:12:24 2026 +0800 [feature] Support Python resource retrieval through the Java resource using the getResource function --- .../resourceprovider/PythonResourceProvider.java | 39 +++++++++++--- .../apache/flink/agents/plan/AgentPlanTest.java | 3 +- .../chat_model_cross_language_agent.py | 17 ++++-- .../runtime/operator/ActionExecutionOperator.java | 21 ++++++-- .../runtime/python/utils/JavaResourceAdapter.java | 61 +++------------------- 5 files changed, 71 insertions(+), 70 deletions(-) diff --git a/plan/src/main/java/org/apache/flink/agents/plan/resourceprovider/PythonResourceProvider.java b/plan/src/main/java/org/apache/flink/agents/plan/resourceprovider/PythonResourceProvider.java index f70833f..9f77676 100644 --- a/plan/src/main/java/org/apache/flink/agents/plan/resourceprovider/PythonResourceProvider.java +++ b/plan/src/main/java/org/apache/flink/agents/plan/resourceprovider/PythonResourceProvider.java @@ -18,6 +18,10 @@ package org.apache.flink.agents.plan.resourceprovider; +import org.apache.flink.agents.api.chat.model.python.PythonChatModelConnection; +import org.apache.flink.agents.api.chat.model.python.PythonChatModelSetup; +import org.apache.flink.agents.api.embedding.model.python.PythonEmbeddingModelConnection; +import org.apache.flink.agents.api.embedding.model.python.PythonEmbeddingModelSetup; import org.apache.flink.agents.api.resource.Resource; import org.apache.flink.agents.api.resource.ResourceDescriptor; import org.apache.flink.agents.api.resource.ResourceType; @@ -26,6 +30,7 @@ import pemja.core.object.PyObject; import java.lang.reflect.Constructor; import java.util.HashMap; +import java.util.Map; import java.util.Objects; import java.util.function.BiFunction; @@ -40,6 +45,13 @@ import static org.apache.flink.util.Preconditions.checkState; public class PythonResourceProvider extends ResourceProvider { private final ResourceDescriptor descriptor; + private static final Map<ResourceType, Class<?>> RESOURCE_TYPE_TO_CLASS = + Map.of( + ResourceType.CHAT_MODEL, PythonChatModelSetup.class, + ResourceType.CHAT_MODEL_CONNECTION, PythonChatModelConnection.class, + ResourceType.EMBEDDING_MODEL, PythonEmbeddingModelSetup.class, + ResourceType.EMBEDDING_MODEL_CONNECTION, PythonEmbeddingModelConnection.class); + protected PythonResourceAdapter pythonResourceAdapter; public PythonResourceProvider(String name, ResourceType type, ResourceDescriptor descriptor) { @@ -59,17 +71,30 @@ public class PythonResourceProvider extends ResourceProvider { public Resource provide(BiFunction<String, ResourceType, Resource> getResource) throws Exception { checkState(pythonResourceAdapter != null, "PythonResourceAdapter is not set"); - Class<?> clazz = Class.forName(descriptor.getClazz()); + + Class<?> clazz = RESOURCE_TYPE_TO_CLASS.get(getType()); + if (clazz == null) { + throw new UnsupportedOperationException( + "Unsupported python resource type: " + getType()); + } HashMap<String, Object> kwargs = new HashMap<>(descriptor.getInitialArguments()); - String pyModule = (String) kwargs.remove("module"); + String pyModule = descriptor.getModule(); + String pyClazz = descriptor.getClazz(); + + // Extract module and class from kwargs if not provided in descriptor if (pyModule == null || pyModule.isEmpty()) { - throw new IllegalArgumentException("module should not be null or empty."); - } - String pyClazz = (String) kwargs.remove("clazz"); - if (pyClazz == null || pyClazz.isEmpty()) { - throw new IllegalArgumentException("clazz should not be null or empty."); + pyModule = (String) kwargs.remove("module"); + if (pyModule == null || pyModule.isEmpty()) { + throw new IllegalArgumentException("module should not be null or empty."); + } + + pyClazz = (String) kwargs.remove("clazz"); + if (pyClazz == null || pyClazz.isEmpty()) { + throw new IllegalArgumentException("clazz should not be null or empty."); + } } + PyObject pyResource = pythonResourceAdapter.initPythonResource(pyModule, pyClazz, kwargs); Constructor<?> constructor = clazz.getConstructor( diff --git a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java index 9179ea2..ae95f40 100644 --- a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java +++ b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java @@ -25,6 +25,7 @@ import org.apache.flink.agents.api.agents.Agent; import org.apache.flink.agents.api.annotation.ChatModelSetup; import org.apache.flink.agents.api.annotation.Tool; import org.apache.flink.agents.api.chat.messages.ChatMessage; +import org.apache.flink.agents.api.chat.model.python.PythonChatModelSetup; import org.apache.flink.agents.api.context.RunnerContext; import org.apache.flink.agents.api.resource.Resource; import org.apache.flink.agents.api.resource.ResourceDescriptor; @@ -440,7 +441,7 @@ public class AgentPlanTest { Resource pythonChatModel = agentPlan.getResource("pythonChatModel", ResourceType.CHAT_MODEL); assertThat(pythonChatModel).isNotNull(); - assertThat(pythonChatModel).isInstanceOf(TestPythonResource.class); + assertThat(pythonChatModel).isInstanceOf(PythonChatModelSetup.class); assertThat(pythonChatModel.getResourceType()).isEqualTo(ResourceType.CHAT_MODEL); // Test that resources are cached (should be the same instance) diff --git a/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/chat_model_cross_language_agent.py b/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/chat_model_cross_language_agent.py index 796ab80..929500f 100644 --- a/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/chat_model_cross_language_agent.py +++ b/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/chat_model_cross_language_agent.py @@ -35,6 +35,9 @@ from flink_agents.api.events.event import InputEvent, OutputEvent from flink_agents.api.prompts.prompt import Prompt from flink_agents.api.resource import ResourceDescriptor from flink_agents.api.runner_context import RunnerContext +from flink_agents.integrations.chat_models.ollama_chat_model import ( + OllamaChatModelConnection, +) class ChatModelCrossLanguageAgent(Agent): @@ -67,7 +70,15 @@ class ChatModelCrossLanguageAgent(Agent): @chat_model_connection @staticmethod - def ollama_connection() -> ResourceDescriptor: + def ollama_connection_python() -> ResourceDescriptor: + """ChatModelConnection responsible for ollama model service connection.""" + return ResourceDescriptor( + clazz=OllamaChatModelConnection, request_timeout=240.0 + ) + + @chat_model_connection + @staticmethod + def ollama_connection_java() -> ResourceDescriptor: """ChatModelConnection responsible for ollama model service connection.""" return ResourceDescriptor( clazz=JavaChatModelConnection, @@ -82,7 +93,7 @@ class ChatModelCrossLanguageAgent(Agent): """ChatModel which focus on math, and reuse ChatModelConnection.""" return ResourceDescriptor( clazz=JavaChatModelSetup, - connection="ollama_connection", + connection="ollama_connection_python", java_clazz="org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelSetup", model=os.environ.get("OLLAMA_CHAT_MODEL", "qwen3:1.7b"), prompt="from_messages_prompt", @@ -96,7 +107,7 @@ class ChatModelCrossLanguageAgent(Agent): """ChatModel which focus on text generate, and reuse ChatModelConnection.""" return ResourceDescriptor( clazz=JavaChatModelSetup, - connection="ollama_connection", + connection="ollama_connection_java", java_clazz="org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelSetup", model=os.environ.get("OLLAMA_CHAT_MODEL", "qwen3:1.7b"), prompt="from_text_prompt", diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java index d44d0b8..6b12a6d 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java @@ -28,6 +28,7 @@ import org.apache.flink.agents.api.logger.EventLogger; import org.apache.flink.agents.api.logger.EventLoggerConfig; import org.apache.flink.agents.api.logger.EventLoggerFactory; import org.apache.flink.agents.api.logger.EventLoggerOpenParams; +import org.apache.flink.agents.api.resource.Resource; import org.apache.flink.agents.api.resource.ResourceType; import org.apache.flink.agents.plan.AgentPlan; import org.apache.flink.agents.plan.JavaFunction; @@ -142,6 +143,9 @@ public class ActionExecutionOperator<IN, OUT> extends AbstractStreamOperator<OUT // PythonResourceAdapter for Python resources in Java actions private transient PythonResourceAdapterImpl pythonResourceAdapter; + // PythonResourceAdapter for Java resources in Python actions or Python resources + private transient JavaResourceAdapter javaResourceAdapter; + private transient FlinkAgentsMetricGroupImpl metricGroup; private transient BuiltInMetrics builtInMetrics; @@ -539,6 +543,14 @@ public class ActionExecutionOperator<IN, OUT> extends AbstractStreamOperator<OUT } } + private Resource getResource(String name, ResourceType type) { + try { + return agentPlan.getResource(name, type); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + private void initPythonEnvironment() throws Exception { boolean containPythonAction = agentPlan.getActions().values().stream() @@ -576,17 +588,18 @@ public class ActionExecutionOperator<IN, OUT> extends AbstractStreamOperator<OUT pythonRunnerContext = new PythonRunnerContextImpl( this.metricGroup, this::checkMailboxThread, this.agentPlan); + + javaResourceAdapter = new JavaResourceAdapter(this::getResource, pythonInterpreter); + if (containPythonResource) { + initPythonResourceAdapter(); + } if (containPythonAction) { initPythonActionExecutor(); - } else { - initPythonResourceAdapter(); } } } private void initPythonActionExecutor() throws Exception { - JavaResourceAdapter javaResourceAdapter = - new JavaResourceAdapter(agentPlan, pythonInterpreter); pythonActionExecutor = new PythonActionExecutor( pythonInterpreter, diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/JavaResourceAdapter.java b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/JavaResourceAdapter.java index fcdc6dc..7b6c009 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/JavaResourceAdapter.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/JavaResourceAdapter.java @@ -21,29 +21,20 @@ import org.apache.flink.agents.api.chat.messages.ChatMessage; import org.apache.flink.agents.api.chat.messages.MessageRole; import org.apache.flink.agents.api.resource.Resource; import org.apache.flink.agents.api.resource.ResourceType; -import org.apache.flink.agents.plan.AgentPlan; -import org.apache.flink.agents.plan.resourceprovider.PythonResourceProvider; -import org.apache.flink.agents.plan.resourceprovider.ResourceProvider; import pemja.core.PythonInterpreter; -import javax.naming.OperationNotSupportedException; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BiFunction; /** Adapter for managing Java resources and facilitating Python-Java interoperability. */ public class JavaResourceAdapter { - private final Map<ResourceType, Map<String, ResourceProvider>> resourceProviders; + private final BiFunction<String, ResourceType, Resource> getResource; private final transient PythonInterpreter interpreter; - /** Cache for instantiated resources. */ - private final transient Map<ResourceType, Map<String, Resource>> resourceCache; - - public JavaResourceAdapter(AgentPlan agentPlan, PythonInterpreter interpreter) { - this.resourceProviders = agentPlan.getResourceProviders(); + public JavaResourceAdapter( + BiFunction<String, ResourceType, Resource> getResource, PythonInterpreter interpreter) { + this.getResource = getResource; this.interpreter = interpreter; - this.resourceCache = new ConcurrentHashMap<>(); } /** @@ -56,47 +47,7 @@ public class JavaResourceAdapter { * @throws Exception if the resource cannot be retrieved */ public Resource getResource(String name, String typeValue) throws Exception { - return getResource(name, ResourceType.fromValue(typeValue)); - } - - /** - * Retrieves a Java resource by name and type. - * - * @param name the name of the resource to retrieve - * @param type the type of the resource - * @return the resource - * @throws Exception if the resource cannot be retrieved - */ - public Resource getResource(String name, ResourceType type) throws Exception { - if (resourceCache.containsKey(type) && resourceCache.get(type).containsKey(name)) { - return resourceCache.get(type).get(name); - } - - if (!resourceProviders.containsKey(type) - || !resourceProviders.get(type).containsKey(name)) { - throw new IllegalArgumentException("Resource not found: " + name + " of type " + type); - } - - ResourceProvider provider = resourceProviders.get(type).get(name); - if (provider instanceof PythonResourceProvider) { - // TODO: Support getting resources from PythonResourceProvider in JavaResourceAdapter. - throw new OperationNotSupportedException("PythonResourceProvider is not supported."); - } - - Resource resource = - provider.provide( - (String anotherName, ResourceType anotherType) -> { - try { - return this.getResource(anotherName, anotherType); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - - // Cache the resource - resourceCache.computeIfAbsent(type, k -> new ConcurrentHashMap<>()).put(name, resource); - - return resource; + return getResource.apply(name, ResourceType.fromValue(typeValue)); } /**
