xintongsong commented on code in PR #107: URL: https://github.com/apache/flink-agents/pull/107#discussion_r2321213307
########## python/flink_agents/api/tools/mcp.py: ########## @@ -0,0 +1,281 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +from __future__ import annotations + +from abc import ABC +import asyncio +from contextlib import AsyncExitStack +from datetime import timedelta +from typing import Any, Dict, List, Optional, Tuple +from urllib.parse import urlparse + +import httpx +from mcp.client.session import ClientSession +from mcp.client.streamable_http import streamablehttp_client +from pydantic import Field +from typing_extensions import override + +from flink_agents.api.resource import Resource, ResourceType +from flink_agents.api.tools.tool import BaseTool, ToolMetadata, ToolType +from flink_agents.api.tools.utils import extract_mcp_content_item +from flink_agents.api.prompts.prompt import Prompt + + + +class MCPToolDefinition(BaseTool): + """MCP tool definition that can be called directly. + + This represents a single tool from an MCP server. + """ + + mcp_server: Optional["MCPServer"] = Field(default=None, exclude=True) + + @classmethod + @override + def tool_type(cls) -> ToolType: + return ToolType.MCP + + @override + def call(self, *args: Any, **kwargs: Any) -> Any: + """Call the MCP tool with the given arguments.""" + if self.mcp_server is None: + raise ValueError("MCP tool call requires a reference to the MCP server") + + return asyncio.run(self.mcp_server._call_tool_async(self.metadata.name, *args, **kwargs)) + + +class MCPPrompt(Prompt): + """MCP prompt definition that extends the base Prompt class. + + This represents a prompt template from an MCP server. + """ + + description: Optional[str] = None + arguments: Dict[str, Any] = Field(default_factory=dict) + mcp_server: Optional["MCPServer"] = Field(default=None, exclude=True) + + def get_content(self, **arguments: Any) -> Any: + """Get the actual prompt content from the MCP server. + + Returns a list of messages, each containing role and content information. + """ + if self.mcp_server is None: + raise ValueError("MCP prompt requires a reference to the MCP server") + + return asyncio.run(self.mcp_server._get_prompt_content_async(self.name, arguments)) + + +class MCPServer(Resource, ABC): + """Resource representing an MCP server and exposing its tools/prompts. + + This is a logical container for MCP tools and prompts; it is not directly invokable. + """ + + model_config = {"arbitrary_types_allowed": True} + + # HTTP connection parameters + endpoint: str + headers: Optional[Dict[str, Any]] = None + timeout: timedelta = timedelta(seconds=30) + sse_read_timeout: timedelta = timedelta(seconds=60 * 5) + auth: Optional[httpx.Auth] = None + + session: Optional[ClientSession] = Field(default=None, exclude=True) + connection_context: Optional[AsyncExitStack] = Field(default=None, exclude=True) + + @classmethod + @override + def resource_type(cls) -> ResourceType: + return ResourceType.MCP_SERVER + + def _is_valid_http_url(self) -> bool: + """Check if the endpoint is a valid HTTP URL.""" + try: + parsed = urlparse(self.endpoint) + return parsed.scheme in ("http", "https") and bool(parsed.netloc) + except Exception: + return False + + async def _get_session(self) -> ClientSession: + """Get or create an MCP client session using the official SDK's streamable HTTP transport.""" + if self.session is not None: + return self.session + + if not self._is_valid_http_url(): + raise ValueError(f"Invalid HTTP endpoint: {self.endpoint}") + + try: + self.connection_context = AsyncExitStack() + + # Use streamable HTTP client with context management + read_stream, write_stream, _ = await self.connection_context.enter_async_context( + streamablehttp_client( + url=self.endpoint, + headers=self.headers, + timeout=self.timeout, + sse_read_timeout=self.sse_read_timeout, + auth=self.auth + ) + ) + + self.session = await self.connection_context.enter_async_context( + ClientSession(read_stream, write_stream) + ) + + await self.session.initialize() + + return self.session + + except Exception as e: + await self._cleanup_connection() + raise + + async def _cleanup_connection(self) -> None: + """Clean up connection resources.""" + try: + if self.connection_context is not None: + await self.connection_context.aclose() + self.connection_context = None + self.session = None + except Exception: + pass + + async def _call_tool_async(self, tool_name: str, *args: Any, **kwargs: Any) -> Any: + """Call a tool on the MCP server asynchronously.""" + session = await self._get_session() + + arguments = kwargs if kwargs else (args[0] if args else {}) + + result = await session.call_tool(tool_name, arguments) + + content = [] + for item in result.content: + content.append(extract_mcp_content_item(item)) + + return content + + def list_tools(self) -> List[MCPToolDefinition]: + """List available tool definitions from the MCP server.""" + return asyncio.run(self._list_tools_async()) + + async def _list_tools_async(self) -> List[MCPToolDefinition]: + """Async implementation of list_tools.""" + session = await self._get_session() + tools_response = await session.list_tools() + + tools = [] + for tool_data in tools_response.tools or []: + metadata = ToolMetadata( + name=tool_data.name, + description=tool_data.description or "", + args_schema=tool_data.inputSchema or {"type": "object", "properties": {}} + ) + + tool = MCPToolDefinition( + metadata=metadata, + mcp_server=self, + timeout=self.timeout + ) + tools.append(tool) + + return tools + + def get_tool(self, name: str) -> MCPToolDefinition: + """Get a single callable tool by name.""" + return asyncio.run(self._get_tool_async(name)) + + async def _get_tool_async(self, name: str) -> MCPToolDefinition: + """Async implementation of get_tool.""" + tools, _ = await self._list_tools_async() + for tool in tools: + if tool.metadata.name == name: + return tool + raise ValueError(f"Tool '{name}' not found on MCP server at {self.endpoint}") + + def get_tool_definition(self, name: str) -> ToolMetadata: + """Get a single tool definition metadata by name.""" + tool = self.get_tool(name) + return tool.metadata + + def list_prompts(self) -> List[MCPPrompt]: + """List available prompts from the MCP server.""" + return asyncio.run(self._list_prompts_async()) + + async def _list_prompts_async(self) -> List[MCPPrompt]: + """Async implementation of list_prompts.""" + session = await self._get_session() + prompts_response = await session.list_prompts() + + prompts = [] + for prompt_data in prompts_response.prompts or []: + arguments_dict = {} + if prompt_data.arguments: + for arg in prompt_data.arguments: + arguments_dict[arg.name] = { + "description": arg.description, + "required": getattr(arg, 'required', False) + } + + prompt = MCPPrompt( + name=prompt_data.name, + template="", # MCP prompts don't have templates in metadata - content comes from get_prompt call Review Comment: I think you mean `get_content`, not `get_prompt`? Still, users should not be aware of any differences in using a `Prompt` and a `MCPPrompt`. We might consider overriding `format_string()` and `from_text()` rather than introducing a new `get_content`. And if `template` doesn't make sense for `MCPPrompt`, that probably indicates that we should refactor the hierarchy of prompts. Maybe introducing a new `BasePrompt` abstraction, and change the current `Prompt` into something like `LocalPrompt`? ########## python/flink_agents/api/tools/mcp.py: ########## @@ -0,0 +1,281 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +from __future__ import annotations + +from abc import ABC +import asyncio +from contextlib import AsyncExitStack +from datetime import timedelta +from typing import Any, Dict, List, Optional, Tuple +from urllib.parse import urlparse + +import httpx +from mcp.client.session import ClientSession +from mcp.client.streamable_http import streamablehttp_client +from pydantic import Field +from typing_extensions import override + +from flink_agents.api.resource import Resource, ResourceType +from flink_agents.api.tools.tool import BaseTool, ToolMetadata, ToolType +from flink_agents.api.tools.utils import extract_mcp_content_item +from flink_agents.api.prompts.prompt import Prompt + + + +class MCPToolDefinition(BaseTool): + """MCP tool definition that can be called directly. + + This represents a single tool from an MCP server. + """ + + mcp_server: Optional["MCPServer"] = Field(default=None, exclude=True) + + @classmethod + @override + def tool_type(cls) -> ToolType: + return ToolType.MCP + + @override + def call(self, *args: Any, **kwargs: Any) -> Any: + """Call the MCP tool with the given arguments.""" + if self.mcp_server is None: + raise ValueError("MCP tool call requires a reference to the MCP server") + + return asyncio.run(self.mcp_server._call_tool_async(self.metadata.name, *args, **kwargs)) + + +class MCPPrompt(Prompt): + """MCP prompt definition that extends the base Prompt class. + + This represents a prompt template from an MCP server. + """ + + description: Optional[str] = None + arguments: Dict[str, Any] = Field(default_factory=dict) + mcp_server: Optional["MCPServer"] = Field(default=None, exclude=True) + + def get_content(self, **arguments: Any) -> Any: + """Get the actual prompt content from the MCP server. + + Returns a list of messages, each containing role and content information. + """ + if self.mcp_server is None: + raise ValueError("MCP prompt requires a reference to the MCP server") + + return asyncio.run(self.mcp_server._get_prompt_content_async(self.name, arguments)) + + +class MCPServer(Resource, ABC): + """Resource representing an MCP server and exposing its tools/prompts. + + This is a logical container for MCP tools and prompts; it is not directly invokable. + """ + + model_config = {"arbitrary_types_allowed": True} Review Comment: What is this and how is it used? ########## python/flink_agents/api/tools/mcp.py: ########## @@ -0,0 +1,281 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +from __future__ import annotations + +from abc import ABC +import asyncio +from contextlib import AsyncExitStack +from datetime import timedelta +from typing import Any, Dict, List, Optional, Tuple +from urllib.parse import urlparse + +import httpx +from mcp.client.session import ClientSession +from mcp.client.streamable_http import streamablehttp_client +from pydantic import Field +from typing_extensions import override + +from flink_agents.api.resource import Resource, ResourceType +from flink_agents.api.tools.tool import BaseTool, ToolMetadata, ToolType +from flink_agents.api.tools.utils import extract_mcp_content_item +from flink_agents.api.prompts.prompt import Prompt + + + +class MCPToolDefinition(BaseTool): + """MCP tool definition that can be called directly. + + This represents a single tool from an MCP server. + """ + + mcp_server: Optional["MCPServer"] = Field(default=None, exclude=True) + + @classmethod + @override + def tool_type(cls) -> ToolType: + return ToolType.MCP + + @override + def call(self, *args: Any, **kwargs: Any) -> Any: + """Call the MCP tool with the given arguments.""" + if self.mcp_server is None: + raise ValueError("MCP tool call requires a reference to the MCP server") + + return asyncio.run(self.mcp_server._call_tool_async(self.metadata.name, *args, **kwargs)) + + +class MCPPrompt(Prompt): Review Comment: How does user use this prompt? I'd expect to use a `MCPPrompt` just like using any other `Prompt`. That means calling `from_text()` or `format_messages()` on it. That may not work because `MCPPrompt` neither override these methods, nor set the properties that these methods use in its super class. ########## python/flink_agents/api/tools/mcp.py: ########## @@ -0,0 +1,281 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +from __future__ import annotations + +from abc import ABC +import asyncio +from contextlib import AsyncExitStack +from datetime import timedelta +from typing import Any, Dict, List, Optional, Tuple +from urllib.parse import urlparse + +import httpx +from mcp.client.session import ClientSession +from mcp.client.streamable_http import streamablehttp_client +from pydantic import Field +from typing_extensions import override + +from flink_agents.api.resource import Resource, ResourceType +from flink_agents.api.tools.tool import BaseTool, ToolMetadata, ToolType +from flink_agents.api.tools.utils import extract_mcp_content_item +from flink_agents.api.prompts.prompt import Prompt + + + +class MCPToolDefinition(BaseTool): + """MCP tool definition that can be called directly. + + This represents a single tool from an MCP server. + """ + + mcp_server: Optional["MCPServer"] = Field(default=None, exclude=True) + + @classmethod + @override + def tool_type(cls) -> ToolType: + return ToolType.MCP + + @override + def call(self, *args: Any, **kwargs: Any) -> Any: + """Call the MCP tool with the given arguments.""" + if self.mcp_server is None: + raise ValueError("MCP tool call requires a reference to the MCP server") + + return asyncio.run(self.mcp_server._call_tool_async(self.metadata.name, *args, **kwargs)) + + +class MCPPrompt(Prompt): + """MCP prompt definition that extends the base Prompt class. + + This represents a prompt template from an MCP server. + """ + + description: Optional[str] = None + arguments: Dict[str, Any] = Field(default_factory=dict) + mcp_server: Optional["MCPServer"] = Field(default=None, exclude=True) + + def get_content(self, **arguments: Any) -> Any: + """Get the actual prompt content from the MCP server. + + Returns a list of messages, each containing role and content information. + """ + if self.mcp_server is None: + raise ValueError("MCP prompt requires a reference to the MCP server") + + return asyncio.run(self.mcp_server._get_prompt_content_async(self.name, arguments)) + + +class MCPServer(Resource, ABC): + """Resource representing an MCP server and exposing its tools/prompts. + + This is a logical container for MCP tools and prompts; it is not directly invokable. + """ + + model_config = {"arbitrary_types_allowed": True} + + # HTTP connection parameters + endpoint: str + headers: Optional[Dict[str, Any]] = None + timeout: timedelta = timedelta(seconds=30) + sse_read_timeout: timedelta = timedelta(seconds=60 * 5) + auth: Optional[httpx.Auth] = None + + session: Optional[ClientSession] = Field(default=None, exclude=True) + connection_context: Optional[AsyncExitStack] = Field(default=None, exclude=True) + + @classmethod + @override + def resource_type(cls) -> ResourceType: + return ResourceType.MCP_SERVER + + def _is_valid_http_url(self) -> bool: + """Check if the endpoint is a valid HTTP URL.""" + try: + parsed = urlparse(self.endpoint) + return parsed.scheme in ("http", "https") and bool(parsed.netloc) + except Exception: + return False + + async def _get_session(self) -> ClientSession: + """Get or create an MCP client session using the official SDK's streamable HTTP transport.""" + if self.session is not None: + return self.session + + if not self._is_valid_http_url(): + raise ValueError(f"Invalid HTTP endpoint: {self.endpoint}") + + try: + self.connection_context = AsyncExitStack() + + # Use streamable HTTP client with context management + read_stream, write_stream, _ = await self.connection_context.enter_async_context( + streamablehttp_client( + url=self.endpoint, + headers=self.headers, + timeout=self.timeout, + sse_read_timeout=self.sse_read_timeout, + auth=self.auth + ) + ) + + self.session = await self.connection_context.enter_async_context( + ClientSession(read_stream, write_stream) + ) + + await self.session.initialize() + + return self.session + + except Exception as e: + await self._cleanup_connection() + raise + + async def _cleanup_connection(self) -> None: + """Clean up connection resources.""" + try: + if self.connection_context is not None: + await self.connection_context.aclose() + self.connection_context = None + self.session = None + except Exception: + pass + + async def _call_tool_async(self, tool_name: str, *args: Any, **kwargs: Any) -> Any: + """Call a tool on the MCP server asynchronously.""" + session = await self._get_session() + + arguments = kwargs if kwargs else (args[0] if args else {}) + + result = await session.call_tool(tool_name, arguments) + + content = [] + for item in result.content: + content.append(extract_mcp_content_item(item)) + + return content + + def list_tools(self) -> List[MCPToolDefinition]: + """List available tool definitions from the MCP server.""" + return asyncio.run(self._list_tools_async()) + + async def _list_tools_async(self) -> List[MCPToolDefinition]: + """Async implementation of list_tools.""" + session = await self._get_session() + tools_response = await session.list_tools() + + tools = [] + for tool_data in tools_response.tools or []: + metadata = ToolMetadata( + name=tool_data.name, + description=tool_data.description or "", + args_schema=tool_data.inputSchema or {"type": "object", "properties": {}} + ) + + tool = MCPToolDefinition( + metadata=metadata, + mcp_server=self, + timeout=self.timeout + ) + tools.append(tool) + + return tools + + def get_tool(self, name: str) -> MCPToolDefinition: + """Get a single callable tool by name.""" + return asyncio.run(self._get_tool_async(name)) + + async def _get_tool_async(self, name: str) -> MCPToolDefinition: + """Async implementation of get_tool.""" + tools, _ = await self._list_tools_async() + for tool in tools: + if tool.metadata.name == name: + return tool + raise ValueError(f"Tool '{name}' not found on MCP server at {self.endpoint}") + + def get_tool_definition(self, name: str) -> ToolMetadata: Review Comment: ```suggestion def get_tool_metadata(self, name: str) -> ToolMetadata: ``` ########## python/flink_agents/api/chat_models/chat_model.py: ########## @@ -60,3 +65,20 @@ def chat(self, messages: Sequence[ChatMessage]) -> ChatMessage: ChatMessage Response from the ChatModel. """ + + @abstractmethod + def convert_tool_to_model_format(self, tool_metadata: ToolMetadata) -> Dict[str, Any]: Review Comment: Why do we need this method? It's never implemented or called. ########## python/flink_agents/api/tools/mcp.py: ########## @@ -0,0 +1,281 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +from __future__ import annotations + +from abc import ABC +import asyncio +from contextlib import AsyncExitStack +from datetime import timedelta +from typing import Any, Dict, List, Optional, Tuple +from urllib.parse import urlparse + +import httpx +from mcp.client.session import ClientSession +from mcp.client.streamable_http import streamablehttp_client +from pydantic import Field +from typing_extensions import override + +from flink_agents.api.resource import Resource, ResourceType +from flink_agents.api.tools.tool import BaseTool, ToolMetadata, ToolType +from flink_agents.api.tools.utils import extract_mcp_content_item +from flink_agents.api.prompts.prompt import Prompt + + + +class MCPToolDefinition(BaseTool): Review Comment: Maybe just call it `MCPTool`, to align with `FunctionTool`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org