This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new 23039b8 [api][integration] Move mcp module from api to integration.
(#415)
23039b8 is described below
commit 23039b8256c8d38ded08363605e82a164cc713ce
Author: Wenjin Xie <[email protected]>
AuthorDate: Wed Jan 7 16:50:18 2026 +0800
[api][integration] Move mcp module from api to integration. (#415)
---
python/flink_agents/api/agents/agent.py | 2 -
python/flink_agents/api/tests/mcp/test_mcp.py | 56 --------------
python/flink_agents/api/tests/test_tool.py | 75 ------------------
python/flink_agents/api/tools/utils.py | 44 +----------
.../e2e_tests_mcp/mcp_test.py | 2 +-
.../mcp/mcp_server.py => integrations/__init__.py} | 34 ---------
.../mcp_server.py => integrations/mcp/__init__.py} | 34 ---------
.../{api/tools => integrations/mcp}/mcp.py | 2 +-
.../mcp/tests/__init__.py} | 34 ---------
.../mcp => integrations/mcp/tests}/mcp_server.py | 0
.../mcp/tests/test_mcp.py} | 89 +++++++++-------------
python/flink_agents/integrations/mcp/utils.py | 65 ++++++++++++++++
python/flink_agents/plan/agent_plan.py | 2 +-
13 files changed, 104 insertions(+), 335 deletions(-)
diff --git a/python/flink_agents/api/agents/agent.py
b/python/flink_agents/api/agents/agent.py
index 3619338..bef4f10 100644
--- a/python/flink_agents/api/agents/agent.py
+++ b/python/flink_agents/api/agents/agent.py
@@ -24,7 +24,6 @@ from flink_agents.api.resource import (
ResourceType,
SerializableResource,
)
-from flink_agents.api.tools.mcp import MCPServer
STRUCTURED_OUTPUT = "structured_output"
@@ -88,7 +87,6 @@ class Agent(ABC):
_actions: Dict[str, Tuple[List[Type[Event]], Callable, Dict[str, Any]]]
_resources: Dict[ResourceType, Dict[str, Any]]
- _mcp_servers: Dict[str, MCPServer]
def __init__(self) -> None:
"""Init method."""
diff --git a/python/flink_agents/api/tests/mcp/test_mcp.py
b/python/flink_agents/api/tests/mcp/test_mcp.py
deleted file mode 100644
index 80aecac..0000000
--- a/python/flink_agents/api/tests/mcp/test_mcp.py
+++ /dev/null
@@ -1,56 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#################################################################################
-import multiprocessing
-import runpy
-import time
-from pathlib import Path
-
-from flink_agents.api.chat_message import ChatMessage, MessageRole
-from flink_agents.api.tools.mcp import MCPServer
-
-
-def run_server() -> None: # noqa : D103
- runpy.run_path(f"{current_dir}/mcp_server.py")
-
-current_dir = Path(__file__).parent
-def test_mcp() -> None: # noqa : D103
- process = multiprocessing.Process(target=run_server)
- process.start()
- time.sleep(5)
-
- mcp_server = MCPServer(endpoint="http://127.0.0.1:8000/mcp")
- prompts = mcp_server.list_prompts()
- assert len(prompts) == 1
- prompt = prompts[0]
- assert prompt.name == "ask_sum"
- message = prompt.format_messages(role=MessageRole.SYSTEM, a="1", b="2")
- assert [ChatMessage(
- role=MessageRole.USER,
- content="Can you please calculate the sum of 1 and 2?",
- )] == message
- tools = mcp_server.list_tools()
- assert len(tools) == 1
- tool = tools[0]
- assert tool.name == "add"
-
- process.kill()
-
-
-
-
-
diff --git a/python/flink_agents/api/tests/test_tool.py
b/python/flink_agents/api/tests/test_tool.py
index 1630833..203d654 100644
--- a/python/flink_agents/api/tests/test_tool.py
+++ b/python/flink_agents/api/tests/test_tool.py
@@ -18,23 +18,13 @@
from __future__ import annotations
import json
-from datetime import timedelta
from pathlib import Path
-from typing import TYPE_CHECKING
-from urllib.parse import parse_qs, urlparse
import pytest
-from mcp.client.auth import OAuthClientProvider, TokenStorage
-from mcp.shared.auth import OAuthClientMetadata
-from pydantic import AnyUrl
-from flink_agents.api.tools.mcp import MCPServer
from flink_agents.api.tools.tool import ToolMetadata
from flink_agents.api.tools.utils import create_schema_from_function
-if TYPE_CHECKING:
- from mcp.shared.auth import OAuthClientInformationFull, OAuthToken
-
current_dir = Path(__file__).parent
@@ -79,68 +69,3 @@ def test_deserialize_tool_metadata(tool_metadata:
ToolMetadata) -> None: # noqa
expected_json = f.read()
actual_tool_metadata = tool_metadata.model_validate_json(expected_json)
assert actual_tool_metadata == tool_metadata
-
-
-class InMemoryTokenStorage(TokenStorage):
- """Demo In-memory token storage implementation."""
-
- def __init__(self) -> None: # noqa:D107
- self.tokens: OAuthToken | None = None
- self.client_info: OAuthClientInformationFull | None = None
-
- async def get_tokens(self) -> OAuthToken | None:
- """Get stored tokens."""
- return self.tokens
-
- async def set_tokens(self, tokens: OAuthToken) -> None:
- """Store tokens."""
- self.tokens = tokens
-
- async def get_client_info(self) -> OAuthClientInformationFull | None:
- """Get stored client information."""
- return self.client_info
-
- async def set_client_info(self, client_info: OAuthClientInformationFull)
-> None:
- """Store client information."""
- self.client_info = client_info
-
-
-async def handle_redirect(auth_url: str) -> None: # noqa:D103
- print(f"Visit: {auth_url}")
-
-
-async def handle_callback() -> tuple[str, str | None]: # noqa:D103
- callback_url = input("Paste callback URL: ")
- params = parse_qs(urlparse(callback_url).query)
- return params["code"][0], params.get("state", [None])[0]
-
-
-def test_serialize_mcp_server() -> None: # noqa:D103
- oauth_auth = OAuthClientProvider(
- server_url="http://localhost:8001",
- client_metadata=OAuthClientMetadata(
- client_name="Example MCP Client",
- redirect_uris=[AnyUrl("http://localhost:3000/callback")],
- grant_types=["authorization_code", "refresh_token"],
- response_types=["code"],
- scope="user",
- ),
- storage=InMemoryTokenStorage(),
- redirect_handler=handle_redirect,
- callback_handler=handle_callback,
- )
- mcp_server = MCPServer(
- endpoint="http://localhost:8080",
- auth=oauth_auth,
- timeout=timedelta(seconds=5),
- )
- data = mcp_server.model_dump_json(serialize_as_any=True)
-
- deserialized = mcp_server.model_validate_json(data)
- assert deserialized.endpoint == mcp_server.endpoint
- assert deserialized.timeout == mcp_server.timeout
- assert deserialized.auth.context.server_url ==
mcp_server.auth.context.server_url
- assert (
- deserialized.auth.context.client_metadata
- == mcp_server.auth.context.client_metadata
- )
diff --git a/python/flink_agents/api/tools/utils.py
b/python/flink_agents/api/tools/utils.py
index 976fcde..6b5cd4b 100644
--- a/python/flink_agents/api/tools/utils.py
+++ b/python/flink_agents/api/tools/utils.py
@@ -18,10 +18,9 @@
import json
import typing
from inspect import signature
-from typing import Any, Callable, Dict, Optional, Type, Union
+from typing import Any, Callable, Optional, Type, Union
from docstring_parser import parse
-from mcp import types
from pydantic import BaseModel, create_model
from pydantic.fields import Field, FieldInfo
@@ -192,44 +191,3 @@ def create_model_from_java_tool_schema_str(name: str,
schema_str: str) -> type[B
type = TYPE_MAPPING.get(properties[param_name]["type"])
fields[param_name] = (type, FieldInfo(description=description))
return create_model(name, **fields)
-
-def extract_mcp_content_item(content_item: Any) -> Dict[str, Any] | str:
- """Extract and normalize a single MCP content item.
-
- Args:
- content_item: A single MCP content item (TextContent, ImageContent,
etc.)
-
- Returns:
- Dict representation of the content item
-
- Raises:
- ImportError: If MCP types are not available
- """
- if types is None:
- err_msg = "MCP types not available. Please install the mcp package."
- raise ImportError(err_msg)
-
- if isinstance(content_item, types.TextContent):
- return content_item.text
- elif isinstance(content_item, types.ImageContent):
- return {
- "type": "image",
- "data": content_item.data,
- "mimeType": content_item.mimeType
- }
- elif isinstance(content_item, types.EmbeddedResource):
- if isinstance(content_item.resource, types.TextResourceContents):
- return {
- "type": "resource",
- "uri": content_item.resource.uri,
- "text": content_item.resource.text
- }
- elif isinstance(content_item.resource, types.BlobResourceContents):
- return {
- "type": "resource",
- "uri": content_item.resource.uri,
- "blob": content_item.resource.blob
- }
- else:
- # Handle unknown content types as generic dict
- return content_item.model_dump() if hasattr(content_item,
'model_dump') else str(content_item)
diff --git
a/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_test.py
b/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_test.py
index 0389fbf..df00cce 100644
---
a/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_test.py
+++
b/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_test.py
@@ -48,12 +48,12 @@ from flink_agents.api.events.event import InputEvent,
OutputEvent
from flink_agents.api.execution_environment import AgentsExecutionEnvironment
from flink_agents.api.resource import ResourceDescriptor
from flink_agents.api.runner_context import RunnerContext
-from flink_agents.api.tools.mcp import MCPServer
from flink_agents.e2e_tests.test_utils import pull_model
from flink_agents.integrations.chat_models.ollama_chat_model import (
OllamaChatModelConnection,
OllamaChatModelSetup,
)
+from flink_agents.integrations.mcp.mcp import MCPServer
OLLAMA_MODEL = os.environ.get("MCP_OLLAMA_CHAT_MODEL", "qwen3:1.7b")
MCP_SERVER_ENDPOINT = "http://127.0.0.1:8000/mcp"
diff --git a/python/flink_agents/api/tests/mcp/mcp_server.py
b/python/flink_agents/integrations/__init__.py
similarity index 59%
copy from python/flink_agents/api/tests/mcp/mcp_server.py
copy to python/flink_agents/integrations/__init__.py
index 2e295d0..e154fad 100644
--- a/python/flink_agents/api/tests/mcp/mcp_server.py
+++ b/python/flink_agents/integrations/__init__.py
@@ -15,37 +15,3 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################
-
-try:
- import dotenv
- dotenv.load_dotenv()
-except ImportError:
- # dotenv is optional for this test server
- pass
-
-from mcp.server.fastmcp import FastMCP
-
-# Create MCP server
-mcp = FastMCP("BasicServer")
-
-
[email protected]()
-def ask_sum(a: int, b: int) -> str:
- """Prompt of add tool."""
- return f"Can you please calculate the sum of {a} and {b}?"
-
[email protected]()
-async def add(a: int, b: int) -> int:
- """Get the detailed information of a specified IP address.
-
- Args:
- a: The first operand.
- b: The second operand.
-
- Returns:
- int: The sum of a and b.
- """
- return a + b
-
-mcp.run("streamable-http")
-
diff --git a/python/flink_agents/api/tests/mcp/mcp_server.py
b/python/flink_agents/integrations/mcp/__init__.py
similarity index 59%
copy from python/flink_agents/api/tests/mcp/mcp_server.py
copy to python/flink_agents/integrations/mcp/__init__.py
index 2e295d0..e154fad 100644
--- a/python/flink_agents/api/tests/mcp/mcp_server.py
+++ b/python/flink_agents/integrations/mcp/__init__.py
@@ -15,37 +15,3 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################
-
-try:
- import dotenv
- dotenv.load_dotenv()
-except ImportError:
- # dotenv is optional for this test server
- pass
-
-from mcp.server.fastmcp import FastMCP
-
-# Create MCP server
-mcp = FastMCP("BasicServer")
-
-
[email protected]()
-def ask_sum(a: int, b: int) -> str:
- """Prompt of add tool."""
- return f"Can you please calculate the sum of {a} and {b}?"
-
[email protected]()
-async def add(a: int, b: int) -> int:
- """Get the detailed information of a specified IP address.
-
- Args:
- a: The first operand.
- b: The second operand.
-
- Returns:
- int: The sum of a and b.
- """
- return a + b
-
-mcp.run("streamable-http")
-
diff --git a/python/flink_agents/api/tools/mcp.py
b/python/flink_agents/integrations/mcp/mcp.py
similarity index 99%
rename from python/flink_agents/api/tools/mcp.py
rename to python/flink_agents/integrations/mcp/mcp.py
index cec0fd5..32ac405 100644
--- a/python/flink_agents/api/tools/mcp.py
+++ b/python/flink_agents/integrations/mcp/mcp.py
@@ -40,7 +40,7 @@ from flink_agents.api.chat_message import ChatMessage,
MessageRole
from flink_agents.api.prompts.prompt import Prompt
from flink_agents.api.resource import ResourceType, SerializableResource
from flink_agents.api.tools.tool import Tool, ToolMetadata, ToolType
-from flink_agents.api.tools.utils import extract_mcp_content_item
+from flink_agents.integrations.mcp.utils import extract_mcp_content_item
class MCPTool(Tool):
diff --git a/python/flink_agents/api/tests/mcp/mcp_server.py
b/python/flink_agents/integrations/mcp/tests/__init__.py
similarity index 59%
copy from python/flink_agents/api/tests/mcp/mcp_server.py
copy to python/flink_agents/integrations/mcp/tests/__init__.py
index 2e295d0..e154fad 100644
--- a/python/flink_agents/api/tests/mcp/mcp_server.py
+++ b/python/flink_agents/integrations/mcp/tests/__init__.py
@@ -15,37 +15,3 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################
-
-try:
- import dotenv
- dotenv.load_dotenv()
-except ImportError:
- # dotenv is optional for this test server
- pass
-
-from mcp.server.fastmcp import FastMCP
-
-# Create MCP server
-mcp = FastMCP("BasicServer")
-
-
[email protected]()
-def ask_sum(a: int, b: int) -> str:
- """Prompt of add tool."""
- return f"Can you please calculate the sum of {a} and {b}?"
-
[email protected]()
-async def add(a: int, b: int) -> int:
- """Get the detailed information of a specified IP address.
-
- Args:
- a: The first operand.
- b: The second operand.
-
- Returns:
- int: The sum of a and b.
- """
- return a + b
-
-mcp.run("streamable-http")
-
diff --git a/python/flink_agents/api/tests/mcp/mcp_server.py
b/python/flink_agents/integrations/mcp/tests/mcp_server.py
similarity index 100%
rename from python/flink_agents/api/tests/mcp/mcp_server.py
rename to python/flink_agents/integrations/mcp/tests/mcp_server.py
diff --git a/python/flink_agents/api/tests/test_tool.py
b/python/flink_agents/integrations/mcp/tests/test_mcp.py
similarity index 67%
copy from python/flink_agents/api/tests/test_tool.py
copy to python/flink_agents/integrations/mcp/tests/test_mcp.py
index 1630833..6e5aa3c 100644
--- a/python/flink_agents/api/tests/test_tool.py
+++ b/python/flink_agents/integrations/mcp/tests/test_mcp.py
@@ -15,70 +15,47 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################
-from __future__ import annotations
-
-import json
+import multiprocessing
+import runpy
+import time
from datetime import timedelta
from pathlib import Path
-from typing import TYPE_CHECKING
from urllib.parse import parse_qs, urlparse
-import pytest
from mcp.client.auth import OAuthClientProvider, TokenStorage
-from mcp.shared.auth import OAuthClientMetadata
+from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata,
OAuthToken
from pydantic import AnyUrl
-from flink_agents.api.tools.mcp import MCPServer
-from flink_agents.api.tools.tool import ToolMetadata
-from flink_agents.api.tools.utils import create_schema_from_function
-
-if TYPE_CHECKING:
- from mcp.shared.auth import OAuthClientInformationFull, OAuthToken
-
-current_dir = Path(__file__).parent
-
-
-def foo(bar: int, baz: str) -> str:
- """Function for testing ToolMetadata.
-
- Parameters
- ----------
- bar : int
- The bar value.
- baz : str
- The baz value.
-
- Returns:
- -------
- str
- Response string value.
- """
- raise NotImplementedError
-
-
[email protected](scope="module")
-def tool_metadata() -> ToolMetadata: # noqa: D103
- return ToolMetadata(
- name="foo",
- description="Function for testing ToolMetadata",
- args_schema=create_schema_from_function(name="foo", func=foo),
- )
+from flink_agents.api.chat_message import ChatMessage, MessageRole
+from flink_agents.integrations.mcp.mcp import MCPServer
-def test_serialize_tool_metadata(tool_metadata: ToolMetadata) -> None: #
noqa: D103
- json_value = tool_metadata.model_dump_json(serialize_as_any=True)
- with Path(f"{current_dir}/resources/tool_metadata.json").open() as f:
- expected_json = f.read()
- actual = json.loads(json_value)
- expected = json.loads(expected_json)
- assert actual == expected
+def run_server() -> None: # noqa : D103
+ runpy.run_path(f"{current_dir}/mcp_server.py")
+current_dir = Path(__file__).parent
+def test_mcp() -> None: # noqa : D103
+ process = multiprocessing.Process(target=run_server)
+ process.start()
+ time.sleep(5)
+
+ mcp_server = MCPServer(endpoint="http://127.0.0.1:8000/mcp")
+ prompts = mcp_server.list_prompts()
+ assert len(prompts) == 1
+ prompt = prompts[0]
+ assert prompt.name == "ask_sum"
+ message = prompt.format_messages(role=MessageRole.SYSTEM, a="1", b="2")
+ assert [ChatMessage(
+ role=MessageRole.USER,
+ content="Can you please calculate the sum of 1 and 2?",
+ )] == message
+ tools = mcp_server.list_tools()
+ assert len(tools) == 1
+ tool = tools[0]
+ assert tool.name == "add"
+
+ process.kill()
-def test_deserialize_tool_metadata(tool_metadata: ToolMetadata) -> None: #
noqa: D103
- with Path(f"{current_dir}/resources/tool_metadata.json").open() as f:
- expected_json = f.read()
- actual_tool_metadata = tool_metadata.model_validate_json(expected_json)
- assert actual_tool_metadata == tool_metadata
class InMemoryTokenStorage(TokenStorage):
@@ -114,7 +91,6 @@ async def handle_callback() -> tuple[str, str | None]: #
noqa:D103
params = parse_qs(urlparse(callback_url).query)
return params["code"][0], params.get("state", [None])[0]
-
def test_serialize_mcp_server() -> None: # noqa:D103
oauth_auth = OAuthClientProvider(
server_url="http://localhost:8001",
@@ -144,3 +120,8 @@ def test_serialize_mcp_server() -> None: # noqa:D103
deserialized.auth.context.client_metadata
== mcp_server.auth.context.client_metadata
)
+
+
+
+
+
diff --git a/python/flink_agents/integrations/mcp/utils.py
b/python/flink_agents/integrations/mcp/utils.py
new file mode 100644
index 0000000..ba96aa7
--- /dev/null
+++ b/python/flink_agents/integrations/mcp/utils.py
@@ -0,0 +1,65 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+from typing import Any, Dict
+
+from mcp import types
+
+
+def extract_mcp_content_item(content_item: Any) -> Dict[str, Any] | str:
+ """Extract and normalize a single MCP content item.
+
+ Args:
+ content_item: A single MCP content item (TextContent, ImageContent,
etc.)
+
+ Returns:
+ Dict representation of the content item
+
+ Raises:
+ ImportError: If MCP types are not available
+ """
+ if types is None:
+ err_msg = "MCP types not available. Please install the mcp package."
+ raise ImportError(err_msg)
+
+ if isinstance(content_item, types.TextContent):
+ return content_item.text
+ elif isinstance(content_item, types.ImageContent):
+ return {
+ "type": "image",
+ "data": content_item.data,
+ "mimeType": content_item.mimeType
+ }
+ elif isinstance(content_item, types.EmbeddedResource):
+ if isinstance(content_item.resource, types.TextResourceContents):
+ return {
+ "type": "resource",
+ "uri": content_item.resource.uri,
+ "text": content_item.resource.text
+ }
+ elif isinstance(content_item.resource, types.BlobResourceContents):
+ return {
+ "type": "resource",
+ "uri": content_item.resource.uri,
+ "blob": content_item.resource.blob
+ }
+ else:
+ err_msg = f"Unsupported content type: {type(content_item)}"
+ raise TypeError(err_msg)
+ else:
+ # Handle unknown content types as generic dict
+ return content_item.model_dump() if hasattr(content_item,
'model_dump') else str(content_item)
diff --git a/python/flink_agents/plan/agent_plan.py
b/python/flink_agents/plan/agent_plan.py
index 3a802ee..98de608 100644
--- a/python/flink_agents/plan/agent_plan.py
+++ b/python/flink_agents/plan/agent_plan.py
@@ -21,7 +21,7 @@ from pydantic import BaseModel, field_serializer,
model_validator
from flink_agents.api.agents.agent import Agent
from flink_agents.api.resource import Resource, ResourceType
-from flink_agents.api.tools.mcp import MCPServer
+from flink_agents.integrations.mcp.mcp import MCPServer
from flink_agents.plan.actions.action import Action
from flink_agents.plan.actions.chat_model_action import CHAT_MODEL_ACTION
from flink_agents.plan.actions.context_retrieval_action import
CONTEXT_RETRIEVAL_ACTION