This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git

commit b2667352fb6da84352127523fc118db393d5a493
Author: WenjinXie <[email protected]>
AuthorDate: Thu Nov 13 15:56:18 2025 +0800

    [test][python] Clean up and refactor python e2e tests.
---
 ..._example.py => chat_model_integration_agent.py} | 120 ++++------
 .../e2e_tests/chat_model_integration_test.py       |  84 +++++++
 .../{my_agent.py => flink_integration_agent.py}    |   9 +
 .../e2e_tests/flink_intergration_test.py           | 247 +++++++++++++++++++++
 .../e2e_tests/from_datastream_to_table.py          | 108 ---------
 .../integrate_datastream_with_agent_example.py     |  81 -------
 .../integrate_table_with_agent_example.py          | 108 ---------
 .../integrate_table_with_react_agent_example.py    | 127 -----------
 .../flink_agents/e2e_tests/mcp_test/mcp_server.py  |   1 +
 .../mcp_test/{mcp_example.py => mcp_test.py}       |  17 +-
 .../flink_agents/e2e_tests/react_agent_example.py  |  90 --------
 python/flink_agents/e2e_tests/react_agent_test.py  | 232 +++++++++++++++++++
 .../{common_tools.py => react_agent_tools.py}      |   0
 .../test_from_datastream_to_datastream.txt         |  10 +
 .../ground_truth/test_from_table_to_table.txt      |  10 +
 .../resources/ground_truth/test_workflow.txt       |  10 +
 .../e2e_tests/resources/{ => input}/input_data.txt |   0
 .../ollama_pull_model.sh}                          |  40 +---
 python/flink_agents/e2e_tests/test_utils.py        |  68 ++++++
 .../{agent_example.py => workflow_test.py}         |  34 ++-
 .../start_ollama_server.sh                         |  45 +---
 21 files changed, 766 insertions(+), 675 deletions(-)

diff --git a/python/flink_agents/e2e_tests/chat_model_example.py 
b/python/flink_agents/e2e_tests/chat_model_integration_agent.py
similarity index 69%
rename from python/flink_agents/e2e_tests/chat_model_example.py
rename to python/flink_agents/e2e_tests/chat_model_integration_agent.py
index 69c8813..0eaa9fc 100644
--- a/python/flink_agents/e2e_tests/chat_model_example.py
+++ b/python/flink_agents/e2e_tests/chat_model_integration_agent.py
@@ -16,7 +16,6 @@
 # limitations under the License.
 
#################################################################################
 import os
-from typing import List
 
 from flink_agents.api.agent import Agent
 from flink_agents.api.chat_message import ChatMessage, MessageRole
@@ -27,11 +26,7 @@ from flink_agents.api.decorators import (
     tool,
 )
 from flink_agents.api.events.chat_event import ChatRequestEvent, 
ChatResponseEvent
-from flink_agents.api.events.event import (
-    InputEvent,
-    OutputEvent,
-)
-from flink_agents.api.execution_environment import AgentsExecutionEnvironment
+from flink_agents.api.events.event import InputEvent, OutputEvent
 from flink_agents.api.resource import ResourceDescriptor
 from flink_agents.api.runner_context import RunnerContext
 from flink_agents.integrations.chat_models.ollama_chat_model import (
@@ -47,15 +42,18 @@ from 
flink_agents.integrations.chat_models.tongyi_chat_model import (
     TongyiChatModelSetup,
 )
 
-TONGYI_MODEL = os.environ.get("TONGYI_CHAT_MODEL", "qwen-plus")
-OLLAMA_MODEL = os.environ.get("OLLAMA_CHAT_MODEL", "qwen3:0.6b")
-OPENAI_MODEL = os.environ.get("OPENAI_CHAT_MODEL", "gpt-3.5-turbo")
-BACKENDS_TO_RUN: List[str] = ["Tongyi", "OpenAI", "Ollama"]
-
 
-class MyAgent(Agent):
+class ChatModelTestAgent(Agent):
     """Example agent demonstrating the new ChatModel architecture."""
 
+    @chat_model_connection
+    @staticmethod
+    def openai_connection() -> ResourceDescriptor:
+        """ChatModelConnection responsible for openai model service 
connection."""
+        return ResourceDescriptor(
+            clazz=OpenAIChatModelConnection, 
api_key=os.environ.get("OPENAI_API_KEY")
+        )
+
     @chat_model_connection
     @staticmethod
     def tongyi_connection() -> ResourceDescriptor:
@@ -66,68 +64,68 @@ class MyAgent(Agent):
     @staticmethod
     def ollama_connection() -> ResourceDescriptor:
         """ChatModelConnection responsible for ollama model service 
connection."""
-        return ResourceDescriptor(clazz=OllamaChatModelConnection)
-
-    @chat_model_connection
-    @staticmethod
-    def openai_connection() -> ResourceDescriptor:
-        """ChatModelConnection responsible for openai model service 
connection."""
         return ResourceDescriptor(
-            clazz=OpenAIChatModelConnection,
-            api_key=os.environ.get("OPENAI_API_KEY")
+            clazz=OllamaChatModelConnection, request_timeout=240.0
         )
 
     @chat_model_setup
     @staticmethod
     def math_chat_model() -> ResourceDescriptor:
         """ChatModel which focus on math, and reuse ChatModelConnection."""
-        if CURRENT_BACKEND == "Tongyi":
+        model_provider = os.environ.get("MODEL_PROVIDER")
+        if model_provider == "Tongyi":
             return ResourceDescriptor(
                 clazz=TongyiChatModelSetup,
                 connection="tongyi_connection",
-                model=TONGYI_MODEL,
+                model=os.environ.get("TONGYI_CHAT_MODEL", "qwen-plus"),
                 tools=["add"],
             )
-        elif CURRENT_BACKEND == "OpenAI":
-            return ResourceDescriptor(
-                clazz=OpenAIChatModelSetup,
-                connection="openai_connection",
-                model=OPENAI_MODEL,
-                tools=["add"]
-            )
-        else:
+        elif model_provider == "Ollama":
             return ResourceDescriptor(
                 clazz=OllamaChatModelSetup,
                 connection="ollama_connection",
-                model=OLLAMA_MODEL,
+                model=os.environ.get("OLLAMA_CHAT_MODEL", "qwen3:1.7b"),
                 tools=["add"],
                 extract_reasoning=True,
             )
+        elif model_provider == "OpenAI":
+            return ResourceDescriptor(
+                clazz=OpenAIChatModelSetup,
+                connection="openai_connection",
+                model=os.environ.get("OPENAI_CHAT_MODEL", "gpt-3.5-turbo"),
+                tools=["add"],
+            )
+        else:
+            err_msg = f"Unknown model_provider {model_provider}"
+            raise RuntimeError(err_msg)
 
     @chat_model_setup
     @staticmethod
     def creative_chat_model() -> ResourceDescriptor:
         """ChatModel which focus on text generate, and reuse 
ChatModelConnection."""
-        if CURRENT_BACKEND == "Tongyi":
+        model_provider = os.environ.get("MODEL_PROVIDER")
+        if model_provider == "Tongyi":
             return ResourceDescriptor(
                 clazz=TongyiChatModelSetup,
                 connection="tongyi_connection",
-                model=TONGYI_MODEL,
-            )
-        elif CURRENT_BACKEND == "OpenAI":
-            return ResourceDescriptor(
-                clazz=OpenAIChatModelSetup,
-                connection="openai_connection",
-                model=OPENAI_MODEL,
-                tools=["add"]
+                model=os.environ.get("TONGYI_CHAT_MODEL", "qwen-plus"),
             )
-        else:
+        elif model_provider == "Ollama":
             return ResourceDescriptor(
                 clazz=TongyiChatModelSetup,
                 connection="ollama_connection",
-                model=OLLAMA_MODEL,
+                model=os.environ.get("OLLAMA_CHAT_MODEL", "qwen3:1.7b"),
                 extract_reasoning=True,
             )
+        elif model_provider == "OpenAI":
+            return ResourceDescriptor(
+                clazz=OpenAIChatModelSetup,
+                connection="openai_connection",
+                model=os.environ.get("OPENAI_CHAT_MODEL", "gpt-3.5-turbo"),
+            )
+        else:
+            err_msg = f"Unknown model_provider {model_provider}"
+            raise RuntimeError(err_msg)
 
     @tool
     @staticmethod
@@ -175,41 +173,3 @@ class MyAgent(Agent):
         input = event.response
         if event.response and input.content:
             ctx.send_event(OutputEvent(output=input.content))
-
-
-if __name__ == "__main__":
-    for backend in BACKENDS_TO_RUN:
-        CURRENT_BACKEND = backend
-        if backend == "Tongyi":
-            CURRENT_MODEL = TONGYI_MODEL
-        elif backend == "OpenAI":
-            CURRENT_MODEL = OPENAI_MODEL
-        else:
-            CURRENT_MODEL = OLLAMA_MODEL
-
-        if backend == "Tongyi" and not os.environ.get("DASHSCOPE_API_KEY"):
-            print("[SKIP] TongyiChatModel because DASHSCOPE_API_KEY is not 
set.")
-            continue
-
-        if backend == "OpenAI" and not os.environ.get("OPENAI_API_KEY"):
-            print("[SKIP] OpenAIChatModel because OPENAI_API_KEY is not set.")
-            continue
-
-        print(
-            f"\nRunning {backend}ChatModel while the using model is 
{CURRENT_MODEL}..."
-        )
-
-        env = AgentsExecutionEnvironment.get_execution_environment()
-        input_list = []
-        agent = MyAgent()
-
-        output_list = env.from_list(input_list).apply(agent).to_list()
-
-        input_list.append({"key": "0001", "value": "calculate the sum of 1 and 
2."})
-        input_list.append({"key": "0002", "value": "Tell me a joke about 
cats."})
-
-        env.execute()
-
-        for output in output_list:
-            for key, value in output.items():
-                print(f"{key}: {value}")
diff --git a/python/flink_agents/e2e_tests/chat_model_integration_test.py 
b/python/flink_agents/e2e_tests/chat_model_integration_test.py
new file mode 100644
index 0000000..b582402
--- /dev/null
+++ b/python/flink_agents/e2e_tests/chat_model_integration_test.py
@@ -0,0 +1,84 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import os
+from pathlib import Path
+
+import pytest
+
+from flink_agents.api.execution_environment import AgentsExecutionEnvironment
+from flink_agents.e2e_tests.chat_model_integration_agent import 
ChatModelTestAgent
+from flink_agents.e2e_tests.test_utils import pull_model
+
+current_dir = Path(__file__).parent
+
+TONGYI_MODEL = os.environ.get("TONGYI_CHAT_MODEL", "qwen-plus")
+os.environ["TONGYI_CHAT_MODEL"] = TONGYI_MODEL
+OLLAMA_MODEL = os.environ.get("OLLAMA_CHAT_MODEL", "qwen3:1.7b")
+os.environ["OLLAMA_CHAT_MODEL"] = OLLAMA_MODEL
+OPENAI_MODEL = os.environ.get("OPENAI_CHAT_MODEL", "gpt-3.5-turbo")
+os.environ["OPENAI_CHAT_MODEL"] = OPENAI_MODEL
+
+DASHSCOPE_API_KEY = os.environ.get("DASHSCOPE_API_KEY")
+OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
+
+client = pull_model(OLLAMA_MODEL)
+
+
[email protected](
+    "model_provider",
+    [
+        pytest.param(
+            "Ollama",
+            marks=pytest.mark.skipif(
+                client is None,
+                reason="Ollama client is not available or test model is 
missing.",
+            ),
+        ),
+        pytest.param(
+            "Tongyi",
+            marks=pytest.mark.skipif(
+                DASHSCOPE_API_KEY is None, reason="Tongyi api key is not set."
+            ),
+        ),
+        pytest.param(
+            "OpenAI",
+            marks=pytest.mark.skipif(
+                OPENAI_API_KEY is None, reason="OpenAI api key is not set."
+            ),
+        ),
+    ],
+)
+def test_chat_model_integration(model_provider: str) -> None:  # noqa: D103
+    os.environ["MODEL_PROVIDER"] = model_provider
+    env = AgentsExecutionEnvironment.get_execution_environment()
+    input_list = []
+    agent = ChatModelTestAgent()
+
+    output_list = env.from_list(input_list).apply(agent).to_list()
+
+    input_list.append({"key": "0001", "value": "calculate the sum of 1 and 
2."})
+    input_list.append({"key": "0002", "value": "Tell me a joke about cats."})
+
+    env.execute()
+
+    for output in output_list:
+        for key, value in output.items():
+            print(f"{key}: {value}")
+
+    assert "3" in output_list[0]["0001"]
+    assert "cat" in output_list[1]["0002"]
diff --git a/python/flink_agents/e2e_tests/my_agent.py 
b/python/flink_agents/e2e_tests/flink_integration_agent.py
similarity index 95%
rename from python/flink_agents/e2e_tests/my_agent.py
rename to python/flink_agents/e2e_tests/flink_integration_agent.py
index d671ae7..4086a17 100644
--- a/python/flink_agents/e2e_tests/my_agent.py
+++ b/python/flink_agents/e2e_tests/flink_integration_agent.py
@@ -22,6 +22,7 @@ from typing import Any
 
 from pydantic import BaseModel
 from pyflink.common import Row
+from pyflink.datastream import KeySelector
 
 from flink_agents.api.agent import Agent
 from flink_agents.api.decorators import action, tool
@@ -53,6 +54,14 @@ class MyEvent(Event):  # noqa D101
     value: Any
 
 
+class MyKeySelector(KeySelector):
+    """KeySelector for extracting key."""
+
+    def get_key(self, value: ItemData) -> int:
+        """Extract key from ItemData."""
+        return value.id
+
+
 class DataStreamAgent(Agent):
     """Agent used for explaining integrating agents with DataStream.
 
diff --git a/python/flink_agents/e2e_tests/flink_intergration_test.py 
b/python/flink_agents/e2e_tests/flink_intergration_test.py
new file mode 100644
index 0000000..4f537ed
--- /dev/null
+++ b/python/flink_agents/e2e_tests/flink_intergration_test.py
@@ -0,0 +1,247 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import os
+import sysconfig
+from pathlib import Path
+
+from pyflink.common import Configuration, Encoder, WatermarkStrategy
+from pyflink.common.typeinfo import BasicTypeInfo, ExternalTypeInfo, 
RowTypeInfo, Types
+from pyflink.datastream import (
+    RuntimeExecutionMode,
+    StreamExecutionEnvironment,
+)
+from pyflink.datastream.connectors.file_system import (
+    FileSource,
+    StreamFormat,
+    StreamingFileSink,
+)
+from pyflink.table import DataTypes, Schema, StreamTableEnvironment, 
TableDescriptor
+
+from flink_agents.api.execution_environment import AgentsExecutionEnvironment
+from flink_agents.e2e_tests.flink_integration_agent import (
+    DataStreamAgent,
+    DataStreamToTableAgent,
+    ItemData,
+    MyKeySelector,
+    TableAgent,
+)
+from flink_agents.e2e_tests.test_utils import check_result
+
+current_dir = Path(__file__).parent
+
+os.environ["PYTHONPATH"] = sysconfig.get_paths()["purelib"]
+
+
+def test_from_datastream_to_datastream(tmp_path: Path) -> None:  # noqa: D103
+    config = Configuration()
+    # config.set_string("state.backend.type", "rocksdb")
+    # config.set_string("checkpointing.interval", "1s")
+    # config.set_string("restart-strategy.type", "disable")
+    env = StreamExecutionEnvironment.get_execution_environment(config)
+    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
+    env.set_parallelism(1)
+
+    # currently, bounded source is not supported due to runtime 
implementation, so
+    # we use continuous file source here.
+    input_datastream = env.from_source(
+        source=FileSource.for_record_stream_format(
+            StreamFormat.text_line_format(), 
f"file:///{current_dir}/resources/input"
+        ).build(),
+        watermark_strategy=WatermarkStrategy.no_watermarks(),
+        source_name="streaming_agent_example",
+    )
+
+    deserialize_datastream = input_datastream.map(
+        lambda x: ItemData.model_validate_json(x)
+    )
+
+    agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env)
+    output_datastream = (
+        agents_env.from_datastream(
+            input=deserialize_datastream, key_selector=MyKeySelector()
+        )
+        .apply(DataStreamAgent())
+        .to_datastream()
+    )
+
+    result_dir = tmp_path / "results"
+    result_dir.mkdir(parents=True, exist_ok=True)
+
+    output_datastream.map(lambda x: x.model_dump_json(), 
Types.STRING()).add_sink(
+        StreamingFileSink.for_row_format(
+            base_path=str(result_dir.absolute()),
+            encoder=Encoder.simple_string_encoder(),
+        ).build()
+    )
+
+    agents_env.execute()
+
+    check_result(
+        result_dir=result_dir,
+        groud_truth_dir=Path(
+            
f"{current_dir}/resources/ground_truth/test_from_datastream_to_datastream.txt"
+        ),
+    )
+
+
+def test_from_table_to_table(tmp_path: Path) -> None:  # noqa: D103
+    env = StreamExecutionEnvironment.get_execution_environment()
+
+    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
+    env.set_parallelism(1)
+
+    t_env = StreamTableEnvironment.create(stream_execution_environment=env)
+
+    t_env.create_temporary_table(
+        "source",
+        TableDescriptor.for_connector("filesystem")
+        .schema(
+            Schema.new_builder()
+            .column("id", DataTypes.BIGINT())
+            .column("review", DataTypes.STRING())
+            .column("review_score", DataTypes.FLOAT())
+            .build()
+        )
+        .option("format", "json")
+        .option("path", f"file:///{current_dir}/resources/input")
+        .build(),
+    )
+
+    table = t_env.from_path("source")
+
+    agents_env = AgentsExecutionEnvironment.get_execution_environment(
+        env=env, t_env=t_env
+    )
+
+    output_type = ExternalTypeInfo(
+        RowTypeInfo(
+            [
+                BasicTypeInfo.LONG_TYPE_INFO(),
+                BasicTypeInfo.STRING_TYPE_INFO(),
+                BasicTypeInfo.FLOAT_TYPE_INFO(),
+            ],
+            ["id", "review", "review_score"],
+        )
+    )
+
+    schema = (
+        Schema.new_builder()
+        .column("id", DataTypes.BIGINT())
+        .column("review", DataTypes.STRING())
+        .column("review_score", DataTypes.FLOAT())
+    ).build()
+
+    output_table = (
+        agents_env.from_table(input=table, key_selector=MyKeySelector())
+        .apply(TableAgent())
+        .to_table(schema=schema, output_type=output_type)
+    )
+
+    result_dir = tmp_path / "results"
+    result_dir.mkdir(parents=True, exist_ok=True)
+
+    t_env.create_temporary_table(
+        "sink",
+        TableDescriptor.for_connector("filesystem")
+        .option("path", str(result_dir.absolute()))
+        .format("json")
+        .schema(schema)
+        .build(),
+    )
+
+    output_table.execute_insert("sink").wait()
+
+    check_result(
+        result_dir=result_dir,
+        groud_truth_dir=Path(
+            
f"{current_dir}/resources/ground_truth/test_from_table_to_table.txt"
+        ),
+    )
+
+
+def test_from_datastream_to_table(tmp_path: Path) -> None:  # noqa: D103
+    env = StreamExecutionEnvironment.get_execution_environment()
+
+    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
+    env.set_parallelism(1)
+    t_env = StreamTableEnvironment.create(stream_execution_environment=env)
+
+    # currently, bounded source is not supported due to runtime 
implementation, so
+    # we use continuous file source here.
+    input_datastream = env.from_source(
+        source=FileSource.for_record_stream_format(
+            StreamFormat.text_line_format(), 
f"file:///{current_dir}/resources/input"
+        ).build(),
+        watermark_strategy=WatermarkStrategy.no_watermarks(),
+        source_name="streaming_agent_example",
+    )
+
+    deserialize_datastream = input_datastream.map(
+        lambda x: ItemData.model_validate_json(x)
+    )
+
+    agents_env = AgentsExecutionEnvironment.get_execution_environment(
+        env=env, t_env=t_env
+    )
+
+    output_type = ExternalTypeInfo(
+        RowTypeInfo(
+            [
+                BasicTypeInfo.LONG_TYPE_INFO(),
+                BasicTypeInfo.STRING_TYPE_INFO(),
+                BasicTypeInfo.FLOAT_TYPE_INFO(),
+            ],
+            ["id", "review", "review_score"],
+        )
+    )
+
+    schema = (
+        Schema.new_builder()
+        .column("id", DataTypes.BIGINT())
+        .column("review", DataTypes.STRING())
+        .column("review_score", DataTypes.FLOAT())
+    ).build()
+
+    output_table = (
+        agents_env.from_datastream(
+            input=deserialize_datastream, key_selector=MyKeySelector()
+        )
+        .apply(DataStreamToTableAgent())
+        .to_table(schema=schema, output_type=output_type)
+    )
+
+    result_dir = tmp_path / "results"
+    result_dir.mkdir(parents=True, exist_ok=True)
+
+    t_env.create_temporary_table(
+        "sink",
+        TableDescriptor.for_connector("filesystem")
+        .option("path", str(result_dir.absolute()))
+        .format("json")
+        .schema(schema)
+        .build(),
+    )
+
+    output_table.execute_insert("sink").wait()
+
+    check_result(
+        result_dir=result_dir,
+        groud_truth_dir=Path(
+            
f"{current_dir}/resources/ground_truth/test_from_table_to_table.txt"
+        ),
+    )
diff --git a/python/flink_agents/e2e_tests/from_datastream_to_table.py 
b/python/flink_agents/e2e_tests/from_datastream_to_table.py
deleted file mode 100644
index 02d7151..0000000
--- a/python/flink_agents/e2e_tests/from_datastream_to_table.py
+++ /dev/null
@@ -1,108 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-#################################################################################
-from pathlib import Path
-
-from pyflink.common import Duration, WatermarkStrategy
-from pyflink.common.typeinfo import BasicTypeInfo, ExternalTypeInfo, 
RowTypeInfo
-from pyflink.datastream import (
-    KeySelector,
-    RuntimeExecutionMode,
-    StreamExecutionEnvironment,
-)
-from pyflink.datastream.connectors.file_system import FileSource, StreamFormat
-from pyflink.table import DataTypes, Schema, StreamTableEnvironment, 
TableDescriptor
-
-from flink_agents.api.execution_environment import AgentsExecutionEnvironment
-from flink_agents.e2e_tests.my_agent import (
-    DataStreamToTableAgent,
-    ItemData,
-)
-
-
-class MyKeySelector(KeySelector):
-    """KeySelector for extracting key."""
-
-    def get_key(self, value: ItemData) -> int:
-        """Extract key from ItemData."""
-        return value.id
-
-
-current_dir = Path(__file__).parent
-
-# if this example raises exception "No module named 'flink_agents'", you could 
set
-# PYTHONPATH like "os.environ["PYTHONPATH"] = ($VENV_HOME/lib/$PYTHON_VERSION/
-# site-packages) in this file.
-if __name__ == "__main__":
-    env = StreamExecutionEnvironment.get_execution_environment()
-
-    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
-    env.set_parallelism(1)
-    t_env = StreamTableEnvironment.create(stream_execution_environment=env)
-
-    # currently, bounded source is not supported due to runtime 
implementation, so
-    # we use continuous file source here.
-    input_datastream = env.from_source(
-        source=FileSource.for_record_stream_format(
-            StreamFormat.text_line_format(), f"file:///{current_dir}/resources"
-        )
-        .monitor_continuously(Duration.of_minutes(1))
-        .build(),
-        watermark_strategy=WatermarkStrategy.no_watermarks(),
-        source_name="streaming_agent_example",
-    )
-
-    deserialize_datastream = input_datastream.map(
-        lambda x: ItemData.model_validate_json(x)
-    )
-
-    agents_env = AgentsExecutionEnvironment.get_execution_environment(
-        env=env, t_env=t_env
-    )
-
-    output_type = ExternalTypeInfo(
-        RowTypeInfo(
-            [
-                BasicTypeInfo.LONG_TYPE_INFO(),
-                BasicTypeInfo.STRING_TYPE_INFO(),
-                BasicTypeInfo.FLOAT_TYPE_INFO(),
-            ],
-            ["id", "review", "review_score"],
-        )
-    )
-
-    schema = (
-        Schema.new_builder()
-        .column("id", DataTypes.BIGINT())
-        .column("review", DataTypes.STRING())
-        .column("review_score", DataTypes.FLOAT())
-    ).build()
-
-    output_table = (
-        agents_env.from_datastream(
-            input=deserialize_datastream, key_selector=MyKeySelector()
-        )
-        .apply(DataStreamToTableAgent())
-        .to_table(schema=schema, output_type=output_type)
-    )
-
-    t_env.create_temporary_table(
-        "sink",
-        TableDescriptor.for_connector("print").schema(schema).build(),
-    )
-
-    output_table.execute_insert("sink").wait()
diff --git 
a/python/flink_agents/e2e_tests/integrate_datastream_with_agent_example.py 
b/python/flink_agents/e2e_tests/integrate_datastream_with_agent_example.py
deleted file mode 100644
index b88e96f..0000000
--- a/python/flink_agents/e2e_tests/integrate_datastream_with_agent_example.py
+++ /dev/null
@@ -1,81 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-#################################################################################
-from pathlib import Path
-
-from pyflink.common import Configuration, Duration, WatermarkStrategy
-from pyflink.datastream import (
-    KeySelector,
-    RuntimeExecutionMode,
-    StreamExecutionEnvironment,
-)
-from pyflink.datastream.connectors.file_system import FileSource, StreamFormat
-
-from flink_agents.api.execution_environment import AgentsExecutionEnvironment
-from flink_agents.e2e_tests.my_agent import DataStreamAgent, ItemData
-
-
-class MyKeySelector(KeySelector):
-    """KeySelector for extracting key."""
-
-    def get_key(self, value: ItemData) -> int:
-        """Extract key from ItemData."""
-        return value.id
-
-
-current_dir = Path(__file__).parent
-
-# if this example raises exception "No module named 'flink_agents'", you could 
set
-# PYTHONPATH like "os.environ["PYTHONPATH"] = ($VENV_HOME/lib/$PYTHON_VERSION/
-# site-packages) in this file.
-if __name__ == "__main__":
-    config = Configuration()
-    config.set_string("state.backend.type", "rocksdb")
-    config.set_string("checkpointing.interval", "1s")
-    config.set_string("restart-strategy.type", "disable")
-    env = StreamExecutionEnvironment.get_execution_environment(config)
-    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
-    env.set_parallelism(1)
-
-    # currently, bounded source is not supported due to runtime 
implementation, so
-    # we use continuous file source here.
-    input_datastream = env.from_source(
-        source=FileSource.for_record_stream_format(
-            StreamFormat.text_line_format(), f"file:///{current_dir}/resources"
-        )
-        .monitor_continuously(Duration.of_minutes(1))
-        .build(),
-        watermark_strategy=WatermarkStrategy.no_watermarks(),
-        source_name="streaming_agent_example",
-    )
-
-    deserialize_datastream = input_datastream.map(
-        lambda x: ItemData.model_validate_json(x)
-    )
-
-    agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env)
-    output_datastream = (
-        agents_env.from_datastream(
-            input=deserialize_datastream, key_selector=MyKeySelector()
-        )
-        .apply(DataStreamAgent())
-        .to_datastream()
-    )
-
-    output_datastream.print()
-
-    agents_env.execute()
diff --git 
a/python/flink_agents/e2e_tests/integrate_table_with_agent_example.py 
b/python/flink_agents/e2e_tests/integrate_table_with_agent_example.py
deleted file mode 100644
index 5983ae7..0000000
--- a/python/flink_agents/e2e_tests/integrate_table_with_agent_example.py
+++ /dev/null
@@ -1,108 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-#################################################################################
-from pathlib import Path
-
-from pyflink.common import Row
-from pyflink.common.typeinfo import BasicTypeInfo, ExternalTypeInfo, 
RowTypeInfo
-from pyflink.datastream import (
-    KeySelector,
-    RuntimeExecutionMode,
-    StreamExecutionEnvironment,
-)
-from pyflink.table import (
-    DataTypes,
-    Schema,
-    StreamTableEnvironment,
-    TableDescriptor,
-)
-
-from flink_agents.api.execution_environment import AgentsExecutionEnvironment
-from flink_agents.e2e_tests.my_agent import TableAgent
-
-current_dir = Path(__file__).parent
-
-
-class MyKeySelector(KeySelector):
-    """KeySelector for extracting key."""
-
-    def get_key(self, value: Row) -> int:
-        """Extract key from Row."""
-        return value[0]
-
-
-# if this example raises exception "No module named 'flink_agents'", you could 
set
-# PYTHONPATH like "os.environ["PYTHONPATH"] = ($VENV_HOME/lib/$PYTHON_VERSION/
-# site-packages) in this file.
-if __name__ == "__main__":
-    env = StreamExecutionEnvironment.get_execution_environment()
-
-    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
-    env.set_parallelism(1)
-
-    t_env = StreamTableEnvironment.create(stream_execution_environment=env)
-
-    t_env.create_temporary_table(
-        "source",
-        TableDescriptor.for_connector("filesystem")
-        .schema(
-            Schema.new_builder()
-            .column("id", DataTypes.BIGINT())
-            .column("review", DataTypes.STRING())
-            .column("review_score", DataTypes.FLOAT())
-            .build()
-        )
-        .option("format", "json")
-        .option("path", f"file:///{current_dir}/resources")
-        .option("source.monitor-interval", "60s")
-        .build(),
-    )
-
-    table = t_env.from_path("source")
-
-    agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env, 
t_env=t_env)
-
-    output_type = ExternalTypeInfo(
-        RowTypeInfo(
-            [
-                BasicTypeInfo.LONG_TYPE_INFO(),
-                BasicTypeInfo.STRING_TYPE_INFO(),
-                BasicTypeInfo.FLOAT_TYPE_INFO(),
-            ],
-            ["id", "review", "review_score"],
-        )
-    )
-
-    schema = (
-        Schema.new_builder()
-        .column("id", DataTypes.BIGINT())
-        .column("review", DataTypes.STRING())
-        .column("review_score", DataTypes.FLOAT())
-    ).build()
-
-    output_table = (
-        agents_env.from_table(input=table, key_selector=MyKeySelector())
-        .apply(TableAgent())
-        .to_table(schema=schema, output_type=output_type)
-    )
-
-    t_env.create_temporary_table(
-        "sink",
-        TableDescriptor.for_connector("print").schema(schema).build(),
-    )
-
-    output_table.execute_insert("sink").wait()
diff --git 
a/python/flink_agents/e2e_tests/integrate_table_with_react_agent_example.py 
b/python/flink_agents/e2e_tests/integrate_table_with_react_agent_example.py
deleted file mode 100644
index 30aaeb3..0000000
--- a/python/flink_agents/e2e_tests/integrate_table_with_react_agent_example.py
+++ /dev/null
@@ -1,127 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-#################################################################################
-import os
-from pathlib import Path
-
-from pyflink.common import Row
-from pyflink.common.typeinfo import BasicTypeInfo, ExternalTypeInfo, 
RowTypeInfo
-from pyflink.datastream import (
-    KeySelector,
-    StreamExecutionEnvironment,
-)
-from pyflink.table import DataTypes, Schema, StreamTableEnvironment, 
TableDescriptor
-
-from flink_agents.api.agents.react_agent import ReActAgent
-from flink_agents.api.chat_message import ChatMessage, MessageRole
-from flink_agents.api.execution_environment import AgentsExecutionEnvironment
-from flink_agents.api.prompts.prompt import Prompt
-from flink_agents.api.resource import ResourceDescriptor
-from flink_agents.api.tools.tool import Tool
-from flink_agents.e2e_tests.common_tools import add, multiply
-from flink_agents.integrations.chat_models.ollama_chat_model import (
-    OllamaChatModelConnection,
-    OllamaChatModelSetup,
-)
-
-model = os.environ.get("OLLAMA_CHAT_MODEL", "qwen2.5:7b")
-
-
-class MyKeySelector(KeySelector):
-    """KeySelector for extracting key."""
-
-    def get_key(self, value: Row) -> int:
-        """Extract key from Row."""
-        return value[0]
-
-
-current_dir = Path(__file__).parent
-
-# TODO: Currently, this example may cause core dump when being executed, the 
root cause
-# may be known issue of pemja incorrect reference counting:
-# https://github.com/apache/flink-agents/issues/83
-if __name__ == "__main__":
-    stream_env = StreamExecutionEnvironment.get_execution_environment()
-
-    stream_env.set_parallelism(1)
-
-    t_env = 
StreamTableEnvironment.create(stream_execution_environment=stream_env)
-
-    table = t_env.from_elements(
-        elements=[(1, 2, 3)],
-        schema=DataTypes.ROW(
-            [
-                DataTypes.FIELD("a", DataTypes.INT()),
-                DataTypes.FIELD("b", DataTypes.INT()),
-                DataTypes.FIELD("c", DataTypes.INT()),
-            ]
-        ),
-    )
-
-    env = AgentsExecutionEnvironment.get_execution_environment(env=stream_env, 
t_env=t_env)
-
-    # register resource to execution environment
-    (
-        env.add_resource("ollama", 
ResourceDescriptor(clazz=OllamaChatModelConnection))
-        .add_resource("add", Tool.from_callable(add))
-        .add_resource("multiply", Tool.from_callable(multiply))
-    )
-
-    # prepare prompt
-    prompt = Prompt.from_messages(
-        messages=[
-            ChatMessage(
-                role=MessageRole.SYSTEM,
-                content='An example of output is {"result": 30.32}.',
-            ),
-            ChatMessage(role=MessageRole.USER, content="What is ({a} + {b}) * 
{c}"),
-        ],
-    )
-
-    output_type_info = RowTypeInfo(
-        [BasicTypeInfo.INT_TYPE_INFO()],
-        ["result"],
-    )
-
-    # create ReAct agent.
-    agent = ReActAgent(
-        chat_model=ResourceDescriptor(
-            clazz=OllamaChatModelSetup,
-            connection="ollama",
-            model=model,
-            tools=["add", "multiply"],
-        ),
-        prompt=prompt,
-        output_schema=output_type_info,
-    )
-
-    output_type = ExternalTypeInfo(output_type_info)
-
-    schema = (Schema.new_builder().column("result", DataTypes.INT())).build()
-
-    output_table = (
-        env.from_table(input=table, key_selector=MyKeySelector())
-        .apply(agent)
-        .to_table(schema=schema, output_type=output_type)
-    )
-
-    t_env.create_temporary_table(
-        "sink",
-        TableDescriptor.for_connector("print").schema(schema).build(),
-    )
-
-    output_table.execute_insert("sink").wait()
diff --git a/python/flink_agents/e2e_tests/mcp_test/mcp_server.py 
b/python/flink_agents/e2e_tests/mcp_test/mcp_server.py
index 2ec0e12..7bfbc0a 100644
--- a/python/flink_agents/e2e_tests/mcp_test/mcp_server.py
+++ b/python/flink_agents/e2e_tests/mcp_test/mcp_server.py
@@ -16,6 +16,7 @@
 #  limitations under the License.
 
################################################################################
 """MCP server providing prompts and tools for calculation tasks."""
+
 import dotenv
 from mcp.server.fastmcp import FastMCP
 
diff --git a/python/flink_agents/e2e_tests/mcp_test/mcp_example.py 
b/python/flink_agents/e2e_tests/mcp_test/mcp_test.py
similarity index 92%
rename from python/flink_agents/e2e_tests/mcp_test/mcp_example.py
rename to python/flink_agents/e2e_tests/mcp_test/mcp_test.py
index 6c68762..ea54c40 100644
--- a/python/flink_agents/e2e_tests/mcp_test/mcp_example.py
+++ b/python/flink_agents/e2e_tests/mcp_test/mcp_test.py
@@ -25,12 +25,14 @@ This example shows how to:
 Prerequisites:
 - Run the MCP server first: mcp_server.py
 """
+
 import multiprocessing
 import os
 import runpy
 import time
 from pathlib import Path
 
+import pytest
 from pydantic import BaseModel
 
 from flink_agents.api.agent import Agent
@@ -47,12 +49,13 @@ from flink_agents.api.execution_environment import 
AgentsExecutionEnvironment
 from flink_agents.api.resource import ResourceDescriptor
 from flink_agents.api.runner_context import RunnerContext
 from flink_agents.api.tools.mcp import MCPServer
+from flink_agents.e2e_tests.test_utils import pull_model
 from flink_agents.integrations.chat_models.ollama_chat_model import (
     OllamaChatModelConnection,
     OllamaChatModelSetup,
 )
 
-OLLAMA_MODEL = os.environ.get("OLLAMA_CHAT_MODEL", "qwen3:8b")
+OLLAMA_MODEL = os.environ.get("MCP_OLLAMA_CHAT_MODEL", "qwen3:1.7b")
 MCP_SERVER_ENDPOINT = "http://127.0.0.1:8000/mcp";
 
 
@@ -76,7 +79,9 @@ class MyMCPAgent(Agent):
     @staticmethod
     def ollama_connection() -> ResourceDescriptor:
         """ChatModelConnection for Ollama."""
-        return ResourceDescriptor(clazz=OllamaChatModelConnection)
+        return ResourceDescriptor(
+            clazz=OllamaChatModelConnection, request_timeout=240.0
+        )
 
     @chat_model_setup
     @staticmethod
@@ -125,7 +130,13 @@ def run_mcp_server() -> None:
 
 current_dir = Path(__file__).parent
 
-if __name__ == "__main__":
+client = pull_model(OLLAMA_MODEL)
+
+
[email protected](
+    client is None, reason="Ollama client is not available or test model is 
missing"
+)
+def test_mcp() -> None:  # noqa:D103
     # Start MCP server in background
     print("Starting MCP server...")
     server_process = multiprocessing.Process(target=run_mcp_server)
diff --git a/python/flink_agents/e2e_tests/react_agent_example.py 
b/python/flink_agents/e2e_tests/react_agent_example.py
deleted file mode 100644
index 6cfbfa4..0000000
--- a/python/flink_agents/e2e_tests/react_agent_example.py
+++ /dev/null
@@ -1,90 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-#################################################################################
-import os
-
-from pydantic import BaseModel
-
-from flink_agents.api.agents.react_agent import ReActAgent
-from flink_agents.api.chat_message import ChatMessage, MessageRole
-from flink_agents.api.execution_environment import AgentsExecutionEnvironment
-from flink_agents.api.prompts.prompt import Prompt
-from flink_agents.api.resource import ResourceDescriptor
-from flink_agents.api.tools.tool import Tool
-from flink_agents.e2e_tests.common_tools import add, multiply
-from flink_agents.integrations.chat_models.ollama_chat_model import (
-    OllamaChatModelConnection,
-    OllamaChatModelSetup,
-)
-
-model = os.environ.get("OLLAMA_CHAT_MODEL", "qwen2.5:7b")
-
-
-class InputData(BaseModel):  # noqa: D101
-    a: int
-    b: int
-    c: int
-
-
-class OutputData(BaseModel):  # noqa: D101
-    result: int
-
-
-if __name__ == "__main__":
-    env = AgentsExecutionEnvironment.get_execution_environment()
-
-    # register resource to execution environment
-    (
-        env.add_resource("ollama", 
ResourceDescriptor(clazz=OllamaChatModelConnection))
-        .add_resource("add", Tool.from_callable(add))
-        .add_resource("multiply", Tool.from_callable(multiply))
-    )
-
-    # prepare prompt
-    prompt = Prompt.from_messages(
-        messages=[
-            ChatMessage(
-                role=MessageRole.SYSTEM,
-                content='An example of output is {"result": 30.32}.',
-            ),
-            ChatMessage(role=MessageRole.USER, content="What is ({a} + {b}) * 
{c}"),
-        ],
-    )
-
-    # create ReAct agent.
-    agent = ReActAgent(
-        chat_model=ResourceDescriptor(
-            clazz=OllamaChatModelSetup,
-            connection="ollama",
-            model=model,
-            tools=["add", "multiply"],
-        ),
-        prompt=prompt,
-        output_schema=OutputData,
-    )
-
-    # execute agent
-    input_list = []
-
-    output_list = env.from_list(input_list).apply(agent).to_list()
-    input_list.append({"key": "0001", "value": InputData(a=2123, b=2321, 
c=312)})
-
-    env.execute()
-
-    for output in output_list:
-        for key, value in output.items():
-            print(f"{key}: {value}")
diff --git a/python/flink_agents/e2e_tests/react_agent_test.py 
b/python/flink_agents/e2e_tests/react_agent_test.py
new file mode 100644
index 0000000..61d0154
--- /dev/null
+++ b/python/flink_agents/e2e_tests/react_agent_test.py
@@ -0,0 +1,232 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import json
+import os
+import sysconfig
+from pathlib import Path
+
+import pytest
+from pydantic import BaseModel
+from pyflink.common import Row
+from pyflink.common.typeinfo import BasicTypeInfo, ExternalTypeInfo, 
RowTypeInfo
+from pyflink.datastream import KeySelector, StreamExecutionEnvironment
+from pyflink.table import DataTypes, Schema, StreamTableEnvironment, 
TableDescriptor
+
+from flink_agents.api.agents.react_agent import (
+    ErrorHandlingStrategy,
+    ReActAgent,
+    ReActAgentOptions,
+)
+from flink_agents.api.chat_message import ChatMessage, MessageRole
+from flink_agents.api.execution_environment import AgentsExecutionEnvironment
+from flink_agents.api.prompts.prompt import Prompt
+from flink_agents.api.resource import ResourceDescriptor
+from flink_agents.api.tools.tool import Tool
+from flink_agents.e2e_tests.react_agent_tools import add, multiply
+from flink_agents.e2e_tests.test_utils import pull_model
+from flink_agents.integrations.chat_models.ollama_chat_model import (
+    OllamaChatModelConnection,
+    OllamaChatModelSetup,
+)
+
+current_dir = Path(__file__).parent
+
+os.environ["PYTHONPATH"] = sysconfig.get_paths()["purelib"]
+
+OLLAMA_MODEL = os.environ.get("REACT_OLLAMA_MODEL", "qwen3:1.7b")
+os.environ["OLLAMA_CHAT_MODEL"] = OLLAMA_MODEL
+
+
+class InputData(BaseModel):  # noqa: D101
+    a: int
+    b: int
+    c: int
+
+
+class OutputData(BaseModel):  # noqa: D101
+    result: int
+
+
+class MyKeySelector(KeySelector):
+    """KeySelector for extracting key."""
+
+    def get_key(self, value: Row) -> int:
+        """Extract key from Row."""
+        return value[0]
+
+
+client = pull_model(OLLAMA_MODEL)
+
+
[email protected](
+    client is None, reason="Ollama client is not available or test model is 
missing"
+)
+def test_react_agent_on_local_runner() -> None:  # noqa: D103
+    env = AgentsExecutionEnvironment.get_execution_environment()
+    env.get_config().set(
+        ReActAgentOptions.ERROR_HANDLING_STRATEGY, ErrorHandlingStrategy.IGNORE
+    )
+
+    # register resource to execution environment
+    (
+        env.add_resource(
+            "ollama",
+            ResourceDescriptor(clazz=OllamaChatModelConnection, 
request_timeout=240.0),
+        )
+        .add_resource("add", Tool.from_callable(add))
+        .add_resource("multiply", Tool.from_callable(multiply))
+    )
+
+    # prepare prompt
+    prompt = Prompt.from_messages(
+        messages=[
+            ChatMessage(
+                role=MessageRole.SYSTEM,
+                content='An example of output is {"result": 30.32}.',
+            ),
+            ChatMessage(role=MessageRole.USER, content="What is ({a} + {b}) * 
{c}"),
+        ],
+    )
+
+    # create ReAct agent.
+    agent = ReActAgent(
+        chat_model=ResourceDescriptor(
+            clazz=OllamaChatModelSetup,
+            connection="ollama",
+            model=OLLAMA_MODEL,
+            tools=["add", "multiply"],
+        ),
+        prompt=prompt,
+        output_schema=OutputData,
+    )
+
+    # execute agent
+    input_list = []
+
+    output_list = env.from_list(input_list).apply(agent).to_list()
+    input_list.append({"key": "0001", "value": InputData(a=2123, b=2321, 
c=312)})
+
+    env.execute()
+
+    assert len(output_list) == 1, (
+        "This may be caused by the LLM response does not match the output 
schema, you can rerun this case."
+    )
+    assert output_list[0]["0001"].result == 1386528
+
+
[email protected](
+    client is None, reason="Ollama client is not available or test model is 
missing"
+)
+def test_react_agent_on_remote_runner(tmp_path: Path) -> None:  # noqa: D103
+    stream_env = StreamExecutionEnvironment.get_execution_environment()
+
+    stream_env.set_parallelism(1)
+
+    t_env = 
StreamTableEnvironment.create(stream_execution_environment=stream_env)
+
+    table = t_env.from_elements(
+        elements=[(1, 2, 3)],
+        schema=DataTypes.ROW(
+            [
+                DataTypes.FIELD("a", DataTypes.INT()),
+                DataTypes.FIELD("b", DataTypes.INT()),
+                DataTypes.FIELD("c", DataTypes.INT()),
+            ]
+        ),
+    )
+
+    env = AgentsExecutionEnvironment.get_execution_environment(
+        env=stream_env, t_env=t_env
+    )
+
+    env.get_config().set(
+        ReActAgentOptions.ERROR_HANDLING_STRATEGY, ErrorHandlingStrategy.IGNORE
+    )
+
+    # register resource to execution environment
+    (
+        env.add_resource(
+            "ollama",
+            ResourceDescriptor(clazz=OllamaChatModelConnection, 
request_timeout=240.0),
+        )
+        .add_resource("add", Tool.from_callable(add))
+        .add_resource("multiply", Tool.from_callable(multiply))
+    )
+
+    # prepare prompt
+    prompt = Prompt.from_messages(
+        messages=[
+            ChatMessage(
+                role=MessageRole.SYSTEM,
+                content='An example of output is {"result": 30.32}.',
+            ),
+            ChatMessage(role=MessageRole.USER, content="What is ({a} + {b}) * 
{c}"),
+        ],
+    )
+
+    output_type_info = RowTypeInfo(
+        [BasicTypeInfo.INT_TYPE_INFO()],
+        ["result"],
+    )
+
+    # create ReAct agent.
+    agent = ReActAgent(
+        chat_model=ResourceDescriptor(
+            clazz=OllamaChatModelSetup,
+            connection="ollama",
+            model=OLLAMA_MODEL,
+            tools=["add", "multiply"],
+        ),
+        prompt=prompt,
+        output_schema=output_type_info,
+    )
+
+    output_type = ExternalTypeInfo(output_type_info)
+
+    schema = (Schema.new_builder().column("result", DataTypes.INT())).build()
+
+    output_table = (
+        env.from_table(input=table, key_selector=MyKeySelector())
+        .apply(agent)
+        .to_table(schema=schema, output_type=output_type)
+    )
+
+    result_dir = tmp_path / "results"
+    result_dir.mkdir(parents=True, exist_ok=True)
+
+    t_env.create_temporary_table(
+        "sink",
+        TableDescriptor.for_connector("filesystem")
+        .option("path", str(result_dir.absolute()))
+        .format("json")
+        .schema(schema)
+        .build(),
+    )
+
+    output_table.execute_insert("sink").wait()
+
+    actual_result = []
+    for file in result_dir.iterdir():
+        if file.is_file():
+            with file.open() as f:
+                actual_result.extend(f.readlines())
+
+    assert len(actual_result) == 1, (
+        "This may be caused by the LLM response does not match the output 
schema, you can rerun this case."
+    )
+    assert "result" in json.loads(actual_result[0].strip())
diff --git a/python/flink_agents/e2e_tests/common_tools.py 
b/python/flink_agents/e2e_tests/react_agent_tools.py
similarity index 100%
copy from python/flink_agents/e2e_tests/common_tools.py
copy to python/flink_agents/e2e_tests/react_agent_tools.py
diff --git 
a/python/flink_agents/e2e_tests/resources/ground_truth/test_from_datastream_to_datastream.txt
 
b/python/flink_agents/e2e_tests/resources/ground_truth/test_from_datastream_to_datastream.txt
new file mode 100644
index 0000000..f94deee
--- /dev/null
+++ 
b/python/flink_agents/e2e_tests/resources/ground_truth/test_from_datastream_to_datastream.txt
@@ -0,0 +1,10 @@
+{"id":3,"review":"Highly satisfied with the performance and value for money. 
first action, log success=True, second action call my 
tool","review_score":7.0,"memory_info":{"total_reviews":1}}
+{"id":1,"review":"Great product! Works perfectly and lasts a long time. first 
action, log success=True, second action call my 
tool","review_score":3.0,"memory_info":{"total_reviews":1}}
+{"id":2,"review":"The item arrived damaged, and the packaging was poor. first 
action, log success=True, second action call my 
tool","review_score":5.0,"memory_info":{"total_reviews":1}}
+{"id":2,"review":"Too complicated to set up. Instructions were unclear. first 
action, log success=True, second action call my 
tool","review_score":8.0,"memory_info":{"total_reviews":2}}
+{"id":1,"review":"Fast shipping and excellent customer service. Would buy 
again! first action, log success=True, second action call my 
tool","review_score":8.0,"memory_info":{"total_reviews":2}}
+{"id":1,"review":"Exactly what I needed. Easy to use and very reliable. first 
action, log success=True, second action call my 
tool","review_score":8.0,"memory_info":{"total_reviews":3}}
+{"id":2,"review":"Good quality, but overview_scored for what it does. first 
action, log success=True, second action call my 
tool","review_score":8.0,"memory_info":{"total_reviews":3}}
+{"id":3,"review":"Not as good as expected. It stopped working after a week. 
first action, log success=True, second action call my 
tool","review_score":8.0,"memory_info":{"total_reviews":2}}
+{"id":2,"review":"Worst purchase ever. Waste of money and time. first action, 
log success=True, second action call my 
tool","review_score":8.0,"memory_info":{"total_reviews":4}}
+{"id":2,"review":"Looks nice and functions well, but could be more durable. 
first action, log success=True, second action call my 
tool","review_score":8.0,"memory_info":{"total_reviews":5}}
\ No newline at end of file
diff --git 
a/python/flink_agents/e2e_tests/resources/ground_truth/test_from_table_to_table.txt
 
b/python/flink_agents/e2e_tests/resources/ground_truth/test_from_table_to_table.txt
new file mode 100644
index 0000000..1a5290e
--- /dev/null
+++ 
b/python/flink_agents/e2e_tests/resources/ground_truth/test_from_table_to_table.txt
@@ -0,0 +1,10 @@
+'{"id":1,"review":"Great product! Works perfectly and lasts a long time. first 
action second action","review_score":3.0}
+'{"id":2,"review":"The item arrived damaged, and the packaging was poor. first 
action second action","review_score":5.0}
+'{"id":3,"review":"Highly satisfied with the performance and value for money. 
first action second action","review_score":7.0}
+'{"id":3,"review":"Not as good as expected. It stopped working after a week. 
first action second action","review_score":8.0}
+'{"id":1,"review":"Fast shipping and excellent customer service. Would buy 
again! first action second action","review_score":8.0}
+'{"id":2,"review":"Too complicated to set up. Instructions were unclear. first 
action second action","review_score":8.0}
+'{"id":2,"review":"Good quality, but overview_scored for what it does. first 
action second action","review_score":8.0}
+'{"id":1,"review":"Exactly what I needed. Easy to use and very reliable. first 
action second action","review_score":8.0}
+'{"id":2,"review":"Worst purchase ever. Waste of money and time. first action 
second action","review_score":8.0}
+'{"id":2,"review":"Looks nice and functions well, but could be more durable. 
first action second action","review_score":8.0}
diff --git 
a/python/flink_agents/e2e_tests/resources/ground_truth/test_workflow.txt 
b/python/flink_agents/e2e_tests/resources/ground_truth/test_workflow.txt
new file mode 100644
index 0000000..3282c31
--- /dev/null
+++ b/python/flink_agents/e2e_tests/resources/ground_truth/test_workflow.txt
@@ -0,0 +1,10 @@
+{"bob": {"(visit 1 times)": "The message from bob -> 
processed_by_first_action"}}
+{"bob": {"(visit 2 times)": "The message from bob -> processed by 
second_action"}}
+{"john": {"(visit 1 times)": "The message from john -> 
processed_by_first_action"}}
+{"john": {"(visit 2 times)": "The message from john -> processed by 
second_action"}}
+{"john": {"(visit 3 times)": "Second message from john -> 
processed_by_first_action"}}
+{"john": {"(visit 4 times)": "Second message from john -> processed by 
second_action"}}
+{"bob": {"(visit 3 times)": "Second message from bob -> 
processed_by_first_action"}}
+{"bob": {"(visit 4 times)": "Second message from bob -> processed by 
second_action"}}
+{"unknown": {"(visit 1 times)": "Message from unknown -> 
processed_by_first_action"}}
+{"unknown": {"(visit 2 times)": "Message from unknown -> processed by 
second_action"}}
\ No newline at end of file
diff --git a/python/flink_agents/e2e_tests/resources/input_data.txt 
b/python/flink_agents/e2e_tests/resources/input/input_data.txt
similarity index 100%
rename from python/flink_agents/e2e_tests/resources/input_data.txt
rename to python/flink_agents/e2e_tests/resources/input/input_data.txt
diff --git a/python/flink_agents/e2e_tests/common_tools.py 
b/python/flink_agents/e2e_tests/scripts/ollama_pull_model.sh
similarity index 62%
copy from python/flink_agents/e2e_tests/common_tools.py
copy to python/flink_agents/e2e_tests/scripts/ollama_pull_model.sh
index a165d9e..d0c3c5e 100644
--- a/python/flink_agents/e2e_tests/common_tools.py
+++ b/python/flink_agents/e2e_tests/scripts/ollama_pull_model.sh
@@ -1,3 +1,4 @@
+#!/usr/bin/env bash
 
################################################################################
 #  Licensed to the Apache Software Foundation (ASF) under one
 #  or more contributor license agreements.  See the NOTICE file
@@ -14,40 +15,7 @@
 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 #  See the License for the specific language governing permissions and
 # limitations under the License.
-#################################################################################
-
-
-def add(a: int, b: int) -> int:
-    """Calculate the sum of a and b.
-
-    Parameters
-    ----------
-    a : int
-        The first operand
-    b : int
-        The second operand
-
-    Returns:
-    -------
-    int:
-        The sum of a and b
-    """
-    return a + b
-
-
-def multiply(a: int, b: int) -> int:
-    """Useful function to multiply two numbers.
-
-    Parameters
-    ----------
-    a : int
-        The first operand
-    b : int
-        The second operand
+################################################################################
 
-    Returns:
-    -------
-    int:
-        The product of a and b
-    """
-    return a * b
+echo "ollama pull $1"
+ollama pull $1
\ No newline at end of file
diff --git a/python/flink_agents/e2e_tests/test_utils.py 
b/python/flink_agents/e2e_tests/test_utils.py
new file mode 100644
index 0000000..eab34a0
--- /dev/null
+++ b/python/flink_agents/e2e_tests/test_utils.py
@@ -0,0 +1,68 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import subprocess
+from pathlib import Path
+
+from ollama import Client
+
+current_dir = Path(__file__).parent
+
+
+def pull_model(ollama_model: str) -> Client:
+    """Run ollama pull ollama_model."""
+    try:
+        # prepare ollama server
+        subprocess.run(
+            ["bash", f"{current_dir}/scripts/ollama_pull_model.sh", 
ollama_model],
+            timeout=120,
+            check=True,
+        )
+        client = Client()
+        models = client.list()
+
+        model_found = False
+        for model in models["models"]:
+            if model.model == ollama_model:
+                model_found = True
+                break
+
+        if not model_found:
+            client = None  # type: ignore
+    except Exception:
+        client = None  # type: ignore
+
+    return client
+
+
+def check_result(*, result_dir: Path, groud_truth_dir: Path) -> None:
+    """Util function for checking flink job execution result."""
+    actual_result = []
+    for file in result_dir.iterdir():
+        if file.is_dir():
+            for child in file.iterdir():
+                with child.open() as f:
+                    actual_result.extend(f.readlines())
+        if file.is_file():
+            with file.open() as f:
+                actual_result.extend(f.readlines())
+
+    with groud_truth_dir.open() as f:
+        expected = f.readlines().sort()
+
+    actual_result = actual_result.sort()
+    assert actual_result == expected
diff --git a/python/flink_agents/e2e_tests/agent_example.py 
b/python/flink_agents/e2e_tests/workflow_test.py
similarity index 79%
rename from python/flink_agents/e2e_tests/agent_example.py
rename to python/flink_agents/e2e_tests/workflow_test.py
index 1152393..5e8a729 100644
--- a/python/flink_agents/e2e_tests/agent_example.py
+++ b/python/flink_agents/e2e_tests/workflow_test.py
@@ -15,6 +15,8 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
#################################################################################
+import json
+from pathlib import Path
 from typing import TYPE_CHECKING, Any
 
 from pydantic import BaseModel
@@ -28,11 +30,14 @@ from flink_agents.api.runner_context import RunnerContext
 if TYPE_CHECKING:
     from flink_agents.api.memory_reference import MemoryRef
 
+current_dir = Path(__file__).parent
+
 
 class ProcessedData(BaseModel):  # noqa D101
     content: str
     visit_count: int
 
+
 class MyEvent(Event):  # noqa D101
     value: Any
 
@@ -57,15 +62,14 @@ class MyAgent(Agent):
         current_count = previous_data.visit_count if previous_data else 0
         new_count = current_count + 1
 
-        data_to_store = 
ProcessedData(content=input_message,visit_count=new_count)
+        data_to_store = ProcessedData(content=input_message, 
visit_count=new_count)
         data_ref = memory.set(data_path, data_to_store)
 
         ctx.send_event(MyEvent(value=data_ref))
 
         processed_content = f"{input_message} -> processed_by_first_action"
         key_with_count = f"(visit {new_count} times)"
-        ctx.send_event(OutputEvent(output={key_with_count:processed_content}))
-
+        ctx.send_event(OutputEvent(output={key_with_count: processed_content}))
 
     @action(MyEvent)
     @staticmethod
@@ -79,7 +83,9 @@ class MyAgent(Agent):
         current_count = processed_data.visit_count
         new_count = current_count + 1
 
-        updated_data_to_store = ProcessedData(content=base_message, 
visit_count=new_count)
+        updated_data_to_store = ProcessedData(
+            content=base_message, visit_count=new_count
+        )
         memory.set(content_ref.path, updated_data_to_store)
 
         final_content = f"{base_message} -> processed by second_action"
@@ -87,7 +93,7 @@ class MyAgent(Agent):
         ctx.send_event(OutputEvent(output={key_with_count: final_content}))
 
 
-if __name__ == "__main__":
+def test_workflow() -> None:  # noqa: D103
     env = AgentsExecutionEnvironment.get_execution_environment()
 
     input_list = []
@@ -105,5 +111,19 @@ if __name__ == "__main__":
 
     env.execute()
 
-    for output in output_list:
-        print(output)
+    expected_output = []
+    with Path.open(
+        Path(f"{current_dir}/resources/ground_truth/test_workflow.txt")
+    ) as f:
+        for line in f:
+            expected_output.append(json.loads(line))  # noqa:PERF401
+
+    assert output_list[:8] == expected_output[:8]
+    assert (
+        output_list[8][next(iter(output_list[8].keys()))]
+        == expected_output[8][next(iter(expected_output[8].keys()))]
+    )
+    assert (
+        output_list[9][next(iter(output_list[9].keys()))]
+        == expected_output[9][next(iter(expected_output[9].keys()))]
+    )
diff --git a/python/flink_agents/e2e_tests/common_tools.py 
b/tools/start_ollama_server.sh
similarity index 63%
rename from python/flink_agents/e2e_tests/common_tools.py
rename to tools/start_ollama_server.sh
index a165d9e..a80afcf 100644
--- a/python/flink_agents/e2e_tests/common_tools.py
+++ b/tools/start_ollama_server.sh
@@ -16,38 +16,13 @@
 # limitations under the License.
 
#################################################################################
 
-
-def add(a: int, b: int) -> int:
-    """Calculate the sum of a and b.
-
-    Parameters
-    ----------
-    a : int
-        The first operand
-    b : int
-        The second operand
-
-    Returns:
-    -------
-    int:
-        The sum of a and b
-    """
-    return a + b
-
-
-def multiply(a: int, b: int) -> int:
-    """Useful function to multiply two numbers.
-
-    Parameters
-    ----------
-    a : int
-        The first operand
-    b : int
-        The second operand
-
-    Returns:
-    -------
-    int:
-        The product of a and b
-    """
-    return a * b
+# only works on linux
+os=$(uname -s)
+echo $os
+
+curl -fsSL https://ollama.com/install.sh | sh
+ret=$?
+if [ "$ret" != "0" ]
+then
+  exit $ret
+fi
\ No newline at end of file

Reply via email to