This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new 69e8c96 [Bug] Add missing close function in MCPServer (#247)
69e8c96 is described below
commit 69e8c965e182dd1b7cd91bcf8e2bdf02a75abc64
Author: Alan Z. <[email protected]>
AuthorDate: Sat Oct 4 04:55:15 2025 -0700
[Bug] Add missing close function in MCPServer (#247)
---
python/flink_agents/api/tools/mcp.py | 4 +
python/flink_agents/e2e_tests/mcp/__init__.py | 17 +++
python/flink_agents/e2e_tests/mcp/mcp_example.py | 155 +++++++++++++++++++++++
python/flink_agents/e2e_tests/mcp/mcp_server.py | 60 +++++++++
4 files changed, 236 insertions(+)
diff --git a/python/flink_agents/api/tools/mcp.py
b/python/flink_agents/api/tools/mcp.py
index d59648c..cec0fd5 100644
--- a/python/flink_agents/api/tools/mcp.py
+++ b/python/flink_agents/api/tools/mcp.py
@@ -307,3 +307,7 @@ class MCPServer(SerializableResource, ABC):
raise TypeError(err_msg)
return chat_messages
+
+ def close(self) -> None:
+ """Close the MCP server connection and clean up resources."""
+ asyncio.run(self._cleanup_connection())
diff --git a/python/flink_agents/e2e_tests/mcp/__init__.py
b/python/flink_agents/e2e_tests/mcp/__init__.py
new file mode 100644
index 0000000..e154fad
--- /dev/null
+++ b/python/flink_agents/e2e_tests/mcp/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
diff --git a/python/flink_agents/e2e_tests/mcp/mcp_example.py
b/python/flink_agents/e2e_tests/mcp/mcp_example.py
new file mode 100644
index 0000000..6c68762
--- /dev/null
+++ b/python/flink_agents/e2e_tests/mcp/mcp_example.py
@@ -0,0 +1,155 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""Example demonstrating MCP (Model Context Protocol) integration with Flink
Agents.
+
+This example shows how to:
+1. Define an MCP server connection
+2. Use MCP prompts in chat model setups
+3. Use MCP tools in actions
+
+Prerequisites:
+- Run the MCP server first: mcp_server.py
+"""
+import multiprocessing
+import os
+import runpy
+import time
+from pathlib import Path
+
+from pydantic import BaseModel
+
+from flink_agents.api.agent import Agent
+from flink_agents.api.chat_message import ChatMessage, MessageRole
+from flink_agents.api.decorators import (
+ action,
+ chat_model_connection,
+ chat_model_setup,
+ mcp_server,
+)
+from flink_agents.api.events.chat_event import ChatRequestEvent,
ChatResponseEvent
+from flink_agents.api.events.event import InputEvent, OutputEvent
+from flink_agents.api.execution_environment import AgentsExecutionEnvironment
+from flink_agents.api.resource import ResourceDescriptor
+from flink_agents.api.runner_context import RunnerContext
+from flink_agents.api.tools.mcp import MCPServer
+from flink_agents.integrations.chat_models.ollama_chat_model import (
+ OllamaChatModelConnection,
+ OllamaChatModelSetup,
+)
+
+OLLAMA_MODEL = os.environ.get("OLLAMA_CHAT_MODEL", "qwen3:8b")
+MCP_SERVER_ENDPOINT = "http://127.0.0.1:8000/mcp"
+
+
+class CalculationInput(BaseModel):
+ """Input for calculation requests."""
+
+ a: int
+ b: int
+
+
+class MyMCPAgent(Agent):
+ """Example agent demonstrating MCP prompts and tools integration."""
+
+ @mcp_server
+ @staticmethod
+ def my_mcp_server() -> MCPServer:
+ """Define MCP server connection."""
+ return MCPServer(endpoint=MCP_SERVER_ENDPOINT)
+
+ @chat_model_connection
+ @staticmethod
+ def ollama_connection() -> ResourceDescriptor:
+ """ChatModelConnection for Ollama."""
+ return ResourceDescriptor(clazz=OllamaChatModelConnection)
+
+ @chat_model_setup
+ @staticmethod
+ def math_chat_model() -> ResourceDescriptor:
+ """ChatModel using MCP prompt and tool."""
+ return ResourceDescriptor(
+ clazz=OllamaChatModelSetup,
+ connection="ollama_connection",
+ model=OLLAMA_MODEL,
+ prompt="ask_sum", # MCP prompt registered from my_mcp_server
+ tools=["add"], # MCP tool registered from my_mcp_server
+ extract_reasoning=True,
+ )
+
+ @action(InputEvent)
+ @staticmethod
+ def process_input(event: InputEvent, ctx: RunnerContext) -> None:
+ """Process input and send chat request using MCP prompt.
+
+ The MCP prompt "ask_sum" accepts parameters {a} and {b}.
+ """
+ input_data: CalculationInput = event.input
+
+ # Send chat request with MCP prompt variables
+ # The prompt template will be filled with a and b values
+ msg = ChatMessage(
+ role=MessageRole.USER,
+ extra_args={"a": str(input_data.a), "b": str(input_data.b)},
+ )
+
+ ctx.send_event(ChatRequestEvent(model="math_chat_model",
messages=[msg]))
+
+ @action(ChatResponseEvent)
+ @staticmethod
+ def process_chat_response(event: ChatResponseEvent, ctx: RunnerContext) ->
None:
+ """Process chat response and output result."""
+ response = event.response
+ if response and response.content:
+ ctx.send_event(OutputEvent(output=response.content))
+
+
+def run_mcp_server() -> None:
+ """Run the MCP server in a separate process."""
+ runpy.run_path(f"{current_dir}/mcp_server.py")
+
+
+current_dir = Path(__file__).parent
+
+if __name__ == "__main__":
+ # Start MCP server in background
+ print("Starting MCP server...")
+ server_process = multiprocessing.Process(target=run_mcp_server)
+ server_process.start()
+ time.sleep(5)
+
+ print(f"\nRunning MyMCPAgent with Ollama model: {OLLAMA_MODEL}")
+ print(f"MCP server endpoint: {MCP_SERVER_ENDPOINT}\n")
+
+ env = AgentsExecutionEnvironment.get_execution_environment()
+ input_list = []
+ agent = MyMCPAgent()
+
+ output_list = env.from_list(input_list).apply(agent).to_list()
+
+ # Add test inputs
+ input_list.append({"key": "calc1", "value": CalculationInput(a=1, b=2)})
+ input_list.append({"key": "calc2", "value": CalculationInput(a=12, b=34)})
+
+ env.execute()
+
+ print("Results:")
+ for output in output_list:
+ for key, value in output.items():
+ print(f"{key}: {value}")
+
+ server_process.kill()
diff --git a/python/flink_agents/e2e_tests/mcp/mcp_server.py
b/python/flink_agents/e2e_tests/mcp/mcp_server.py
new file mode 100644
index 0000000..2ec0e12
--- /dev/null
+++ b/python/flink_agents/e2e_tests/mcp/mcp_server.py
@@ -0,0 +1,60 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""MCP server providing prompts and tools for calculation tasks."""
+import dotenv
+from mcp.server.fastmcp import FastMCP
+
+dotenv.load_dotenv()
+
+# Create MCP server
+mcp = FastMCP("MathServer")
+
+
[email protected]()
+def ask_sum(a: int, b: int) -> str:
+ """Generate a prompt asking to calculate the sum of two numbers.
+
+ This prompt will be used by chat models to request calculations.
+
+ Args:
+ a: The first operand
+ b: The second operand
+
+ Returns:
+ A formatted prompt string
+ """
+ return f"Calculate the sum of {a} and {b}?"
+
+
[email protected]()
+async def add(a: int, b: int) -> int:
+ """Calculate the sum of two numbers.
+
+ This tool can be called by chat models to perform the actual calculation.
+
+ Args:
+ a: The first operand
+ b: The second operand
+
+ Returns:
+ The sum of a and b
+ """
+ return a + b
+
+
+mcp.run("streamable-http")