This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git

commit e9b56540f14f6a6f1ad2f63b40a5a554b5c98b41
Author: WenjinXie <wenjin...@gmail.com>
AuthorDate: Thu Aug 28 12:08:49 2025 +0800

    [api][python] Introduce built-in ReAct Agent.
---
 .../flink/agents/api/resource/ResourceType.java    |   2 +
 python/flink_agents/api/agents/__init__.py         |  17 ++
 python/flink_agents/api/agents/react_agent.py      | 278 +++++++++++++++++++++
 python/flink_agents/api/agents/tests/__init__.py   |  17 ++
 .../api/agents/tests/test_row_schema.py            |  35 +++
 python/flink_agents/api/execution_environment.py   |   6 +
 python/flink_agents/api/tests/test_prompt.py       |   8 +-
 python/flink_agents/examples/common_tools.py       |  53 ++++
 .../integrate_table_with_react_agent_example.py    | 136 ++++++++++
 .../flink_agents/examples/react_agent_example.py   |  88 +++++++
 .../runtime/remote_execution_environment.py        |  11 +-
 11 files changed, 642 insertions(+), 9 deletions(-)

diff --git 
a/api/src/main/java/org/apache/flink/agents/api/resource/ResourceType.java 
b/api/src/main/java/org/apache/flink/agents/api/resource/ResourceType.java
index 3b9f9d1..86f313d 100644
--- a/api/src/main/java/org/apache/flink/agents/api/resource/ResourceType.java
+++ b/api/src/main/java/org/apache/flink/agents/api/resource/ResourceType.java
@@ -25,6 +25,8 @@ package org.apache.flink.agents.api.resource;
  */
 public enum ResourceType {
     CHAT_MODEL("chat_model"),
+    CHAT_MODEL_CONNECTION("chat_model_connection"),
+    PROMPT("prompt"),
     TOOL("tool");
 
     private final String value;
diff --git a/python/flink_agents/api/agents/__init__.py 
b/python/flink_agents/api/agents/__init__.py
new file mode 100644
index 0000000..e154fad
--- /dev/null
+++ b/python/flink_agents/api/agents/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
diff --git a/python/flink_agents/api/agents/react_agent.py 
b/python/flink_agents/api/agents/react_agent.py
new file mode 100644
index 0000000..7bbf3c6
--- /dev/null
+++ b/python/flink_agents/api/agents/react_agent.py
@@ -0,0 +1,278 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import importlib
+import json
+from typing import Any, List, Optional, Union, cast
+
+from pydantic import BaseModel, ConfigDict, model_serializer, model_validator
+from pyflink.common import Row
+from pyflink.common.typeinfo import BasicType, BasicTypeInfo, RowTypeInfo
+
+from flink_agents.api.agent import Agent
+from flink_agents.api.chat_message import ChatMessage, MessageRole
+from flink_agents.api.chat_models.chat_model import BaseChatModelSetup
+from flink_agents.api.decorators import action
+from flink_agents.api.events.chat_event import ChatRequestEvent, 
ChatResponseEvent
+from flink_agents.api.events.event import InputEvent, OutputEvent
+from flink_agents.api.prompts.prompt import Prompt
+from flink_agents.api.resource import ResourceType
+from flink_agents.api.runner_context import RunnerContext
+
+_DEFAULT_CHAT_MODEL = "_default_chat_model"
+_DEFAULT_SCHEMA_PROMPT = "_default_schema_prompt"
+_DEFAULT_USER_PROMPT = "_default_user_prompt"
+_OUTPUT_SCHEMA = "_output_schema"
+
+
+class OutputSchema(BaseModel):
+    """Util class to help serialize and deserialize output schema json."""
+
+    model_config = ConfigDict(arbitrary_types_allowed=True)
+    output_schema: Union[type[BaseModel], RowTypeInfo]
+
+    @model_serializer
+    def __custom_serializer(self) -> dict[str, Any]:
+        if isinstance(self.output_schema, RowTypeInfo):
+            data = {
+                "output_schema": {
+                    "names": self.output_schema.get_field_names(),
+                    "types": [
+                        type._basic_type.value
+                        for type in self.output_schema.get_field_types()
+                    ],
+                },
+            }
+        else:
+            data = {
+                "output_schema": {
+                    "module": self.output_schema.__module__,
+                    "class": self.output_schema.__name__,
+                }
+            }
+        return data
+
+    @model_validator(mode="before")
+    def __custom_deserialize(self) -> "OutputSchema":
+        output_schema = self["output_schema"]
+        if isinstance(output_schema, dict):
+            if "names" in output_schema:
+                self["output_schema"] = RowTypeInfo(
+                    field_types=[
+                        BasicTypeInfo(BasicType(type))
+                        for type in output_schema["types"]
+                    ],
+                    field_names=output_schema["names"],
+                )
+            else:
+                module = importlib.import_module(output_schema["module"])
+                self["output_schema"] = getattr(module, output_schema["class"])
+        return self
+
+
+class ReActAgent(Agent):
+    """Built-in implementation of ReAct agent which is based on the function
+    call ability of llm.
+
+    This implementation is not based on the foundational ReAct paper which uses
+    prompt to force llm output contain <Thought>, <Action> and <Observation> 
and
+    extract tool calls by text parsing. For a more robust and feature-rich
+    implementation we use the tool/function call ability of current llm, and 
get
+    the tool calls from response directly.
+
+
+    Example:
+        ::
+
+            class OutputData(BaseModel):
+                result: int
+
+
+            env = AgentsExecutionEnvironment.get_execution_environment()
+
+            # register resource to execution environment
+            (
+                env.add_chat_model_connection(
+                    name="ollama", connection=OllamaChatModelConnection, 
model=model
+                )
+                .add_tool("add", add)
+                .add_tool("multiply", multiply)
+            )
+
+            # prepare prompt
+            prompt = Prompt.from_messages(
+                name="prompt",
+                messages=[
+                    ChatMessage(
+                        role=MessageRole.SYSTEM,
+                        content='An example of output is {"result": 30.32}.',
+                    ),
+                    ChatMessage(
+                        role=MessageRole.USER, content="What is ({a} + {b}) * 
{c}"
+                    ),
+                ],
+            )
+
+            # create ReAct agent.
+            agent = ReActAgent(
+                chat_model=OllamaChatModelSetup,
+                connection="ollama",
+                prompt=prompt,
+                tools=["add", "multiply"],
+                output_schema=OutputData,
+            )
+    """
+
+    def __init__(
+        self,
+        *,
+        chat_model_setup: type[BaseChatModelSetup],
+        connection: str,
+        prompt: Optional[Prompt] = None,
+        tools: Optional[List[str]] = None,
+        output_schema: Optional[Union[type[BaseModel], RowTypeInfo]] = None,
+        **kwargs: Any,
+    ) -> None:
+        """Init method of ReActAgent.
+
+        Parameters
+        ----------
+        chat_model_setup : BaseChatModelSetup
+            The type of the chat model setup used in this ReAct agent.
+        connection: str
+            The name of the chat model connection used in chat model setup. The
+            connection should be registered in environment.
+        prompt : Optional[Prompt] = None
+            Prompt to instruct the llm, could include input and output example,
+            task and so on.
+        tools : Optional[List[str]]
+            Tools names can be used in this ReAct agent. The tools should be 
registered
+            in environment.
+        output_schema : Optional[Union[type[BaseModel], RowTypeInfo]] = None
+            The schema should be RowTypeInfo or subclass of BaseModel. When 
user
+            provide output schema, ReAct agent will add system prompt to 
instruct
+            response format of llm, and add output parser according to the 
schema.
+        **kwargs: Any
+            The initialize arguments of chat_model_setup.
+        """
+        super().__init__()
+        settings = {
+            "name": _DEFAULT_CHAT_MODEL,
+            "connection": connection,
+            "tools": tools,
+        }
+        settings.update(kwargs)
+        self._resources[ResourceType.CHAT_MODEL][_DEFAULT_CHAT_MODEL] = (
+            chat_model_setup,
+            settings,
+        )
+
+        if output_schema:
+            if isinstance(output_schema, type) and issubclass(output_schema, 
BaseModel):
+                json_schema = output_schema.model_json_schema()
+            elif isinstance(output_schema, RowTypeInfo):
+                json_schema = str(output_schema)
+            else:
+                err_msg = f"Output schema {output_schema.__class__} is not 
supported."
+                raise TypeError(err_msg)
+            schema_prompt = f"The final response should be json format, and 
match the schema {json_schema}."
+            self._resources[ResourceType.PROMPT][_DEFAULT_SCHEMA_PROMPT] = (
+                Prompt.from_text(name="output_schema", text=schema_prompt)
+            )
+
+        if prompt:
+            self._resources[ResourceType.PROMPT][_DEFAULT_USER_PROMPT] = prompt
+
+        self.add_action(
+            name="stop_action",
+            events=[ChatResponseEvent],
+            func=self.stop_action,
+            output_schema=OutputSchema(output_schema=output_schema),
+        )
+
+    @action(InputEvent)
+    @staticmethod
+    def start_action(event: InputEvent, ctx: RunnerContext) -> None:
+        """Start action to format user input and send chat request event."""
+        usr_input = event.input
+
+        try:
+            prompt = cast(
+                "Prompt", ctx.get_resource(_DEFAULT_USER_PROMPT, 
ResourceType.PROMPT)
+            )
+        except KeyError:
+            prompt = None
+
+        if isinstance(usr_input, (bool, str, int, float, type(None))):
+            usr_input = str(usr_input)
+            if prompt:
+                usr_msgs = prompt.format_messages(
+                    role=MessageRole.USER, input=usr_input
+                )
+            else:
+                usr_msgs = [ChatMessage(role=MessageRole.USER, 
content=usr_input)]
+        else:
+            if not prompt:
+                err_msg = (
+                    f"Input type is {usr_input.__class__}, which is not 
primitive types. "
+                    f"User should provide prompt to help convert it to 
ChatMessage."
+                )
+                raise RuntimeError(err_msg)
+            if isinstance(usr_input, Row):
+                usr_input = usr_input.as_dict(recursive=True)
+            else:  # regard as pojo
+                usr_input = usr_input.__dict__
+            usr_msgs = prompt.format_messages(role=MessageRole.USER, 
**usr_input)
+
+        try:
+            schema_prompt = cast(
+                "Prompt", ctx.get_resource(_DEFAULT_SCHEMA_PROMPT, 
ResourceType.PROMPT)
+            )
+        except KeyError:
+            schema_prompt = None
+
+        if schema_prompt:
+            instruct = schema_prompt.format_messages()
+            usr_msgs = instruct + usr_msgs
+
+        ctx.send_event(
+            ChatRequestEvent(
+                model=_DEFAULT_CHAT_MODEL,
+                messages=usr_msgs,
+            )
+        )
+
+    @staticmethod
+    def stop_action(event: ChatResponseEvent, ctx: RunnerContext) -> None:
+        """Stop action to output result."""
+        output = event.response.content
+
+        # parse llm response to target schema.
+        # TODO: config error handle strategy by configuration.
+        output_schema = ctx.get_action_config_value(key="output_schema")
+        if output_schema:
+            output_schema = output_schema.output_schema
+            output = json.loads(output.strip())
+            if isinstance(output_schema, type) and issubclass(output_schema, 
BaseModel):
+                output = output_schema.model_validate(output)
+            elif isinstance(output_schema, RowTypeInfo):
+                field_names = output_schema.get_field_names()
+                values = {}
+                for field_name in field_names:
+                    values[field_name] = output[field_name]
+                output = Row(**values)
+        ctx.send_event(OutputEvent(output=output))
diff --git a/python/flink_agents/api/agents/tests/__init__.py 
b/python/flink_agents/api/agents/tests/__init__.py
new file mode 100644
index 0000000..e154fad
--- /dev/null
+++ b/python/flink_agents/api/agents/tests/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
diff --git a/python/flink_agents/api/agents/tests/test_row_schema.py 
b/python/flink_agents/api/agents/tests/test_row_schema.py
new file mode 100644
index 0000000..b3a663e
--- /dev/null
+++ b/python/flink_agents/api/agents/tests/test_row_schema.py
@@ -0,0 +1,35 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+
+from pyflink.common.typeinfo import BasicTypeInfo, RowTypeInfo
+
+from flink_agents.api.agents.react_agent import OutputSchema
+
+
+def test_output_schema_serializable() -> None:  # noqa: D103
+    schema = OutputSchema(
+        output_schema=RowTypeInfo(
+            [BasicTypeInfo.INT_TYPE_INFO()],
+            ["result"],
+        )
+    )
+
+    json_data = schema.model_dump_json()
+
+    deserialize_schema = OutputSchema.model_validate_json(json_data)
+    assert schema == deserialize_schema
diff --git a/python/flink_agents/api/execution_environment.py 
b/python/flink_agents/api/execution_environment.py
index e4d6eae..a942e96 100644
--- a/python/flink_agents/api/execution_environment.py
+++ b/python/flink_agents/api/execution_environment.py
@@ -24,7 +24,13 @@ from pyflink.datastream import DataStream, KeySelector, 
StreamExecutionEnvironme
 from pyflink.table import Schema, StreamTableEnvironment, Table
 
 from flink_agents.api.agent import Agent
+from flink_agents.api.chat_models.chat_model import (
+    BaseChatModelConnection,
+    BaseChatModelSetup,
+)
 from flink_agents.api.configuration import Configuration
+from flink_agents.api.prompts.prompt import Prompt
+from flink_agents.api.resource import ResourceType
 
 
 class AgentBuilder(ABC):
diff --git a/python/flink_agents/api/tests/test_prompt.py 
b/python/flink_agents/api/tests/test_prompt.py
index 011a20a..38342f9 100644
--- a/python/flink_agents/api/tests/test_prompt.py
+++ b/python/flink_agents/api/tests/test_prompt.py
@@ -122,6 +122,10 @@ def test_prompt_lack_one_argument(text_prompt: Prompt) -> 
None:  # noqa: D103
         "and user review is 'The headphones broke after one week of use. Very 
poor quality'."
     )
 
-def test_prompt_contain_json_schema() -> None: # noqa: D103
-    prompt = Prompt.from_text(name="prompt", text = f"The json schema is 
{Prompt.model_json_schema(mode='serialization')}")
+
+def test_prompt_contain_json_schema() -> None:  # noqa: D103
+    prompt = Prompt.from_text(
+        name="prompt",
+        text=f"The json schema is 
{Prompt.model_json_schema(mode='serialization')}",
+    )
     prompt.format_string()
diff --git a/python/flink_agents/examples/common_tools.py 
b/python/flink_agents/examples/common_tools.py
new file mode 100644
index 0000000..a165d9e
--- /dev/null
+++ b/python/flink_agents/examples/common_tools.py
@@ -0,0 +1,53 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+
+
+def add(a: int, b: int) -> int:
+    """Calculate the sum of a and b.
+
+    Parameters
+    ----------
+    a : int
+        The first operand
+    b : int
+        The second operand
+
+    Returns:
+    -------
+    int:
+        The sum of a and b
+    """
+    return a + b
+
+
+def multiply(a: int, b: int) -> int:
+    """Useful function to multiply two numbers.
+
+    Parameters
+    ----------
+    a : int
+        The first operand
+    b : int
+        The second operand
+
+    Returns:
+    -------
+    int:
+        The product of a and b
+    """
+    return a * b
diff --git 
a/python/flink_agents/examples/integrate_table_with_react_agent_example.py 
b/python/flink_agents/examples/integrate_table_with_react_agent_example.py
new file mode 100644
index 0000000..1467b4f
--- /dev/null
+++ b/python/flink_agents/examples/integrate_table_with_react_agent_example.py
@@ -0,0 +1,136 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import os
+from pathlib import Path
+
+from pyflink.common import Row
+from pyflink.common.typeinfo import BasicTypeInfo, ExternalTypeInfo, 
RowTypeInfo
+from pyflink.datastream import (
+    KeySelector,
+    StreamExecutionEnvironment,
+)
+from pyflink.table import DataTypes, Schema, StreamTableEnvironment, 
TableDescriptor
+
+from flink_agents.api.agents.react_agent import ReActAgent
+from flink_agents.api.chat_message import ChatMessage, MessageRole
+from flink_agents.api.execution_environment import AgentsExecutionEnvironment
+from flink_agents.api.prompts.prompt import Prompt
+from flink_agents.examples.common_tools import add, multiply
+from flink_agents.integrations.chat_models.ollama_chat_model import (
+    OllamaChatModelConnection,
+    OllamaChatModelSetup,
+)
+
+model = os.environ.get("OLLAMA_CHAT_MODEL", "qwen2.5:7b")
+
+
+class MyKeySelector(KeySelector):
+    """KeySelector for extracting key."""
+
+    def get_key(self, value: Row) -> int:
+        """Extract key from Row."""
+        return value[0]
+
+
+current_dir = Path(__file__).parent
+
+#TODO: Currently, this example may cause core dump when being executed, the 
root cause
+# may be known issue of pemja incorrect reference counting:
+# https://github.com/apache/flink-agents/issues/83
+if __name__ == "__main__":
+    stream_env = StreamExecutionEnvironment.get_execution_environment()
+
+    # should compile flink-agents jars before run this example.
+    stream_env.add_jars(
+        
f"file:///{current_dir}/../../../runtime/target/flink-agents-runtime-0.1-SNAPSHOT.jar"
+    )
+    stream_env.add_jars(
+        
f"file:///{current_dir}/../../../plan/target/flink-agents-plan-0.1-SNAPSHOT.jar"
+    )
+    stream_env.add_jars(
+        
f"file:///{current_dir}/../../../api/target/flink-agents-api-0.1-SNAPSHOT.jar"
+    )
+
+    stream_env.set_parallelism(1)
+
+    t_env = 
StreamTableEnvironment.create(stream_execution_environment=stream_env)
+
+    table = t_env.from_elements(
+        elements=[(1, 2, 3)],
+        schema=DataTypes.ROW(
+            [
+                DataTypes.FIELD("a", DataTypes.INT()),
+                DataTypes.FIELD("b", DataTypes.INT()),
+                DataTypes.FIELD("c", DataTypes.INT()),
+            ]
+        ),
+    )
+
+    env = AgentsExecutionEnvironment.get_execution_environment(env=stream_env)
+
+    # register resource to execution environment
+    (
+        env.add_chat_model_connection(
+            name="ollama", connection=OllamaChatModelConnection, model=model
+        )
+        .add_tool("add", add)
+        .add_tool("multiply", multiply)
+    )
+
+    # prepare prompt
+    prompt = Prompt.from_messages(
+        name="prompt",
+        messages=[
+            ChatMessage(
+                role=MessageRole.SYSTEM,
+                content='An example of output is {"result": 30.32}.',
+            ),
+            ChatMessage(role=MessageRole.USER, content="What is ({a} + {b}) * 
{c}"),
+        ],
+    )
+
+    output_type_info = RowTypeInfo(
+        [BasicTypeInfo.INT_TYPE_INFO()],
+        ["result"],
+    )
+
+    # create ReAct agent.
+    agent = ReActAgent(
+        chat_model_setup=OllamaChatModelSetup,
+        connection="ollama",
+        prompt=prompt,
+        tools=["add", "multiply"],
+        output_schema=output_type_info,
+    )
+
+    output_type = ExternalTypeInfo(output_type_info)
+
+    schema = (Schema.new_builder().column("result", DataTypes.INT())).build()
+
+    output_table = (
+        env.from_table(input=table, t_env=t_env, key_selector=MyKeySelector())
+        .apply(agent)
+        .to_table(schema=schema, output_type=output_type)
+    )
+
+    t_env.create_temporary_table(
+        "sink",
+        TableDescriptor.for_connector("print").schema(schema).build(),
+    )
+
+    output_table.execute_insert("sink").wait()
diff --git a/python/flink_agents/examples/react_agent_example.py 
b/python/flink_agents/examples/react_agent_example.py
new file mode 100644
index 0000000..7c603c7
--- /dev/null
+++ b/python/flink_agents/examples/react_agent_example.py
@@ -0,0 +1,88 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import os
+
+from pydantic import BaseModel
+
+from flink_agents.api.agents.react_agent import ReActAgent
+from flink_agents.api.chat_message import ChatMessage, MessageRole
+from flink_agents.api.execution_environment import AgentsExecutionEnvironment
+from flink_agents.api.prompts.prompt import Prompt
+from flink_agents.examples.common_tools import add, multiply
+from flink_agents.integrations.chat_models.ollama_chat_model import (
+    OllamaChatModelConnection,
+    OllamaChatModelSetup,
+)
+
+model = os.environ.get("OLLAMA_CHAT_MODEL", "qwen2.5:7b")
+
+
+class InputData(BaseModel):  # noqa: D101
+    a: int
+    b: int
+    c: int
+
+
+class OutputData(BaseModel):  # noqa: D101
+    result: int
+
+
+if __name__ == "__main__":
+    env = AgentsExecutionEnvironment.get_execution_environment()
+
+    # register resource to execution environment
+    (
+        env.add_chat_model_connection(
+            name="ollama", connection=OllamaChatModelConnection, model=model
+        )
+        .add_tool("add", add)
+        .add_tool("multiply", multiply)
+    )
+
+    # prepare prompt
+    prompt = Prompt.from_messages(
+        name="prompt",
+        messages=[
+            ChatMessage(
+                role=MessageRole.SYSTEM,
+                content='An example of output is {"result": 30.32}.',
+            ),
+            ChatMessage(role=MessageRole.USER, content="What is ({a} + {b}) * 
{c}"),
+        ],
+    )
+
+    # create ReAct agent.
+    agent = ReActAgent(
+        chat_model_setup=OllamaChatModelSetup,
+        connection="ollama",
+        prompt=prompt,
+        tools=["add", "multiply"],
+        output_schema=OutputData,
+    )
+
+    # execute agent
+    input_list = []
+
+    output_list = env.from_list(input_list).apply(agent).to_list()
+    input_list.append({"key": "0001", "value": InputData(a=2123, b=2321, 
c=312)})
+
+    env.execute()
+
+    for output in output_list:
+        for key, value in output.items():
+            print(f"{key}: {value}")
diff --git a/python/flink_agents/runtime/remote_execution_environment.py 
b/python/flink_agents/runtime/remote_execution_environment.py
index a998bb4..ead1508 100644
--- a/python/flink_agents/runtime/remote_execution_environment.py
+++ b/python/flink_agents/runtime/remote_execution_environment.py
@@ -74,16 +74,13 @@ class RemoteAgentBuilder(AgentBuilder):
         if self.__agent_plan is not None:
             err_msg = "RemoteAgentBuilder doesn't support apply multiple 
agents yet."
             raise RuntimeError(err_msg)
-        self.__agent_plan = AgentPlan.from_agent(agent, self.__config)
 
         # inspect refer actions and resources from env to agent.
-        for type, names in agent._resource_names.items():
-            if type not in agent.resources:
-                agent.resources[type] = {}
-            for name in names:
-                agent.resources[type][name] = self.__resources[type][name]
+        for type, name_to_resource in self.__resources.items():
+            agent.resources[type] = name_to_resource | agent.resources[type]
+
+        self.__agent_plan = AgentPlan.from_agent(agent, self.__config)
 
-        self.__agent_plan = AgentPlan.from_agent(agent)
         return self
 
     def to_datastream(

Reply via email to