This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-agents.git
commit e9b56540f14f6a6f1ad2f63b40a5a554b5c98b41 Author: WenjinXie <wenjin...@gmail.com> AuthorDate: Thu Aug 28 12:08:49 2025 +0800 [api][python] Introduce built-in ReAct Agent. --- .../flink/agents/api/resource/ResourceType.java | 2 + python/flink_agents/api/agents/__init__.py | 17 ++ python/flink_agents/api/agents/react_agent.py | 278 +++++++++++++++++++++ python/flink_agents/api/agents/tests/__init__.py | 17 ++ .../api/agents/tests/test_row_schema.py | 35 +++ python/flink_agents/api/execution_environment.py | 6 + python/flink_agents/api/tests/test_prompt.py | 8 +- python/flink_agents/examples/common_tools.py | 53 ++++ .../integrate_table_with_react_agent_example.py | 136 ++++++++++ .../flink_agents/examples/react_agent_example.py | 88 +++++++ .../runtime/remote_execution_environment.py | 11 +- 11 files changed, 642 insertions(+), 9 deletions(-) diff --git a/api/src/main/java/org/apache/flink/agents/api/resource/ResourceType.java b/api/src/main/java/org/apache/flink/agents/api/resource/ResourceType.java index 3b9f9d1..86f313d 100644 --- a/api/src/main/java/org/apache/flink/agents/api/resource/ResourceType.java +++ b/api/src/main/java/org/apache/flink/agents/api/resource/ResourceType.java @@ -25,6 +25,8 @@ package org.apache.flink.agents.api.resource; */ public enum ResourceType { CHAT_MODEL("chat_model"), + CHAT_MODEL_CONNECTION("chat_model_connection"), + PROMPT("prompt"), TOOL("tool"); private final String value; diff --git a/python/flink_agents/api/agents/__init__.py b/python/flink_agents/api/agents/__init__.py new file mode 100644 index 0000000..e154fad --- /dev/null +++ b/python/flink_agents/api/agents/__init__.py @@ -0,0 +1,17 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# diff --git a/python/flink_agents/api/agents/react_agent.py b/python/flink_agents/api/agents/react_agent.py new file mode 100644 index 0000000..7bbf3c6 --- /dev/null +++ b/python/flink_agents/api/agents/react_agent.py @@ -0,0 +1,278 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import importlib +import json +from typing import Any, List, Optional, Union, cast + +from pydantic import BaseModel, ConfigDict, model_serializer, model_validator +from pyflink.common import Row +from pyflink.common.typeinfo import BasicType, BasicTypeInfo, RowTypeInfo + +from flink_agents.api.agent import Agent +from flink_agents.api.chat_message import ChatMessage, MessageRole +from flink_agents.api.chat_models.chat_model import BaseChatModelSetup +from flink_agents.api.decorators import action +from flink_agents.api.events.chat_event import ChatRequestEvent, ChatResponseEvent +from flink_agents.api.events.event import InputEvent, OutputEvent +from flink_agents.api.prompts.prompt import Prompt +from flink_agents.api.resource import ResourceType +from flink_agents.api.runner_context import RunnerContext + +_DEFAULT_CHAT_MODEL = "_default_chat_model" +_DEFAULT_SCHEMA_PROMPT = "_default_schema_prompt" +_DEFAULT_USER_PROMPT = "_default_user_prompt" +_OUTPUT_SCHEMA = "_output_schema" + + +class OutputSchema(BaseModel): + """Util class to help serialize and deserialize output schema json.""" + + model_config = ConfigDict(arbitrary_types_allowed=True) + output_schema: Union[type[BaseModel], RowTypeInfo] + + @model_serializer + def __custom_serializer(self) -> dict[str, Any]: + if isinstance(self.output_schema, RowTypeInfo): + data = { + "output_schema": { + "names": self.output_schema.get_field_names(), + "types": [ + type._basic_type.value + for type in self.output_schema.get_field_types() + ], + }, + } + else: + data = { + "output_schema": { + "module": self.output_schema.__module__, + "class": self.output_schema.__name__, + } + } + return data + + @model_validator(mode="before") + def __custom_deserialize(self) -> "OutputSchema": + output_schema = self["output_schema"] + if isinstance(output_schema, dict): + if "names" in output_schema: + self["output_schema"] = RowTypeInfo( + field_types=[ + BasicTypeInfo(BasicType(type)) + for type in output_schema["types"] + ], + field_names=output_schema["names"], + ) + else: + module = importlib.import_module(output_schema["module"]) + self["output_schema"] = getattr(module, output_schema["class"]) + return self + + +class ReActAgent(Agent): + """Built-in implementation of ReAct agent which is based on the function + call ability of llm. + + This implementation is not based on the foundational ReAct paper which uses + prompt to force llm output contain <Thought>, <Action> and <Observation> and + extract tool calls by text parsing. For a more robust and feature-rich + implementation we use the tool/function call ability of current llm, and get + the tool calls from response directly. + + + Example: + :: + + class OutputData(BaseModel): + result: int + + + env = AgentsExecutionEnvironment.get_execution_environment() + + # register resource to execution environment + ( + env.add_chat_model_connection( + name="ollama", connection=OllamaChatModelConnection, model=model + ) + .add_tool("add", add) + .add_tool("multiply", multiply) + ) + + # prepare prompt + prompt = Prompt.from_messages( + name="prompt", + messages=[ + ChatMessage( + role=MessageRole.SYSTEM, + content='An example of output is {"result": 30.32}.', + ), + ChatMessage( + role=MessageRole.USER, content="What is ({a} + {b}) * {c}" + ), + ], + ) + + # create ReAct agent. + agent = ReActAgent( + chat_model=OllamaChatModelSetup, + connection="ollama", + prompt=prompt, + tools=["add", "multiply"], + output_schema=OutputData, + ) + """ + + def __init__( + self, + *, + chat_model_setup: type[BaseChatModelSetup], + connection: str, + prompt: Optional[Prompt] = None, + tools: Optional[List[str]] = None, + output_schema: Optional[Union[type[BaseModel], RowTypeInfo]] = None, + **kwargs: Any, + ) -> None: + """Init method of ReActAgent. + + Parameters + ---------- + chat_model_setup : BaseChatModelSetup + The type of the chat model setup used in this ReAct agent. + connection: str + The name of the chat model connection used in chat model setup. The + connection should be registered in environment. + prompt : Optional[Prompt] = None + Prompt to instruct the llm, could include input and output example, + task and so on. + tools : Optional[List[str]] + Tools names can be used in this ReAct agent. The tools should be registered + in environment. + output_schema : Optional[Union[type[BaseModel], RowTypeInfo]] = None + The schema should be RowTypeInfo or subclass of BaseModel. When user + provide output schema, ReAct agent will add system prompt to instruct + response format of llm, and add output parser according to the schema. + **kwargs: Any + The initialize arguments of chat_model_setup. + """ + super().__init__() + settings = { + "name": _DEFAULT_CHAT_MODEL, + "connection": connection, + "tools": tools, + } + settings.update(kwargs) + self._resources[ResourceType.CHAT_MODEL][_DEFAULT_CHAT_MODEL] = ( + chat_model_setup, + settings, + ) + + if output_schema: + if isinstance(output_schema, type) and issubclass(output_schema, BaseModel): + json_schema = output_schema.model_json_schema() + elif isinstance(output_schema, RowTypeInfo): + json_schema = str(output_schema) + else: + err_msg = f"Output schema {output_schema.__class__} is not supported." + raise TypeError(err_msg) + schema_prompt = f"The final response should be json format, and match the schema {json_schema}." + self._resources[ResourceType.PROMPT][_DEFAULT_SCHEMA_PROMPT] = ( + Prompt.from_text(name="output_schema", text=schema_prompt) + ) + + if prompt: + self._resources[ResourceType.PROMPT][_DEFAULT_USER_PROMPT] = prompt + + self.add_action( + name="stop_action", + events=[ChatResponseEvent], + func=self.stop_action, + output_schema=OutputSchema(output_schema=output_schema), + ) + + @action(InputEvent) + @staticmethod + def start_action(event: InputEvent, ctx: RunnerContext) -> None: + """Start action to format user input and send chat request event.""" + usr_input = event.input + + try: + prompt = cast( + "Prompt", ctx.get_resource(_DEFAULT_USER_PROMPT, ResourceType.PROMPT) + ) + except KeyError: + prompt = None + + if isinstance(usr_input, (bool, str, int, float, type(None))): + usr_input = str(usr_input) + if prompt: + usr_msgs = prompt.format_messages( + role=MessageRole.USER, input=usr_input + ) + else: + usr_msgs = [ChatMessage(role=MessageRole.USER, content=usr_input)] + else: + if not prompt: + err_msg = ( + f"Input type is {usr_input.__class__}, which is not primitive types. " + f"User should provide prompt to help convert it to ChatMessage." + ) + raise RuntimeError(err_msg) + if isinstance(usr_input, Row): + usr_input = usr_input.as_dict(recursive=True) + else: # regard as pojo + usr_input = usr_input.__dict__ + usr_msgs = prompt.format_messages(role=MessageRole.USER, **usr_input) + + try: + schema_prompt = cast( + "Prompt", ctx.get_resource(_DEFAULT_SCHEMA_PROMPT, ResourceType.PROMPT) + ) + except KeyError: + schema_prompt = None + + if schema_prompt: + instruct = schema_prompt.format_messages() + usr_msgs = instruct + usr_msgs + + ctx.send_event( + ChatRequestEvent( + model=_DEFAULT_CHAT_MODEL, + messages=usr_msgs, + ) + ) + + @staticmethod + def stop_action(event: ChatResponseEvent, ctx: RunnerContext) -> None: + """Stop action to output result.""" + output = event.response.content + + # parse llm response to target schema. + # TODO: config error handle strategy by configuration. + output_schema = ctx.get_action_config_value(key="output_schema") + if output_schema: + output_schema = output_schema.output_schema + output = json.loads(output.strip()) + if isinstance(output_schema, type) and issubclass(output_schema, BaseModel): + output = output_schema.model_validate(output) + elif isinstance(output_schema, RowTypeInfo): + field_names = output_schema.get_field_names() + values = {} + for field_name in field_names: + values[field_name] = output[field_name] + output = Row(**values) + ctx.send_event(OutputEvent(output=output)) diff --git a/python/flink_agents/api/agents/tests/__init__.py b/python/flink_agents/api/agents/tests/__init__.py new file mode 100644 index 0000000..e154fad --- /dev/null +++ b/python/flink_agents/api/agents/tests/__init__.py @@ -0,0 +1,17 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# diff --git a/python/flink_agents/api/agents/tests/test_row_schema.py b/python/flink_agents/api/agents/tests/test_row_schema.py new file mode 100644 index 0000000..b3a663e --- /dev/null +++ b/python/flink_agents/api/agents/tests/test_row_schema.py @@ -0,0 +1,35 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# + +from pyflink.common.typeinfo import BasicTypeInfo, RowTypeInfo + +from flink_agents.api.agents.react_agent import OutputSchema + + +def test_output_schema_serializable() -> None: # noqa: D103 + schema = OutputSchema( + output_schema=RowTypeInfo( + [BasicTypeInfo.INT_TYPE_INFO()], + ["result"], + ) + ) + + json_data = schema.model_dump_json() + + deserialize_schema = OutputSchema.model_validate_json(json_data) + assert schema == deserialize_schema diff --git a/python/flink_agents/api/execution_environment.py b/python/flink_agents/api/execution_environment.py index e4d6eae..a942e96 100644 --- a/python/flink_agents/api/execution_environment.py +++ b/python/flink_agents/api/execution_environment.py @@ -24,7 +24,13 @@ from pyflink.datastream import DataStream, KeySelector, StreamExecutionEnvironme from pyflink.table import Schema, StreamTableEnvironment, Table from flink_agents.api.agent import Agent +from flink_agents.api.chat_models.chat_model import ( + BaseChatModelConnection, + BaseChatModelSetup, +) from flink_agents.api.configuration import Configuration +from flink_agents.api.prompts.prompt import Prompt +from flink_agents.api.resource import ResourceType class AgentBuilder(ABC): diff --git a/python/flink_agents/api/tests/test_prompt.py b/python/flink_agents/api/tests/test_prompt.py index 011a20a..38342f9 100644 --- a/python/flink_agents/api/tests/test_prompt.py +++ b/python/flink_agents/api/tests/test_prompt.py @@ -122,6 +122,10 @@ def test_prompt_lack_one_argument(text_prompt: Prompt) -> None: # noqa: D103 "and user review is 'The headphones broke after one week of use. Very poor quality'." ) -def test_prompt_contain_json_schema() -> None: # noqa: D103 - prompt = Prompt.from_text(name="prompt", text = f"The json schema is {Prompt.model_json_schema(mode='serialization')}") + +def test_prompt_contain_json_schema() -> None: # noqa: D103 + prompt = Prompt.from_text( + name="prompt", + text=f"The json schema is {Prompt.model_json_schema(mode='serialization')}", + ) prompt.format_string() diff --git a/python/flink_agents/examples/common_tools.py b/python/flink_agents/examples/common_tools.py new file mode 100644 index 0000000..a165d9e --- /dev/null +++ b/python/flink_agents/examples/common_tools.py @@ -0,0 +1,53 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# + + +def add(a: int, b: int) -> int: + """Calculate the sum of a and b. + + Parameters + ---------- + a : int + The first operand + b : int + The second operand + + Returns: + ------- + int: + The sum of a and b + """ + return a + b + + +def multiply(a: int, b: int) -> int: + """Useful function to multiply two numbers. + + Parameters + ---------- + a : int + The first operand + b : int + The second operand + + Returns: + ------- + int: + The product of a and b + """ + return a * b diff --git a/python/flink_agents/examples/integrate_table_with_react_agent_example.py b/python/flink_agents/examples/integrate_table_with_react_agent_example.py new file mode 100644 index 0000000..1467b4f --- /dev/null +++ b/python/flink_agents/examples/integrate_table_with_react_agent_example.py @@ -0,0 +1,136 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import os +from pathlib import Path + +from pyflink.common import Row +from pyflink.common.typeinfo import BasicTypeInfo, ExternalTypeInfo, RowTypeInfo +from pyflink.datastream import ( + KeySelector, + StreamExecutionEnvironment, +) +from pyflink.table import DataTypes, Schema, StreamTableEnvironment, TableDescriptor + +from flink_agents.api.agents.react_agent import ReActAgent +from flink_agents.api.chat_message import ChatMessage, MessageRole +from flink_agents.api.execution_environment import AgentsExecutionEnvironment +from flink_agents.api.prompts.prompt import Prompt +from flink_agents.examples.common_tools import add, multiply +from flink_agents.integrations.chat_models.ollama_chat_model import ( + OllamaChatModelConnection, + OllamaChatModelSetup, +) + +model = os.environ.get("OLLAMA_CHAT_MODEL", "qwen2.5:7b") + + +class MyKeySelector(KeySelector): + """KeySelector for extracting key.""" + + def get_key(self, value: Row) -> int: + """Extract key from Row.""" + return value[0] + + +current_dir = Path(__file__).parent + +#TODO: Currently, this example may cause core dump when being executed, the root cause +# may be known issue of pemja incorrect reference counting: +# https://github.com/apache/flink-agents/issues/83 +if __name__ == "__main__": + stream_env = StreamExecutionEnvironment.get_execution_environment() + + # should compile flink-agents jars before run this example. + stream_env.add_jars( + f"file:///{current_dir}/../../../runtime/target/flink-agents-runtime-0.1-SNAPSHOT.jar" + ) + stream_env.add_jars( + f"file:///{current_dir}/../../../plan/target/flink-agents-plan-0.1-SNAPSHOT.jar" + ) + stream_env.add_jars( + f"file:///{current_dir}/../../../api/target/flink-agents-api-0.1-SNAPSHOT.jar" + ) + + stream_env.set_parallelism(1) + + t_env = StreamTableEnvironment.create(stream_execution_environment=stream_env) + + table = t_env.from_elements( + elements=[(1, 2, 3)], + schema=DataTypes.ROW( + [ + DataTypes.FIELD("a", DataTypes.INT()), + DataTypes.FIELD("b", DataTypes.INT()), + DataTypes.FIELD("c", DataTypes.INT()), + ] + ), + ) + + env = AgentsExecutionEnvironment.get_execution_environment(env=stream_env) + + # register resource to execution environment + ( + env.add_chat_model_connection( + name="ollama", connection=OllamaChatModelConnection, model=model + ) + .add_tool("add", add) + .add_tool("multiply", multiply) + ) + + # prepare prompt + prompt = Prompt.from_messages( + name="prompt", + messages=[ + ChatMessage( + role=MessageRole.SYSTEM, + content='An example of output is {"result": 30.32}.', + ), + ChatMessage(role=MessageRole.USER, content="What is ({a} + {b}) * {c}"), + ], + ) + + output_type_info = RowTypeInfo( + [BasicTypeInfo.INT_TYPE_INFO()], + ["result"], + ) + + # create ReAct agent. + agent = ReActAgent( + chat_model_setup=OllamaChatModelSetup, + connection="ollama", + prompt=prompt, + tools=["add", "multiply"], + output_schema=output_type_info, + ) + + output_type = ExternalTypeInfo(output_type_info) + + schema = (Schema.new_builder().column("result", DataTypes.INT())).build() + + output_table = ( + env.from_table(input=table, t_env=t_env, key_selector=MyKeySelector()) + .apply(agent) + .to_table(schema=schema, output_type=output_type) + ) + + t_env.create_temporary_table( + "sink", + TableDescriptor.for_connector("print").schema(schema).build(), + ) + + output_table.execute_insert("sink").wait() diff --git a/python/flink_agents/examples/react_agent_example.py b/python/flink_agents/examples/react_agent_example.py new file mode 100644 index 0000000..7c603c7 --- /dev/null +++ b/python/flink_agents/examples/react_agent_example.py @@ -0,0 +1,88 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import os + +from pydantic import BaseModel + +from flink_agents.api.agents.react_agent import ReActAgent +from flink_agents.api.chat_message import ChatMessage, MessageRole +from flink_agents.api.execution_environment import AgentsExecutionEnvironment +from flink_agents.api.prompts.prompt import Prompt +from flink_agents.examples.common_tools import add, multiply +from flink_agents.integrations.chat_models.ollama_chat_model import ( + OllamaChatModelConnection, + OllamaChatModelSetup, +) + +model = os.environ.get("OLLAMA_CHAT_MODEL", "qwen2.5:7b") + + +class InputData(BaseModel): # noqa: D101 + a: int + b: int + c: int + + +class OutputData(BaseModel): # noqa: D101 + result: int + + +if __name__ == "__main__": + env = AgentsExecutionEnvironment.get_execution_environment() + + # register resource to execution environment + ( + env.add_chat_model_connection( + name="ollama", connection=OllamaChatModelConnection, model=model + ) + .add_tool("add", add) + .add_tool("multiply", multiply) + ) + + # prepare prompt + prompt = Prompt.from_messages( + name="prompt", + messages=[ + ChatMessage( + role=MessageRole.SYSTEM, + content='An example of output is {"result": 30.32}.', + ), + ChatMessage(role=MessageRole.USER, content="What is ({a} + {b}) * {c}"), + ], + ) + + # create ReAct agent. + agent = ReActAgent( + chat_model_setup=OllamaChatModelSetup, + connection="ollama", + prompt=prompt, + tools=["add", "multiply"], + output_schema=OutputData, + ) + + # execute agent + input_list = [] + + output_list = env.from_list(input_list).apply(agent).to_list() + input_list.append({"key": "0001", "value": InputData(a=2123, b=2321, c=312)}) + + env.execute() + + for output in output_list: + for key, value in output.items(): + print(f"{key}: {value}") diff --git a/python/flink_agents/runtime/remote_execution_environment.py b/python/flink_agents/runtime/remote_execution_environment.py index a998bb4..ead1508 100644 --- a/python/flink_agents/runtime/remote_execution_environment.py +++ b/python/flink_agents/runtime/remote_execution_environment.py @@ -74,16 +74,13 @@ class RemoteAgentBuilder(AgentBuilder): if self.__agent_plan is not None: err_msg = "RemoteAgentBuilder doesn't support apply multiple agents yet." raise RuntimeError(err_msg) - self.__agent_plan = AgentPlan.from_agent(agent, self.__config) # inspect refer actions and resources from env to agent. - for type, names in agent._resource_names.items(): - if type not in agent.resources: - agent.resources[type] = {} - for name in names: - agent.resources[type][name] = self.__resources[type][name] + for type, name_to_resource in self.__resources.items(): + agent.resources[type] = name_to_resource | agent.resources[type] + + self.__agent_plan = AgentPlan.from_agent(agent, self.__config) - self.__agent_plan = AgentPlan.from_agent(agent) return self def to_datastream(