This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-agents.git
commit b2667352fb6da84352127523fc118db393d5a493 Author: WenjinXie <[email protected]> AuthorDate: Thu Nov 13 15:56:18 2025 +0800 [test][python] Clean up and refactor python e2e tests. --- ..._example.py => chat_model_integration_agent.py} | 120 ++++------ .../e2e_tests/chat_model_integration_test.py | 84 +++++++ .../{my_agent.py => flink_integration_agent.py} | 9 + .../e2e_tests/flink_intergration_test.py | 247 +++++++++++++++++++++ .../e2e_tests/from_datastream_to_table.py | 108 --------- .../integrate_datastream_with_agent_example.py | 81 ------- .../integrate_table_with_agent_example.py | 108 --------- .../integrate_table_with_react_agent_example.py | 127 ----------- .../flink_agents/e2e_tests/mcp_test/mcp_server.py | 1 + .../mcp_test/{mcp_example.py => mcp_test.py} | 17 +- .../flink_agents/e2e_tests/react_agent_example.py | 90 -------- python/flink_agents/e2e_tests/react_agent_test.py | 232 +++++++++++++++++++ .../{common_tools.py => react_agent_tools.py} | 0 .../test_from_datastream_to_datastream.txt | 10 + .../ground_truth/test_from_table_to_table.txt | 10 + .../resources/ground_truth/test_workflow.txt | 10 + .../e2e_tests/resources/{ => input}/input_data.txt | 0 .../ollama_pull_model.sh} | 40 +--- python/flink_agents/e2e_tests/test_utils.py | 68 ++++++ .../{agent_example.py => workflow_test.py} | 34 ++- .../start_ollama_server.sh | 45 +--- 21 files changed, 766 insertions(+), 675 deletions(-) diff --git a/python/flink_agents/e2e_tests/chat_model_example.py b/python/flink_agents/e2e_tests/chat_model_integration_agent.py similarity index 69% rename from python/flink_agents/e2e_tests/chat_model_example.py rename to python/flink_agents/e2e_tests/chat_model_integration_agent.py index 69c8813..0eaa9fc 100644 --- a/python/flink_agents/e2e_tests/chat_model_example.py +++ b/python/flink_agents/e2e_tests/chat_model_integration_agent.py @@ -16,7 +16,6 @@ # limitations under the License. ################################################################################# import os -from typing import List from flink_agents.api.agent import Agent from flink_agents.api.chat_message import ChatMessage, MessageRole @@ -27,11 +26,7 @@ from flink_agents.api.decorators import ( tool, ) from flink_agents.api.events.chat_event import ChatRequestEvent, ChatResponseEvent -from flink_agents.api.events.event import ( - InputEvent, - OutputEvent, -) -from flink_agents.api.execution_environment import AgentsExecutionEnvironment +from flink_agents.api.events.event import InputEvent, OutputEvent from flink_agents.api.resource import ResourceDescriptor from flink_agents.api.runner_context import RunnerContext from flink_agents.integrations.chat_models.ollama_chat_model import ( @@ -47,15 +42,18 @@ from flink_agents.integrations.chat_models.tongyi_chat_model import ( TongyiChatModelSetup, ) -TONGYI_MODEL = os.environ.get("TONGYI_CHAT_MODEL", "qwen-plus") -OLLAMA_MODEL = os.environ.get("OLLAMA_CHAT_MODEL", "qwen3:0.6b") -OPENAI_MODEL = os.environ.get("OPENAI_CHAT_MODEL", "gpt-3.5-turbo") -BACKENDS_TO_RUN: List[str] = ["Tongyi", "OpenAI", "Ollama"] - -class MyAgent(Agent): +class ChatModelTestAgent(Agent): """Example agent demonstrating the new ChatModel architecture.""" + @chat_model_connection + @staticmethod + def openai_connection() -> ResourceDescriptor: + """ChatModelConnection responsible for openai model service connection.""" + return ResourceDescriptor( + clazz=OpenAIChatModelConnection, api_key=os.environ.get("OPENAI_API_KEY") + ) + @chat_model_connection @staticmethod def tongyi_connection() -> ResourceDescriptor: @@ -66,68 +64,68 @@ class MyAgent(Agent): @staticmethod def ollama_connection() -> ResourceDescriptor: """ChatModelConnection responsible for ollama model service connection.""" - return ResourceDescriptor(clazz=OllamaChatModelConnection) - - @chat_model_connection - @staticmethod - def openai_connection() -> ResourceDescriptor: - """ChatModelConnection responsible for openai model service connection.""" return ResourceDescriptor( - clazz=OpenAIChatModelConnection, - api_key=os.environ.get("OPENAI_API_KEY") + clazz=OllamaChatModelConnection, request_timeout=240.0 ) @chat_model_setup @staticmethod def math_chat_model() -> ResourceDescriptor: """ChatModel which focus on math, and reuse ChatModelConnection.""" - if CURRENT_BACKEND == "Tongyi": + model_provider = os.environ.get("MODEL_PROVIDER") + if model_provider == "Tongyi": return ResourceDescriptor( clazz=TongyiChatModelSetup, connection="tongyi_connection", - model=TONGYI_MODEL, + model=os.environ.get("TONGYI_CHAT_MODEL", "qwen-plus"), tools=["add"], ) - elif CURRENT_BACKEND == "OpenAI": - return ResourceDescriptor( - clazz=OpenAIChatModelSetup, - connection="openai_connection", - model=OPENAI_MODEL, - tools=["add"] - ) - else: + elif model_provider == "Ollama": return ResourceDescriptor( clazz=OllamaChatModelSetup, connection="ollama_connection", - model=OLLAMA_MODEL, + model=os.environ.get("OLLAMA_CHAT_MODEL", "qwen3:1.7b"), tools=["add"], extract_reasoning=True, ) + elif model_provider == "OpenAI": + return ResourceDescriptor( + clazz=OpenAIChatModelSetup, + connection="openai_connection", + model=os.environ.get("OPENAI_CHAT_MODEL", "gpt-3.5-turbo"), + tools=["add"], + ) + else: + err_msg = f"Unknown model_provider {model_provider}" + raise RuntimeError(err_msg) @chat_model_setup @staticmethod def creative_chat_model() -> ResourceDescriptor: """ChatModel which focus on text generate, and reuse ChatModelConnection.""" - if CURRENT_BACKEND == "Tongyi": + model_provider = os.environ.get("MODEL_PROVIDER") + if model_provider == "Tongyi": return ResourceDescriptor( clazz=TongyiChatModelSetup, connection="tongyi_connection", - model=TONGYI_MODEL, - ) - elif CURRENT_BACKEND == "OpenAI": - return ResourceDescriptor( - clazz=OpenAIChatModelSetup, - connection="openai_connection", - model=OPENAI_MODEL, - tools=["add"] + model=os.environ.get("TONGYI_CHAT_MODEL", "qwen-plus"), ) - else: + elif model_provider == "Ollama": return ResourceDescriptor( clazz=TongyiChatModelSetup, connection="ollama_connection", - model=OLLAMA_MODEL, + model=os.environ.get("OLLAMA_CHAT_MODEL", "qwen3:1.7b"), extract_reasoning=True, ) + elif model_provider == "OpenAI": + return ResourceDescriptor( + clazz=OpenAIChatModelSetup, + connection="openai_connection", + model=os.environ.get("OPENAI_CHAT_MODEL", "gpt-3.5-turbo"), + ) + else: + err_msg = f"Unknown model_provider {model_provider}" + raise RuntimeError(err_msg) @tool @staticmethod @@ -175,41 +173,3 @@ class MyAgent(Agent): input = event.response if event.response and input.content: ctx.send_event(OutputEvent(output=input.content)) - - -if __name__ == "__main__": - for backend in BACKENDS_TO_RUN: - CURRENT_BACKEND = backend - if backend == "Tongyi": - CURRENT_MODEL = TONGYI_MODEL - elif backend == "OpenAI": - CURRENT_MODEL = OPENAI_MODEL - else: - CURRENT_MODEL = OLLAMA_MODEL - - if backend == "Tongyi" and not os.environ.get("DASHSCOPE_API_KEY"): - print("[SKIP] TongyiChatModel because DASHSCOPE_API_KEY is not set.") - continue - - if backend == "OpenAI" and not os.environ.get("OPENAI_API_KEY"): - print("[SKIP] OpenAIChatModel because OPENAI_API_KEY is not set.") - continue - - print( - f"\nRunning {backend}ChatModel while the using model is {CURRENT_MODEL}..." - ) - - env = AgentsExecutionEnvironment.get_execution_environment() - input_list = [] - agent = MyAgent() - - output_list = env.from_list(input_list).apply(agent).to_list() - - input_list.append({"key": "0001", "value": "calculate the sum of 1 and 2."}) - input_list.append({"key": "0002", "value": "Tell me a joke about cats."}) - - env.execute() - - for output in output_list: - for key, value in output.items(): - print(f"{key}: {value}") diff --git a/python/flink_agents/e2e_tests/chat_model_integration_test.py b/python/flink_agents/e2e_tests/chat_model_integration_test.py new file mode 100644 index 0000000..b582402 --- /dev/null +++ b/python/flink_agents/e2e_tests/chat_model_integration_test.py @@ -0,0 +1,84 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import os +from pathlib import Path + +import pytest + +from flink_agents.api.execution_environment import AgentsExecutionEnvironment +from flink_agents.e2e_tests.chat_model_integration_agent import ChatModelTestAgent +from flink_agents.e2e_tests.test_utils import pull_model + +current_dir = Path(__file__).parent + +TONGYI_MODEL = os.environ.get("TONGYI_CHAT_MODEL", "qwen-plus") +os.environ["TONGYI_CHAT_MODEL"] = TONGYI_MODEL +OLLAMA_MODEL = os.environ.get("OLLAMA_CHAT_MODEL", "qwen3:1.7b") +os.environ["OLLAMA_CHAT_MODEL"] = OLLAMA_MODEL +OPENAI_MODEL = os.environ.get("OPENAI_CHAT_MODEL", "gpt-3.5-turbo") +os.environ["OPENAI_CHAT_MODEL"] = OPENAI_MODEL + +DASHSCOPE_API_KEY = os.environ.get("DASHSCOPE_API_KEY") +OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") + +client = pull_model(OLLAMA_MODEL) + + [email protected]( + "model_provider", + [ + pytest.param( + "Ollama", + marks=pytest.mark.skipif( + client is None, + reason="Ollama client is not available or test model is missing.", + ), + ), + pytest.param( + "Tongyi", + marks=pytest.mark.skipif( + DASHSCOPE_API_KEY is None, reason="Tongyi api key is not set." + ), + ), + pytest.param( + "OpenAI", + marks=pytest.mark.skipif( + OPENAI_API_KEY is None, reason="OpenAI api key is not set." + ), + ), + ], +) +def test_chat_model_integration(model_provider: str) -> None: # noqa: D103 + os.environ["MODEL_PROVIDER"] = model_provider + env = AgentsExecutionEnvironment.get_execution_environment() + input_list = [] + agent = ChatModelTestAgent() + + output_list = env.from_list(input_list).apply(agent).to_list() + + input_list.append({"key": "0001", "value": "calculate the sum of 1 and 2."}) + input_list.append({"key": "0002", "value": "Tell me a joke about cats."}) + + env.execute() + + for output in output_list: + for key, value in output.items(): + print(f"{key}: {value}") + + assert "3" in output_list[0]["0001"] + assert "cat" in output_list[1]["0002"] diff --git a/python/flink_agents/e2e_tests/my_agent.py b/python/flink_agents/e2e_tests/flink_integration_agent.py similarity index 95% rename from python/flink_agents/e2e_tests/my_agent.py rename to python/flink_agents/e2e_tests/flink_integration_agent.py index d671ae7..4086a17 100644 --- a/python/flink_agents/e2e_tests/my_agent.py +++ b/python/flink_agents/e2e_tests/flink_integration_agent.py @@ -22,6 +22,7 @@ from typing import Any from pydantic import BaseModel from pyflink.common import Row +from pyflink.datastream import KeySelector from flink_agents.api.agent import Agent from flink_agents.api.decorators import action, tool @@ -53,6 +54,14 @@ class MyEvent(Event): # noqa D101 value: Any +class MyKeySelector(KeySelector): + """KeySelector for extracting key.""" + + def get_key(self, value: ItemData) -> int: + """Extract key from ItemData.""" + return value.id + + class DataStreamAgent(Agent): """Agent used for explaining integrating agents with DataStream. diff --git a/python/flink_agents/e2e_tests/flink_intergration_test.py b/python/flink_agents/e2e_tests/flink_intergration_test.py new file mode 100644 index 0000000..4f537ed --- /dev/null +++ b/python/flink_agents/e2e_tests/flink_intergration_test.py @@ -0,0 +1,247 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import os +import sysconfig +from pathlib import Path + +from pyflink.common import Configuration, Encoder, WatermarkStrategy +from pyflink.common.typeinfo import BasicTypeInfo, ExternalTypeInfo, RowTypeInfo, Types +from pyflink.datastream import ( + RuntimeExecutionMode, + StreamExecutionEnvironment, +) +from pyflink.datastream.connectors.file_system import ( + FileSource, + StreamFormat, + StreamingFileSink, +) +from pyflink.table import DataTypes, Schema, StreamTableEnvironment, TableDescriptor + +from flink_agents.api.execution_environment import AgentsExecutionEnvironment +from flink_agents.e2e_tests.flink_integration_agent import ( + DataStreamAgent, + DataStreamToTableAgent, + ItemData, + MyKeySelector, + TableAgent, +) +from flink_agents.e2e_tests.test_utils import check_result + +current_dir = Path(__file__).parent + +os.environ["PYTHONPATH"] = sysconfig.get_paths()["purelib"] + + +def test_from_datastream_to_datastream(tmp_path: Path) -> None: # noqa: D103 + config = Configuration() + # config.set_string("state.backend.type", "rocksdb") + # config.set_string("checkpointing.interval", "1s") + # config.set_string("restart-strategy.type", "disable") + env = StreamExecutionEnvironment.get_execution_environment(config) + env.set_runtime_mode(RuntimeExecutionMode.STREAMING) + env.set_parallelism(1) + + # currently, bounded source is not supported due to runtime implementation, so + # we use continuous file source here. + input_datastream = env.from_source( + source=FileSource.for_record_stream_format( + StreamFormat.text_line_format(), f"file:///{current_dir}/resources/input" + ).build(), + watermark_strategy=WatermarkStrategy.no_watermarks(), + source_name="streaming_agent_example", + ) + + deserialize_datastream = input_datastream.map( + lambda x: ItemData.model_validate_json(x) + ) + + agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env) + output_datastream = ( + agents_env.from_datastream( + input=deserialize_datastream, key_selector=MyKeySelector() + ) + .apply(DataStreamAgent()) + .to_datastream() + ) + + result_dir = tmp_path / "results" + result_dir.mkdir(parents=True, exist_ok=True) + + output_datastream.map(lambda x: x.model_dump_json(), Types.STRING()).add_sink( + StreamingFileSink.for_row_format( + base_path=str(result_dir.absolute()), + encoder=Encoder.simple_string_encoder(), + ).build() + ) + + agents_env.execute() + + check_result( + result_dir=result_dir, + groud_truth_dir=Path( + f"{current_dir}/resources/ground_truth/test_from_datastream_to_datastream.txt" + ), + ) + + +def test_from_table_to_table(tmp_path: Path) -> None: # noqa: D103 + env = StreamExecutionEnvironment.get_execution_environment() + + env.set_runtime_mode(RuntimeExecutionMode.STREAMING) + env.set_parallelism(1) + + t_env = StreamTableEnvironment.create(stream_execution_environment=env) + + t_env.create_temporary_table( + "source", + TableDescriptor.for_connector("filesystem") + .schema( + Schema.new_builder() + .column("id", DataTypes.BIGINT()) + .column("review", DataTypes.STRING()) + .column("review_score", DataTypes.FLOAT()) + .build() + ) + .option("format", "json") + .option("path", f"file:///{current_dir}/resources/input") + .build(), + ) + + table = t_env.from_path("source") + + agents_env = AgentsExecutionEnvironment.get_execution_environment( + env=env, t_env=t_env + ) + + output_type = ExternalTypeInfo( + RowTypeInfo( + [ + BasicTypeInfo.LONG_TYPE_INFO(), + BasicTypeInfo.STRING_TYPE_INFO(), + BasicTypeInfo.FLOAT_TYPE_INFO(), + ], + ["id", "review", "review_score"], + ) + ) + + schema = ( + Schema.new_builder() + .column("id", DataTypes.BIGINT()) + .column("review", DataTypes.STRING()) + .column("review_score", DataTypes.FLOAT()) + ).build() + + output_table = ( + agents_env.from_table(input=table, key_selector=MyKeySelector()) + .apply(TableAgent()) + .to_table(schema=schema, output_type=output_type) + ) + + result_dir = tmp_path / "results" + result_dir.mkdir(parents=True, exist_ok=True) + + t_env.create_temporary_table( + "sink", + TableDescriptor.for_connector("filesystem") + .option("path", str(result_dir.absolute())) + .format("json") + .schema(schema) + .build(), + ) + + output_table.execute_insert("sink").wait() + + check_result( + result_dir=result_dir, + groud_truth_dir=Path( + f"{current_dir}/resources/ground_truth/test_from_table_to_table.txt" + ), + ) + + +def test_from_datastream_to_table(tmp_path: Path) -> None: # noqa: D103 + env = StreamExecutionEnvironment.get_execution_environment() + + env.set_runtime_mode(RuntimeExecutionMode.STREAMING) + env.set_parallelism(1) + t_env = StreamTableEnvironment.create(stream_execution_environment=env) + + # currently, bounded source is not supported due to runtime implementation, so + # we use continuous file source here. + input_datastream = env.from_source( + source=FileSource.for_record_stream_format( + StreamFormat.text_line_format(), f"file:///{current_dir}/resources/input" + ).build(), + watermark_strategy=WatermarkStrategy.no_watermarks(), + source_name="streaming_agent_example", + ) + + deserialize_datastream = input_datastream.map( + lambda x: ItemData.model_validate_json(x) + ) + + agents_env = AgentsExecutionEnvironment.get_execution_environment( + env=env, t_env=t_env + ) + + output_type = ExternalTypeInfo( + RowTypeInfo( + [ + BasicTypeInfo.LONG_TYPE_INFO(), + BasicTypeInfo.STRING_TYPE_INFO(), + BasicTypeInfo.FLOAT_TYPE_INFO(), + ], + ["id", "review", "review_score"], + ) + ) + + schema = ( + Schema.new_builder() + .column("id", DataTypes.BIGINT()) + .column("review", DataTypes.STRING()) + .column("review_score", DataTypes.FLOAT()) + ).build() + + output_table = ( + agents_env.from_datastream( + input=deserialize_datastream, key_selector=MyKeySelector() + ) + .apply(DataStreamToTableAgent()) + .to_table(schema=schema, output_type=output_type) + ) + + result_dir = tmp_path / "results" + result_dir.mkdir(parents=True, exist_ok=True) + + t_env.create_temporary_table( + "sink", + TableDescriptor.for_connector("filesystem") + .option("path", str(result_dir.absolute())) + .format("json") + .schema(schema) + .build(), + ) + + output_table.execute_insert("sink").wait() + + check_result( + result_dir=result_dir, + groud_truth_dir=Path( + f"{current_dir}/resources/ground_truth/test_from_table_to_table.txt" + ), + ) diff --git a/python/flink_agents/e2e_tests/from_datastream_to_table.py b/python/flink_agents/e2e_tests/from_datastream_to_table.py deleted file mode 100644 index 02d7151..0000000 --- a/python/flink_agents/e2e_tests/from_datastream_to_table.py +++ /dev/null @@ -1,108 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################# -from pathlib import Path - -from pyflink.common import Duration, WatermarkStrategy -from pyflink.common.typeinfo import BasicTypeInfo, ExternalTypeInfo, RowTypeInfo -from pyflink.datastream import ( - KeySelector, - RuntimeExecutionMode, - StreamExecutionEnvironment, -) -from pyflink.datastream.connectors.file_system import FileSource, StreamFormat -from pyflink.table import DataTypes, Schema, StreamTableEnvironment, TableDescriptor - -from flink_agents.api.execution_environment import AgentsExecutionEnvironment -from flink_agents.e2e_tests.my_agent import ( - DataStreamToTableAgent, - ItemData, -) - - -class MyKeySelector(KeySelector): - """KeySelector for extracting key.""" - - def get_key(self, value: ItemData) -> int: - """Extract key from ItemData.""" - return value.id - - -current_dir = Path(__file__).parent - -# if this example raises exception "No module named 'flink_agents'", you could set -# PYTHONPATH like "os.environ["PYTHONPATH"] = ($VENV_HOME/lib/$PYTHON_VERSION/ -# site-packages) in this file. -if __name__ == "__main__": - env = StreamExecutionEnvironment.get_execution_environment() - - env.set_runtime_mode(RuntimeExecutionMode.STREAMING) - env.set_parallelism(1) - t_env = StreamTableEnvironment.create(stream_execution_environment=env) - - # currently, bounded source is not supported due to runtime implementation, so - # we use continuous file source here. - input_datastream = env.from_source( - source=FileSource.for_record_stream_format( - StreamFormat.text_line_format(), f"file:///{current_dir}/resources" - ) - .monitor_continuously(Duration.of_minutes(1)) - .build(), - watermark_strategy=WatermarkStrategy.no_watermarks(), - source_name="streaming_agent_example", - ) - - deserialize_datastream = input_datastream.map( - lambda x: ItemData.model_validate_json(x) - ) - - agents_env = AgentsExecutionEnvironment.get_execution_environment( - env=env, t_env=t_env - ) - - output_type = ExternalTypeInfo( - RowTypeInfo( - [ - BasicTypeInfo.LONG_TYPE_INFO(), - BasicTypeInfo.STRING_TYPE_INFO(), - BasicTypeInfo.FLOAT_TYPE_INFO(), - ], - ["id", "review", "review_score"], - ) - ) - - schema = ( - Schema.new_builder() - .column("id", DataTypes.BIGINT()) - .column("review", DataTypes.STRING()) - .column("review_score", DataTypes.FLOAT()) - ).build() - - output_table = ( - agents_env.from_datastream( - input=deserialize_datastream, key_selector=MyKeySelector() - ) - .apply(DataStreamToTableAgent()) - .to_table(schema=schema, output_type=output_type) - ) - - t_env.create_temporary_table( - "sink", - TableDescriptor.for_connector("print").schema(schema).build(), - ) - - output_table.execute_insert("sink").wait() diff --git a/python/flink_agents/e2e_tests/integrate_datastream_with_agent_example.py b/python/flink_agents/e2e_tests/integrate_datastream_with_agent_example.py deleted file mode 100644 index b88e96f..0000000 --- a/python/flink_agents/e2e_tests/integrate_datastream_with_agent_example.py +++ /dev/null @@ -1,81 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################# -from pathlib import Path - -from pyflink.common import Configuration, Duration, WatermarkStrategy -from pyflink.datastream import ( - KeySelector, - RuntimeExecutionMode, - StreamExecutionEnvironment, -) -from pyflink.datastream.connectors.file_system import FileSource, StreamFormat - -from flink_agents.api.execution_environment import AgentsExecutionEnvironment -from flink_agents.e2e_tests.my_agent import DataStreamAgent, ItemData - - -class MyKeySelector(KeySelector): - """KeySelector for extracting key.""" - - def get_key(self, value: ItemData) -> int: - """Extract key from ItemData.""" - return value.id - - -current_dir = Path(__file__).parent - -# if this example raises exception "No module named 'flink_agents'", you could set -# PYTHONPATH like "os.environ["PYTHONPATH"] = ($VENV_HOME/lib/$PYTHON_VERSION/ -# site-packages) in this file. -if __name__ == "__main__": - config = Configuration() - config.set_string("state.backend.type", "rocksdb") - config.set_string("checkpointing.interval", "1s") - config.set_string("restart-strategy.type", "disable") - env = StreamExecutionEnvironment.get_execution_environment(config) - env.set_runtime_mode(RuntimeExecutionMode.STREAMING) - env.set_parallelism(1) - - # currently, bounded source is not supported due to runtime implementation, so - # we use continuous file source here. - input_datastream = env.from_source( - source=FileSource.for_record_stream_format( - StreamFormat.text_line_format(), f"file:///{current_dir}/resources" - ) - .monitor_continuously(Duration.of_minutes(1)) - .build(), - watermark_strategy=WatermarkStrategy.no_watermarks(), - source_name="streaming_agent_example", - ) - - deserialize_datastream = input_datastream.map( - lambda x: ItemData.model_validate_json(x) - ) - - agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env) - output_datastream = ( - agents_env.from_datastream( - input=deserialize_datastream, key_selector=MyKeySelector() - ) - .apply(DataStreamAgent()) - .to_datastream() - ) - - output_datastream.print() - - agents_env.execute() diff --git a/python/flink_agents/e2e_tests/integrate_table_with_agent_example.py b/python/flink_agents/e2e_tests/integrate_table_with_agent_example.py deleted file mode 100644 index 5983ae7..0000000 --- a/python/flink_agents/e2e_tests/integrate_table_with_agent_example.py +++ /dev/null @@ -1,108 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################# -from pathlib import Path - -from pyflink.common import Row -from pyflink.common.typeinfo import BasicTypeInfo, ExternalTypeInfo, RowTypeInfo -from pyflink.datastream import ( - KeySelector, - RuntimeExecutionMode, - StreamExecutionEnvironment, -) -from pyflink.table import ( - DataTypes, - Schema, - StreamTableEnvironment, - TableDescriptor, -) - -from flink_agents.api.execution_environment import AgentsExecutionEnvironment -from flink_agents.e2e_tests.my_agent import TableAgent - -current_dir = Path(__file__).parent - - -class MyKeySelector(KeySelector): - """KeySelector for extracting key.""" - - def get_key(self, value: Row) -> int: - """Extract key from Row.""" - return value[0] - - -# if this example raises exception "No module named 'flink_agents'", you could set -# PYTHONPATH like "os.environ["PYTHONPATH"] = ($VENV_HOME/lib/$PYTHON_VERSION/ -# site-packages) in this file. -if __name__ == "__main__": - env = StreamExecutionEnvironment.get_execution_environment() - - env.set_runtime_mode(RuntimeExecutionMode.STREAMING) - env.set_parallelism(1) - - t_env = StreamTableEnvironment.create(stream_execution_environment=env) - - t_env.create_temporary_table( - "source", - TableDescriptor.for_connector("filesystem") - .schema( - Schema.new_builder() - .column("id", DataTypes.BIGINT()) - .column("review", DataTypes.STRING()) - .column("review_score", DataTypes.FLOAT()) - .build() - ) - .option("format", "json") - .option("path", f"file:///{current_dir}/resources") - .option("source.monitor-interval", "60s") - .build(), - ) - - table = t_env.from_path("source") - - agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env, t_env=t_env) - - output_type = ExternalTypeInfo( - RowTypeInfo( - [ - BasicTypeInfo.LONG_TYPE_INFO(), - BasicTypeInfo.STRING_TYPE_INFO(), - BasicTypeInfo.FLOAT_TYPE_INFO(), - ], - ["id", "review", "review_score"], - ) - ) - - schema = ( - Schema.new_builder() - .column("id", DataTypes.BIGINT()) - .column("review", DataTypes.STRING()) - .column("review_score", DataTypes.FLOAT()) - ).build() - - output_table = ( - agents_env.from_table(input=table, key_selector=MyKeySelector()) - .apply(TableAgent()) - .to_table(schema=schema, output_type=output_type) - ) - - t_env.create_temporary_table( - "sink", - TableDescriptor.for_connector("print").schema(schema).build(), - ) - - output_table.execute_insert("sink").wait() diff --git a/python/flink_agents/e2e_tests/integrate_table_with_react_agent_example.py b/python/flink_agents/e2e_tests/integrate_table_with_react_agent_example.py deleted file mode 100644 index 30aaeb3..0000000 --- a/python/flink_agents/e2e_tests/integrate_table_with_react_agent_example.py +++ /dev/null @@ -1,127 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################# -import os -from pathlib import Path - -from pyflink.common import Row -from pyflink.common.typeinfo import BasicTypeInfo, ExternalTypeInfo, RowTypeInfo -from pyflink.datastream import ( - KeySelector, - StreamExecutionEnvironment, -) -from pyflink.table import DataTypes, Schema, StreamTableEnvironment, TableDescriptor - -from flink_agents.api.agents.react_agent import ReActAgent -from flink_agents.api.chat_message import ChatMessage, MessageRole -from flink_agents.api.execution_environment import AgentsExecutionEnvironment -from flink_agents.api.prompts.prompt import Prompt -from flink_agents.api.resource import ResourceDescriptor -from flink_agents.api.tools.tool import Tool -from flink_agents.e2e_tests.common_tools import add, multiply -from flink_agents.integrations.chat_models.ollama_chat_model import ( - OllamaChatModelConnection, - OllamaChatModelSetup, -) - -model = os.environ.get("OLLAMA_CHAT_MODEL", "qwen2.5:7b") - - -class MyKeySelector(KeySelector): - """KeySelector for extracting key.""" - - def get_key(self, value: Row) -> int: - """Extract key from Row.""" - return value[0] - - -current_dir = Path(__file__).parent - -# TODO: Currently, this example may cause core dump when being executed, the root cause -# may be known issue of pemja incorrect reference counting: -# https://github.com/apache/flink-agents/issues/83 -if __name__ == "__main__": - stream_env = StreamExecutionEnvironment.get_execution_environment() - - stream_env.set_parallelism(1) - - t_env = StreamTableEnvironment.create(stream_execution_environment=stream_env) - - table = t_env.from_elements( - elements=[(1, 2, 3)], - schema=DataTypes.ROW( - [ - DataTypes.FIELD("a", DataTypes.INT()), - DataTypes.FIELD("b", DataTypes.INT()), - DataTypes.FIELD("c", DataTypes.INT()), - ] - ), - ) - - env = AgentsExecutionEnvironment.get_execution_environment(env=stream_env, t_env=t_env) - - # register resource to execution environment - ( - env.add_resource("ollama", ResourceDescriptor(clazz=OllamaChatModelConnection)) - .add_resource("add", Tool.from_callable(add)) - .add_resource("multiply", Tool.from_callable(multiply)) - ) - - # prepare prompt - prompt = Prompt.from_messages( - messages=[ - ChatMessage( - role=MessageRole.SYSTEM, - content='An example of output is {"result": 30.32}.', - ), - ChatMessage(role=MessageRole.USER, content="What is ({a} + {b}) * {c}"), - ], - ) - - output_type_info = RowTypeInfo( - [BasicTypeInfo.INT_TYPE_INFO()], - ["result"], - ) - - # create ReAct agent. - agent = ReActAgent( - chat_model=ResourceDescriptor( - clazz=OllamaChatModelSetup, - connection="ollama", - model=model, - tools=["add", "multiply"], - ), - prompt=prompt, - output_schema=output_type_info, - ) - - output_type = ExternalTypeInfo(output_type_info) - - schema = (Schema.new_builder().column("result", DataTypes.INT())).build() - - output_table = ( - env.from_table(input=table, key_selector=MyKeySelector()) - .apply(agent) - .to_table(schema=schema, output_type=output_type) - ) - - t_env.create_temporary_table( - "sink", - TableDescriptor.for_connector("print").schema(schema).build(), - ) - - output_table.execute_insert("sink").wait() diff --git a/python/flink_agents/e2e_tests/mcp_test/mcp_server.py b/python/flink_agents/e2e_tests/mcp_test/mcp_server.py index 2ec0e12..7bfbc0a 100644 --- a/python/flink_agents/e2e_tests/mcp_test/mcp_server.py +++ b/python/flink_agents/e2e_tests/mcp_test/mcp_server.py @@ -16,6 +16,7 @@ # limitations under the License. ################################################################################ """MCP server providing prompts and tools for calculation tasks.""" + import dotenv from mcp.server.fastmcp import FastMCP diff --git a/python/flink_agents/e2e_tests/mcp_test/mcp_example.py b/python/flink_agents/e2e_tests/mcp_test/mcp_test.py similarity index 92% rename from python/flink_agents/e2e_tests/mcp_test/mcp_example.py rename to python/flink_agents/e2e_tests/mcp_test/mcp_test.py index 6c68762..ea54c40 100644 --- a/python/flink_agents/e2e_tests/mcp_test/mcp_example.py +++ b/python/flink_agents/e2e_tests/mcp_test/mcp_test.py @@ -25,12 +25,14 @@ This example shows how to: Prerequisites: - Run the MCP server first: mcp_server.py """ + import multiprocessing import os import runpy import time from pathlib import Path +import pytest from pydantic import BaseModel from flink_agents.api.agent import Agent @@ -47,12 +49,13 @@ from flink_agents.api.execution_environment import AgentsExecutionEnvironment from flink_agents.api.resource import ResourceDescriptor from flink_agents.api.runner_context import RunnerContext from flink_agents.api.tools.mcp import MCPServer +from flink_agents.e2e_tests.test_utils import pull_model from flink_agents.integrations.chat_models.ollama_chat_model import ( OllamaChatModelConnection, OllamaChatModelSetup, ) -OLLAMA_MODEL = os.environ.get("OLLAMA_CHAT_MODEL", "qwen3:8b") +OLLAMA_MODEL = os.environ.get("MCP_OLLAMA_CHAT_MODEL", "qwen3:1.7b") MCP_SERVER_ENDPOINT = "http://127.0.0.1:8000/mcp" @@ -76,7 +79,9 @@ class MyMCPAgent(Agent): @staticmethod def ollama_connection() -> ResourceDescriptor: """ChatModelConnection for Ollama.""" - return ResourceDescriptor(clazz=OllamaChatModelConnection) + return ResourceDescriptor( + clazz=OllamaChatModelConnection, request_timeout=240.0 + ) @chat_model_setup @staticmethod @@ -125,7 +130,13 @@ def run_mcp_server() -> None: current_dir = Path(__file__).parent -if __name__ == "__main__": +client = pull_model(OLLAMA_MODEL) + + [email protected]( + client is None, reason="Ollama client is not available or test model is missing" +) +def test_mcp() -> None: # noqa:D103 # Start MCP server in background print("Starting MCP server...") server_process = multiprocessing.Process(target=run_mcp_server) diff --git a/python/flink_agents/e2e_tests/react_agent_example.py b/python/flink_agents/e2e_tests/react_agent_example.py deleted file mode 100644 index 6cfbfa4..0000000 --- a/python/flink_agents/e2e_tests/react_agent_example.py +++ /dev/null @@ -1,90 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################# -import os - -from pydantic import BaseModel - -from flink_agents.api.agents.react_agent import ReActAgent -from flink_agents.api.chat_message import ChatMessage, MessageRole -from flink_agents.api.execution_environment import AgentsExecutionEnvironment -from flink_agents.api.prompts.prompt import Prompt -from flink_agents.api.resource import ResourceDescriptor -from flink_agents.api.tools.tool import Tool -from flink_agents.e2e_tests.common_tools import add, multiply -from flink_agents.integrations.chat_models.ollama_chat_model import ( - OllamaChatModelConnection, - OllamaChatModelSetup, -) - -model = os.environ.get("OLLAMA_CHAT_MODEL", "qwen2.5:7b") - - -class InputData(BaseModel): # noqa: D101 - a: int - b: int - c: int - - -class OutputData(BaseModel): # noqa: D101 - result: int - - -if __name__ == "__main__": - env = AgentsExecutionEnvironment.get_execution_environment() - - # register resource to execution environment - ( - env.add_resource("ollama", ResourceDescriptor(clazz=OllamaChatModelConnection)) - .add_resource("add", Tool.from_callable(add)) - .add_resource("multiply", Tool.from_callable(multiply)) - ) - - # prepare prompt - prompt = Prompt.from_messages( - messages=[ - ChatMessage( - role=MessageRole.SYSTEM, - content='An example of output is {"result": 30.32}.', - ), - ChatMessage(role=MessageRole.USER, content="What is ({a} + {b}) * {c}"), - ], - ) - - # create ReAct agent. - agent = ReActAgent( - chat_model=ResourceDescriptor( - clazz=OllamaChatModelSetup, - connection="ollama", - model=model, - tools=["add", "multiply"], - ), - prompt=prompt, - output_schema=OutputData, - ) - - # execute agent - input_list = [] - - output_list = env.from_list(input_list).apply(agent).to_list() - input_list.append({"key": "0001", "value": InputData(a=2123, b=2321, c=312)}) - - env.execute() - - for output in output_list: - for key, value in output.items(): - print(f"{key}: {value}") diff --git a/python/flink_agents/e2e_tests/react_agent_test.py b/python/flink_agents/e2e_tests/react_agent_test.py new file mode 100644 index 0000000..61d0154 --- /dev/null +++ b/python/flink_agents/e2e_tests/react_agent_test.py @@ -0,0 +1,232 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import json +import os +import sysconfig +from pathlib import Path + +import pytest +from pydantic import BaseModel +from pyflink.common import Row +from pyflink.common.typeinfo import BasicTypeInfo, ExternalTypeInfo, RowTypeInfo +from pyflink.datastream import KeySelector, StreamExecutionEnvironment +from pyflink.table import DataTypes, Schema, StreamTableEnvironment, TableDescriptor + +from flink_agents.api.agents.react_agent import ( + ErrorHandlingStrategy, + ReActAgent, + ReActAgentOptions, +) +from flink_agents.api.chat_message import ChatMessage, MessageRole +from flink_agents.api.execution_environment import AgentsExecutionEnvironment +from flink_agents.api.prompts.prompt import Prompt +from flink_agents.api.resource import ResourceDescriptor +from flink_agents.api.tools.tool import Tool +from flink_agents.e2e_tests.react_agent_tools import add, multiply +from flink_agents.e2e_tests.test_utils import pull_model +from flink_agents.integrations.chat_models.ollama_chat_model import ( + OllamaChatModelConnection, + OllamaChatModelSetup, +) + +current_dir = Path(__file__).parent + +os.environ["PYTHONPATH"] = sysconfig.get_paths()["purelib"] + +OLLAMA_MODEL = os.environ.get("REACT_OLLAMA_MODEL", "qwen3:1.7b") +os.environ["OLLAMA_CHAT_MODEL"] = OLLAMA_MODEL + + +class InputData(BaseModel): # noqa: D101 + a: int + b: int + c: int + + +class OutputData(BaseModel): # noqa: D101 + result: int + + +class MyKeySelector(KeySelector): + """KeySelector for extracting key.""" + + def get_key(self, value: Row) -> int: + """Extract key from Row.""" + return value[0] + + +client = pull_model(OLLAMA_MODEL) + + [email protected]( + client is None, reason="Ollama client is not available or test model is missing" +) +def test_react_agent_on_local_runner() -> None: # noqa: D103 + env = AgentsExecutionEnvironment.get_execution_environment() + env.get_config().set( + ReActAgentOptions.ERROR_HANDLING_STRATEGY, ErrorHandlingStrategy.IGNORE + ) + + # register resource to execution environment + ( + env.add_resource( + "ollama", + ResourceDescriptor(clazz=OllamaChatModelConnection, request_timeout=240.0), + ) + .add_resource("add", Tool.from_callable(add)) + .add_resource("multiply", Tool.from_callable(multiply)) + ) + + # prepare prompt + prompt = Prompt.from_messages( + messages=[ + ChatMessage( + role=MessageRole.SYSTEM, + content='An example of output is {"result": 30.32}.', + ), + ChatMessage(role=MessageRole.USER, content="What is ({a} + {b}) * {c}"), + ], + ) + + # create ReAct agent. + agent = ReActAgent( + chat_model=ResourceDescriptor( + clazz=OllamaChatModelSetup, + connection="ollama", + model=OLLAMA_MODEL, + tools=["add", "multiply"], + ), + prompt=prompt, + output_schema=OutputData, + ) + + # execute agent + input_list = [] + + output_list = env.from_list(input_list).apply(agent).to_list() + input_list.append({"key": "0001", "value": InputData(a=2123, b=2321, c=312)}) + + env.execute() + + assert len(output_list) == 1, ( + "This may be caused by the LLM response does not match the output schema, you can rerun this case." + ) + assert output_list[0]["0001"].result == 1386528 + + [email protected]( + client is None, reason="Ollama client is not available or test model is missing" +) +def test_react_agent_on_remote_runner(tmp_path: Path) -> None: # noqa: D103 + stream_env = StreamExecutionEnvironment.get_execution_environment() + + stream_env.set_parallelism(1) + + t_env = StreamTableEnvironment.create(stream_execution_environment=stream_env) + + table = t_env.from_elements( + elements=[(1, 2, 3)], + schema=DataTypes.ROW( + [ + DataTypes.FIELD("a", DataTypes.INT()), + DataTypes.FIELD("b", DataTypes.INT()), + DataTypes.FIELD("c", DataTypes.INT()), + ] + ), + ) + + env = AgentsExecutionEnvironment.get_execution_environment( + env=stream_env, t_env=t_env + ) + + env.get_config().set( + ReActAgentOptions.ERROR_HANDLING_STRATEGY, ErrorHandlingStrategy.IGNORE + ) + + # register resource to execution environment + ( + env.add_resource( + "ollama", + ResourceDescriptor(clazz=OllamaChatModelConnection, request_timeout=240.0), + ) + .add_resource("add", Tool.from_callable(add)) + .add_resource("multiply", Tool.from_callable(multiply)) + ) + + # prepare prompt + prompt = Prompt.from_messages( + messages=[ + ChatMessage( + role=MessageRole.SYSTEM, + content='An example of output is {"result": 30.32}.', + ), + ChatMessage(role=MessageRole.USER, content="What is ({a} + {b}) * {c}"), + ], + ) + + output_type_info = RowTypeInfo( + [BasicTypeInfo.INT_TYPE_INFO()], + ["result"], + ) + + # create ReAct agent. + agent = ReActAgent( + chat_model=ResourceDescriptor( + clazz=OllamaChatModelSetup, + connection="ollama", + model=OLLAMA_MODEL, + tools=["add", "multiply"], + ), + prompt=prompt, + output_schema=output_type_info, + ) + + output_type = ExternalTypeInfo(output_type_info) + + schema = (Schema.new_builder().column("result", DataTypes.INT())).build() + + output_table = ( + env.from_table(input=table, key_selector=MyKeySelector()) + .apply(agent) + .to_table(schema=schema, output_type=output_type) + ) + + result_dir = tmp_path / "results" + result_dir.mkdir(parents=True, exist_ok=True) + + t_env.create_temporary_table( + "sink", + TableDescriptor.for_connector("filesystem") + .option("path", str(result_dir.absolute())) + .format("json") + .schema(schema) + .build(), + ) + + output_table.execute_insert("sink").wait() + + actual_result = [] + for file in result_dir.iterdir(): + if file.is_file(): + with file.open() as f: + actual_result.extend(f.readlines()) + + assert len(actual_result) == 1, ( + "This may be caused by the LLM response does not match the output schema, you can rerun this case." + ) + assert "result" in json.loads(actual_result[0].strip()) diff --git a/python/flink_agents/e2e_tests/common_tools.py b/python/flink_agents/e2e_tests/react_agent_tools.py similarity index 100% copy from python/flink_agents/e2e_tests/common_tools.py copy to python/flink_agents/e2e_tests/react_agent_tools.py diff --git a/python/flink_agents/e2e_tests/resources/ground_truth/test_from_datastream_to_datastream.txt b/python/flink_agents/e2e_tests/resources/ground_truth/test_from_datastream_to_datastream.txt new file mode 100644 index 0000000..f94deee --- /dev/null +++ b/python/flink_agents/e2e_tests/resources/ground_truth/test_from_datastream_to_datastream.txt @@ -0,0 +1,10 @@ +{"id":3,"review":"Highly satisfied with the performance and value for money. first action, log success=True, second action call my tool","review_score":7.0,"memory_info":{"total_reviews":1}} +{"id":1,"review":"Great product! Works perfectly and lasts a long time. first action, log success=True, second action call my tool","review_score":3.0,"memory_info":{"total_reviews":1}} +{"id":2,"review":"The item arrived damaged, and the packaging was poor. first action, log success=True, second action call my tool","review_score":5.0,"memory_info":{"total_reviews":1}} +{"id":2,"review":"Too complicated to set up. Instructions were unclear. first action, log success=True, second action call my tool","review_score":8.0,"memory_info":{"total_reviews":2}} +{"id":1,"review":"Fast shipping and excellent customer service. Would buy again! first action, log success=True, second action call my tool","review_score":8.0,"memory_info":{"total_reviews":2}} +{"id":1,"review":"Exactly what I needed. Easy to use and very reliable. first action, log success=True, second action call my tool","review_score":8.0,"memory_info":{"total_reviews":3}} +{"id":2,"review":"Good quality, but overview_scored for what it does. first action, log success=True, second action call my tool","review_score":8.0,"memory_info":{"total_reviews":3}} +{"id":3,"review":"Not as good as expected. It stopped working after a week. first action, log success=True, second action call my tool","review_score":8.0,"memory_info":{"total_reviews":2}} +{"id":2,"review":"Worst purchase ever. Waste of money and time. first action, log success=True, second action call my tool","review_score":8.0,"memory_info":{"total_reviews":4}} +{"id":2,"review":"Looks nice and functions well, but could be more durable. first action, log success=True, second action call my tool","review_score":8.0,"memory_info":{"total_reviews":5}} \ No newline at end of file diff --git a/python/flink_agents/e2e_tests/resources/ground_truth/test_from_table_to_table.txt b/python/flink_agents/e2e_tests/resources/ground_truth/test_from_table_to_table.txt new file mode 100644 index 0000000..1a5290e --- /dev/null +++ b/python/flink_agents/e2e_tests/resources/ground_truth/test_from_table_to_table.txt @@ -0,0 +1,10 @@ +'{"id":1,"review":"Great product! Works perfectly and lasts a long time. first action second action","review_score":3.0} +'{"id":2,"review":"The item arrived damaged, and the packaging was poor. first action second action","review_score":5.0} +'{"id":3,"review":"Highly satisfied with the performance and value for money. first action second action","review_score":7.0} +'{"id":3,"review":"Not as good as expected. It stopped working after a week. first action second action","review_score":8.0} +'{"id":1,"review":"Fast shipping and excellent customer service. Would buy again! first action second action","review_score":8.0} +'{"id":2,"review":"Too complicated to set up. Instructions were unclear. first action second action","review_score":8.0} +'{"id":2,"review":"Good quality, but overview_scored for what it does. first action second action","review_score":8.0} +'{"id":1,"review":"Exactly what I needed. Easy to use and very reliable. first action second action","review_score":8.0} +'{"id":2,"review":"Worst purchase ever. Waste of money and time. first action second action","review_score":8.0} +'{"id":2,"review":"Looks nice and functions well, but could be more durable. first action second action","review_score":8.0} diff --git a/python/flink_agents/e2e_tests/resources/ground_truth/test_workflow.txt b/python/flink_agents/e2e_tests/resources/ground_truth/test_workflow.txt new file mode 100644 index 0000000..3282c31 --- /dev/null +++ b/python/flink_agents/e2e_tests/resources/ground_truth/test_workflow.txt @@ -0,0 +1,10 @@ +{"bob": {"(visit 1 times)": "The message from bob -> processed_by_first_action"}} +{"bob": {"(visit 2 times)": "The message from bob -> processed by second_action"}} +{"john": {"(visit 1 times)": "The message from john -> processed_by_first_action"}} +{"john": {"(visit 2 times)": "The message from john -> processed by second_action"}} +{"john": {"(visit 3 times)": "Second message from john -> processed_by_first_action"}} +{"john": {"(visit 4 times)": "Second message from john -> processed by second_action"}} +{"bob": {"(visit 3 times)": "Second message from bob -> processed_by_first_action"}} +{"bob": {"(visit 4 times)": "Second message from bob -> processed by second_action"}} +{"unknown": {"(visit 1 times)": "Message from unknown -> processed_by_first_action"}} +{"unknown": {"(visit 2 times)": "Message from unknown -> processed by second_action"}} \ No newline at end of file diff --git a/python/flink_agents/e2e_tests/resources/input_data.txt b/python/flink_agents/e2e_tests/resources/input/input_data.txt similarity index 100% rename from python/flink_agents/e2e_tests/resources/input_data.txt rename to python/flink_agents/e2e_tests/resources/input/input_data.txt diff --git a/python/flink_agents/e2e_tests/common_tools.py b/python/flink_agents/e2e_tests/scripts/ollama_pull_model.sh similarity index 62% copy from python/flink_agents/e2e_tests/common_tools.py copy to python/flink_agents/e2e_tests/scripts/ollama_pull_model.sh index a165d9e..d0c3c5e 100644 --- a/python/flink_agents/e2e_tests/common_tools.py +++ b/python/flink_agents/e2e_tests/scripts/ollama_pull_model.sh @@ -1,3 +1,4 @@ +#!/usr/bin/env bash ################################################################################ # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -14,40 +15,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -################################################################################# - - -def add(a: int, b: int) -> int: - """Calculate the sum of a and b. - - Parameters - ---------- - a : int - The first operand - b : int - The second operand - - Returns: - ------- - int: - The sum of a and b - """ - return a + b - - -def multiply(a: int, b: int) -> int: - """Useful function to multiply two numbers. - - Parameters - ---------- - a : int - The first operand - b : int - The second operand +################################################################################ - Returns: - ------- - int: - The product of a and b - """ - return a * b +echo "ollama pull $1" +ollama pull $1 \ No newline at end of file diff --git a/python/flink_agents/e2e_tests/test_utils.py b/python/flink_agents/e2e_tests/test_utils.py new file mode 100644 index 0000000..eab34a0 --- /dev/null +++ b/python/flink_agents/e2e_tests/test_utils.py @@ -0,0 +1,68 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import subprocess +from pathlib import Path + +from ollama import Client + +current_dir = Path(__file__).parent + + +def pull_model(ollama_model: str) -> Client: + """Run ollama pull ollama_model.""" + try: + # prepare ollama server + subprocess.run( + ["bash", f"{current_dir}/scripts/ollama_pull_model.sh", ollama_model], + timeout=120, + check=True, + ) + client = Client() + models = client.list() + + model_found = False + for model in models["models"]: + if model.model == ollama_model: + model_found = True + break + + if not model_found: + client = None # type: ignore + except Exception: + client = None # type: ignore + + return client + + +def check_result(*, result_dir: Path, groud_truth_dir: Path) -> None: + """Util function for checking flink job execution result.""" + actual_result = [] + for file in result_dir.iterdir(): + if file.is_dir(): + for child in file.iterdir(): + with child.open() as f: + actual_result.extend(f.readlines()) + if file.is_file(): + with file.open() as f: + actual_result.extend(f.readlines()) + + with groud_truth_dir.open() as f: + expected = f.readlines().sort() + + actual_result = actual_result.sort() + assert actual_result == expected diff --git a/python/flink_agents/e2e_tests/agent_example.py b/python/flink_agents/e2e_tests/workflow_test.py similarity index 79% rename from python/flink_agents/e2e_tests/agent_example.py rename to python/flink_agents/e2e_tests/workflow_test.py index 1152393..5e8a729 100644 --- a/python/flink_agents/e2e_tests/agent_example.py +++ b/python/flink_agents/e2e_tests/workflow_test.py @@ -15,6 +15,8 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################# +import json +from pathlib import Path from typing import TYPE_CHECKING, Any from pydantic import BaseModel @@ -28,11 +30,14 @@ from flink_agents.api.runner_context import RunnerContext if TYPE_CHECKING: from flink_agents.api.memory_reference import MemoryRef +current_dir = Path(__file__).parent + class ProcessedData(BaseModel): # noqa D101 content: str visit_count: int + class MyEvent(Event): # noqa D101 value: Any @@ -57,15 +62,14 @@ class MyAgent(Agent): current_count = previous_data.visit_count if previous_data else 0 new_count = current_count + 1 - data_to_store = ProcessedData(content=input_message,visit_count=new_count) + data_to_store = ProcessedData(content=input_message, visit_count=new_count) data_ref = memory.set(data_path, data_to_store) ctx.send_event(MyEvent(value=data_ref)) processed_content = f"{input_message} -> processed_by_first_action" key_with_count = f"(visit {new_count} times)" - ctx.send_event(OutputEvent(output={key_with_count:processed_content})) - + ctx.send_event(OutputEvent(output={key_with_count: processed_content})) @action(MyEvent) @staticmethod @@ -79,7 +83,9 @@ class MyAgent(Agent): current_count = processed_data.visit_count new_count = current_count + 1 - updated_data_to_store = ProcessedData(content=base_message, visit_count=new_count) + updated_data_to_store = ProcessedData( + content=base_message, visit_count=new_count + ) memory.set(content_ref.path, updated_data_to_store) final_content = f"{base_message} -> processed by second_action" @@ -87,7 +93,7 @@ class MyAgent(Agent): ctx.send_event(OutputEvent(output={key_with_count: final_content})) -if __name__ == "__main__": +def test_workflow() -> None: # noqa: D103 env = AgentsExecutionEnvironment.get_execution_environment() input_list = [] @@ -105,5 +111,19 @@ if __name__ == "__main__": env.execute() - for output in output_list: - print(output) + expected_output = [] + with Path.open( + Path(f"{current_dir}/resources/ground_truth/test_workflow.txt") + ) as f: + for line in f: + expected_output.append(json.loads(line)) # noqa:PERF401 + + assert output_list[:8] == expected_output[:8] + assert ( + output_list[8][next(iter(output_list[8].keys()))] + == expected_output[8][next(iter(expected_output[8].keys()))] + ) + assert ( + output_list[9][next(iter(output_list[9].keys()))] + == expected_output[9][next(iter(expected_output[9].keys()))] + ) diff --git a/python/flink_agents/e2e_tests/common_tools.py b/tools/start_ollama_server.sh similarity index 63% rename from python/flink_agents/e2e_tests/common_tools.py rename to tools/start_ollama_server.sh index a165d9e..a80afcf 100644 --- a/python/flink_agents/e2e_tests/common_tools.py +++ b/tools/start_ollama_server.sh @@ -16,38 +16,13 @@ # limitations under the License. ################################################################################# - -def add(a: int, b: int) -> int: - """Calculate the sum of a and b. - - Parameters - ---------- - a : int - The first operand - b : int - The second operand - - Returns: - ------- - int: - The sum of a and b - """ - return a + b - - -def multiply(a: int, b: int) -> int: - """Useful function to multiply two numbers. - - Parameters - ---------- - a : int - The first operand - b : int - The second operand - - Returns: - ------- - int: - The product of a and b - """ - return a * b +# only works on linux +os=$(uname -s) +echo $os + +curl -fsSL https://ollama.com/install.sh | sh +ret=$? +if [ "$ret" != "0" ] +then + exit $ret +fi \ No newline at end of file
