This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git

commit 300d052550ff43d4923563590e6babf84731e0aa
Author: WenjinXie <wenjin...@gmail.com>
AuthorDate: Thu Aug 28 12:02:57 2025 +0800

    [api][plan][runtime] Support register resources in execution envrionment.
---
 python/flink_agents/api/agent.py                   |  51 ++++++----
 python/flink_agents/api/execution_environment.py   | 110 ++++++++++++++++++++-
 python/flink_agents/plan/agent_plan.py             |   8 +-
 .../runtime/local_execution_environment.py         |   4 +
 .../runtime/remote_execution_environment.py        |  20 +++-
 5 files changed, 166 insertions(+), 27 deletions(-)

diff --git a/python/flink_agents/api/agent.py b/python/flink_agents/api/agent.py
index 7e9af23..5a06542 100644
--- a/python/flink_agents/api/agent.py
+++ b/python/flink_agents/api/agent.py
@@ -24,6 +24,7 @@ from flink_agents.api.chat_models.chat_model import (
 )
 from flink_agents.api.events.event import Event
 from flink_agents.api.prompts.prompt import Prompt
+from flink_agents.api.resource import ResourceType
 
 
 class Agent(ABC):
@@ -71,20 +72,24 @@ class Agent(ABC):
     """
 
     _actions: Dict[str, Tuple[List[Type[Event]], Callable]]
-    _prompts: Dict[str, Prompt]
-    _tools: Dict[str, Callable]
-    _chat_model_connections: Dict[
-        str, Tuple[Type[BaseChatModelConnection], Dict[str, Any]]
-    ]
-    _chat_model_setups: Dict[str, Tuple[Type[BaseChatModelSetup], Dict[str, 
Any]]]
+    _resources: Dict[ResourceType, Dict[str, Any]]
 
     def __init__(self) -> None:
         """Init method."""
         self._actions = {}
-        self._prompts = {}
-        self._tools = {}
-        self._chat_model_connections = {}
-        self._chat_model_setups = {}
+        self._resources = {}
+        for type in ResourceType:
+            self._resources[type] = {}
+
+    @property
+    def actions(self) -> Dict[str, Tuple[List[Type[Event]], Callable]]:
+        """Get added actions."""
+        return self._actions
+
+    @property
+    def resources(self) -> Dict[ResourceType, Dict[str, Any]]:
+        """Get added resources."""
+        return self._resources
 
     def add_action(
         self, name: str, events: List[Type[Event]], func: Callable
@@ -126,10 +131,12 @@ class Agent(ABC):
         Agent
             The modified Agent instance.
         """
-        if name in self._prompts:
+        if ResourceType.PROMPT not in self._resources:
+            self._resources[ResourceType.PROMPT] = {}
+        if name in self._resources[ResourceType.PROMPT]:
             msg = f"Prompt {name} already defined"
             raise ValueError(msg)
-        self._prompts[name] = prompt
+        self._resources[ResourceType.PROMPT][name] = prompt
         return self
 
     def add_tool(self, name: str, func: Callable) -> "Agent":
@@ -147,10 +154,12 @@ class Agent(ABC):
         Agent
             The modified Agent instance.
         """
-        if name in self._tools:
+        if ResourceType.TOOL not in self._resources:
+            self._resources[ResourceType.TOOL] = {}
+        if name in self._resources[ResourceType.TOOL]:
             msg = f"Function tool {name} already defined"
             raise ValueError(msg)
-        self._tools[name] = func
+        self._resources[ResourceType.TOOL][name] = func
         return self
 
     def add_chat_model_connection(
@@ -172,11 +181,13 @@ class Agent(ABC):
         Agent
             The modified Agent instance.
         """
-        if name in self._chat_model_connections:
+        if ResourceType.CHAT_MODEL_CONNECTION not in self._resources:
+            self._resources[ResourceType.CHAT_MODEL_CONNECTION] = {}
+        if name in self._resources[ResourceType.CHAT_MODEL_CONNECTION]:
             msg = f"Chat model connection {name} already defined"
             raise ValueError(msg)
         kwargs["name"] = name
-        self._chat_model_connections[name] = (connection, kwargs)
+        self._resources[ResourceType.CHAT_MODEL_CONNECTION][name] = 
(connection, kwargs)
         return self
 
     def add_chat_model_setup(
@@ -191,16 +202,18 @@ class Agent(ABC):
         chat_model: Type[BaseChatModel]
             The type of chat model.
         **kwargs: Any
-            Initialize keyword arguments passed to the chat model.
+            Initialize keyword arguments passed to the chat model setup.
 
         Returns:
         -------
         Agent
             The modified Agent instance.
         """
-        if name in self._chat_model_setups:
+        if ResourceType.CHAT_MODEL not in self._resources:
+            self._resources[ResourceType.CHAT_MODEL] = {}
+        if name in self._resources[ResourceType.CHAT_MODEL]:
             msg = f"Chat model setup {name} already defined"
             raise ValueError(msg)
         kwargs["name"] = name
-        self._chat_model_setups[name] = (chat_model, kwargs)
+        self._resources[ResourceType.CHAT_MODEL][name] = (chat_model, kwargs)
         return self
diff --git a/python/flink_agents/api/execution_environment.py 
b/python/flink_agents/api/execution_environment.py
index b2eef8c..e4d6eae 100644
--- a/python/flink_agents/api/execution_environment.py
+++ b/python/flink_agents/api/execution_environment.py
@@ -17,7 +17,7 @@
 
#################################################################################
 import importlib
 from abc import ABC, abstractmethod
-from typing import Any, Dict, List, Optional
+from typing import Any, Callable, Dict, List, Optional, Type
 
 from pyflink.common import TypeInformation
 from pyflink.datastream import DataStream, KeySelector, 
StreamExecutionEnvironment
@@ -84,6 +84,20 @@ class AgentBuilder(ABC):
 class AgentsExecutionEnvironment(ABC):
     """Base class for agent execution environment."""
 
+    _resources: Dict[ResourceType, Dict[str, Any]]
+
+    def __init__(self) -> None:
+        """Init method."""
+        self._actions = {}
+        self._resources = {}
+        for type in ResourceType:
+            self._resources[type] = {}
+
+    @property
+    def resources(self) -> Dict[ResourceType, Dict[str, Any]]:
+        """Get registered resources."""
+        return self._resources
+
     @staticmethod
     def get_execution_environment(
         env: Optional[StreamExecutionEnvironment] = None, **kwargs: Dict[str, 
Any]
@@ -181,3 +195,97 @@ class AgentsExecutionEnvironment(ABC):
     @abstractmethod
     def execute(self) -> None:
         """Execute agent individually."""
+
+    def add_prompt(self, name: str, prompt: Prompt) -> 
"AgentsExecutionEnvironment":
+        """Register prompt to agent execution environment.
+
+        Parameters
+        ----------
+        name : str
+            The name of the prompt, should be unique in the same Agent.
+        prompt: Prompt
+            The prompt to be used in the agent.
+
+        Returns:
+        -------
+        AgentsExecutionEnvironment
+            The environment contains registered prompt.
+        """
+        if name in self._resources[ResourceType.PROMPT]:
+            msg = f"Prompt {name} already defined"
+            raise ValueError(msg)
+        self._resources[ResourceType.PROMPT][name] = prompt
+        return self
+
+    def add_tool(self, name: str, func: Callable) -> 
"AgentsExecutionEnvironment":
+        """Register function tool to agent execution environment.
+
+        Parameters
+        ----------
+        name : str
+            The name of the tool, should be unique in the same Agent.
+        func: Callable
+            The execution function of the tool.
+
+        Returns:
+        -------
+        AgentsExecutionEnvironment
+            The environment contains registered tool.
+        """
+        if name in self._resources[ResourceType.TOOL]:
+            msg = f"Function tool {name} already defined"
+            raise ValueError(msg)
+        self._resources[ResourceType.TOOL][name] = func
+        return self
+
+    def add_chat_model_connection(
+        self, name: str, connection: Type[BaseChatModelConnection], **kwargs: 
Any
+    ) -> "AgentsExecutionEnvironment":
+        """Register chat model connection to agent execution environment.
+
+        Parameters
+        ----------
+        name : str
+            The name of the chat model connection, should be unique in the 
same Agent.
+        connection: Type[BaseChatModelConnection]
+            The type of chat model connection.
+        **kwargs: Any
+            Initialize keyword arguments passed to the chat model connection.
+
+        Returns:
+        -------
+        AgentsExecutionEnvironment
+            The environment contains registered chat model connection.
+        """
+        if name in self._resources[ResourceType.CHAT_MODEL_CONNECTION]:
+            msg = f"Chat model connection {name} already defined"
+            raise ValueError(msg)
+        kwargs["name"] = name
+        self._resources[ResourceType.CHAT_MODEL_CONNECTION][name] = 
(connection, kwargs)
+        return self
+
+    def add_chat_model_setup(
+        self, name: str, chat_model: Type[BaseChatModelSetup], **kwargs: Any
+    ) -> "AgentsExecutionEnvironment":
+        """Register chat model setup to agent execution environment.
+
+        Parameters
+        ----------
+        name : str
+            The name of the chat model, should be unique in the same Agent.
+        chat_model: Type[BaseChatModel]
+            The type of chat model.
+        **kwargs: Any
+            Initialize keyword arguments passed to the chat model.
+
+        Returns:
+        -------
+        AgentsExecutionEnvironment
+            The environment contains registered chat model setup.
+        """
+        if name in self._resources[ResourceType.CHAT_MODEL]:
+            msg = f"Chat model setup {name} already defined"
+            raise ValueError(msg)
+        kwargs["name"] = name
+        self._resources[ResourceType.CHAT_MODEL][name] = (chat_model, kwargs)
+        return self
diff --git a/python/flink_agents/plan/agent_plan.py 
b/python/flink_agents/plan/agent_plan.py
index 7118ecd..de411e4 100644
--- a/python/flink_agents/plan/agent_plan.py
+++ b/python/flink_agents/plan/agent_plan.py
@@ -285,18 +285,18 @@ def _get_resource_providers(agent: Agent) -> 
List[ResourceProvider]:
                 )
             )
 
-    for name, prompt in agent._prompts.items():
+    for name, prompt in agent.resources[ResourceType.PROMPT].items():
         resource_providers.append(
             PythonSerializableResourceProvider.from_resource(name=name, 
resource=prompt)
         )
 
-    for name, func in agent._tools.items():
+    for name, func in agent.resources[ResourceType.TOOL].items():
         tool = from_callable(name=name, func=func)
         resource_providers.append(
             PythonSerializableResourceProvider.from_resource(name=name, 
resource=tool)
         )
 
-    for name, chat_model in agent._chat_model_setups.items():
+    for name, chat_model in agent.resources[ResourceType.CHAT_MODEL].items():
         clazz, kwargs = chat_model
         provider = PythonResourceProvider(
             name=name,
@@ -307,7 +307,7 @@ def _get_resource_providers(agent: Agent) -> 
List[ResourceProvider]:
         )
         resource_providers.append(provider)
 
-    for name, connection in agent._chat_model_connections.items():
+    for name, connection in 
agent.resources[ResourceType.CHAT_MODEL_CONNECTION].items():
         clazz, kwargs = connection
         provider = PythonResourceProvider(
             name=name,
diff --git a/python/flink_agents/runtime/local_execution_environment.py 
b/python/flink_agents/runtime/local_execution_environment.py
index aa5605f..ca0a06e 100644
--- a/python/flink_agents/runtime/local_execution_environment.py
+++ b/python/flink_agents/runtime/local_execution_environment.py
@@ -56,6 +56,10 @@ class LocalAgentBuilder(AgentBuilder):
         if self.__runner is not None:
             err_msg = "LocalAgentBuilder doesn't support apply multiple 
agents."
             raise RuntimeError(err_msg)
+        # inspect resources from environment to agent instance.
+        registered_resources = self.__env.resources
+        for type, name_to_resource in registered_resources.items():
+            agent.resources[type] = name_to_resource | agent.resources[type]
         self.__runner = LocalRunner(agent, self.__config)
         self.__env.set_agent(self.__input, self.__output, self.__runner)
         return self
diff --git a/python/flink_agents/runtime/remote_execution_environment.py 
b/python/flink_agents/runtime/remote_execution_environment.py
index 6faf3b0..a998bb4 100644
--- a/python/flink_agents/runtime/remote_execution_environment.py
+++ b/python/flink_agents/runtime/remote_execution_environment.py
@@ -38,6 +38,7 @@ from flink_agents.api.execution_environment import (
     AgentBuilder,
     AgentsExecutionEnvironment,
 )
+from flink_agents.api.resource import ResourceType
 from flink_agents.plan.agent_plan import AgentPlan
 from flink_agents.plan.configuration import AgentConfiguration
 
@@ -50,14 +51,17 @@ class RemoteAgentBuilder(AgentBuilder):
     __output: DataStream = None
     __t_env: StreamTableEnvironment
     __config: AgentConfiguration
+    __resources: Dict[ResourceType, Dict[str, Any]] = None
 
     def __init__(
-        self, input: DataStream, config: AgentConfiguration, t_env: 
Optional[StreamTableEnvironment] = None
+        self, input: DataStream, config: AgentConfiguration, t_env: 
Optional[StreamTableEnvironment] = None,
+            resources: Optional[Dict[ResourceType, Dict[str, Any]]] = None,
     ) -> None:
         """Init method of RemoteAgentBuilder."""
         self.__input = input
         self.__t_env = t_env
         self.__config = config
+        self.__resources = resources
 
     def apply(self, agent: Agent) -> "AgentBuilder":
         """Set agent of execution environment.
@@ -71,6 +75,15 @@ class RemoteAgentBuilder(AgentBuilder):
             err_msg = "RemoteAgentBuilder doesn't support apply multiple 
agents yet."
             raise RuntimeError(err_msg)
         self.__agent_plan = AgentPlan.from_agent(agent, self.__config)
+
+        # inspect refer actions and resources from env to agent.
+        for type, names in agent._resource_names.items():
+            if type not in agent.resources:
+                agent.resources[type] = {}
+            for name in names:
+                agent.resources[type][name] = self.__resources[type][name]
+
+        self.__agent_plan = AgentPlan.from_agent(agent)
         return self
 
     def to_datastream(
@@ -142,6 +155,7 @@ class 
RemoteExecutionEnvironment(AgentsExecutionEnvironment):
 
     def __init__(self, env: StreamExecutionEnvironment) -> None:
         """Init method of RemoteExecutionEnvironment."""
+        super().__init__()
         self.__env = env
         self.__config = AgentConfiguration()
         flink_conf_dir = os.environ.get("FLINK_CONF_DIR")
@@ -187,7 +201,7 @@ class 
RemoteExecutionEnvironment(AgentsExecutionEnvironment):
         """
         input = self.__process_input_datastream(input, key_selector)
 
-        return RemoteAgentBuilder(input=input, config=self.__config)
+        return RemoteAgentBuilder(input=input, config=self.__config, 
resources=self.resources)
 
     def from_table(
         self,
@@ -211,7 +225,7 @@ class 
RemoteExecutionEnvironment(AgentsExecutionEnvironment):
         input = input.map(lambda x: x, output_type=PickledBytesTypeInfo())
 
         input = self.__process_input_datastream(input, key_selector)
-        return RemoteAgentBuilder(input=input, config=self.__config, 
t_env=t_env)
+        return RemoteAgentBuilder(input=input, config=self.__config, 
t_env=t_env, resources=self.resources)
 
     def from_list(self, input: List[Dict[str, Any]]) -> 
"AgentsExecutionEnvironment":
         """Set input list of agent execution.

Reply via email to