This is an automated email from the ASF dual-hosted git repository. yuqi1129 pushed a commit to branch feat/mcp-governance-task3-6 in repository https://gitbox.apache.org/repos/asf/gravitino.git
commit 3b372972f2baba7495db7e4dddae7aa9409f25c2 Author: yuqi <[email protected]> AuthorDate: Wed Jun 10 21:21:13 2026 +0800 [#11574] feat(mcp-server): implement write operations for catalog, schema, table, model, topic, fileset Add create/alter/drop (and register/link/delete for models) write tools across all remaining resource types, following the same pattern as the tag write tools. Write tools are exposed by default; authorization is enforced by Gravitino core. Per resource: - catalog: create_catalog, alter_catalog, drop_catalog - schema: create_schema, alter_schema, drop_schema - table: create_table, alter_table, drop_table - topic: create_topic, alter_topic, delete_topic - fileset: create_fileset, alter_fileset, drop_fileset - model: register_model, delete_model, link_model_version, delete_model_version Each adds: abstract interface method, plain REST client implementation, FastMCP tool registration, mock implementation, and unit tests (20 new tests). Also remove a leftover unused `contextlib` import in server.py. --- mcp-server/mcp_server/client/catalog_operation.py | 19 ++++ mcp-server/mcp_server/client/fileset_operation.py | 29 +++++ mcp-server/mcp_server/client/model_operation.py | 36 +++++++ .../plain/plain_rest_client_catalog_operation.py | 35 ++++++ .../plain/plain_rest_client_fileset_operation.py | 56 +++++++++- .../plain/plain_rest_client_model_operation.py | 68 +++++++++++- .../plain/plain_rest_client_schema_operation.py | 29 +++++ .../plain/plain_rest_client_table_operation.py | 49 +++++++++ .../plain/plain_rest_client_topic_operation.py | 48 ++++++++- mcp-server/mcp_server/client/schema_operation.py | 16 +++ mcp-server/mcp_server/client/table_operation.py | 28 +++++ mcp-server/mcp_server/client/topic_operation.py | 27 +++++ mcp-server/mcp_server/server.py | 1 - mcp-server/mcp_server/tools/catalog.py | 75 +++++++++++++ mcp-server/mcp_server/tools/fileset.py | 94 +++++++++++++++++ mcp-server/mcp_server/tools/model.py | 117 +++++++++++++++++++++ mcp-server/mcp_server/tools/schema.py | 74 +++++++++++++ mcp-server/mcp_server/tools/table.py | 92 ++++++++++++++++ mcp-server/mcp_server/tools/topic.py | 82 +++++++++++++++ mcp-server/tests/unit/test_audit.py | 13 ++- mcp-server/tests/unit/test_auth_flow.py | 4 +- mcp-server/tests/unit/test_per_request_token.py | 12 ++- mcp-server/tests/unit/tools/mock_operation.py | 115 ++++++++++++++++++++ mcp-server/tests/unit/tools/test_catalog.py | 58 ++++++++++ mcp-server/tests/unit/tools/test_fileset.py | 58 ++++++++++ mcp-server/tests/unit/tools/test_model.py | 78 ++++++++++++++ mcp-server/tests/unit/tools/test_schema.py | 50 +++++++++ mcp-server/tests/unit/tools/test_table.py | 58 ++++++++++ mcp-server/tests/unit/tools/test_topic.py | 56 ++++++++++ 29 files changed, 1466 insertions(+), 11 deletions(-) diff --git a/mcp-server/mcp_server/client/catalog_operation.py b/mcp-server/mcp_server/client/catalog_operation.py index 08cf7a5b97..df9b1c91dc 100644 --- a/mcp-server/mcp_server/client/catalog_operation.py +++ b/mcp-server/mcp_server/client/catalog_operation.py @@ -32,3 +32,22 @@ class CatalogOperation(ABC): str: JSON-formatted string containing catalog information. """ pass + + @abstractmethod + async def create_catalog( + self, + name: str, + catalog_type: str, + provider: str, + comment: str, + properties: dict, + ) -> str: + pass + + @abstractmethod + async def alter_catalog(self, catalog_name: str, updates: list) -> str: + pass + + @abstractmethod + async def drop_catalog(self, catalog_name: str) -> str: + pass diff --git a/mcp-server/mcp_server/client/fileset_operation.py b/mcp-server/mcp_server/client/fileset_operation.py index a158c11cda..064e66d597 100644 --- a/mcp-server/mcp_server/client/fileset_operation.py +++ b/mcp-server/mcp_server/client/fileset_operation.py @@ -80,3 +80,32 @@ class FilesetOperation(ABC): str: JSON-formatted string containing list of files in the fileset """ pass + + @abstractmethod + async def create_fileset( + self, + catalog_name: str, + schema_name: str, + name: str, + fileset_type: str, + storage_location: str, + comment: str, + properties: dict, + ) -> str: + pass + + @abstractmethod + async def alter_fileset( + self, + catalog_name: str, + schema_name: str, + fileset_name: str, + updates: list, + ) -> str: + pass + + @abstractmethod + async def drop_fileset( + self, catalog_name: str, schema_name: str, fileset_name: str + ) -> str: + pass diff --git a/mcp-server/mcp_server/client/model_operation.py b/mcp-server/mcp_server/client/model_operation.py index c63eda326a..6a9aa4195e 100644 --- a/mcp-server/mcp_server/client/model_operation.py +++ b/mcp-server/mcp_server/client/model_operation.py @@ -106,3 +106,39 @@ class ModelOperation(ABC): str: JSON-formatted string containing full model version metadata """ pass + + @abstractmethod + async def register_model( + self, + catalog_name: str, + schema_name: str, + name: str, + comment: str, + properties: dict, + ) -> str: + pass + + @abstractmethod + async def delete_model( + self, catalog_name: str, schema_name: str, model_name: str + ) -> str: + pass + + @abstractmethod + async def link_model_version( + self, + catalog_name: str, + schema_name: str, + model_name: str, + uri: str, + aliases: list, + comment: str, + properties: dict, + ) -> str: + pass + + @abstractmethod + async def delete_model_version( + self, catalog_name: str, schema_name: str, model_name: str, version: int + ) -> str: + pass diff --git a/mcp-server/mcp_server/client/plain/plain_rest_client_catalog_operation.py b/mcp-server/mcp_server/client/plain/plain_rest_client_catalog_operation.py index 843cf3bfe7..9aece47fc3 100644 --- a/mcp-server/mcp_server/client/plain/plain_rest_client_catalog_operation.py +++ b/mcp-server/mcp_server/client/plain/plain_rest_client_catalog_operation.py @@ -34,3 +34,38 @@ class PlainRESTClientCatalogOperation(CatalogOperation): f"/api/metalakes/{encode_path_segment(self.metalake_name)}/catalogs?details=true" ) return extract_content_from_response(response, "catalogs", []) + + async def create_catalog( + self, + name: str, + catalog_type: str, + provider: str, + comment: str, + properties: dict, + ) -> str: + response = await self.rest_client.post( + f"/api/metalakes/{encode_path_segment(self.metalake_name)}/catalogs", + json={ + "name": name, + "type": catalog_type, + "provider": provider, + "comment": comment, + "properties": properties, + }, + ) + return extract_content_from_response(response, "catalog", {}) + + async def alter_catalog(self, catalog_name: str, updates: list) -> str: + response = await self.rest_client.put( + f"/api/metalakes/{encode_path_segment(self.metalake_name)}" + f"/catalogs/{encode_path_segment(catalog_name)}", + json={"updates": updates}, + ) + return extract_content_from_response(response, "catalog", {}) + + async def drop_catalog(self, catalog_name: str) -> str: + response = await self.rest_client.delete( + f"/api/metalakes/{encode_path_segment(self.metalake_name)}" + f"/catalogs/{encode_path_segment(catalog_name)}" + ) + return extract_content_from_response(response, "dropped", False) diff --git a/mcp-server/mcp_server/client/plain/plain_rest_client_fileset_operation.py b/mcp-server/mcp_server/client/plain/plain_rest_client_fileset_operation.py index 4c81eece1d..4c023157c5 100644 --- a/mcp-server/mcp_server/client/plain/plain_rest_client_fileset_operation.py +++ b/mcp-server/mcp_server/client/plain/plain_rest_client_fileset_operation.py @@ -16,7 +16,10 @@ # under the License. from mcp_server.client.fileset_operation import FilesetOperation -from mcp_server.client.plain.utils import encode_path_segment +from mcp_server.client.plain.utils import ( + encode_path_segment, + extract_content_from_response, +) class PlainRESTClientFilesetOperation(FilesetOperation): @@ -62,3 +65,54 @@ class PlainRESTClientFilesetOperation(FilesetOperation): params={"sub_path": sub_path, "location_name": location_name}, ) return response.json().get("files", []) + + async def create_fileset( + self, + catalog_name: str, + schema_name: str, + name: str, + fileset_type: str, + storage_location: str, + comment: str, + properties: dict, + ) -> str: + response = await self.rest_client.post( + f"/api/metalakes/{encode_path_segment(self.metalake_name)}" + f"/catalogs/{encode_path_segment(catalog_name)}" + f"/schemas/{encode_path_segment(schema_name)}/filesets", + json={ + "name": name, + "type": fileset_type, + "storageLocation": storage_location, + "comment": comment, + "properties": properties, + }, + ) + return extract_content_from_response(response, "fileset", {}) + + async def alter_fileset( + self, + catalog_name: str, + schema_name: str, + fileset_name: str, + updates: list, + ) -> str: + response = await self.rest_client.put( + f"/api/metalakes/{encode_path_segment(self.metalake_name)}" + f"/catalogs/{encode_path_segment(catalog_name)}" + f"/schemas/{encode_path_segment(schema_name)}" + f"/filesets/{encode_path_segment(fileset_name)}", + json={"updates": updates}, + ) + return extract_content_from_response(response, "fileset", {}) + + async def drop_fileset( + self, catalog_name: str, schema_name: str, fileset_name: str + ) -> str: + response = await self.rest_client.delete( + f"/api/metalakes/{encode_path_segment(self.metalake_name)}" + f"/catalogs/{encode_path_segment(catalog_name)}" + f"/schemas/{encode_path_segment(schema_name)}" + f"/filesets/{encode_path_segment(fileset_name)}" + ) + return extract_content_from_response(response, "dropped", False) diff --git a/mcp-server/mcp_server/client/plain/plain_rest_client_model_operation.py b/mcp-server/mcp_server/client/plain/plain_rest_client_model_operation.py index 7fce30bd91..1f36bc8359 100644 --- a/mcp-server/mcp_server/client/plain/plain_rest_client_model_operation.py +++ b/mcp-server/mcp_server/client/plain/plain_rest_client_model_operation.py @@ -16,7 +16,10 @@ # under the License. from mcp_server.client import ModelOperation -from mcp_server.client.plain.utils import encode_path_segment +from mcp_server.client.plain.utils import ( + encode_path_segment, + extract_content_from_response, +) class PlainRESTClientModelOperation(ModelOperation): @@ -81,3 +84,66 @@ class PlainRESTClientModelOperation(ModelOperation): f"/aliases/{encode_path_segment(alias)}" ) return response.json().get("modelVersion", {}) + + async def register_model( + self, + catalog_name: str, + schema_name: str, + name: str, + comment: str, + properties: dict, + ) -> str: + response = await self.rest_client.post( + f"/api/metalakes/{encode_path_segment(self.metalake_name)}" + f"/catalogs/{encode_path_segment(catalog_name)}" + f"/schemas/{encode_path_segment(schema_name)}/models", + json={"name": name, "comment": comment, "properties": properties}, + ) + return extract_content_from_response(response, "model", {}) + + async def delete_model( + self, catalog_name: str, schema_name: str, model_name: str + ) -> str: + response = await self.rest_client.delete( + f"/api/metalakes/{encode_path_segment(self.metalake_name)}" + f"/catalogs/{encode_path_segment(catalog_name)}" + f"/schemas/{encode_path_segment(schema_name)}" + f"/models/{encode_path_segment(model_name)}" + ) + return extract_content_from_response(response, "deleted", False) + + async def link_model_version( + self, + catalog_name: str, + schema_name: str, + model_name: str, + uri: str, + aliases: list, + comment: str, + properties: dict, + ) -> str: + response = await self.rest_client.post( + f"/api/metalakes/{encode_path_segment(self.metalake_name)}" + f"/catalogs/{encode_path_segment(catalog_name)}" + f"/schemas/{encode_path_segment(schema_name)}" + f"/models/{encode_path_segment(model_name)}/versions", + json={ + "uri": uri, + "aliases": aliases, + "comment": comment, + "properties": properties, + }, + ) + return extract_content_from_response(response, "modelVersion", {}) + + async def delete_model_version( + self, catalog_name: str, schema_name: str, model_name: str, version: int + ) -> str: + response = await self.rest_client.delete( + f"/api/metalakes/{encode_path_segment(self.metalake_name)}" + f"/catalogs/{encode_path_segment(catalog_name)}" + f"/schemas/{encode_path_segment(schema_name)}" + f"/models/{encode_path_segment(model_name)}" + f"/versions/{encode_path_segment(version)}" + ) + return extract_content_from_response(response, "deleted", False) diff --git a/mcp-server/mcp_server/client/plain/plain_rest_client_schema_operation.py b/mcp-server/mcp_server/client/plain/plain_rest_client_schema_operation.py index 4c2c76ed44..ac6554bedb 100644 --- a/mcp-server/mcp_server/client/plain/plain_rest_client_schema_operation.py +++ b/mcp-server/mcp_server/client/plain/plain_rest_client_schema_operation.py @@ -35,3 +35,32 @@ class PlainRESTClientSchemaOperation(SchemaOperation): f"/catalogs/{encode_path_segment(catalog_name)}/schemas" ) return extract_content_from_response(response, "identifiers", []) + + async def create_schema( + self, catalog_name: str, name: str, comment: str, properties: dict + ) -> str: + response = await self.rest_client.post( + f"/api/metalakes/{encode_path_segment(self.metalake_name)}" + f"/catalogs/{encode_path_segment(catalog_name)}/schemas", + json={"name": name, "comment": comment, "properties": properties}, + ) + return extract_content_from_response(response, "schema", {}) + + async def alter_schema( + self, catalog_name: str, schema_name: str, updates: list + ) -> str: + response = await self.rest_client.put( + f"/api/metalakes/{encode_path_segment(self.metalake_name)}" + f"/catalogs/{encode_path_segment(catalog_name)}" + f"/schemas/{encode_path_segment(schema_name)}", + json={"updates": updates}, + ) + return extract_content_from_response(response, "schema", {}) + + async def drop_schema(self, catalog_name: str, schema_name: str) -> str: + response = await self.rest_client.delete( + f"/api/metalakes/{encode_path_segment(self.metalake_name)}" + f"/catalogs/{encode_path_segment(catalog_name)}" + f"/schemas/{encode_path_segment(schema_name)}" + ) + return extract_content_from_response(response, "dropped", False) diff --git a/mcp-server/mcp_server/client/plain/plain_rest_client_table_operation.py b/mcp-server/mcp_server/client/plain/plain_rest_client_table_operation.py index e1bee62cab..0ab05d2366 100644 --- a/mcp-server/mcp_server/client/plain/plain_rest_client_table_operation.py +++ b/mcp-server/mcp_server/client/plain/plain_rest_client_table_operation.py @@ -50,3 +50,52 @@ class PlainRESTClientTableOperation(TableOperation): f"/tables/{encode_path_segment(table_name)}" ) return extract_content_from_response(response, "table", {}) + + async def create_table( + self, + catalog_name: str, + schema_name: str, + name: str, + comment: str, + columns: list, + properties: dict, + ) -> str: + response = await self.rest_client.post( + f"/api/metalakes/{encode_path_segment(self.metalake_name)}" + f"/catalogs/{encode_path_segment(catalog_name)}" + f"/schemas/{encode_path_segment(schema_name)}/tables", + json={ + "name": name, + "comment": comment, + "columns": columns, + "properties": properties, + }, + ) + return extract_content_from_response(response, "table", {}) + + async def alter_table( + self, + catalog_name: str, + schema_name: str, + table_name: str, + updates: list, + ) -> str: + response = await self.rest_client.put( + f"/api/metalakes/{encode_path_segment(self.metalake_name)}" + f"/catalogs/{encode_path_segment(catalog_name)}" + f"/schemas/{encode_path_segment(schema_name)}" + f"/tables/{encode_path_segment(table_name)}", + json={"updates": updates}, + ) + return extract_content_from_response(response, "table", {}) + + async def drop_table( + self, catalog_name: str, schema_name: str, table_name: str + ) -> str: + response = await self.rest_client.delete( + f"/api/metalakes/{encode_path_segment(self.metalake_name)}" + f"/catalogs/{encode_path_segment(catalog_name)}" + f"/schemas/{encode_path_segment(schema_name)}" + f"/tables/{encode_path_segment(table_name)}" + ) + return extract_content_from_response(response, "dropped", False) diff --git a/mcp-server/mcp_server/client/plain/plain_rest_client_topic_operation.py b/mcp-server/mcp_server/client/plain/plain_rest_client_topic_operation.py index ff04720197..18bae32709 100644 --- a/mcp-server/mcp_server/client/plain/plain_rest_client_topic_operation.py +++ b/mcp-server/mcp_server/client/plain/plain_rest_client_topic_operation.py @@ -15,7 +15,10 @@ # specific language governing permissions and limitations # under the License. -from mcp_server.client.plain.utils import encode_path_segment +from mcp_server.client.plain.utils import ( + encode_path_segment, + extract_content_from_response, +) from mcp_server.client.topic_operation import TopicOperation @@ -46,3 +49,46 @@ class PlainRESTClientTopicOperation(TopicOperation): f"/topics/{encode_path_segment(topic_name)}" ) return response.json().get("topic", {}) + + async def create_topic( + self, + catalog_name: str, + schema_name: str, + name: str, + comment: str, + properties: dict, + ) -> str: + response = await self.rest_client.post( + f"/api/metalakes/{encode_path_segment(self.metalake_name)}" + f"/catalogs/{encode_path_segment(catalog_name)}" + f"/schemas/{encode_path_segment(schema_name)}/topics", + json={"name": name, "comment": comment, "properties": properties}, + ) + return extract_content_from_response(response, "topic", {}) + + async def alter_topic( + self, + catalog_name: str, + schema_name: str, + topic_name: str, + updates: list, + ) -> str: + response = await self.rest_client.put( + f"/api/metalakes/{encode_path_segment(self.metalake_name)}" + f"/catalogs/{encode_path_segment(catalog_name)}" + f"/schemas/{encode_path_segment(schema_name)}" + f"/topics/{encode_path_segment(topic_name)}", + json={"updates": updates}, + ) + return extract_content_from_response(response, "topic", {}) + + async def delete_topic( + self, catalog_name: str, schema_name: str, topic_name: str + ) -> str: + response = await self.rest_client.delete( + f"/api/metalakes/{encode_path_segment(self.metalake_name)}" + f"/catalogs/{encode_path_segment(catalog_name)}" + f"/schemas/{encode_path_segment(schema_name)}" + f"/topics/{encode_path_segment(topic_name)}" + ) + return extract_content_from_response(response, "dropped", False) diff --git a/mcp-server/mcp_server/client/schema_operation.py b/mcp-server/mcp_server/client/schema_operation.py index b90d7b96f2..6e7376d6f9 100644 --- a/mcp-server/mcp_server/client/schema_operation.py +++ b/mcp-server/mcp_server/client/schema_operation.py @@ -35,3 +35,19 @@ class SchemaOperation(ABC): str: JSON-formatted string containing catalog information. """ pass + + @abstractmethod + async def create_schema( + self, catalog_name: str, name: str, comment: str, properties: dict + ) -> str: + pass + + @abstractmethod + async def alter_schema( + self, catalog_name: str, schema_name: str, updates: list + ) -> str: + pass + + @abstractmethod + async def drop_schema(self, catalog_name: str, schema_name: str) -> str: + pass diff --git a/mcp-server/mcp_server/client/table_operation.py b/mcp-server/mcp_server/client/table_operation.py index 2d188ebf25..3f70e042bc 100644 --- a/mcp-server/mcp_server/client/table_operation.py +++ b/mcp-server/mcp_server/client/table_operation.py @@ -55,3 +55,31 @@ class TableOperation(ABC): str: JSON-formatted string containing full table metadata """ pass + + @abstractmethod + async def create_table( + self, + catalog_name: str, + schema_name: str, + name: str, + comment: str, + columns: list, + properties: dict, + ) -> str: + pass + + @abstractmethod + async def alter_table( + self, + catalog_name: str, + schema_name: str, + table_name: str, + updates: list, + ) -> str: + pass + + @abstractmethod + async def drop_table( + self, catalog_name: str, schema_name: str, table_name: str + ) -> str: + pass diff --git a/mcp-server/mcp_server/client/topic_operation.py b/mcp-server/mcp_server/client/topic_operation.py index 6bc0bc94ab..449b90207e 100644 --- a/mcp-server/mcp_server/client/topic_operation.py +++ b/mcp-server/mcp_server/client/topic_operation.py @@ -53,3 +53,30 @@ class TopicOperation(ABC): str: JSON-formatted string containing full topic metadata """ pass + + @abstractmethod + async def create_topic( + self, + catalog_name: str, + schema_name: str, + name: str, + comment: str, + properties: dict, + ) -> str: + pass + + @abstractmethod + async def alter_topic( + self, + catalog_name: str, + schema_name: str, + topic_name: str, + updates: list, + ) -> str: + pass + + @abstractmethod + async def delete_topic( + self, catalog_name: str, schema_name: str, topic_name: str + ) -> str: + pass diff --git a/mcp-server/mcp_server/server.py b/mcp-server/mcp_server/server.py index aea7df7f78..4280932c9e 100644 --- a/mcp-server/mcp_server/server.py +++ b/mcp-server/mcp_server/server.py @@ -16,7 +16,6 @@ # under the License. import asyncio -import contextlib import logging from contextlib import asynccontextmanager from typing import AsyncIterator diff --git a/mcp-server/mcp_server/tools/catalog.py b/mcp-server/mcp_server/tools/catalog.py index 50115610e8..442e4ce02b 100644 --- a/mcp-server/mcp_server/tools/catalog.py +++ b/mcp-server/mcp_server/tools/catalog.py @@ -108,3 +108,78 @@ def load_catalog_tools(mcp: FastMCP): """ client = ctx.request_context.lifespan_context.rest_client() return await client.as_catalog_operation().get_list_of_catalogs() + + @mcp.tool(tags={"catalog"}) + async def create_catalog( + ctx: Context, + name: str, + catalog_type: str, + provider: str, + comment: str, + properties: dict, + ) -> str: + """ + Create a new catalog within the metalake. + + Authorization is enforced by Gravitino: a principal without the + required grant receives an authorization denial. + + Args: + ctx (Context): The request context object. + name (str): Name of the catalog to create. + catalog_type (str): Catalog type, one of "relational", "fileset", + "messaging", "model". + provider (str): Provider implementation, e.g. "hive", + "lakehouse-iceberg", "jdbc-postgresql", "kafka". May be empty + for model/fileset catalogs that have a single provider. + comment (str): Human-readable description. + properties (dict): Catalog configuration properties. + + Returns: + str: JSON-formatted string containing the created catalog. + """ + client = ctx.request_context.lifespan_context.rest_client() + return await client.as_catalog_operation().create_catalog( + name, catalog_type, provider, comment, properties + ) + + @mcp.tool(tags={"catalog"}) + async def alter_catalog( + ctx: Context, catalog_name: str, updates: list + ) -> str: + """ + Alter an existing catalog. + + Args: + ctx (Context): The request context object. + catalog_name (str): Name of the catalog to alter. + updates (list): List of update operations. Example: + [ + {"@type": "rename", "newName": "new_catalog"}, + {"@type": "updateComment", "newComment": "updated"}, + {"@type": "setProperty", "property": "k", "value": "v"}, + {"@type": "removeProperty", "property": "k"} + ] + + Returns: + str: JSON-formatted string containing the altered catalog. + """ + client = ctx.request_context.lifespan_context.rest_client() + return await client.as_catalog_operation().alter_catalog( + catalog_name, updates + ) + + @mcp.tool(tags={"catalog"}) + async def drop_catalog(ctx: Context, catalog_name: str) -> str: + """ + Drop a catalog by its name. + + Args: + ctx (Context): The request context object. + catalog_name (str): Name of the catalog to drop. + + Returns: + str: JSON-formatted string indicating whether the catalog was dropped. + """ + client = ctx.request_context.lifespan_context.rest_client() + return await client.as_catalog_operation().drop_catalog(catalog_name) diff --git a/mcp-server/mcp_server/tools/fileset.py b/mcp-server/mcp_server/tools/fileset.py index c3997ac16d..0df5070b7b 100644 --- a/mcp-server/mcp_server/tools/fileset.py +++ b/mcp-server/mcp_server/tools/fileset.py @@ -143,3 +143,97 @@ def load_fileset_tools(mcp: FastMCP): return await client.as_fileset_operation().list_files_in_fileset( catalog_name, schema_name, fileset_name, location_name, sub_path ) + + # pylint:disable=too-many-positional-arguments + @mcp.tool(tags={"fileset"}) + async def create_fileset( + ctx: Context, + catalog_name: str, + schema_name: str, + name: str, + fileset_type: str, + storage_location: str, + comment: str, + properties: dict, + ) -> str: + """ + Create a new fileset within a schema. + + Authorization is enforced by Gravitino: a principal without the + required grant receives an authorization denial. + + Args: + ctx (Context): The request context object. + catalog_name (str): Name of the catalog. + schema_name (str): Name of the schema. + name (str): Name of the fileset to create. + fileset_type (str): Fileset type, one of "managed" or "external". + storage_location (str): Storage location URI, e.g. "file:/tmp/fileset1". + comment (str): Human-readable description. + properties (dict): Fileset configuration properties. + + Returns: + str: JSON-formatted string containing the created fileset. + """ + client = ctx.request_context.lifespan_context.rest_client() + return await client.as_fileset_operation().create_fileset( + catalog_name, + schema_name, + name, + fileset_type, + storage_location, + comment, + properties, + ) + + @mcp.tool(tags={"fileset"}) + async def alter_fileset( + ctx: Context, + catalog_name: str, + schema_name: str, + fileset_name: str, + updates: list, + ) -> str: + """ + Alter an existing fileset. + + Args: + ctx (Context): The request context object. + catalog_name (str): Name of the catalog. + schema_name (str): Name of the schema. + fileset_name (str): Name of the fileset to alter. + updates (list): List of update operations. Example: + [ + {"@type": "rename", "newName": "new_fileset"}, + {"@type": "updateComment", "newComment": "updated"}, + {"@type": "setProperty", "property": "k", "value": "v"} + ] + + Returns: + str: JSON-formatted string containing the altered fileset. + """ + client = ctx.request_context.lifespan_context.rest_client() + return await client.as_fileset_operation().alter_fileset( + catalog_name, schema_name, fileset_name, updates + ) + + @mcp.tool(tags={"fileset"}) + async def drop_fileset( + ctx: Context, catalog_name: str, schema_name: str, fileset_name: str + ) -> str: + """ + Drop a fileset by its name. + + Args: + ctx (Context): The request context object. + catalog_name (str): Name of the catalog. + schema_name (str): Name of the schema. + fileset_name (str): Name of the fileset to drop. + + Returns: + str: JSON-formatted string indicating whether the fileset was dropped. + """ + client = ctx.request_context.lifespan_context.rest_client() + return await client.as_fileset_operation().drop_fileset( + catalog_name, schema_name, fileset_name + ) diff --git a/mcp-server/mcp_server/tools/model.py b/mcp-server/mcp_server/tools/model.py index 4e5f21fdb5..655423e168 100644 --- a/mcp-server/mcp_server/tools/model.py +++ b/mcp-server/mcp_server/tools/model.py @@ -230,3 +230,120 @@ def load_model_tools(mcp: FastMCP): return await client.as_model_operation().load_model_version_by_alias( catalog_name, schema_name, model_name, alias ) + + @mcp.tool(tags={"model"}) + async def register_model( + ctx: Context, + catalog_name: str, + schema_name: str, + name: str, + comment: str, + properties: dict, + ) -> str: + """ + Register a new model within a schema. + + Authorization is enforced by Gravitino: a principal without the + required grant receives an authorization denial. + + Args: + ctx (Context): The request context object. + catalog_name (str): Name of the catalog. + schema_name (str): Name of the schema. + name (str): Name of the model to register. + comment (str): Human-readable description. + properties (dict): Model configuration properties. + + Returns: + str: JSON-formatted string containing the registered model. + """ + client = ctx.request_context.lifespan_context.rest_client() + return await client.as_model_operation().register_model( + catalog_name, schema_name, name, comment, properties + ) + + @mcp.tool(tags={"model"}) + async def delete_model( + ctx: Context, catalog_name: str, schema_name: str, model_name: str + ) -> str: + """ + Delete a model by its name. + + Args: + ctx (Context): The request context object. + catalog_name (str): Name of the catalog. + schema_name (str): Name of the schema. + model_name (str): Name of the model to delete. + + Returns: + str: JSON-formatted string indicating whether the model was deleted. + """ + client = ctx.request_context.lifespan_context.rest_client() + return await client.as_model_operation().delete_model( + catalog_name, schema_name, model_name + ) + + # pylint:disable=too-many-positional-arguments + @mcp.tool(tags={"model"}) + async def link_model_version( + ctx: Context, + catalog_name: str, + schema_name: str, + model_name: str, + uri: str, + aliases: list, + comment: str, + properties: dict, + ) -> str: + """ + Link a new version to an existing model. + + Args: + ctx (Context): The request context object. + catalog_name (str): Name of the catalog. + schema_name (str): Name of the schema. + model_name (str): Name of the model. + uri (str): URI of the model version artifact. + aliases (list): List of string aliases for this version. + comment (str): Human-readable description. + properties (dict): Model version configuration properties. + + Returns: + str: JSON-formatted string containing the linked model version. + """ + client = ctx.request_context.lifespan_context.rest_client() + return await client.as_model_operation().link_model_version( + catalog_name, + schema_name, + model_name, + uri, + aliases, + comment, + properties, + ) + + @mcp.tool(tags={"model"}) + async def delete_model_version( + ctx: Context, + catalog_name: str, + schema_name: str, + model_name: str, + version: int, + ) -> str: + """ + Delete a specific version of a model. + + Args: + ctx (Context): The request context object. + catalog_name (str): Name of the catalog. + schema_name (str): Name of the schema. + model_name (str): Name of the model. + version (int): Version number to delete. + + Returns: + str: JSON-formatted string indicating whether the version was deleted. + """ + client = ctx.request_context.lifespan_context.rest_client() + return await client.as_model_operation().delete_model_version( + catalog_name, schema_name, model_name, version + ) diff --git a/mcp-server/mcp_server/tools/schema.py b/mcp-server/mcp_server/tools/schema.py index 3a37353a4b..dc5957c54d 100644 --- a/mcp-server/mcp_server/tools/schema.py +++ b/mcp-server/mcp_server/tools/schema.py @@ -61,3 +61,77 @@ def load_schema_tools(mcp: FastMCP): return await client.as_schema_operation().get_list_of_schemas( catalog_name ) + + @mcp.tool(tags={"schema"}) + async def create_schema( + ctx: Context, + catalog_name: str, + name: str, + comment: str, + properties: dict, + ) -> str: + """ + Create a new schema within a catalog. + + Authorization is enforced by Gravitino: a principal without the + required grant receives an authorization denial. + + Args: + ctx (Context): The request context object. + catalog_name (str): Name of the catalog to create the schema in. + name (str): Name of the schema to create. + comment (str): Human-readable description. + properties (dict): Schema configuration properties. + + Returns: + str: JSON-formatted string containing the created schema. + """ + client = ctx.request_context.lifespan_context.rest_client() + return await client.as_schema_operation().create_schema( + catalog_name, name, comment, properties + ) + + @mcp.tool(tags={"schema"}) + async def alter_schema( + ctx: Context, catalog_name: str, schema_name: str, updates: list + ) -> str: + """ + Alter an existing schema. + + Args: + ctx (Context): The request context object. + catalog_name (str): Name of the catalog containing the schema. + schema_name (str): Name of the schema to alter. + updates (list): List of update operations. Example: + [ + {"@type": "setProperty", "property": "k", "value": "v"}, + {"@type": "removeProperty", "property": "k"} + ] + + Returns: + str: JSON-formatted string containing the altered schema. + """ + client = ctx.request_context.lifespan_context.rest_client() + return await client.as_schema_operation().alter_schema( + catalog_name, schema_name, updates + ) + + @mcp.tool(tags={"schema"}) + async def drop_schema( + ctx: Context, catalog_name: str, schema_name: str + ) -> str: + """ + Drop a schema by its name. + + Args: + ctx (Context): The request context object. + catalog_name (str): Name of the catalog containing the schema. + schema_name (str): Name of the schema to drop. + + Returns: + str: JSON-formatted string indicating whether the schema was dropped. + """ + client = ctx.request_context.lifespan_context.rest_client() + return await client.as_schema_operation().drop_schema( + catalog_name, schema_name + ) diff --git a/mcp-server/mcp_server/tools/table.py b/mcp-server/mcp_server/tools/table.py index af6c62da6f..fbb3f03846 100644 --- a/mcp-server/mcp_server/tools/table.py +++ b/mcp-server/mcp_server/tools/table.py @@ -257,3 +257,95 @@ def load_table_tools(mcp: FastMCP): return await client.as_table_operation().load_table( catalog_name, schema_name, table_name ) + + @mcp.tool(tags={"table"}) + async def create_table( + ctx: Context, + catalog_name: str, + schema_name: str, + name: str, + comment: str, + columns: list, + properties: dict, + ) -> str: + """ + Create a new table within a schema. + + Authorization is enforced by Gravitino: a principal without the + required grant receives an authorization denial. + + Args: + ctx (Context): The request context object. + catalog_name (str): Name of the catalog. + schema_name (str): Name of the schema. + name (str): Name of the table to create. + comment (str): Human-readable description. + columns (list): List of column definitions. Each column is a dict: + { + "name": "id", + "type": "integer", + "comment": "primary id", + "nullable": false, + "autoIncrement": false + } + properties (dict): Table configuration properties. + + Returns: + str: JSON-formatted string containing the created table. + """ + client = ctx.request_context.lifespan_context.rest_client() + return await client.as_table_operation().create_table( + catalog_name, schema_name, name, comment, columns, properties + ) + + @mcp.tool(tags={"table"}) + async def alter_table( + ctx: Context, + catalog_name: str, + schema_name: str, + table_name: str, + updates: list, + ) -> str: + """ + Alter an existing table. + + Args: + ctx (Context): The request context object. + catalog_name (str): Name of the catalog. + schema_name (str): Name of the schema. + table_name (str): Name of the table to alter. + updates (list): List of update operations. Example: + [ + {"@type": "rename", "newName": "new_table"}, + {"@type": "updateComment", "newComment": "updated"}, + {"@type": "setProperty", "property": "k", "value": "v"} + ] + + Returns: + str: JSON-formatted string containing the altered table. + """ + client = ctx.request_context.lifespan_context.rest_client() + return await client.as_table_operation().alter_table( + catalog_name, schema_name, table_name, updates + ) + + @mcp.tool(tags={"table"}) + async def drop_table( + ctx: Context, catalog_name: str, schema_name: str, table_name: str + ) -> str: + """ + Drop a table by its name. + + Args: + ctx (Context): The request context object. + catalog_name (str): Name of the catalog. + schema_name (str): Name of the schema. + table_name (str): Name of the table to drop. + + Returns: + str: JSON-formatted string indicating whether the table was dropped. + """ + client = ctx.request_context.lifespan_context.rest_client() + return await client.as_table_operation().drop_table( + catalog_name, schema_name, table_name + ) diff --git a/mcp-server/mcp_server/tools/topic.py b/mcp-server/mcp_server/tools/topic.py index 93d2aa57d5..82813bcaf3 100644 --- a/mcp-server/mcp_server/tools/topic.py +++ b/mcp-server/mcp_server/tools/topic.py @@ -132,3 +132,85 @@ def load_topic_tools(mcp: FastMCP): return await client.as_topic_operation().load_topic( catalog_name, schema_name, topic_name ) + + @mcp.tool(tags={"topic"}) + async def create_topic( + ctx: Context, + catalog_name: str, + schema_name: str, + name: str, + comment: str, + properties: dict, + ) -> str: + """ + Create a new topic within a schema. + + Authorization is enforced by Gravitino: a principal without the + required grant receives an authorization denial. + + Args: + ctx (Context): The request context object. + catalog_name (str): Name of the catalog. + schema_name (str): Name of the schema. + name (str): Name of the topic to create. + comment (str): Human-readable description. + properties (dict): Topic configuration properties. + + Returns: + str: JSON-formatted string containing the created topic. + """ + client = ctx.request_context.lifespan_context.rest_client() + return await client.as_topic_operation().create_topic( + catalog_name, schema_name, name, comment, properties + ) + + @mcp.tool(tags={"topic"}) + async def alter_topic( + ctx: Context, + catalog_name: str, + schema_name: str, + topic_name: str, + updates: list, + ) -> str: + """ + Alter an existing topic. + + Args: + ctx (Context): The request context object. + catalog_name (str): Name of the catalog. + schema_name (str): Name of the schema. + topic_name (str): Name of the topic to alter. + updates (list): List of update operations. Example: + [ + {"@type": "updateComment", "newComment": "updated"}, + {"@type": "setProperty", "property": "k", "value": "v"} + ] + + Returns: + str: JSON-formatted string containing the altered topic. + """ + client = ctx.request_context.lifespan_context.rest_client() + return await client.as_topic_operation().alter_topic( + catalog_name, schema_name, topic_name, updates + ) + + @mcp.tool(tags={"topic"}) + async def delete_topic( + ctx: Context, catalog_name: str, schema_name: str, topic_name: str + ) -> str: + """ + Delete a topic by its name. + + Args: + ctx (Context): The request context object. + catalog_name (str): Name of the catalog. + schema_name (str): Name of the schema. + topic_name (str): Name of the topic to delete. + + Returns: + str: JSON-formatted string indicating whether the topic was deleted. + """ + client = ctx.request_context.lifespan_context.rest_client() + return await client.as_topic_operation().delete_topic( + catalog_name, schema_name, topic_name + ) diff --git a/mcp-server/tests/unit/test_audit.py b/mcp-server/tests/unit/test_audit.py index 37cdef29cb..c52a09a391 100644 --- a/mcp-server/tests/unit/test_audit.py +++ b/mcp-server/tests/unit/test_audit.py @@ -25,7 +25,9 @@ from fastmcp import Client from mcp_server.client.factory import RESTClientFactory from mcp_server.client.plain.exception import GravitinoException -from mcp_server.client.plain.plain_rest_client_operation import PlainRESTClientOperation +from mcp_server.client.plain.plain_rest_client_operation import ( + PlainRESTClientOperation, +) from mcp_server.core import audit from mcp_server.core.setting import Setting from mcp_server.server import GravitinoMCPServer @@ -54,7 +56,9 @@ class TestAuditEmit(unittest.TestCase): def test_allow_record_structure(self): """emit() writes a JSON record with all required fields on allow.""" - audit.emit(principal="bearer:abc12345", tool="list_catalogs", outcome="allow") + audit.emit( + principal="bearer:abc12345", tool="list_catalogs", outcome="allow" + ) self.assertEqual(len(self.log_records), 1) record = json.loads(self.log_records[0]) @@ -79,7 +83,9 @@ class TestAuditEmit(unittest.TestCase): def test_anonymous_principal(self): """emit() works with anonymous principal.""" - audit.emit(principal="anonymous", tool="get_list_of_catalogs", outcome="allow") + audit.emit( + principal="anonymous", tool="get_list_of_catalogs", outcome="allow" + ) record = json.loads(self.log_records[0]) self.assertEqual(record["principal"], "anonymous") @@ -173,6 +179,7 @@ class TestAuditMiddlewareIntegration(unittest.TestCase): # Helpers # --------------------------------------------------------------------------- + class _CapturingHandler(logging.Handler): """Logging handler that stores formatted messages in a list.""" diff --git a/mcp-server/tests/unit/test_auth_flow.py b/mcp-server/tests/unit/test_auth_flow.py index a413515bf9..ca6b59b671 100644 --- a/mcp-server/tests/unit/test_auth_flow.py +++ b/mcp-server/tests/unit/test_auth_flow.py @@ -45,7 +45,9 @@ class TestTokenInjection(unittest.TestCase): def test_no_token_argument_no_authorization_header(self): """When token argument is omitted entirely, no Authorization header is added.""" - client = PlainRESTClientOperation("my_metalake", "http://localhost:8090") + client = PlainRESTClientOperation( + "my_metalake", "http://localhost:8090" + ) headers = dict(client._catalog_operation.rest_client.headers) self.assertNotIn("authorization", headers) diff --git a/mcp-server/tests/unit/test_per_request_token.py b/mcp-server/tests/unit/test_per_request_token.py index c08cbeeb39..77ad709ce6 100644 --- a/mcp-server/tests/unit/test_per_request_token.py +++ b/mcp-server/tests/unit/test_per_request_token.py @@ -38,7 +38,9 @@ class TestExtractBearerToken(unittest.TestCase): """Unit tests for the token extraction helper.""" def test_well_formed_bearer_header(self): - self.assertEqual(_extract_bearer_token("Bearer mytoken123"), "mytoken123") + self.assertEqual( + _extract_bearer_token("Bearer mytoken123"), "mytoken123" + ) def test_case_insensitive_bearer(self): self.assertEqual(_extract_bearer_token("bearer MYTOKEN"), "MYTOKEN") @@ -153,8 +155,12 @@ class TestGravitinoContextPerRequestToken(unittest.TestCase): ): client_bob = ctx.rest_client() - alice_headers = dict(client_alice._catalog_operation.rest_client.headers) + alice_headers = dict( + client_alice._catalog_operation.rest_client.headers + ) bob_headers = dict(client_bob._catalog_operation.rest_client.headers) - self.assertEqual(alice_headers.get("authorization"), "Bearer alice-token") + self.assertEqual( + alice_headers.get("authorization"), "Bearer alice-token" + ) self.assertEqual(bob_headers.get("authorization"), "Bearer bob-token") self.assertIsNot(client_alice, client_bob) diff --git a/mcp-server/tests/unit/tools/mock_operation.py b/mcp-server/tests/unit/tools/mock_operation.py index a985bf7912..eb6c1f2cff 100644 --- a/mcp-server/tests/unit/tools/mock_operation.py +++ b/mcp-server/tests/unit/tools/mock_operation.py @@ -69,11 +69,33 @@ class MockCatalogOperation(CatalogOperation): async def get_list_of_catalogs(self) -> str: return "mock_catalogs" + async def create_catalog( + self, name, catalog_type, provider, comment, properties + ) -> str: + return f"mock_catalog_created: {name}, {catalog_type}, {provider}" + + async def alter_catalog(self, catalog_name, updates) -> str: + return f"mock_catalog_altered: {catalog_name} with updates {updates}" + + async def drop_catalog(self, catalog_name) -> str: + return f"mock_catalog_dropped: {catalog_name}" + class MockSchemaOperation(SchemaOperation): async def get_list_of_schemas(self, catalog_name: str) -> str: return "mock_schemas" + async def create_schema( + self, catalog_name, name, comment, properties + ) -> str: + return f"mock_schema_created: {catalog_name}.{name}" + + async def alter_schema(self, catalog_name, schema_name, updates) -> str: + return f"mock_schema_altered: {catalog_name}.{schema_name} with updates {updates}" + + async def drop_schema(self, catalog_name, schema_name) -> str: + return f"mock_schema_dropped: {catalog_name}.{schema_name}" + class MockTableOperation(TableOperation): async def get_list_of_tables( @@ -86,6 +108,23 @@ class MockTableOperation(TableOperation): ) -> str: return "mock_table" + # pylint: disable=R0917 + async def create_table( + self, catalog_name, schema_name, name, comment, columns, properties + ) -> str: + return f"mock_table_created: {catalog_name}.{schema_name}.{name}" + + async def alter_table( + self, catalog_name, schema_name, table_name, updates + ) -> str: + return ( + f"mock_table_altered: {catalog_name}.{schema_name}.{table_name} " + f"with updates {updates}" + ) + + async def drop_table(self, catalog_name, schema_name, table_name) -> str: + return f"mock_table_dropped: {catalog_name}.{schema_name}.{table_name}" + class MockFilesetOperation(FilesetOperation): async def list_of_filesets( @@ -110,6 +149,34 @@ class MockFilesetOperation(FilesetOperation): ) -> str: return "mock_files_in_fileset" + # pylint: disable=R0917 + async def create_fileset( + self, + catalog_name, + schema_name, + name, + fileset_type, + storage_location, + comment, + properties, + ) -> str: + return f"mock_fileset_created: {catalog_name}.{schema_name}.{name}" + + async def alter_fileset( + self, catalog_name, schema_name, fileset_name, updates + ) -> str: + return ( + f"mock_fileset_altered: {catalog_name}.{schema_name}.{fileset_name} " + f"with updates {updates}" + ) + + async def drop_fileset( + self, catalog_name, schema_name, fileset_name + ) -> str: + return ( + f"mock_fileset_dropped: {catalog_name}.{schema_name}.{fileset_name}" + ) + class MockPolicyOperation(PolicyOperation): async def associate_policy_with_metadata( @@ -170,6 +237,38 @@ class MockModelOperation(ModelOperation): ) -> str: return "mock_model_version_by_alias" + async def register_model( + self, catalog_name, schema_name, name, comment, properties + ) -> str: + return f"mock_model_registered: {catalog_name}.{schema_name}.{name}" + + async def delete_model(self, catalog_name, schema_name, model_name) -> str: + return f"mock_model_deleted: {catalog_name}.{schema_name}.{model_name}" + + # pylint: disable=R0917 + async def link_model_version( + self, + catalog_name, + schema_name, + model_name, + uri, + aliases, + comment, + properties, + ) -> str: + return ( + f"mock_model_version_linked: {catalog_name}.{schema_name}.{model_name} " + f"uri={uri} aliases={aliases}" + ) + + async def delete_model_version( + self, catalog_name, schema_name, model_name, version + ) -> str: + return ( + f"mock_model_version_deleted: {catalog_name}.{schema_name}.{model_name} " + f"version={version}" + ) + class MockTopicOperation(TopicOperation): async def list_of_topics(self, catalog_name: str, schema_name: str) -> str: @@ -180,6 +279,22 @@ class MockTopicOperation(TopicOperation): ) -> str: return "mock_topic" + async def create_topic( + self, catalog_name, schema_name, name, comment, properties + ) -> str: + return f"mock_topic_created: {catalog_name}.{schema_name}.{name}" + + async def alter_topic( + self, catalog_name, schema_name, topic_name, updates + ) -> str: + return ( + f"mock_topic_altered: {catalog_name}.{schema_name}.{topic_name} " + f"with updates {updates}" + ) + + async def delete_topic(self, catalog_name, schema_name, topic_name) -> str: + return f"mock_topic_deleted: {catalog_name}.{schema_name}.{topic_name}" + class MockTagOperation(TagOperation): async def list_of_tags(self) -> str: diff --git a/mcp-server/tests/unit/tools/test_catalog.py b/mcp-server/tests/unit/tools/test_catalog.py index ca6c03f735..bf92a9adef 100644 --- a/mcp-server/tests/unit/tools/test_catalog.py +++ b/mcp-server/tests/unit/tools/test_catalog.py @@ -45,3 +45,61 @@ class TestCatalogTool(unittest.TestCase): self.assertEqual("mock_catalogs", result.content[0].text) asyncio.run(_test_list_catalogs(self.mcp)) + + def test_create_catalog(self): + async def _test(mcp_server): + async with Client(mcp_server) as client: + result = await client.call_tool( + "create_catalog", + { + "name": "cat1", + "catalog_type": "relational", + "provider": "hive", + "comment": "c", + "properties": {"k": "v"}, + }, + ) + self.assertEqual( + "mock_catalog_created: cat1, relational, hive", + result.content[0].text, + ) + + asyncio.run(_test(self.mcp)) + + def test_alter_catalog(self): + async def _test(mcp_server): + updates = [{"@type": "rename", "newName": "cat2"}] + async with Client(mcp_server) as client: + result = await client.call_tool( + "alter_catalog", + {"catalog_name": "cat1", "updates": updates}, + ) + self.assertEqual( + f"mock_catalog_altered: cat1 with updates {updates}", + result.content[0].text, + ) + + asyncio.run(_test(self.mcp)) + + def test_drop_catalog(self): + async def _test(mcp_server): + async with Client(mcp_server) as client: + result = await client.call_tool( + "drop_catalog", {"catalog_name": "cat1"} + ) + self.assertEqual( + "mock_catalog_dropped: cat1", result.content[0].text + ) + + asyncio.run(_test(self.mcp)) + + def test_write_tools_exposed(self): + """Write tools must be present (authorization enforced by Gravitino).""" + + async def _test(mcp_server): + names = {t.name for t in await mcp_server.list_tools()} + self.assertIn("create_catalog", names) + self.assertIn("alter_catalog", names) + self.assertIn("drop_catalog", names) + + asyncio.run(_test(self.mcp)) diff --git a/mcp-server/tests/unit/tools/test_fileset.py b/mcp-server/tests/unit/tools/test_fileset.py index a0c0371491..7a338133d4 100644 --- a/mcp-server/tests/unit/tools/test_fileset.py +++ b/mcp-server/tests/unit/tools/test_fileset.py @@ -75,3 +75,61 @@ class TestFilesetTool(unittest.TestCase): ) asyncio.run(_test_list_files_in_fileset(self.mcp)) + + def test_create_fileset(self): + async def _test(mcp_server): + async with Client(mcp_server) as client: + result = await client.call_tool( + "create_fileset", + { + "catalog_name": "cat", + "schema_name": "sch", + "name": "fs", + "fileset_type": "managed", + "storage_location": "file:/tmp/fs", + "comment": "c", + "properties": {"k": "v"}, + }, + ) + self.assertEqual( + "mock_fileset_created: cat.sch.fs", result.content[0].text + ) + + asyncio.run(_test(self.mcp)) + + def test_alter_fileset(self): + async def _test(mcp_server): + updates = [{"@type": "rename", "newName": "fs2"}] + async with Client(mcp_server) as client: + result = await client.call_tool( + "alter_fileset", + { + "catalog_name": "cat", + "schema_name": "sch", + "fileset_name": "fs", + "updates": updates, + }, + ) + self.assertEqual( + f"mock_fileset_altered: cat.sch.fs with updates {updates}", + result.content[0].text, + ) + + asyncio.run(_test(self.mcp)) + + def test_drop_fileset(self): + async def _test(mcp_server): + async with Client(mcp_server) as client: + result = await client.call_tool( + "drop_fileset", + { + "catalog_name": "cat", + "schema_name": "sch", + "fileset_name": "fs", + }, + ) + self.assertEqual( + "mock_fileset_dropped: cat.sch.fs", result.content[0].text + ) + + asyncio.run(_test(self.mcp)) diff --git a/mcp-server/tests/unit/tools/test_model.py b/mcp-server/tests/unit/tools/test_model.py index ff27e64d3b..09d8db2b39 100644 --- a/mcp-server/tests/unit/tools/test_model.py +++ b/mcp-server/tests/unit/tools/test_model.py @@ -106,3 +106,81 @@ class TestModelTool(unittest.TestCase): ) asyncio.run(_test_load_model_version_by_alias(self.mcp)) + + def test_register_model(self): + async def _test(mcp_server): + async with Client(mcp_server) as client: + result = await client.call_tool( + "register_model", + { + "catalog_name": "cat", + "schema_name": "sch", + "name": "m", + "comment": "c", + "properties": {"k": "v"}, + }, + ) + self.assertEqual( + "mock_model_registered: cat.sch.m", result.content[0].text + ) + + asyncio.run(_test(self.mcp)) + + def test_delete_model(self): + async def _test(mcp_server): + async with Client(mcp_server) as client: + result = await client.call_tool( + "delete_model", + { + "catalog_name": "cat", + "schema_name": "sch", + "model_name": "m", + }, + ) + self.assertEqual( + "mock_model_deleted: cat.sch.m", result.content[0].text + ) + + asyncio.run(_test(self.mcp)) + + def test_link_model_version(self): + async def _test(mcp_server): + async with Client(mcp_server) as client: + result = await client.call_tool( + "link_model_version", + { + "catalog_name": "cat", + "schema_name": "sch", + "model_name": "m", + "uri": "s3://bucket/model", + "aliases": ["latest"], + "comment": "c", + "properties": {"k": "v"}, + }, + ) + self.assertEqual( + "mock_model_version_linked: cat.sch.m " + "uri=s3://bucket/model aliases=['latest']", + result.content[0].text, + ) + + asyncio.run(_test(self.mcp)) + + def test_delete_model_version(self): + async def _test(mcp_server): + async with Client(mcp_server) as client: + result = await client.call_tool( + "delete_model_version", + { + "catalog_name": "cat", + "schema_name": "sch", + "model_name": "m", + "version": 1, + }, + ) + self.assertEqual( + "mock_model_version_deleted: cat.sch.m version=1", + result.content[0].text, + ) + + asyncio.run(_test(self.mcp)) diff --git a/mcp-server/tests/unit/tools/test_schema.py b/mcp-server/tests/unit/tools/test_schema.py index 443c3968dc..ef6d3cd118 100644 --- a/mcp-server/tests/unit/tools/test_schema.py +++ b/mcp-server/tests/unit/tools/test_schema.py @@ -42,3 +42,53 @@ class TestSchemaTool(unittest.TestCase): self.assertEqual("mock_schemas", result.content[0].text) asyncio.run(_test_list_schemas(self.mcp)) + + def test_create_schema(self): + async def _test(mcp_server): + async with Client(mcp_server) as client: + result = await client.call_tool( + "create_schema", + { + "catalog_name": "cat", + "name": "sch", + "comment": "c", + "properties": {"k": "v"}, + }, + ) + self.assertEqual( + "mock_schema_created: cat.sch", result.content[0].text + ) + + asyncio.run(_test(self.mcp)) + + def test_alter_schema(self): + async def _test(mcp_server): + updates = [{"@type": "setProperty", "property": "k", "value": "v"}] + async with Client(mcp_server) as client: + result = await client.call_tool( + "alter_schema", + { + "catalog_name": "cat", + "schema_name": "sch", + "updates": updates, + }, + ) + self.assertEqual( + f"mock_schema_altered: cat.sch with updates {updates}", + result.content[0].text, + ) + + asyncio.run(_test(self.mcp)) + + def test_drop_schema(self): + async def _test(mcp_server): + async with Client(mcp_server) as client: + result = await client.call_tool( + "drop_schema", + {"catalog_name": "cat", "schema_name": "sch"}, + ) + self.assertEqual( + "mock_schema_dropped: cat.sch", result.content[0].text + ) + + asyncio.run(_test(self.mcp)) diff --git a/mcp-server/tests/unit/tools/test_table.py b/mcp-server/tests/unit/tools/test_table.py index 1b426e5512..b14cd2d13b 100644 --- a/mcp-server/tests/unit/tools/test_table.py +++ b/mcp-server/tests/unit/tools/test_table.py @@ -57,3 +57,61 @@ class TestTableTool(unittest.TestCase): self.assertEqual("mock_table", result.content[0].text) asyncio.run(_test_load_table(self.mcp)) + + def test_create_table(self): + async def _test(mcp_server): + columns = [{"name": "id", "type": "integer", "nullable": False}] + async with Client(mcp_server) as client: + result = await client.call_tool( + "create_table", + { + "catalog_name": "cat", + "schema_name": "sch", + "name": "tbl", + "comment": "c", + "columns": columns, + "properties": {"k": "v"}, + }, + ) + self.assertEqual( + "mock_table_created: cat.sch.tbl", result.content[0].text + ) + + asyncio.run(_test(self.mcp)) + + def test_alter_table(self): + async def _test(mcp_server): + updates = [{"@type": "rename", "newName": "tbl2"}] + async with Client(mcp_server) as client: + result = await client.call_tool( + "alter_table", + { + "catalog_name": "cat", + "schema_name": "sch", + "table_name": "tbl", + "updates": updates, + }, + ) + self.assertEqual( + f"mock_table_altered: cat.sch.tbl with updates {updates}", + result.content[0].text, + ) + + asyncio.run(_test(self.mcp)) + + def test_drop_table(self): + async def _test(mcp_server): + async with Client(mcp_server) as client: + result = await client.call_tool( + "drop_table", + { + "catalog_name": "cat", + "schema_name": "sch", + "table_name": "tbl", + }, + ) + self.assertEqual( + "mock_table_dropped: cat.sch.tbl", result.content[0].text + ) + + asyncio.run(_test(self.mcp)) diff --git a/mcp-server/tests/unit/tools/test_topic.py b/mcp-server/tests/unit/tools/test_topic.py index 28422d0634..6f3f915bb8 100644 --- a/mcp-server/tests/unit/tools/test_topic.py +++ b/mcp-server/tests/unit/tools/test_topic.py @@ -57,3 +57,59 @@ class TestTopicTool(unittest.TestCase): self.assertEqual("mock_topic", result.content[0].text) asyncio.run(_test_load_topic(self.mcp)) + + def test_create_topic(self): + async def _test(mcp_server): + async with Client(mcp_server) as client: + result = await client.call_tool( + "create_topic", + { + "catalog_name": "cat", + "schema_name": "sch", + "name": "tp", + "comment": "c", + "properties": {"k": "v"}, + }, + ) + self.assertEqual( + "mock_topic_created: cat.sch.tp", result.content[0].text + ) + + asyncio.run(_test(self.mcp)) + + def test_alter_topic(self): + async def _test(mcp_server): + updates = [{"@type": "setProperty", "property": "k", "value": "v"}] + async with Client(mcp_server) as client: + result = await client.call_tool( + "alter_topic", + { + "catalog_name": "cat", + "schema_name": "sch", + "topic_name": "tp", + "updates": updates, + }, + ) + self.assertEqual( + f"mock_topic_altered: cat.sch.tp with updates {updates}", + result.content[0].text, + ) + + asyncio.run(_test(self.mcp)) + + def test_delete_topic(self): + async def _test(mcp_server): + async with Client(mcp_server) as client: + result = await client.call_tool( + "delete_topic", + { + "catalog_name": "cat", + "schema_name": "sch", + "topic_name": "tp", + }, + ) + self.assertEqual( + "mock_topic_deleted: cat.sch.tp", result.content[0].text + ) + + asyncio.run(_test(self.mcp))
