This is an automated email from the ASF dual-hosted git repository.

yuqi1129 pushed a commit to branch feat/mcp-governance-task3-6
in repository https://gitbox.apache.org/repos/asf/gravitino.git

commit 3b372972f2baba7495db7e4dddae7aa9409f25c2
Author: yuqi <[email protected]>
AuthorDate: Wed Jun 10 21:21:13 2026 +0800

    [#11574] feat(mcp-server): implement write operations for catalog, schema, 
table, model, topic, fileset
    
    Add create/alter/drop (and register/link/delete for models) write tools 
across
    all remaining resource types, following the same pattern as the tag write 
tools.
    Write tools are exposed by default; authorization is enforced by Gravitino 
core.
    
    Per resource:
    - catalog: create_catalog, alter_catalog, drop_catalog
    - schema:  create_schema, alter_schema, drop_schema
    - table:   create_table, alter_table, drop_table
    - topic:   create_topic, alter_topic, delete_topic
    - fileset: create_fileset, alter_fileset, drop_fileset
    - model:   register_model, delete_model, link_model_version, 
delete_model_version
    
    Each adds: abstract interface method, plain REST client implementation, 
FastMCP
    tool registration, mock implementation, and unit tests (20 new tests).
    
    Also remove a leftover unused `contextlib` import in server.py.
---
 mcp-server/mcp_server/client/catalog_operation.py  |  19 ++++
 mcp-server/mcp_server/client/fileset_operation.py  |  29 +++++
 mcp-server/mcp_server/client/model_operation.py    |  36 +++++++
 .../plain/plain_rest_client_catalog_operation.py   |  35 ++++++
 .../plain/plain_rest_client_fileset_operation.py   |  56 +++++++++-
 .../plain/plain_rest_client_model_operation.py     |  68 +++++++++++-
 .../plain/plain_rest_client_schema_operation.py    |  29 +++++
 .../plain/plain_rest_client_table_operation.py     |  49 +++++++++
 .../plain/plain_rest_client_topic_operation.py     |  48 ++++++++-
 mcp-server/mcp_server/client/schema_operation.py   |  16 +++
 mcp-server/mcp_server/client/table_operation.py    |  28 +++++
 mcp-server/mcp_server/client/topic_operation.py    |  27 +++++
 mcp-server/mcp_server/server.py                    |   1 -
 mcp-server/mcp_server/tools/catalog.py             |  75 +++++++++++++
 mcp-server/mcp_server/tools/fileset.py             |  94 +++++++++++++++++
 mcp-server/mcp_server/tools/model.py               | 117 +++++++++++++++++++++
 mcp-server/mcp_server/tools/schema.py              |  74 +++++++++++++
 mcp-server/mcp_server/tools/table.py               |  92 ++++++++++++++++
 mcp-server/mcp_server/tools/topic.py               |  82 +++++++++++++++
 mcp-server/tests/unit/test_audit.py                |  13 ++-
 mcp-server/tests/unit/test_auth_flow.py            |   4 +-
 mcp-server/tests/unit/test_per_request_token.py    |  12 ++-
 mcp-server/tests/unit/tools/mock_operation.py      | 115 ++++++++++++++++++++
 mcp-server/tests/unit/tools/test_catalog.py        |  58 ++++++++++
 mcp-server/tests/unit/tools/test_fileset.py        |  58 ++++++++++
 mcp-server/tests/unit/tools/test_model.py          |  78 ++++++++++++++
 mcp-server/tests/unit/tools/test_schema.py         |  50 +++++++++
 mcp-server/tests/unit/tools/test_table.py          |  58 ++++++++++
 mcp-server/tests/unit/tools/test_topic.py          |  56 ++++++++++
 29 files changed, 1466 insertions(+), 11 deletions(-)

diff --git a/mcp-server/mcp_server/client/catalog_operation.py 
b/mcp-server/mcp_server/client/catalog_operation.py
index 08cf7a5b97..df9b1c91dc 100644
--- a/mcp-server/mcp_server/client/catalog_operation.py
+++ b/mcp-server/mcp_server/client/catalog_operation.py
@@ -32,3 +32,22 @@ class CatalogOperation(ABC):
             str: JSON-formatted string containing catalog information.
         """
         pass
+
+    @abstractmethod
+    async def create_catalog(
+        self,
+        name: str,
+        catalog_type: str,
+        provider: str,
+        comment: str,
+        properties: dict,
+    ) -> str:
+        pass
+
+    @abstractmethod
+    async def alter_catalog(self, catalog_name: str, updates: list) -> str:
+        pass
+
+    @abstractmethod
+    async def drop_catalog(self, catalog_name: str) -> str:
+        pass
diff --git a/mcp-server/mcp_server/client/fileset_operation.py 
b/mcp-server/mcp_server/client/fileset_operation.py
index a158c11cda..064e66d597 100644
--- a/mcp-server/mcp_server/client/fileset_operation.py
+++ b/mcp-server/mcp_server/client/fileset_operation.py
@@ -80,3 +80,32 @@ class FilesetOperation(ABC):
             str: JSON-formatted string containing list of files in the fileset
         """
         pass
+
+    @abstractmethod
+    async def create_fileset(
+        self,
+        catalog_name: str,
+        schema_name: str,
+        name: str,
+        fileset_type: str,
+        storage_location: str,
+        comment: str,
+        properties: dict,
+    ) -> str:
+        pass
+
+    @abstractmethod
+    async def alter_fileset(
+        self,
+        catalog_name: str,
+        schema_name: str,
+        fileset_name: str,
+        updates: list,
+    ) -> str:
+        pass
+
+    @abstractmethod
+    async def drop_fileset(
+        self, catalog_name: str, schema_name: str, fileset_name: str
+    ) -> str:
+        pass
diff --git a/mcp-server/mcp_server/client/model_operation.py 
b/mcp-server/mcp_server/client/model_operation.py
index c63eda326a..6a9aa4195e 100644
--- a/mcp-server/mcp_server/client/model_operation.py
+++ b/mcp-server/mcp_server/client/model_operation.py
@@ -106,3 +106,39 @@ class ModelOperation(ABC):
             str: JSON-formatted string containing full model version metadata
         """
         pass
+
+    @abstractmethod
+    async def register_model(
+        self,
+        catalog_name: str,
+        schema_name: str,
+        name: str,
+        comment: str,
+        properties: dict,
+    ) -> str:
+        pass
+
+    @abstractmethod
+    async def delete_model(
+        self, catalog_name: str, schema_name: str, model_name: str
+    ) -> str:
+        pass
+
+    @abstractmethod
+    async def link_model_version(
+        self,
+        catalog_name: str,
+        schema_name: str,
+        model_name: str,
+        uri: str,
+        aliases: list,
+        comment: str,
+        properties: dict,
+    ) -> str:
+        pass
+
+    @abstractmethod
+    async def delete_model_version(
+        self, catalog_name: str, schema_name: str, model_name: str, version: 
int
+    ) -> str:
+        pass
diff --git 
a/mcp-server/mcp_server/client/plain/plain_rest_client_catalog_operation.py 
b/mcp-server/mcp_server/client/plain/plain_rest_client_catalog_operation.py
index 843cf3bfe7..9aece47fc3 100644
--- a/mcp-server/mcp_server/client/plain/plain_rest_client_catalog_operation.py
+++ b/mcp-server/mcp_server/client/plain/plain_rest_client_catalog_operation.py
@@ -34,3 +34,38 @@ class PlainRESTClientCatalogOperation(CatalogOperation):
             
f"/api/metalakes/{encode_path_segment(self.metalake_name)}/catalogs?details=true"
         )
         return extract_content_from_response(response, "catalogs", [])
+
+    async def create_catalog(
+        self,
+        name: str,
+        catalog_type: str,
+        provider: str,
+        comment: str,
+        properties: dict,
+    ) -> str:
+        response = await self.rest_client.post(
+            
f"/api/metalakes/{encode_path_segment(self.metalake_name)}/catalogs",
+            json={
+                "name": name,
+                "type": catalog_type,
+                "provider": provider,
+                "comment": comment,
+                "properties": properties,
+            },
+        )
+        return extract_content_from_response(response, "catalog", {})
+
+    async def alter_catalog(self, catalog_name: str, updates: list) -> str:
+        response = await self.rest_client.put(
+            f"/api/metalakes/{encode_path_segment(self.metalake_name)}"
+            f"/catalogs/{encode_path_segment(catalog_name)}",
+            json={"updates": updates},
+        )
+        return extract_content_from_response(response, "catalog", {})
+
+    async def drop_catalog(self, catalog_name: str) -> str:
+        response = await self.rest_client.delete(
+            f"/api/metalakes/{encode_path_segment(self.metalake_name)}"
+            f"/catalogs/{encode_path_segment(catalog_name)}"
+        )
+        return extract_content_from_response(response, "dropped", False)
diff --git 
a/mcp-server/mcp_server/client/plain/plain_rest_client_fileset_operation.py 
b/mcp-server/mcp_server/client/plain/plain_rest_client_fileset_operation.py
index 4c81eece1d..4c023157c5 100644
--- a/mcp-server/mcp_server/client/plain/plain_rest_client_fileset_operation.py
+++ b/mcp-server/mcp_server/client/plain/plain_rest_client_fileset_operation.py
@@ -16,7 +16,10 @@
 # under the License.
 
 from mcp_server.client.fileset_operation import FilesetOperation
-from mcp_server.client.plain.utils import encode_path_segment
+from mcp_server.client.plain.utils import (
+    encode_path_segment,
+    extract_content_from_response,
+)
 
 
 class PlainRESTClientFilesetOperation(FilesetOperation):
@@ -62,3 +65,54 @@ class PlainRESTClientFilesetOperation(FilesetOperation):
             params={"sub_path": sub_path, "location_name": location_name},
         )
         return response.json().get("files", [])
+
+    async def create_fileset(
+        self,
+        catalog_name: str,
+        schema_name: str,
+        name: str,
+        fileset_type: str,
+        storage_location: str,
+        comment: str,
+        properties: dict,
+    ) -> str:
+        response = await self.rest_client.post(
+            f"/api/metalakes/{encode_path_segment(self.metalake_name)}"
+            f"/catalogs/{encode_path_segment(catalog_name)}"
+            f"/schemas/{encode_path_segment(schema_name)}/filesets",
+            json={
+                "name": name,
+                "type": fileset_type,
+                "storageLocation": storage_location,
+                "comment": comment,
+                "properties": properties,
+            },
+        )
+        return extract_content_from_response(response, "fileset", {})
+
+    async def alter_fileset(
+        self,
+        catalog_name: str,
+        schema_name: str,
+        fileset_name: str,
+        updates: list,
+    ) -> str:
+        response = await self.rest_client.put(
+            f"/api/metalakes/{encode_path_segment(self.metalake_name)}"
+            f"/catalogs/{encode_path_segment(catalog_name)}"
+            f"/schemas/{encode_path_segment(schema_name)}"
+            f"/filesets/{encode_path_segment(fileset_name)}",
+            json={"updates": updates},
+        )
+        return extract_content_from_response(response, "fileset", {})
+
+    async def drop_fileset(
+        self, catalog_name: str, schema_name: str, fileset_name: str
+    ) -> str:
+        response = await self.rest_client.delete(
+            f"/api/metalakes/{encode_path_segment(self.metalake_name)}"
+            f"/catalogs/{encode_path_segment(catalog_name)}"
+            f"/schemas/{encode_path_segment(schema_name)}"
+            f"/filesets/{encode_path_segment(fileset_name)}"
+        )
+        return extract_content_from_response(response, "dropped", False)
diff --git 
a/mcp-server/mcp_server/client/plain/plain_rest_client_model_operation.py 
b/mcp-server/mcp_server/client/plain/plain_rest_client_model_operation.py
index 7fce30bd91..1f36bc8359 100644
--- a/mcp-server/mcp_server/client/plain/plain_rest_client_model_operation.py
+++ b/mcp-server/mcp_server/client/plain/plain_rest_client_model_operation.py
@@ -16,7 +16,10 @@
 # under the License.
 
 from mcp_server.client import ModelOperation
-from mcp_server.client.plain.utils import encode_path_segment
+from mcp_server.client.plain.utils import (
+    encode_path_segment,
+    extract_content_from_response,
+)
 
 
 class PlainRESTClientModelOperation(ModelOperation):
@@ -81,3 +84,66 @@ class PlainRESTClientModelOperation(ModelOperation):
             f"/aliases/{encode_path_segment(alias)}"
         )
         return response.json().get("modelVersion", {})
+
+    async def register_model(
+        self,
+        catalog_name: str,
+        schema_name: str,
+        name: str,
+        comment: str,
+        properties: dict,
+    ) -> str:
+        response = await self.rest_client.post(
+            f"/api/metalakes/{encode_path_segment(self.metalake_name)}"
+            f"/catalogs/{encode_path_segment(catalog_name)}"
+            f"/schemas/{encode_path_segment(schema_name)}/models",
+            json={"name": name, "comment": comment, "properties": properties},
+        )
+        return extract_content_from_response(response, "model", {})
+
+    async def delete_model(
+        self, catalog_name: str, schema_name: str, model_name: str
+    ) -> str:
+        response = await self.rest_client.delete(
+            f"/api/metalakes/{encode_path_segment(self.metalake_name)}"
+            f"/catalogs/{encode_path_segment(catalog_name)}"
+            f"/schemas/{encode_path_segment(schema_name)}"
+            f"/models/{encode_path_segment(model_name)}"
+        )
+        return extract_content_from_response(response, "deleted", False)
+
+    async def link_model_version(
+        self,
+        catalog_name: str,
+        schema_name: str,
+        model_name: str,
+        uri: str,
+        aliases: list,
+        comment: str,
+        properties: dict,
+    ) -> str:
+        response = await self.rest_client.post(
+            f"/api/metalakes/{encode_path_segment(self.metalake_name)}"
+            f"/catalogs/{encode_path_segment(catalog_name)}"
+            f"/schemas/{encode_path_segment(schema_name)}"
+            f"/models/{encode_path_segment(model_name)}/versions",
+            json={
+                "uri": uri,
+                "aliases": aliases,
+                "comment": comment,
+                "properties": properties,
+            },
+        )
+        return extract_content_from_response(response, "modelVersion", {})
+
+    async def delete_model_version(
+        self, catalog_name: str, schema_name: str, model_name: str, version: 
int
+    ) -> str:
+        response = await self.rest_client.delete(
+            f"/api/metalakes/{encode_path_segment(self.metalake_name)}"
+            f"/catalogs/{encode_path_segment(catalog_name)}"
+            f"/schemas/{encode_path_segment(schema_name)}"
+            f"/models/{encode_path_segment(model_name)}"
+            f"/versions/{encode_path_segment(version)}"
+        )
+        return extract_content_from_response(response, "deleted", False)
diff --git 
a/mcp-server/mcp_server/client/plain/plain_rest_client_schema_operation.py 
b/mcp-server/mcp_server/client/plain/plain_rest_client_schema_operation.py
index 4c2c76ed44..ac6554bedb 100644
--- a/mcp-server/mcp_server/client/plain/plain_rest_client_schema_operation.py
+++ b/mcp-server/mcp_server/client/plain/plain_rest_client_schema_operation.py
@@ -35,3 +35,32 @@ class PlainRESTClientSchemaOperation(SchemaOperation):
             f"/catalogs/{encode_path_segment(catalog_name)}/schemas"
         )
         return extract_content_from_response(response, "identifiers", [])
+
+    async def create_schema(
+        self, catalog_name: str, name: str, comment: str, properties: dict
+    ) -> str:
+        response = await self.rest_client.post(
+            f"/api/metalakes/{encode_path_segment(self.metalake_name)}"
+            f"/catalogs/{encode_path_segment(catalog_name)}/schemas",
+            json={"name": name, "comment": comment, "properties": properties},
+        )
+        return extract_content_from_response(response, "schema", {})
+
+    async def alter_schema(
+        self, catalog_name: str, schema_name: str, updates: list
+    ) -> str:
+        response = await self.rest_client.put(
+            f"/api/metalakes/{encode_path_segment(self.metalake_name)}"
+            f"/catalogs/{encode_path_segment(catalog_name)}"
+            f"/schemas/{encode_path_segment(schema_name)}",
+            json={"updates": updates},
+        )
+        return extract_content_from_response(response, "schema", {})
+
+    async def drop_schema(self, catalog_name: str, schema_name: str) -> str:
+        response = await self.rest_client.delete(
+            f"/api/metalakes/{encode_path_segment(self.metalake_name)}"
+            f"/catalogs/{encode_path_segment(catalog_name)}"
+            f"/schemas/{encode_path_segment(schema_name)}"
+        )
+        return extract_content_from_response(response, "dropped", False)
diff --git 
a/mcp-server/mcp_server/client/plain/plain_rest_client_table_operation.py 
b/mcp-server/mcp_server/client/plain/plain_rest_client_table_operation.py
index e1bee62cab..0ab05d2366 100644
--- a/mcp-server/mcp_server/client/plain/plain_rest_client_table_operation.py
+++ b/mcp-server/mcp_server/client/plain/plain_rest_client_table_operation.py
@@ -50,3 +50,52 @@ class PlainRESTClientTableOperation(TableOperation):
             f"/tables/{encode_path_segment(table_name)}"
         )
         return extract_content_from_response(response, "table", {})
+
+    async def create_table(
+        self,
+        catalog_name: str,
+        schema_name: str,
+        name: str,
+        comment: str,
+        columns: list,
+        properties: dict,
+    ) -> str:
+        response = await self.rest_client.post(
+            f"/api/metalakes/{encode_path_segment(self.metalake_name)}"
+            f"/catalogs/{encode_path_segment(catalog_name)}"
+            f"/schemas/{encode_path_segment(schema_name)}/tables",
+            json={
+                "name": name,
+                "comment": comment,
+                "columns": columns,
+                "properties": properties,
+            },
+        )
+        return extract_content_from_response(response, "table", {})
+
+    async def alter_table(
+        self,
+        catalog_name: str,
+        schema_name: str,
+        table_name: str,
+        updates: list,
+    ) -> str:
+        response = await self.rest_client.put(
+            f"/api/metalakes/{encode_path_segment(self.metalake_name)}"
+            f"/catalogs/{encode_path_segment(catalog_name)}"
+            f"/schemas/{encode_path_segment(schema_name)}"
+            f"/tables/{encode_path_segment(table_name)}",
+            json={"updates": updates},
+        )
+        return extract_content_from_response(response, "table", {})
+
+    async def drop_table(
+        self, catalog_name: str, schema_name: str, table_name: str
+    ) -> str:
+        response = await self.rest_client.delete(
+            f"/api/metalakes/{encode_path_segment(self.metalake_name)}"
+            f"/catalogs/{encode_path_segment(catalog_name)}"
+            f"/schemas/{encode_path_segment(schema_name)}"
+            f"/tables/{encode_path_segment(table_name)}"
+        )
+        return extract_content_from_response(response, "dropped", False)
diff --git 
a/mcp-server/mcp_server/client/plain/plain_rest_client_topic_operation.py 
b/mcp-server/mcp_server/client/plain/plain_rest_client_topic_operation.py
index ff04720197..18bae32709 100644
--- a/mcp-server/mcp_server/client/plain/plain_rest_client_topic_operation.py
+++ b/mcp-server/mcp_server/client/plain/plain_rest_client_topic_operation.py
@@ -15,7 +15,10 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from mcp_server.client.plain.utils import encode_path_segment
+from mcp_server.client.plain.utils import (
+    encode_path_segment,
+    extract_content_from_response,
+)
 from mcp_server.client.topic_operation import TopicOperation
 
 
@@ -46,3 +49,46 @@ class PlainRESTClientTopicOperation(TopicOperation):
             f"/topics/{encode_path_segment(topic_name)}"
         )
         return response.json().get("topic", {})
+
+    async def create_topic(
+        self,
+        catalog_name: str,
+        schema_name: str,
+        name: str,
+        comment: str,
+        properties: dict,
+    ) -> str:
+        response = await self.rest_client.post(
+            f"/api/metalakes/{encode_path_segment(self.metalake_name)}"
+            f"/catalogs/{encode_path_segment(catalog_name)}"
+            f"/schemas/{encode_path_segment(schema_name)}/topics",
+            json={"name": name, "comment": comment, "properties": properties},
+        )
+        return extract_content_from_response(response, "topic", {})
+
+    async def alter_topic(
+        self,
+        catalog_name: str,
+        schema_name: str,
+        topic_name: str,
+        updates: list,
+    ) -> str:
+        response = await self.rest_client.put(
+            f"/api/metalakes/{encode_path_segment(self.metalake_name)}"
+            f"/catalogs/{encode_path_segment(catalog_name)}"
+            f"/schemas/{encode_path_segment(schema_name)}"
+            f"/topics/{encode_path_segment(topic_name)}",
+            json={"updates": updates},
+        )
+        return extract_content_from_response(response, "topic", {})
+
+    async def delete_topic(
+        self, catalog_name: str, schema_name: str, topic_name: str
+    ) -> str:
+        response = await self.rest_client.delete(
+            f"/api/metalakes/{encode_path_segment(self.metalake_name)}"
+            f"/catalogs/{encode_path_segment(catalog_name)}"
+            f"/schemas/{encode_path_segment(schema_name)}"
+            f"/topics/{encode_path_segment(topic_name)}"
+        )
+        return extract_content_from_response(response, "dropped", False)
diff --git a/mcp-server/mcp_server/client/schema_operation.py 
b/mcp-server/mcp_server/client/schema_operation.py
index b90d7b96f2..6e7376d6f9 100644
--- a/mcp-server/mcp_server/client/schema_operation.py
+++ b/mcp-server/mcp_server/client/schema_operation.py
@@ -35,3 +35,19 @@ class SchemaOperation(ABC):
             str: JSON-formatted string containing catalog information.
         """
         pass
+
+    @abstractmethod
+    async def create_schema(
+        self, catalog_name: str, name: str, comment: str, properties: dict
+    ) -> str:
+        pass
+
+    @abstractmethod
+    async def alter_schema(
+        self, catalog_name: str, schema_name: str, updates: list
+    ) -> str:
+        pass
+
+    @abstractmethod
+    async def drop_schema(self, catalog_name: str, schema_name: str) -> str:
+        pass
diff --git a/mcp-server/mcp_server/client/table_operation.py 
b/mcp-server/mcp_server/client/table_operation.py
index 2d188ebf25..3f70e042bc 100644
--- a/mcp-server/mcp_server/client/table_operation.py
+++ b/mcp-server/mcp_server/client/table_operation.py
@@ -55,3 +55,31 @@ class TableOperation(ABC):
             str: JSON-formatted string containing full table metadata
         """
         pass
+
+    @abstractmethod
+    async def create_table(
+        self,
+        catalog_name: str,
+        schema_name: str,
+        name: str,
+        comment: str,
+        columns: list,
+        properties: dict,
+    ) -> str:
+        pass
+
+    @abstractmethod
+    async def alter_table(
+        self,
+        catalog_name: str,
+        schema_name: str,
+        table_name: str,
+        updates: list,
+    ) -> str:
+        pass
+
+    @abstractmethod
+    async def drop_table(
+        self, catalog_name: str, schema_name: str, table_name: str
+    ) -> str:
+        pass
diff --git a/mcp-server/mcp_server/client/topic_operation.py 
b/mcp-server/mcp_server/client/topic_operation.py
index 6bc0bc94ab..449b90207e 100644
--- a/mcp-server/mcp_server/client/topic_operation.py
+++ b/mcp-server/mcp_server/client/topic_operation.py
@@ -53,3 +53,30 @@ class TopicOperation(ABC):
             str: JSON-formatted string containing full topic metadata
         """
         pass
+
+    @abstractmethod
+    async def create_topic(
+        self,
+        catalog_name: str,
+        schema_name: str,
+        name: str,
+        comment: str,
+        properties: dict,
+    ) -> str:
+        pass
+
+    @abstractmethod
+    async def alter_topic(
+        self,
+        catalog_name: str,
+        schema_name: str,
+        topic_name: str,
+        updates: list,
+    ) -> str:
+        pass
+
+    @abstractmethod
+    async def delete_topic(
+        self, catalog_name: str, schema_name: str, topic_name: str
+    ) -> str:
+        pass
diff --git a/mcp-server/mcp_server/server.py b/mcp-server/mcp_server/server.py
index aea7df7f78..4280932c9e 100644
--- a/mcp-server/mcp_server/server.py
+++ b/mcp-server/mcp_server/server.py
@@ -16,7 +16,6 @@
 # under the License.
 
 import asyncio
-import contextlib
 import logging
 from contextlib import asynccontextmanager
 from typing import AsyncIterator
diff --git a/mcp-server/mcp_server/tools/catalog.py 
b/mcp-server/mcp_server/tools/catalog.py
index 50115610e8..442e4ce02b 100644
--- a/mcp-server/mcp_server/tools/catalog.py
+++ b/mcp-server/mcp_server/tools/catalog.py
@@ -108,3 +108,78 @@ def load_catalog_tools(mcp: FastMCP):
         """
         client = ctx.request_context.lifespan_context.rest_client()
         return await client.as_catalog_operation().get_list_of_catalogs()
+
+    @mcp.tool(tags={"catalog"})
+    async def create_catalog(
+        ctx: Context,
+        name: str,
+        catalog_type: str,
+        provider: str,
+        comment: str,
+        properties: dict,
+    ) -> str:
+        """
+        Create a new catalog within the metalake.
+
+        Authorization is enforced by Gravitino: a principal without the
+        required grant receives an authorization denial.
+
+        Args:
+            ctx (Context): The request context object.
+            name (str): Name of the catalog to create.
+            catalog_type (str): Catalog type, one of "relational", "fileset",
+                "messaging", "model".
+            provider (str): Provider implementation, e.g. "hive",
+                "lakehouse-iceberg", "jdbc-postgresql", "kafka". May be empty
+                for model/fileset catalogs that have a single provider.
+            comment (str): Human-readable description.
+            properties (dict): Catalog configuration properties.
+
+        Returns:
+            str: JSON-formatted string containing the created catalog.
+        """
+        client = ctx.request_context.lifespan_context.rest_client()
+        return await client.as_catalog_operation().create_catalog(
+            name, catalog_type, provider, comment, properties
+        )
+
+    @mcp.tool(tags={"catalog"})
+    async def alter_catalog(
+        ctx: Context, catalog_name: str, updates: list
+    ) -> str:
+        """
+        Alter an existing catalog.
+
+        Args:
+            ctx (Context): The request context object.
+            catalog_name (str): Name of the catalog to alter.
+            updates (list): List of update operations. Example:
+                [
+                  {"@type": "rename", "newName": "new_catalog"},
+                  {"@type": "updateComment", "newComment": "updated"},
+                  {"@type": "setProperty", "property": "k", "value": "v"},
+                  {"@type": "removeProperty", "property": "k"}
+                ]
+
+        Returns:
+            str: JSON-formatted string containing the altered catalog.
+        """
+        client = ctx.request_context.lifespan_context.rest_client()
+        return await client.as_catalog_operation().alter_catalog(
+            catalog_name, updates
+        )
+
+    @mcp.tool(tags={"catalog"})
+    async def drop_catalog(ctx: Context, catalog_name: str) -> str:
+        """
+        Drop a catalog by its name.
+
+        Args:
+            ctx (Context): The request context object.
+            catalog_name (str): Name of the catalog to drop.
+
+        Returns:
+            str: JSON-formatted string indicating whether the catalog was 
dropped.
+        """
+        client = ctx.request_context.lifespan_context.rest_client()
+        return await client.as_catalog_operation().drop_catalog(catalog_name)
diff --git a/mcp-server/mcp_server/tools/fileset.py 
b/mcp-server/mcp_server/tools/fileset.py
index c3997ac16d..0df5070b7b 100644
--- a/mcp-server/mcp_server/tools/fileset.py
+++ b/mcp-server/mcp_server/tools/fileset.py
@@ -143,3 +143,97 @@ def load_fileset_tools(mcp: FastMCP):
         return await client.as_fileset_operation().list_files_in_fileset(
             catalog_name, schema_name, fileset_name, location_name, sub_path
         )
+
+    # pylint:disable=too-many-positional-arguments
+    @mcp.tool(tags={"fileset"})
+    async def create_fileset(
+        ctx: Context,
+        catalog_name: str,
+        schema_name: str,
+        name: str,
+        fileset_type: str,
+        storage_location: str,
+        comment: str,
+        properties: dict,
+    ) -> str:
+        """
+        Create a new fileset within a schema.
+
+        Authorization is enforced by Gravitino: a principal without the
+        required grant receives an authorization denial.
+
+        Args:
+            ctx (Context): The request context object.
+            catalog_name (str): Name of the catalog.
+            schema_name (str): Name of the schema.
+            name (str): Name of the fileset to create.
+            fileset_type (str): Fileset type, one of "managed" or "external".
+            storage_location (str): Storage location URI, e.g. 
"file:/tmp/fileset1".
+            comment (str): Human-readable description.
+            properties (dict): Fileset configuration properties.
+
+        Returns:
+            str: JSON-formatted string containing the created fileset.
+        """
+        client = ctx.request_context.lifespan_context.rest_client()
+        return await client.as_fileset_operation().create_fileset(
+            catalog_name,
+            schema_name,
+            name,
+            fileset_type,
+            storage_location,
+            comment,
+            properties,
+        )
+
+    @mcp.tool(tags={"fileset"})
+    async def alter_fileset(
+        ctx: Context,
+        catalog_name: str,
+        schema_name: str,
+        fileset_name: str,
+        updates: list,
+    ) -> str:
+        """
+        Alter an existing fileset.
+
+        Args:
+            ctx (Context): The request context object.
+            catalog_name (str): Name of the catalog.
+            schema_name (str): Name of the schema.
+            fileset_name (str): Name of the fileset to alter.
+            updates (list): List of update operations. Example:
+                [
+                  {"@type": "rename", "newName": "new_fileset"},
+                  {"@type": "updateComment", "newComment": "updated"},
+                  {"@type": "setProperty", "property": "k", "value": "v"}
+                ]
+
+        Returns:
+            str: JSON-formatted string containing the altered fileset.
+        """
+        client = ctx.request_context.lifespan_context.rest_client()
+        return await client.as_fileset_operation().alter_fileset(
+            catalog_name, schema_name, fileset_name, updates
+        )
+
+    @mcp.tool(tags={"fileset"})
+    async def drop_fileset(
+        ctx: Context, catalog_name: str, schema_name: str, fileset_name: str
+    ) -> str:
+        """
+        Drop a fileset by its name.
+
+        Args:
+            ctx (Context): The request context object.
+            catalog_name (str): Name of the catalog.
+            schema_name (str): Name of the schema.
+            fileset_name (str): Name of the fileset to drop.
+
+        Returns:
+            str: JSON-formatted string indicating whether the fileset was 
dropped.
+        """
+        client = ctx.request_context.lifespan_context.rest_client()
+        return await client.as_fileset_operation().drop_fileset(
+            catalog_name, schema_name, fileset_name
+        )
diff --git a/mcp-server/mcp_server/tools/model.py 
b/mcp-server/mcp_server/tools/model.py
index 4e5f21fdb5..655423e168 100644
--- a/mcp-server/mcp_server/tools/model.py
+++ b/mcp-server/mcp_server/tools/model.py
@@ -230,3 +230,120 @@ def load_model_tools(mcp: FastMCP):
         return await client.as_model_operation().load_model_version_by_alias(
             catalog_name, schema_name, model_name, alias
         )
+
+    @mcp.tool(tags={"model"})
+    async def register_model(
+        ctx: Context,
+        catalog_name: str,
+        schema_name: str,
+        name: str,
+        comment: str,
+        properties: dict,
+    ) -> str:
+        """
+        Register a new model within a schema.
+
+        Authorization is enforced by Gravitino: a principal without the
+        required grant receives an authorization denial.
+
+        Args:
+            ctx (Context): The request context object.
+            catalog_name (str): Name of the catalog.
+            schema_name (str): Name of the schema.
+            name (str): Name of the model to register.
+            comment (str): Human-readable description.
+            properties (dict): Model configuration properties.
+
+        Returns:
+            str: JSON-formatted string containing the registered model.
+        """
+        client = ctx.request_context.lifespan_context.rest_client()
+        return await client.as_model_operation().register_model(
+            catalog_name, schema_name, name, comment, properties
+        )
+
+    @mcp.tool(tags={"model"})
+    async def delete_model(
+        ctx: Context, catalog_name: str, schema_name: str, model_name: str
+    ) -> str:
+        """
+        Delete a model by its name.
+
+        Args:
+            ctx (Context): The request context object.
+            catalog_name (str): Name of the catalog.
+            schema_name (str): Name of the schema.
+            model_name (str): Name of the model to delete.
+
+        Returns:
+            str: JSON-formatted string indicating whether the model was 
deleted.
+        """
+        client = ctx.request_context.lifespan_context.rest_client()
+        return await client.as_model_operation().delete_model(
+            catalog_name, schema_name, model_name
+        )
+
+    # pylint:disable=too-many-positional-arguments
+    @mcp.tool(tags={"model"})
+    async def link_model_version(
+        ctx: Context,
+        catalog_name: str,
+        schema_name: str,
+        model_name: str,
+        uri: str,
+        aliases: list,
+        comment: str,
+        properties: dict,
+    ) -> str:
+        """
+        Link a new version to an existing model.
+
+        Args:
+            ctx (Context): The request context object.
+            catalog_name (str): Name of the catalog.
+            schema_name (str): Name of the schema.
+            model_name (str): Name of the model.
+            uri (str): URI of the model version artifact.
+            aliases (list): List of string aliases for this version.
+            comment (str): Human-readable description.
+            properties (dict): Model version configuration properties.
+
+        Returns:
+            str: JSON-formatted string containing the linked model version.
+        """
+        client = ctx.request_context.lifespan_context.rest_client()
+        return await client.as_model_operation().link_model_version(
+            catalog_name,
+            schema_name,
+            model_name,
+            uri,
+            aliases,
+            comment,
+            properties,
+        )
+
+    @mcp.tool(tags={"model"})
+    async def delete_model_version(
+        ctx: Context,
+        catalog_name: str,
+        schema_name: str,
+        model_name: str,
+        version: int,
+    ) -> str:
+        """
+        Delete a specific version of a model.
+
+        Args:
+            ctx (Context): The request context object.
+            catalog_name (str): Name of the catalog.
+            schema_name (str): Name of the schema.
+            model_name (str): Name of the model.
+            version (int): Version number to delete.
+
+        Returns:
+            str: JSON-formatted string indicating whether the version was 
deleted.
+        """
+        client = ctx.request_context.lifespan_context.rest_client()
+        return await client.as_model_operation().delete_model_version(
+            catalog_name, schema_name, model_name, version
+        )
diff --git a/mcp-server/mcp_server/tools/schema.py 
b/mcp-server/mcp_server/tools/schema.py
index 3a37353a4b..dc5957c54d 100644
--- a/mcp-server/mcp_server/tools/schema.py
+++ b/mcp-server/mcp_server/tools/schema.py
@@ -61,3 +61,77 @@ def load_schema_tools(mcp: FastMCP):
         return await client.as_schema_operation().get_list_of_schemas(
             catalog_name
         )
+
+    @mcp.tool(tags={"schema"})
+    async def create_schema(
+        ctx: Context,
+        catalog_name: str,
+        name: str,
+        comment: str,
+        properties: dict,
+    ) -> str:
+        """
+        Create a new schema within a catalog.
+
+        Authorization is enforced by Gravitino: a principal without the
+        required grant receives an authorization denial.
+
+        Args:
+            ctx (Context): The request context object.
+            catalog_name (str): Name of the catalog to create the schema in.
+            name (str): Name of the schema to create.
+            comment (str): Human-readable description.
+            properties (dict): Schema configuration properties.
+
+        Returns:
+            str: JSON-formatted string containing the created schema.
+        """
+        client = ctx.request_context.lifespan_context.rest_client()
+        return await client.as_schema_operation().create_schema(
+            catalog_name, name, comment, properties
+        )
+
+    @mcp.tool(tags={"schema"})
+    async def alter_schema(
+        ctx: Context, catalog_name: str, schema_name: str, updates: list
+    ) -> str:
+        """
+        Alter an existing schema.
+
+        Args:
+            ctx (Context): The request context object.
+            catalog_name (str): Name of the catalog containing the schema.
+            schema_name (str): Name of the schema to alter.
+            updates (list): List of update operations. Example:
+                [
+                  {"@type": "setProperty", "property": "k", "value": "v"},
+                  {"@type": "removeProperty", "property": "k"}
+                ]
+
+        Returns:
+            str: JSON-formatted string containing the altered schema.
+        """
+        client = ctx.request_context.lifespan_context.rest_client()
+        return await client.as_schema_operation().alter_schema(
+            catalog_name, schema_name, updates
+        )
+
+    @mcp.tool(tags={"schema"})
+    async def drop_schema(
+        ctx: Context, catalog_name: str, schema_name: str
+    ) -> str:
+        """
+        Drop a schema by its name.
+
+        Args:
+            ctx (Context): The request context object.
+            catalog_name (str): Name of the catalog containing the schema.
+            schema_name (str): Name of the schema to drop.
+
+        Returns:
+            str: JSON-formatted string indicating whether the schema was 
dropped.
+        """
+        client = ctx.request_context.lifespan_context.rest_client()
+        return await client.as_schema_operation().drop_schema(
+            catalog_name, schema_name
+        )
diff --git a/mcp-server/mcp_server/tools/table.py 
b/mcp-server/mcp_server/tools/table.py
index af6c62da6f..fbb3f03846 100644
--- a/mcp-server/mcp_server/tools/table.py
+++ b/mcp-server/mcp_server/tools/table.py
@@ -257,3 +257,95 @@ def load_table_tools(mcp: FastMCP):
         return await client.as_table_operation().load_table(
             catalog_name, schema_name, table_name
         )
+
+    @mcp.tool(tags={"table"})
+    async def create_table(
+        ctx: Context,
+        catalog_name: str,
+        schema_name: str,
+        name: str,
+        comment: str,
+        columns: list,
+        properties: dict,
+    ) -> str:
+        """
+        Create a new table within a schema.
+
+        Authorization is enforced by Gravitino: a principal without the
+        required grant receives an authorization denial.
+
+        Args:
+            ctx (Context): The request context object.
+            catalog_name (str): Name of the catalog.
+            schema_name (str): Name of the schema.
+            name (str): Name of the table to create.
+            comment (str): Human-readable description.
+            columns (list): List of column definitions. Each column is a dict:
+                {
+                  "name": "id",
+                  "type": "integer",
+                  "comment": "primary id",
+                  "nullable": false,
+                  "autoIncrement": false
+                }
+            properties (dict): Table configuration properties.
+
+        Returns:
+            str: JSON-formatted string containing the created table.
+        """
+        client = ctx.request_context.lifespan_context.rest_client()
+        return await client.as_table_operation().create_table(
+            catalog_name, schema_name, name, comment, columns, properties
+        )
+
+    @mcp.tool(tags={"table"})
+    async def alter_table(
+        ctx: Context,
+        catalog_name: str,
+        schema_name: str,
+        table_name: str,
+        updates: list,
+    ) -> str:
+        """
+        Alter an existing table.
+
+        Args:
+            ctx (Context): The request context object.
+            catalog_name (str): Name of the catalog.
+            schema_name (str): Name of the schema.
+            table_name (str): Name of the table to alter.
+            updates (list): List of update operations. Example:
+                [
+                  {"@type": "rename", "newName": "new_table"},
+                  {"@type": "updateComment", "newComment": "updated"},
+                  {"@type": "setProperty", "property": "k", "value": "v"}
+                ]
+
+        Returns:
+            str: JSON-formatted string containing the altered table.
+        """
+        client = ctx.request_context.lifespan_context.rest_client()
+        return await client.as_table_operation().alter_table(
+            catalog_name, schema_name, table_name, updates
+        )
+
+    @mcp.tool(tags={"table"})
+    async def drop_table(
+        ctx: Context, catalog_name: str, schema_name: str, table_name: str
+    ) -> str:
+        """
+        Drop a table by its name.
+
+        Args:
+            ctx (Context): The request context object.
+            catalog_name (str): Name of the catalog.
+            schema_name (str): Name of the schema.
+            table_name (str): Name of the table to drop.
+
+        Returns:
+            str: JSON-formatted string indicating whether the table was 
dropped.
+        """
+        client = ctx.request_context.lifespan_context.rest_client()
+        return await client.as_table_operation().drop_table(
+            catalog_name, schema_name, table_name
+        )
diff --git a/mcp-server/mcp_server/tools/topic.py 
b/mcp-server/mcp_server/tools/topic.py
index 93d2aa57d5..82813bcaf3 100644
--- a/mcp-server/mcp_server/tools/topic.py
+++ b/mcp-server/mcp_server/tools/topic.py
@@ -132,3 +132,85 @@ def load_topic_tools(mcp: FastMCP):
         return await client.as_topic_operation().load_topic(
             catalog_name, schema_name, topic_name
         )
+
+    @mcp.tool(tags={"topic"})
+    async def create_topic(
+        ctx: Context,
+        catalog_name: str,
+        schema_name: str,
+        name: str,
+        comment: str,
+        properties: dict,
+    ) -> str:
+        """
+        Create a new topic within a schema.
+
+        Authorization is enforced by Gravitino: a principal without the
+        required grant receives an authorization denial.
+
+        Args:
+            ctx (Context): The request context object.
+            catalog_name (str): Name of the catalog.
+            schema_name (str): Name of the schema.
+            name (str): Name of the topic to create.
+            comment (str): Human-readable description.
+            properties (dict): Topic configuration properties.
+
+        Returns:
+            str: JSON-formatted string containing the created topic.
+        """
+        client = ctx.request_context.lifespan_context.rest_client()
+        return await client.as_topic_operation().create_topic(
+            catalog_name, schema_name, name, comment, properties
+        )
+
+    @mcp.tool(tags={"topic"})
+    async def alter_topic(
+        ctx: Context,
+        catalog_name: str,
+        schema_name: str,
+        topic_name: str,
+        updates: list,
+    ) -> str:
+        """
+        Alter an existing topic.
+
+        Args:
+            ctx (Context): The request context object.
+            catalog_name (str): Name of the catalog.
+            schema_name (str): Name of the schema.
+            topic_name (str): Name of the topic to alter.
+            updates (list): List of update operations. Example:
+                [
+                  {"@type": "updateComment", "newComment": "updated"},
+                  {"@type": "setProperty", "property": "k", "value": "v"}
+                ]
+
+        Returns:
+            str: JSON-formatted string containing the altered topic.
+        """
+        client = ctx.request_context.lifespan_context.rest_client()
+        return await client.as_topic_operation().alter_topic(
+            catalog_name, schema_name, topic_name, updates
+        )
+
+    @mcp.tool(tags={"topic"})
+    async def delete_topic(
+        ctx: Context, catalog_name: str, schema_name: str, topic_name: str
+    ) -> str:
+        """
+        Delete a topic by its name.
+
+        Args:
+            ctx (Context): The request context object.
+            catalog_name (str): Name of the catalog.
+            schema_name (str): Name of the schema.
+            topic_name (str): Name of the topic to delete.
+
+        Returns:
+            str: JSON-formatted string indicating whether the topic was 
deleted.
+        """
+        client = ctx.request_context.lifespan_context.rest_client()
+        return await client.as_topic_operation().delete_topic(
+            catalog_name, schema_name, topic_name
+        )
diff --git a/mcp-server/tests/unit/test_audit.py 
b/mcp-server/tests/unit/test_audit.py
index 37cdef29cb..c52a09a391 100644
--- a/mcp-server/tests/unit/test_audit.py
+++ b/mcp-server/tests/unit/test_audit.py
@@ -25,7 +25,9 @@ from fastmcp import Client
 
 from mcp_server.client.factory import RESTClientFactory
 from mcp_server.client.plain.exception import GravitinoException
-from mcp_server.client.plain.plain_rest_client_operation import 
PlainRESTClientOperation
+from mcp_server.client.plain.plain_rest_client_operation import (
+    PlainRESTClientOperation,
+)
 from mcp_server.core import audit
 from mcp_server.core.setting import Setting
 from mcp_server.server import GravitinoMCPServer
@@ -54,7 +56,9 @@ class TestAuditEmit(unittest.TestCase):
 
     def test_allow_record_structure(self):
         """emit() writes a JSON record with all required fields on allow."""
-        audit.emit(principal="bearer:abc12345", tool="list_catalogs", 
outcome="allow")
+        audit.emit(
+            principal="bearer:abc12345", tool="list_catalogs", outcome="allow"
+        )
 
         self.assertEqual(len(self.log_records), 1)
         record = json.loads(self.log_records[0])
@@ -79,7 +83,9 @@ class TestAuditEmit(unittest.TestCase):
 
     def test_anonymous_principal(self):
         """emit() works with anonymous principal."""
-        audit.emit(principal="anonymous", tool="get_list_of_catalogs", 
outcome="allow")
+        audit.emit(
+            principal="anonymous", tool="get_list_of_catalogs", outcome="allow"
+        )
         record = json.loads(self.log_records[0])
         self.assertEqual(record["principal"], "anonymous")
 
@@ -173,6 +179,7 @@ class TestAuditMiddlewareIntegration(unittest.TestCase):
 # Helpers
 # ---------------------------------------------------------------------------
 
+
 class _CapturingHandler(logging.Handler):
     """Logging handler that stores formatted messages in a list."""
 
diff --git a/mcp-server/tests/unit/test_auth_flow.py 
b/mcp-server/tests/unit/test_auth_flow.py
index a413515bf9..ca6b59b671 100644
--- a/mcp-server/tests/unit/test_auth_flow.py
+++ b/mcp-server/tests/unit/test_auth_flow.py
@@ -45,7 +45,9 @@ class TestTokenInjection(unittest.TestCase):
 
     def test_no_token_argument_no_authorization_header(self):
         """When token argument is omitted entirely, no Authorization header is 
added."""
-        client = PlainRESTClientOperation("my_metalake", 
"http://localhost:8090";)
+        client = PlainRESTClientOperation(
+            "my_metalake", "http://localhost:8090";
+        )
         headers = dict(client._catalog_operation.rest_client.headers)
         self.assertNotIn("authorization", headers)
 
diff --git a/mcp-server/tests/unit/test_per_request_token.py 
b/mcp-server/tests/unit/test_per_request_token.py
index c08cbeeb39..77ad709ce6 100644
--- a/mcp-server/tests/unit/test_per_request_token.py
+++ b/mcp-server/tests/unit/test_per_request_token.py
@@ -38,7 +38,9 @@ class TestExtractBearerToken(unittest.TestCase):
     """Unit tests for the token extraction helper."""
 
     def test_well_formed_bearer_header(self):
-        self.assertEqual(_extract_bearer_token("Bearer mytoken123"), 
"mytoken123")
+        self.assertEqual(
+            _extract_bearer_token("Bearer mytoken123"), "mytoken123"
+        )
 
     def test_case_insensitive_bearer(self):
         self.assertEqual(_extract_bearer_token("bearer MYTOKEN"), "MYTOKEN")
@@ -153,8 +155,12 @@ class 
TestGravitinoContextPerRequestToken(unittest.TestCase):
         ):
             client_bob = ctx.rest_client()
 
-        alice_headers = 
dict(client_alice._catalog_operation.rest_client.headers)
+        alice_headers = dict(
+            client_alice._catalog_operation.rest_client.headers
+        )
         bob_headers = dict(client_bob._catalog_operation.rest_client.headers)
-        self.assertEqual(alice_headers.get("authorization"), "Bearer 
alice-token")
+        self.assertEqual(
+            alice_headers.get("authorization"), "Bearer alice-token"
+        )
         self.assertEqual(bob_headers.get("authorization"), "Bearer bob-token")
         self.assertIsNot(client_alice, client_bob)
diff --git a/mcp-server/tests/unit/tools/mock_operation.py 
b/mcp-server/tests/unit/tools/mock_operation.py
index a985bf7912..eb6c1f2cff 100644
--- a/mcp-server/tests/unit/tools/mock_operation.py
+++ b/mcp-server/tests/unit/tools/mock_operation.py
@@ -69,11 +69,33 @@ class MockCatalogOperation(CatalogOperation):
     async def get_list_of_catalogs(self) -> str:
         return "mock_catalogs"
 
+    async def create_catalog(
+        self, name, catalog_type, provider, comment, properties
+    ) -> str:
+        return f"mock_catalog_created: {name}, {catalog_type}, {provider}"
+
+    async def alter_catalog(self, catalog_name, updates) -> str:
+        return f"mock_catalog_altered: {catalog_name} with updates {updates}"
+
+    async def drop_catalog(self, catalog_name) -> str:
+        return f"mock_catalog_dropped: {catalog_name}"
+
 
 class MockSchemaOperation(SchemaOperation):
     async def get_list_of_schemas(self, catalog_name: str) -> str:
         return "mock_schemas"
 
+    async def create_schema(
+        self, catalog_name, name, comment, properties
+    ) -> str:
+        return f"mock_schema_created: {catalog_name}.{name}"
+
+    async def alter_schema(self, catalog_name, schema_name, updates) -> str:
+        return f"mock_schema_altered: {catalog_name}.{schema_name} with 
updates {updates}"
+
+    async def drop_schema(self, catalog_name, schema_name) -> str:
+        return f"mock_schema_dropped: {catalog_name}.{schema_name}"
+
 
 class MockTableOperation(TableOperation):
     async def get_list_of_tables(
@@ -86,6 +108,23 @@ class MockTableOperation(TableOperation):
     ) -> str:
         return "mock_table"
 
+    # pylint: disable=R0917
+    async def create_table(
+        self, catalog_name, schema_name, name, comment, columns, properties
+    ) -> str:
+        return f"mock_table_created: {catalog_name}.{schema_name}.{name}"
+
+    async def alter_table(
+        self, catalog_name, schema_name, table_name, updates
+    ) -> str:
+        return (
+            f"mock_table_altered: {catalog_name}.{schema_name}.{table_name} "
+            f"with updates {updates}"
+        )
+
+    async def drop_table(self, catalog_name, schema_name, table_name) -> str:
+        return f"mock_table_dropped: {catalog_name}.{schema_name}.{table_name}"
+
 
 class MockFilesetOperation(FilesetOperation):
     async def list_of_filesets(
@@ -110,6 +149,34 @@ class MockFilesetOperation(FilesetOperation):
     ) -> str:
         return "mock_files_in_fileset"
 
+    # pylint: disable=R0917
+    async def create_fileset(
+        self,
+        catalog_name,
+        schema_name,
+        name,
+        fileset_type,
+        storage_location,
+        comment,
+        properties,
+    ) -> str:
+        return f"mock_fileset_created: {catalog_name}.{schema_name}.{name}"
+
+    async def alter_fileset(
+        self, catalog_name, schema_name, fileset_name, updates
+    ) -> str:
+        return (
+            f"mock_fileset_altered: 
{catalog_name}.{schema_name}.{fileset_name} "
+            f"with updates {updates}"
+        )
+
+    async def drop_fileset(
+        self, catalog_name, schema_name, fileset_name
+    ) -> str:
+        return (
+            f"mock_fileset_dropped: 
{catalog_name}.{schema_name}.{fileset_name}"
+        )
+
 
 class MockPolicyOperation(PolicyOperation):
     async def associate_policy_with_metadata(
@@ -170,6 +237,38 @@ class MockModelOperation(ModelOperation):
     ) -> str:
         return "mock_model_version_by_alias"
 
+    async def register_model(
+        self, catalog_name, schema_name, name, comment, properties
+    ) -> str:
+        return f"mock_model_registered: {catalog_name}.{schema_name}.{name}"
+
+    async def delete_model(self, catalog_name, schema_name, model_name) -> str:
+        return f"mock_model_deleted: {catalog_name}.{schema_name}.{model_name}"
+
+    # pylint: disable=R0917
+    async def link_model_version(
+        self,
+        catalog_name,
+        schema_name,
+        model_name,
+        uri,
+        aliases,
+        comment,
+        properties,
+    ) -> str:
+        return (
+            f"mock_model_version_linked: 
{catalog_name}.{schema_name}.{model_name} "
+            f"uri={uri} aliases={aliases}"
+        )
+
+    async def delete_model_version(
+        self, catalog_name, schema_name, model_name, version
+    ) -> str:
+        return (
+            f"mock_model_version_deleted: 
{catalog_name}.{schema_name}.{model_name} "
+            f"version={version}"
+        )
+
 
 class MockTopicOperation(TopicOperation):
     async def list_of_topics(self, catalog_name: str, schema_name: str) -> str:
@@ -180,6 +279,22 @@ class MockTopicOperation(TopicOperation):
     ) -> str:
         return "mock_topic"
 
+    async def create_topic(
+        self, catalog_name, schema_name, name, comment, properties
+    ) -> str:
+        return f"mock_topic_created: {catalog_name}.{schema_name}.{name}"
+
+    async def alter_topic(
+        self, catalog_name, schema_name, topic_name, updates
+    ) -> str:
+        return (
+            f"mock_topic_altered: {catalog_name}.{schema_name}.{topic_name} "
+            f"with updates {updates}"
+        )
+
+    async def delete_topic(self, catalog_name, schema_name, topic_name) -> str:
+        return f"mock_topic_deleted: {catalog_name}.{schema_name}.{topic_name}"
+
 
 class MockTagOperation(TagOperation):
     async def list_of_tags(self) -> str:
diff --git a/mcp-server/tests/unit/tools/test_catalog.py 
b/mcp-server/tests/unit/tools/test_catalog.py
index ca6c03f735..bf92a9adef 100644
--- a/mcp-server/tests/unit/tools/test_catalog.py
+++ b/mcp-server/tests/unit/tools/test_catalog.py
@@ -45,3 +45,61 @@ class TestCatalogTool(unittest.TestCase):
                 self.assertEqual("mock_catalogs", result.content[0].text)
 
         asyncio.run(_test_list_catalogs(self.mcp))
+
+    def test_create_catalog(self):
+        async def _test(mcp_server):
+            async with Client(mcp_server) as client:
+                result = await client.call_tool(
+                    "create_catalog",
+                    {
+                        "name": "cat1",
+                        "catalog_type": "relational",
+                        "provider": "hive",
+                        "comment": "c",
+                        "properties": {"k": "v"},
+                    },
+                )
+                self.assertEqual(
+                    "mock_catalog_created: cat1, relational, hive",
+                    result.content[0].text,
+                )
+
+        asyncio.run(_test(self.mcp))
+
+    def test_alter_catalog(self):
+        async def _test(mcp_server):
+            updates = [{"@type": "rename", "newName": "cat2"}]
+            async with Client(mcp_server) as client:
+                result = await client.call_tool(
+                    "alter_catalog",
+                    {"catalog_name": "cat1", "updates": updates},
+                )
+                self.assertEqual(
+                    f"mock_catalog_altered: cat1 with updates {updates}",
+                    result.content[0].text,
+                )
+
+        asyncio.run(_test(self.mcp))
+
+    def test_drop_catalog(self):
+        async def _test(mcp_server):
+            async with Client(mcp_server) as client:
+                result = await client.call_tool(
+                    "drop_catalog", {"catalog_name": "cat1"}
+                )
+                self.assertEqual(
+                    "mock_catalog_dropped: cat1", result.content[0].text
+                )
+
+        asyncio.run(_test(self.mcp))
+
+    def test_write_tools_exposed(self):
+        """Write tools must be present (authorization enforced by 
Gravitino)."""
+
+        async def _test(mcp_server):
+            names = {t.name for t in await mcp_server.list_tools()}
+            self.assertIn("create_catalog", names)
+            self.assertIn("alter_catalog", names)
+            self.assertIn("drop_catalog", names)
+
+        asyncio.run(_test(self.mcp))
diff --git a/mcp-server/tests/unit/tools/test_fileset.py 
b/mcp-server/tests/unit/tools/test_fileset.py
index a0c0371491..7a338133d4 100644
--- a/mcp-server/tests/unit/tools/test_fileset.py
+++ b/mcp-server/tests/unit/tools/test_fileset.py
@@ -75,3 +75,61 @@ class TestFilesetTool(unittest.TestCase):
                 )
 
         asyncio.run(_test_list_files_in_fileset(self.mcp))
+
+    def test_create_fileset(self):
+        async def _test(mcp_server):
+            async with Client(mcp_server) as client:
+                result = await client.call_tool(
+                    "create_fileset",
+                    {
+                        "catalog_name": "cat",
+                        "schema_name": "sch",
+                        "name": "fs",
+                        "fileset_type": "managed",
+                        "storage_location": "file:/tmp/fs",
+                        "comment": "c",
+                        "properties": {"k": "v"},
+                    },
+                )
+                self.assertEqual(
+                    "mock_fileset_created: cat.sch.fs", result.content[0].text
+                )
+
+        asyncio.run(_test(self.mcp))
+
+    def test_alter_fileset(self):
+        async def _test(mcp_server):
+            updates = [{"@type": "rename", "newName": "fs2"}]
+            async with Client(mcp_server) as client:
+                result = await client.call_tool(
+                    "alter_fileset",
+                    {
+                        "catalog_name": "cat",
+                        "schema_name": "sch",
+                        "fileset_name": "fs",
+                        "updates": updates,
+                    },
+                )
+                self.assertEqual(
+                    f"mock_fileset_altered: cat.sch.fs with updates {updates}",
+                    result.content[0].text,
+                )
+
+        asyncio.run(_test(self.mcp))
+
+    def test_drop_fileset(self):
+        async def _test(mcp_server):
+            async with Client(mcp_server) as client:
+                result = await client.call_tool(
+                    "drop_fileset",
+                    {
+                        "catalog_name": "cat",
+                        "schema_name": "sch",
+                        "fileset_name": "fs",
+                    },
+                )
+                self.assertEqual(
+                    "mock_fileset_dropped: cat.sch.fs", result.content[0].text
+                )
+
+        asyncio.run(_test(self.mcp))
diff --git a/mcp-server/tests/unit/tools/test_model.py 
b/mcp-server/tests/unit/tools/test_model.py
index ff27e64d3b..09d8db2b39 100644
--- a/mcp-server/tests/unit/tools/test_model.py
+++ b/mcp-server/tests/unit/tools/test_model.py
@@ -106,3 +106,81 @@ class TestModelTool(unittest.TestCase):
                 )
 
         asyncio.run(_test_load_model_version_by_alias(self.mcp))
+
+    def test_register_model(self):
+        async def _test(mcp_server):
+            async with Client(mcp_server) as client:
+                result = await client.call_tool(
+                    "register_model",
+                    {
+                        "catalog_name": "cat",
+                        "schema_name": "sch",
+                        "name": "m",
+                        "comment": "c",
+                        "properties": {"k": "v"},
+                    },
+                )
+                self.assertEqual(
+                    "mock_model_registered: cat.sch.m", result.content[0].text
+                )
+
+        asyncio.run(_test(self.mcp))
+
+    def test_delete_model(self):
+        async def _test(mcp_server):
+            async with Client(mcp_server) as client:
+                result = await client.call_tool(
+                    "delete_model",
+                    {
+                        "catalog_name": "cat",
+                        "schema_name": "sch",
+                        "model_name": "m",
+                    },
+                )
+                self.assertEqual(
+                    "mock_model_deleted: cat.sch.m", result.content[0].text
+                )
+
+        asyncio.run(_test(self.mcp))
+
+    def test_link_model_version(self):
+        async def _test(mcp_server):
+            async with Client(mcp_server) as client:
+                result = await client.call_tool(
+                    "link_model_version",
+                    {
+                        "catalog_name": "cat",
+                        "schema_name": "sch",
+                        "model_name": "m",
+                        "uri": "s3://bucket/model",
+                        "aliases": ["latest"],
+                        "comment": "c",
+                        "properties": {"k": "v"},
+                    },
+                )
+                self.assertEqual(
+                    "mock_model_version_linked: cat.sch.m "
+                    "uri=s3://bucket/model aliases=['latest']",
+                    result.content[0].text,
+                )
+
+        asyncio.run(_test(self.mcp))
+
+    def test_delete_model_version(self):
+        async def _test(mcp_server):
+            async with Client(mcp_server) as client:
+                result = await client.call_tool(
+                    "delete_model_version",
+                    {
+                        "catalog_name": "cat",
+                        "schema_name": "sch",
+                        "model_name": "m",
+                        "version": 1,
+                    },
+                )
+                self.assertEqual(
+                    "mock_model_version_deleted: cat.sch.m version=1",
+                    result.content[0].text,
+                )
+
+        asyncio.run(_test(self.mcp))
diff --git a/mcp-server/tests/unit/tools/test_schema.py 
b/mcp-server/tests/unit/tools/test_schema.py
index 443c3968dc..ef6d3cd118 100644
--- a/mcp-server/tests/unit/tools/test_schema.py
+++ b/mcp-server/tests/unit/tools/test_schema.py
@@ -42,3 +42,53 @@ class TestSchemaTool(unittest.TestCase):
                 self.assertEqual("mock_schemas", result.content[0].text)
 
         asyncio.run(_test_list_schemas(self.mcp))
+
+    def test_create_schema(self):
+        async def _test(mcp_server):
+            async with Client(mcp_server) as client:
+                result = await client.call_tool(
+                    "create_schema",
+                    {
+                        "catalog_name": "cat",
+                        "name": "sch",
+                        "comment": "c",
+                        "properties": {"k": "v"},
+                    },
+                )
+                self.assertEqual(
+                    "mock_schema_created: cat.sch", result.content[0].text
+                )
+
+        asyncio.run(_test(self.mcp))
+
+    def test_alter_schema(self):
+        async def _test(mcp_server):
+            updates = [{"@type": "setProperty", "property": "k", "value": "v"}]
+            async with Client(mcp_server) as client:
+                result = await client.call_tool(
+                    "alter_schema",
+                    {
+                        "catalog_name": "cat",
+                        "schema_name": "sch",
+                        "updates": updates,
+                    },
+                )
+                self.assertEqual(
+                    f"mock_schema_altered: cat.sch with updates {updates}",
+                    result.content[0].text,
+                )
+
+        asyncio.run(_test(self.mcp))
+
+    def test_drop_schema(self):
+        async def _test(mcp_server):
+            async with Client(mcp_server) as client:
+                result = await client.call_tool(
+                    "drop_schema",
+                    {"catalog_name": "cat", "schema_name": "sch"},
+                )
+                self.assertEqual(
+                    "mock_schema_dropped: cat.sch", result.content[0].text
+                )
+
+        asyncio.run(_test(self.mcp))
diff --git a/mcp-server/tests/unit/tools/test_table.py 
b/mcp-server/tests/unit/tools/test_table.py
index 1b426e5512..b14cd2d13b 100644
--- a/mcp-server/tests/unit/tools/test_table.py
+++ b/mcp-server/tests/unit/tools/test_table.py
@@ -57,3 +57,61 @@ class TestTableTool(unittest.TestCase):
                 self.assertEqual("mock_table", result.content[0].text)
 
         asyncio.run(_test_load_table(self.mcp))
+
+    def test_create_table(self):
+        async def _test(mcp_server):
+            columns = [{"name": "id", "type": "integer", "nullable": False}]
+            async with Client(mcp_server) as client:
+                result = await client.call_tool(
+                    "create_table",
+                    {
+                        "catalog_name": "cat",
+                        "schema_name": "sch",
+                        "name": "tbl",
+                        "comment": "c",
+                        "columns": columns,
+                        "properties": {"k": "v"},
+                    },
+                )
+                self.assertEqual(
+                    "mock_table_created: cat.sch.tbl", result.content[0].text
+                )
+
+        asyncio.run(_test(self.mcp))
+
+    def test_alter_table(self):
+        async def _test(mcp_server):
+            updates = [{"@type": "rename", "newName": "tbl2"}]
+            async with Client(mcp_server) as client:
+                result = await client.call_tool(
+                    "alter_table",
+                    {
+                        "catalog_name": "cat",
+                        "schema_name": "sch",
+                        "table_name": "tbl",
+                        "updates": updates,
+                    },
+                )
+                self.assertEqual(
+                    f"mock_table_altered: cat.sch.tbl with updates {updates}",
+                    result.content[0].text,
+                )
+
+        asyncio.run(_test(self.mcp))
+
+    def test_drop_table(self):
+        async def _test(mcp_server):
+            async with Client(mcp_server) as client:
+                result = await client.call_tool(
+                    "drop_table",
+                    {
+                        "catalog_name": "cat",
+                        "schema_name": "sch",
+                        "table_name": "tbl",
+                    },
+                )
+                self.assertEqual(
+                    "mock_table_dropped: cat.sch.tbl", result.content[0].text
+                )
+
+        asyncio.run(_test(self.mcp))
diff --git a/mcp-server/tests/unit/tools/test_topic.py 
b/mcp-server/tests/unit/tools/test_topic.py
index 28422d0634..6f3f915bb8 100644
--- a/mcp-server/tests/unit/tools/test_topic.py
+++ b/mcp-server/tests/unit/tools/test_topic.py
@@ -57,3 +57,59 @@ class TestTopicTool(unittest.TestCase):
                 self.assertEqual("mock_topic", result.content[0].text)
 
         asyncio.run(_test_load_topic(self.mcp))
+
+    def test_create_topic(self):
+        async def _test(mcp_server):
+            async with Client(mcp_server) as client:
+                result = await client.call_tool(
+                    "create_topic",
+                    {
+                        "catalog_name": "cat",
+                        "schema_name": "sch",
+                        "name": "tp",
+                        "comment": "c",
+                        "properties": {"k": "v"},
+                    },
+                )
+                self.assertEqual(
+                    "mock_topic_created: cat.sch.tp", result.content[0].text
+                )
+
+        asyncio.run(_test(self.mcp))
+
+    def test_alter_topic(self):
+        async def _test(mcp_server):
+            updates = [{"@type": "setProperty", "property": "k", "value": "v"}]
+            async with Client(mcp_server) as client:
+                result = await client.call_tool(
+                    "alter_topic",
+                    {
+                        "catalog_name": "cat",
+                        "schema_name": "sch",
+                        "topic_name": "tp",
+                        "updates": updates,
+                    },
+                )
+                self.assertEqual(
+                    f"mock_topic_altered: cat.sch.tp with updates {updates}",
+                    result.content[0].text,
+                )
+
+        asyncio.run(_test(self.mcp))
+
+    def test_delete_topic(self):
+        async def _test(mcp_server):
+            async with Client(mcp_server) as client:
+                result = await client.call_tool(
+                    "delete_topic",
+                    {
+                        "catalog_name": "cat",
+                        "schema_name": "sch",
+                        "topic_name": "tp",
+                    },
+                )
+                self.assertEqual(
+                    "mock_topic_deleted: cat.sch.tp", result.content[0].text
+                )
+
+        asyncio.run(_test(self.mcp))

Reply via email to