This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-agents.git
commit 300d052550ff43d4923563590e6babf84731e0aa Author: WenjinXie <wenjin...@gmail.com> AuthorDate: Thu Aug 28 12:02:57 2025 +0800 [api][plan][runtime] Support register resources in execution envrionment. --- python/flink_agents/api/agent.py | 51 ++++++---- python/flink_agents/api/execution_environment.py | 110 ++++++++++++++++++++- python/flink_agents/plan/agent_plan.py | 8 +- .../runtime/local_execution_environment.py | 4 + .../runtime/remote_execution_environment.py | 20 +++- 5 files changed, 166 insertions(+), 27 deletions(-) diff --git a/python/flink_agents/api/agent.py b/python/flink_agents/api/agent.py index 7e9af23..5a06542 100644 --- a/python/flink_agents/api/agent.py +++ b/python/flink_agents/api/agent.py @@ -24,6 +24,7 @@ from flink_agents.api.chat_models.chat_model import ( ) from flink_agents.api.events.event import Event from flink_agents.api.prompts.prompt import Prompt +from flink_agents.api.resource import ResourceType class Agent(ABC): @@ -71,20 +72,24 @@ class Agent(ABC): """ _actions: Dict[str, Tuple[List[Type[Event]], Callable]] - _prompts: Dict[str, Prompt] - _tools: Dict[str, Callable] - _chat_model_connections: Dict[ - str, Tuple[Type[BaseChatModelConnection], Dict[str, Any]] - ] - _chat_model_setups: Dict[str, Tuple[Type[BaseChatModelSetup], Dict[str, Any]]] + _resources: Dict[ResourceType, Dict[str, Any]] def __init__(self) -> None: """Init method.""" self._actions = {} - self._prompts = {} - self._tools = {} - self._chat_model_connections = {} - self._chat_model_setups = {} + self._resources = {} + for type in ResourceType: + self._resources[type] = {} + + @property + def actions(self) -> Dict[str, Tuple[List[Type[Event]], Callable]]: + """Get added actions.""" + return self._actions + + @property + def resources(self) -> Dict[ResourceType, Dict[str, Any]]: + """Get added resources.""" + return self._resources def add_action( self, name: str, events: List[Type[Event]], func: Callable @@ -126,10 +131,12 @@ class Agent(ABC): Agent The modified Agent instance. """ - if name in self._prompts: + if ResourceType.PROMPT not in self._resources: + self._resources[ResourceType.PROMPT] = {} + if name in self._resources[ResourceType.PROMPT]: msg = f"Prompt {name} already defined" raise ValueError(msg) - self._prompts[name] = prompt + self._resources[ResourceType.PROMPT][name] = prompt return self def add_tool(self, name: str, func: Callable) -> "Agent": @@ -147,10 +154,12 @@ class Agent(ABC): Agent The modified Agent instance. """ - if name in self._tools: + if ResourceType.TOOL not in self._resources: + self._resources[ResourceType.TOOL] = {} + if name in self._resources[ResourceType.TOOL]: msg = f"Function tool {name} already defined" raise ValueError(msg) - self._tools[name] = func + self._resources[ResourceType.TOOL][name] = func return self def add_chat_model_connection( @@ -172,11 +181,13 @@ class Agent(ABC): Agent The modified Agent instance. """ - if name in self._chat_model_connections: + if ResourceType.CHAT_MODEL_CONNECTION not in self._resources: + self._resources[ResourceType.CHAT_MODEL_CONNECTION] = {} + if name in self._resources[ResourceType.CHAT_MODEL_CONNECTION]: msg = f"Chat model connection {name} already defined" raise ValueError(msg) kwargs["name"] = name - self._chat_model_connections[name] = (connection, kwargs) + self._resources[ResourceType.CHAT_MODEL_CONNECTION][name] = (connection, kwargs) return self def add_chat_model_setup( @@ -191,16 +202,18 @@ class Agent(ABC): chat_model: Type[BaseChatModel] The type of chat model. **kwargs: Any - Initialize keyword arguments passed to the chat model. + Initialize keyword arguments passed to the chat model setup. Returns: ------- Agent The modified Agent instance. """ - if name in self._chat_model_setups: + if ResourceType.CHAT_MODEL not in self._resources: + self._resources[ResourceType.CHAT_MODEL] = {} + if name in self._resources[ResourceType.CHAT_MODEL]: msg = f"Chat model setup {name} already defined" raise ValueError(msg) kwargs["name"] = name - self._chat_model_setups[name] = (chat_model, kwargs) + self._resources[ResourceType.CHAT_MODEL][name] = (chat_model, kwargs) return self diff --git a/python/flink_agents/api/execution_environment.py b/python/flink_agents/api/execution_environment.py index b2eef8c..e4d6eae 100644 --- a/python/flink_agents/api/execution_environment.py +++ b/python/flink_agents/api/execution_environment.py @@ -17,7 +17,7 @@ ################################################################################# import importlib from abc import ABC, abstractmethod -from typing import Any, Dict, List, Optional +from typing import Any, Callable, Dict, List, Optional, Type from pyflink.common import TypeInformation from pyflink.datastream import DataStream, KeySelector, StreamExecutionEnvironment @@ -84,6 +84,20 @@ class AgentBuilder(ABC): class AgentsExecutionEnvironment(ABC): """Base class for agent execution environment.""" + _resources: Dict[ResourceType, Dict[str, Any]] + + def __init__(self) -> None: + """Init method.""" + self._actions = {} + self._resources = {} + for type in ResourceType: + self._resources[type] = {} + + @property + def resources(self) -> Dict[ResourceType, Dict[str, Any]]: + """Get registered resources.""" + return self._resources + @staticmethod def get_execution_environment( env: Optional[StreamExecutionEnvironment] = None, **kwargs: Dict[str, Any] @@ -181,3 +195,97 @@ class AgentsExecutionEnvironment(ABC): @abstractmethod def execute(self) -> None: """Execute agent individually.""" + + def add_prompt(self, name: str, prompt: Prompt) -> "AgentsExecutionEnvironment": + """Register prompt to agent execution environment. + + Parameters + ---------- + name : str + The name of the prompt, should be unique in the same Agent. + prompt: Prompt + The prompt to be used in the agent. + + Returns: + ------- + AgentsExecutionEnvironment + The environment contains registered prompt. + """ + if name in self._resources[ResourceType.PROMPT]: + msg = f"Prompt {name} already defined" + raise ValueError(msg) + self._resources[ResourceType.PROMPT][name] = prompt + return self + + def add_tool(self, name: str, func: Callable) -> "AgentsExecutionEnvironment": + """Register function tool to agent execution environment. + + Parameters + ---------- + name : str + The name of the tool, should be unique in the same Agent. + func: Callable + The execution function of the tool. + + Returns: + ------- + AgentsExecutionEnvironment + The environment contains registered tool. + """ + if name in self._resources[ResourceType.TOOL]: + msg = f"Function tool {name} already defined" + raise ValueError(msg) + self._resources[ResourceType.TOOL][name] = func + return self + + def add_chat_model_connection( + self, name: str, connection: Type[BaseChatModelConnection], **kwargs: Any + ) -> "AgentsExecutionEnvironment": + """Register chat model connection to agent execution environment. + + Parameters + ---------- + name : str + The name of the chat model connection, should be unique in the same Agent. + connection: Type[BaseChatModelConnection] + The type of chat model connection. + **kwargs: Any + Initialize keyword arguments passed to the chat model connection. + + Returns: + ------- + AgentsExecutionEnvironment + The environment contains registered chat model connection. + """ + if name in self._resources[ResourceType.CHAT_MODEL_CONNECTION]: + msg = f"Chat model connection {name} already defined" + raise ValueError(msg) + kwargs["name"] = name + self._resources[ResourceType.CHAT_MODEL_CONNECTION][name] = (connection, kwargs) + return self + + def add_chat_model_setup( + self, name: str, chat_model: Type[BaseChatModelSetup], **kwargs: Any + ) -> "AgentsExecutionEnvironment": + """Register chat model setup to agent execution environment. + + Parameters + ---------- + name : str + The name of the chat model, should be unique in the same Agent. + chat_model: Type[BaseChatModel] + The type of chat model. + **kwargs: Any + Initialize keyword arguments passed to the chat model. + + Returns: + ------- + AgentsExecutionEnvironment + The environment contains registered chat model setup. + """ + if name in self._resources[ResourceType.CHAT_MODEL]: + msg = f"Chat model setup {name} already defined" + raise ValueError(msg) + kwargs["name"] = name + self._resources[ResourceType.CHAT_MODEL][name] = (chat_model, kwargs) + return self diff --git a/python/flink_agents/plan/agent_plan.py b/python/flink_agents/plan/agent_plan.py index 7118ecd..de411e4 100644 --- a/python/flink_agents/plan/agent_plan.py +++ b/python/flink_agents/plan/agent_plan.py @@ -285,18 +285,18 @@ def _get_resource_providers(agent: Agent) -> List[ResourceProvider]: ) ) - for name, prompt in agent._prompts.items(): + for name, prompt in agent.resources[ResourceType.PROMPT].items(): resource_providers.append( PythonSerializableResourceProvider.from_resource(name=name, resource=prompt) ) - for name, func in agent._tools.items(): + for name, func in agent.resources[ResourceType.TOOL].items(): tool = from_callable(name=name, func=func) resource_providers.append( PythonSerializableResourceProvider.from_resource(name=name, resource=tool) ) - for name, chat_model in agent._chat_model_setups.items(): + for name, chat_model in agent.resources[ResourceType.CHAT_MODEL].items(): clazz, kwargs = chat_model provider = PythonResourceProvider( name=name, @@ -307,7 +307,7 @@ def _get_resource_providers(agent: Agent) -> List[ResourceProvider]: ) resource_providers.append(provider) - for name, connection in agent._chat_model_connections.items(): + for name, connection in agent.resources[ResourceType.CHAT_MODEL_CONNECTION].items(): clazz, kwargs = connection provider = PythonResourceProvider( name=name, diff --git a/python/flink_agents/runtime/local_execution_environment.py b/python/flink_agents/runtime/local_execution_environment.py index aa5605f..ca0a06e 100644 --- a/python/flink_agents/runtime/local_execution_environment.py +++ b/python/flink_agents/runtime/local_execution_environment.py @@ -56,6 +56,10 @@ class LocalAgentBuilder(AgentBuilder): if self.__runner is not None: err_msg = "LocalAgentBuilder doesn't support apply multiple agents." raise RuntimeError(err_msg) + # inspect resources from environment to agent instance. + registered_resources = self.__env.resources + for type, name_to_resource in registered_resources.items(): + agent.resources[type] = name_to_resource | agent.resources[type] self.__runner = LocalRunner(agent, self.__config) self.__env.set_agent(self.__input, self.__output, self.__runner) return self diff --git a/python/flink_agents/runtime/remote_execution_environment.py b/python/flink_agents/runtime/remote_execution_environment.py index 6faf3b0..a998bb4 100644 --- a/python/flink_agents/runtime/remote_execution_environment.py +++ b/python/flink_agents/runtime/remote_execution_environment.py @@ -38,6 +38,7 @@ from flink_agents.api.execution_environment import ( AgentBuilder, AgentsExecutionEnvironment, ) +from flink_agents.api.resource import ResourceType from flink_agents.plan.agent_plan import AgentPlan from flink_agents.plan.configuration import AgentConfiguration @@ -50,14 +51,17 @@ class RemoteAgentBuilder(AgentBuilder): __output: DataStream = None __t_env: StreamTableEnvironment __config: AgentConfiguration + __resources: Dict[ResourceType, Dict[str, Any]] = None def __init__( - self, input: DataStream, config: AgentConfiguration, t_env: Optional[StreamTableEnvironment] = None + self, input: DataStream, config: AgentConfiguration, t_env: Optional[StreamTableEnvironment] = None, + resources: Optional[Dict[ResourceType, Dict[str, Any]]] = None, ) -> None: """Init method of RemoteAgentBuilder.""" self.__input = input self.__t_env = t_env self.__config = config + self.__resources = resources def apply(self, agent: Agent) -> "AgentBuilder": """Set agent of execution environment. @@ -71,6 +75,15 @@ class RemoteAgentBuilder(AgentBuilder): err_msg = "RemoteAgentBuilder doesn't support apply multiple agents yet." raise RuntimeError(err_msg) self.__agent_plan = AgentPlan.from_agent(agent, self.__config) + + # inspect refer actions and resources from env to agent. + for type, names in agent._resource_names.items(): + if type not in agent.resources: + agent.resources[type] = {} + for name in names: + agent.resources[type][name] = self.__resources[type][name] + + self.__agent_plan = AgentPlan.from_agent(agent) return self def to_datastream( @@ -142,6 +155,7 @@ class RemoteExecutionEnvironment(AgentsExecutionEnvironment): def __init__(self, env: StreamExecutionEnvironment) -> None: """Init method of RemoteExecutionEnvironment.""" + super().__init__() self.__env = env self.__config = AgentConfiguration() flink_conf_dir = os.environ.get("FLINK_CONF_DIR") @@ -187,7 +201,7 @@ class RemoteExecutionEnvironment(AgentsExecutionEnvironment): """ input = self.__process_input_datastream(input, key_selector) - return RemoteAgentBuilder(input=input, config=self.__config) + return RemoteAgentBuilder(input=input, config=self.__config, resources=self.resources) def from_table( self, @@ -211,7 +225,7 @@ class RemoteExecutionEnvironment(AgentsExecutionEnvironment): input = input.map(lambda x: x, output_type=PickledBytesTypeInfo()) input = self.__process_input_datastream(input, key_selector) - return RemoteAgentBuilder(input=input, config=self.__config, t_env=t_env) + return RemoteAgentBuilder(input=input, config=self.__config, t_env=t_env, resources=self.resources) def from_list(self, input: List[Dict[str, Any]]) -> "AgentsExecutionEnvironment": """Set input list of agent execution.