This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 554ce49fb [#3732] refactor(client-python): Update Python client to 
align with the Java client API (#4046)
554ce49fb is described below

commit 554ce49fbc7b42ba3b858a44b7e4d7cfb146f2b4
Author: noidname01 <[email protected]>
AuthorDate: Wed Jul 3 15:48:38 2024 +0800

    [#3732] refactor(client-python): Update Python client to align with the 
Java client API (#4046)
    
    ### What changes were proposed in this pull request?
    
    After the refactoring work in Java client is done
    (https://github.com/apache/gravitino/issues/3626), the Python client
    should also get updated to align with the Java API.
    
    * Update all the catalog, schema and their implementation to align with
    `client-java`
    * Remove redundant methods in `Namespace` and `NameIdentifier`
    * Add some missing tests and modify existing tests to conform with new
    API
    
    ### Why are the changes needed?
    
    Fix: #3732
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    UT added and test with `./gradlew clients:client-python:test`
    
    ---------
    
    Co-authored-by: TimWang <[email protected]>
---
 .../gravitino/api/supports_schemas.py              |  35 ++--
 .../gravitino/catalog/base_schema_catalog.py       |  85 ++++++----
 .../gravitino/catalog/fileset_catalog.py           |  80 ++++++---
 .../gravitino/client/gravitino_metalake.py         |  14 +-
 .../client-python/gravitino/dto/dto_converters.py  |   5 +-
 clients/client-python/gravitino/filesystem/gvfs.py |  11 +-
 clients/client-python/gravitino/name_identifier.py | 180 ++-------------------
 clients/client-python/gravitino/namespace.py       | 169 ++-----------------
 clients/client-python/gravitino/rest/rest_utils.py |  13 ++
 .../tests/integration/test_catalog.py              |   4 +-
 .../tests/integration/test_fileset_catalog.py      |  20 +--
 .../client-python/tests/integration/test_schema.py |  26 ++-
 .../tests/integration/test_simple_auth_client.py   |  18 +--
 clients/client-python/tests/unittests/mock_base.py |  14 +-
 .../tests/unittests/test_gvfs_with_local.py        |  10 +-
 .../tests/unittests/test_name_identifier.py        |  17 +-
 16 files changed, 247 insertions(+), 454 deletions(-)

diff --git a/clients/client-python/gravitino/api/supports_schemas.py 
b/clients/client-python/gravitino/api/supports_schemas.py
index 068f59c99..2459a4238 100644
--- a/clients/client-python/gravitino/api/supports_schemas.py
+++ b/clients/client-python/gravitino/api/supports_schemas.py
@@ -23,8 +23,6 @@ from typing import List, Dict
 
 from gravitino.api.schema import Schema
 from gravitino.api.schema_change import SchemaChange
-from gravitino.name_identifier import NameIdentifier
-from gravitino.namespace import Namespace
 
 
 class NoSuchSchemaException(Exception):
@@ -40,50 +38,47 @@ class SupportsSchemas(ABC):
     """
 
     @abstractmethod
-    def list_schemas(self, namespace: Namespace) -> List[NameIdentifier]:
-        """List schemas under a namespace.
+    def list_schemas(self) -> List[str]:
+        """List schemas under the entity.
 
         If an entity such as a table, view exists, its parent schemas must 
also exist and must be
         returned by this discovery method. For example, if table a.b.t exists, 
this method invoked as
-        list_schemas(a) must return [a.b] in the result array.
-
-        Args:
-            namespace: The namespace to list.
+        listSchemas(a) must return [b] in the result array
 
         Raises:
             NoSuchCatalogException: If the catalog does not exist.
 
         Returns:
-            A list of schema identifiers under the namespace.
+            A list of schema names under the namespace.
         """
         pass
 
-    def schema_exists(self, ident: NameIdentifier) -> bool:
+    def schema_exists(self, schema_name: str) -> bool:
         """Check if a schema exists.
 
         If an entity such as a table, view exists, its parent namespaces must 
also exist. For
         example, if table a.b.t exists, this method invoked as 
schema_exists(a.b) must return true.
 
         Args:
-            ident: The name identifier of the schema.
+            schema_name: The name of the schema.
 
         Returns:
             True if the schema exists, false otherwise.
         """
         try:
-            self.load_schema(ident)
+            self.load_schema(schema_name)
             return True
         except NoSuchSchemaException:
             return False
 
     @abstractmethod
     def create_schema(
-        self, ident: NameIdentifier, comment: str, properties: Dict[str, str]
+        self, schema_name: str, comment: str, properties: Dict[str, str]
     ) -> Schema:
         """Create a schema in the catalog.
 
         Args:
-            ident: The name identifier of the schema.
+            schema_name: The name of the schema.
             comment: The comment of the schema.
             properties: The properties of the schema.
 
@@ -97,11 +92,11 @@ class SupportsSchemas(ABC):
         pass
 
     @abstractmethod
-    def load_schema(self, ident: NameIdentifier) -> Schema:
+    def load_schema(self, schema_name: str) -> Schema:
         """Load metadata properties for a schema.
 
         Args:
-            ident: The name identifier of the schema.
+            schema_name: The name of the schema.
 
         Raises:
             NoSuchSchemaException: If the schema does not exist (optional).
@@ -112,11 +107,11 @@ class SupportsSchemas(ABC):
         pass
 
     @abstractmethod
-    def alter_schema(self, ident: NameIdentifier, *changes: SchemaChange) -> 
Schema:
+    def alter_schema(self, schema_name: str, *changes: SchemaChange) -> Schema:
         """Apply the metadata change to a schema in the catalog.
 
         Args:
-            ident: The name identifier of the schema.
+            schema_name: The name of the schema.
             changes: The metadata changes to apply.
 
         Raises:
@@ -128,12 +123,12 @@ class SupportsSchemas(ABC):
         pass
 
     @abstractmethod
-    def drop_schema(self, ident: NameIdentifier, cascade: bool) -> bool:
+    def drop_schema(self, schema_name: str, cascade: bool) -> bool:
         """Drop a schema from the catalog. If cascade option is true, 
recursively
         drop all objects within the schema.
 
         Args:
-            ident: The name identifier of the schema.
+            schema_name: The name of the schema.
             cascade: If true, recursively drop all objects within the schema.
 
         Returns:
diff --git a/clients/client-python/gravitino/catalog/base_schema_catalog.py 
b/clients/client-python/gravitino/catalog/base_schema_catalog.py
index b6d54bb86..93fd40aac 100644
--- a/clients/client-python/gravitino/catalog/base_schema_catalog.py
+++ b/clients/client-python/gravitino/catalog/base_schema_catalog.py
@@ -19,7 +19,7 @@ under the License.
 """
 
 import logging
-from typing import Dict
+from typing import Dict, List
 
 from gravitino.api.catalog import Catalog
 from gravitino.api.schema import Schema
@@ -33,9 +33,9 @@ from gravitino.dto.requests.schema_updates_request import 
SchemaUpdatesRequest
 from gravitino.dto.responses.drop_response import DropResponse
 from gravitino.dto.responses.entity_list_response import EntityListResponse
 from gravitino.dto.responses.schema_response import SchemaResponse
-from gravitino.name_identifier import NameIdentifier
 from gravitino.namespace import Namespace
 from gravitino.utils import HTTPClient
+from gravitino.rest.rest_utils import encode_string
 
 logger = logging.getLogger(__name__)
 
@@ -47,11 +47,15 @@ class BaseSchemaCatalog(CatalogDTO, SupportsSchemas):
     create, load, alter and drop a schema with specified identifier.
     """
 
+    # The REST client to send the requests.
     rest_client: HTTPClient
-    """The REST client to send the requests."""
+
+    # The namespace of current catalog, which is the metalake name.
+    _catalog_namespace: Namespace
 
     def __init__(
         self,
+        catalog_namespace: Namespace,
         name: str = None,
         catalog_type: Catalog.Type = Catalog.Type.UNSUPPORTED,
         provider: str = None,
@@ -69,42 +73,42 @@ class BaseSchemaCatalog(CatalogDTO, SupportsSchemas):
             _audit=audit,
         )
         self.rest_client = rest_client
+        self._catalog_namespace = catalog_namespace
+
+        self.validate()
 
     def as_schemas(self):
         return self
 
-    def list_schemas(self, namespace: Namespace) -> [NameIdentifier]:
+    def list_schemas(self) -> List[str]:
         """List all the schemas under the given catalog namespace.
 
-        Args:
-             namespace: The namespace of the catalog.
-
         Raises:
             NoSuchCatalogException if the catalog with specified namespace 
does not exist.
 
         Returns:
-             A list of {@link NameIdentifier} of the schemas under the given 
catalog namespace.
+             A list of schema names under the given catalog namespace.
         """
-        Namespace.check_schema(namespace)
         resp = self.rest_client.get(
-            BaseSchemaCatalog.format_schema_request_path(namespace)
+            
BaseSchemaCatalog.format_schema_request_path(self._schema_namespace())
         )
         entity_list_response = EntityListResponse.from_json(
             resp.body, infer_missing=True
         )
         entity_list_response.validate()
-        return entity_list_response.identifiers()
+
+        return [ident.name() for ident in entity_list_response.identifiers()]
 
     def create_schema(
         self,
-        ident: NameIdentifier = None,
+        schema_name: str = None,
         comment: str = None,
         properties: Dict[str, str] = None,
     ) -> Schema:
         """Create a new schema with specified identifier, comment and metadata.
 
         Args:
-            ident: The name identifier of the schema.
+            schema_name: The name of the schema.
             comment: The comment of the schema.
             properties: The properties of the schema.
 
@@ -115,23 +119,23 @@ class BaseSchemaCatalog(CatalogDTO, SupportsSchemas):
         Returns:
              The created Schema.
         """
-        NameIdentifier.check_schema(ident)
-        req = SchemaCreateRequest(ident.name(), comment, properties)
+        req = SchemaCreateRequest(encode_string(schema_name), comment, 
properties)
         req.validate()
 
         resp = self.rest_client.post(
-            BaseSchemaCatalog.format_schema_request_path(ident.namespace()), 
json=req
+            
BaseSchemaCatalog.format_schema_request_path(self._schema_namespace()),
+            json=req,
         )
         schema_response = SchemaResponse.from_json(resp.body, 
infer_missing=True)
         schema_response.validate()
 
         return schema_response.schema()
 
-    def load_schema(self, ident: NameIdentifier) -> Schema:
+    def load_schema(self, schema_name: str) -> Schema:
         """Load the schema with specified identifier.
 
         Args:
-            ident: The name identifier of the schema.
+            schema_name: The name of the schema.
 
         Raises:
             NoSuchSchemaException if the schema with specified identifier does 
not exist.
@@ -139,22 +143,21 @@ class BaseSchemaCatalog(CatalogDTO, SupportsSchemas):
         Returns:
             The Schema with specified identifier.
         """
-        NameIdentifier.check_schema(ident)
         resp = self.rest_client.get(
-            BaseSchemaCatalog.format_schema_request_path(ident.namespace())
+            
BaseSchemaCatalog.format_schema_request_path(self._schema_namespace())
             + "/"
-            + ident.name()
+            + encode_string(schema_name)
         )
         schema_response = SchemaResponse.from_json(resp.body, 
infer_missing=True)
         schema_response.validate()
 
         return schema_response.schema()
 
-    def alter_schema(self, ident: NameIdentifier, *changes: SchemaChange) -> 
Schema:
+    def alter_schema(self, schema_name: str, *changes: SchemaChange) -> Schema:
         """Alter the schema with specified identifier by applying the changes.
 
         Args:
-             ident: The name identifier of the schema.
+            schema_name: The name of the schema.
             changes: The metadata changes to apply.
 
         Raises:
@@ -163,27 +166,26 @@ class BaseSchemaCatalog(CatalogDTO, SupportsSchemas):
         Returns:
             The altered Schema.
         """
-        NameIdentifier.check_schema(ident)
         reqs = [
             BaseSchemaCatalog.to_schema_update_request(change) for change in 
changes
         ]
         updates_request = SchemaUpdatesRequest(reqs)
         updates_request.validate()
         resp = self.rest_client.put(
-            BaseSchemaCatalog.format_schema_request_path(ident.namespace())
+            
BaseSchemaCatalog.format_schema_request_path(self._schema_namespace())
             + "/"
-            + ident.name(),
+            + encode_string(schema_name),
             updates_request,
         )
         schema_response = SchemaResponse.from_json(resp.body, 
infer_missing=True)
         schema_response.validate()
         return schema_response.schema()
 
-    def drop_schema(self, ident: NameIdentifier, cascade: bool) -> bool:
+    def drop_schema(self, schema_name: str, cascade: bool) -> bool:
         """Drop the schema with specified identifier.
 
         Args:
-            ident: The name identifier of the schema.
+            schema_name: The name of the schema.
             cascade: Whether to drop all the tables under the schema.
 
         Raises:
@@ -192,22 +194,24 @@ class BaseSchemaCatalog(CatalogDTO, SupportsSchemas):
         Returns:
              true if the schema is dropped successfully, false otherwise.
         """
-        NameIdentifier.check_schema(ident)
         try:
             params = {"cascade": str(cascade)}
             resp = self.rest_client.delete(
-                BaseSchemaCatalog.format_schema_request_path(ident.namespace())
+                
BaseSchemaCatalog.format_schema_request_path(self._schema_namespace())
                 + "/"
-                + ident.name(),
+                + encode_string(schema_name),
                 params=params,
             )
             drop_resp = DropResponse.from_json(resp.body, infer_missing=True)
             drop_resp.validate()
             return drop_resp.dropped()
         except Exception:
-            logger.warning("Failed to drop schema %s", ident)
+            logger.warning("Failed to drop schema %s", schema_name)
             return False
 
+    def _schema_namespace(self) -> Namespace:
+        return Namespace.of(self._catalog_namespace.level(0), self.name())
+
     @staticmethod
     def format_schema_request_path(ns: Namespace):
         return "api/metalakes/" + ns.level(0) + "/catalogs/" + ns.level(1) + 
"/schemas"
@@ -221,3 +225,20 @@ class BaseSchemaCatalog(CatalogDTO, SupportsSchemas):
         if isinstance(change, SchemaChange.RemoveProperty):
             return 
SchemaUpdateRequest.RemoveSchemaPropertyRequest(change.property())
         raise ValueError(f"Unknown change type: {type(change).__name__}")
+
+    def validate(self):
+        Namespace.check(
+            self._catalog_namespace is not None
+            and self._catalog_namespace.length() == 1,
+            f"Catalog namespace must be non-null and have 1 level, the input 
namespace is {self._catalog_namespace}",
+        )
+
+        assert self.rest_client is not None, "restClient must be set"
+        assert (
+            self.name() is not None and len(self.name().strip()) > 0
+        ), "name must not be blank"
+        assert self.type() is not None, "type must not be None"
+        assert (
+            self.provider() is not None and len(self.provider().strip()) > 0
+        ), "provider must not be blank"
+        assert self.audit_info() is not None, "audit must not be None"
diff --git a/clients/client-python/gravitino/catalog/fileset_catalog.py 
b/clients/client-python/gravitino/catalog/fileset_catalog.py
index ae3bb2cc1..c7bd3af47 100644
--- a/clients/client-python/gravitino/catalog/fileset_catalog.py
+++ b/clients/client-python/gravitino/catalog/fileset_catalog.py
@@ -35,6 +35,7 @@ from gravitino.dto.responses.fileset_response import 
FilesetResponse
 from gravitino.name_identifier import NameIdentifier
 from gravitino.namespace import Namespace
 from gravitino.utils import HTTPClient
+from gravitino.rest.rest_utils import encode_string
 
 logger = logging.getLogger(__name__)
 
@@ -47,6 +48,7 @@ class FilesetCatalog(BaseSchemaCatalog):
 
     def __init__(
         self,
+        namespace: Namespace,
         name: str = None,
         catalog_type: Catalog.Type = Catalog.Type.UNSUPPORTED,
         provider: str = None,
@@ -57,7 +59,14 @@ class FilesetCatalog(BaseSchemaCatalog):
     ):
 
         super().__init__(
-            name, catalog_type, provider, comment, properties, audit, 
rest_client
+            namespace,
+            name,
+            catalog_type,
+            provider,
+            comment,
+            properties,
+            audit,
+            rest_client,
         )
 
     def as_fileset_catalog(self):
@@ -67,23 +76,29 @@ class FilesetCatalog(BaseSchemaCatalog):
         """List the filesets in a schema namespace from the catalog.
 
         Args:
-            namespace A schema namespace.
+            namespace: A schema namespace. This namespace should have 1 level, 
which is the schema name
 
         Raises:
             NoSuchSchemaException If the schema does not exist.
 
         Returns:
-            An array of fileset identifiers in the namespace.
+            A list of NameIdentifier of filesets under the given namespace.
         """
-        Namespace.check_fileset(namespace)
 
-        resp = 
self.rest_client.get(self.format_fileset_request_path(namespace))
+        self.check_fileset_namespace(namespace)
+
+        full_namespace = self._get_fileset_full_namespace(namespace)
+
+        resp = 
self.rest_client.get(self.format_fileset_request_path(full_namespace))
         entity_list_resp = EntityListResponse.from_json(resp.body, 
infer_missing=True)
         entity_list_resp.validate()
 
-        return entity_list_resp.identifiers()
+        return [
+            NameIdentifier.of(ident.namespace().level(2), ident.name())
+            for ident in entity_list_resp.identifiers()
+        ]
 
-    def load_fileset(self, ident) -> Fileset:
+    def load_fileset(self, ident: NameIdentifier) -> Fileset:
         """Load fileset metadata by {@link NameIdentifier} from the catalog.
 
         Args:
@@ -95,10 +110,12 @@ class FilesetCatalog(BaseSchemaCatalog):
         Returns:
             The fileset metadata.
         """
-        NameIdentifier.check_fileset(ident)
+        self.check_fileset_name_identifier(ident)
+
+        full_namespace = self._get_fileset_full_namespace(ident.namespace())
 
         resp = self.rest_client.get(
-            
f"{self.format_fileset_request_path(ident.namespace())}/{ident.name()}"
+            
f"{self.format_fileset_request_path(full_namespace)}/{encode_string(ident.name())}"
         )
         fileset_resp = FilesetResponse.from_json(resp.body, infer_missing=True)
         fileset_resp.validate()
@@ -134,10 +151,12 @@ class FilesetCatalog(BaseSchemaCatalog):
         Returns:
             The created fileset metadata
         """
-        NameIdentifier.check_fileset(ident)
+        self.check_fileset_name_identifier(ident)
+
+        full_namespace = self._get_fileset_full_namespace(ident.namespace())
 
         req = FilesetCreateRequest(
-            name=ident.name(),
+            name=encode_string(ident.name()),
             comment=comment,
             fileset_type=fileset_type,
             storage_location=storage_location,
@@ -145,14 +164,14 @@ class FilesetCatalog(BaseSchemaCatalog):
         )
 
         resp = self.rest_client.post(
-            self.format_fileset_request_path(ident.namespace()), req
+            self.format_fileset_request_path(full_namespace), req
         )
         fileset_resp = FilesetResponse.from_json(resp.body, infer_missing=True)
         fileset_resp.validate()
 
         return fileset_resp.fileset()
 
-    def alter_fileset(self, ident, *changes) -> Fileset:
+    def alter_fileset(self, ident: NameIdentifier, *changes) -> Fileset:
         """Update a fileset metadata in the catalog.
 
         Args:
@@ -166,7 +185,9 @@ class FilesetCatalog(BaseSchemaCatalog):
         Returns:
             The updated fileset metadata.
         """
-        NameIdentifier.check_fileset(ident)
+        self.check_fileset_name_identifier(ident)
+
+        full_namespace = self._get_fileset_full_namespace(ident.namespace())
 
         updates = [
             FilesetCatalog.to_fileset_update_request(change) for change in 
changes
@@ -175,7 +196,7 @@ class FilesetCatalog(BaseSchemaCatalog):
         req.validate()
 
         resp = self.rest_client.put(
-            
f"{self.format_fileset_request_path(ident.namespace())}/{ident.name()}", req
+            
f"{self.format_fileset_request_path(full_namespace)}/{ident.name()}", req
         )
         fileset_resp = FilesetResponse.from_json(resp.body, infer_missing=True)
         fileset_resp.validate()
@@ -195,10 +216,12 @@ class FilesetCatalog(BaseSchemaCatalog):
              true If the fileset is dropped, false the fileset did not exist.
         """
         try:
-            NameIdentifier.check_fileset(ident)
+            self.check_fileset_name_identifier(ident)
+
+            full_namespace = 
self._get_fileset_full_namespace(ident.namespace())
 
             resp = self.rest_client.delete(
-                
f"{self.format_fileset_request_path(ident.namespace())}/{ident.name()}",
+                
f"{self.format_fileset_request_path(full_namespace)}/{ident.name()}",
             )
             drop_resp = DropResponse.from_json(resp.body, infer_missing=True)
             drop_resp.validate()
@@ -208,10 +231,31 @@ class FilesetCatalog(BaseSchemaCatalog):
             logger.warning("Failed to drop fileset %s: %s", ident, e)
             return False
 
+    @staticmethod
+    def check_fileset_namespace(namespace: Namespace):
+        Namespace.check(
+            namespace is not None and namespace.length() == 1,
+            f"Fileset namespace must be non-null and have 1 level, the input 
namespace is {namespace}",
+        )
+
+    @staticmethod
+    def check_fileset_name_identifier(ident: NameIdentifier):
+        NameIdentifier.check(ident is not None, "NameIdentifier must not be 
None")
+        NameIdentifier.check(
+            ident.name() is not None and len(ident.name()) != 0,
+            "NameIdentifier name must not be empty",
+        )
+        FilesetCatalog.check_fileset_namespace(ident.namespace())
+
+    def _get_fileset_full_namespace(self, table_namespace: Namespace) -> 
Namespace:
+        return Namespace.of(
+            self._catalog_namespace.level(0), self.name(), 
table_namespace.level(0)
+        )
+
     @staticmethod
     def format_fileset_request_path(namespace: Namespace) -> str:
         schema_ns = Namespace.of(namespace.level(0), namespace.level(1))
-        return 
f"{BaseSchemaCatalog.format_schema_request_path(schema_ns)}/{namespace.level(2)}/filesets"
+        return 
f"{BaseSchemaCatalog.format_schema_request_path(schema_ns)}/{encode_string(namespace.level(2))}/filesets"
 
     @staticmethod
     def to_fileset_update_request(change: FilesetChange):
diff --git a/clients/client-python/gravitino/client/gravitino_metalake.py 
b/clients/client-python/gravitino/client/gravitino_metalake.py
index af77fbf8c..07654cb53 100644
--- a/clients/client-python/gravitino/client/gravitino_metalake.py
+++ b/clients/client-python/gravitino/client/gravitino_metalake.py
@@ -99,7 +99,7 @@ class GravitinoMetalake(MetalakeDTO):
         catalog_list = CatalogListResponse.from_json(response.body, 
infer_missing=True)
 
         return [
-            DTOConverters.to_catalog(catalog, self.rest_client)
+            DTOConverters.to_catalog(self.name(), catalog, self.rest_client)
             for catalog in catalog_list.catalogs()
         ]
 
@@ -119,7 +119,9 @@ class GravitinoMetalake(MetalakeDTO):
         response = self.rest_client.get(url)
         catalog_resp = CatalogResponse.from_json(response.body, 
infer_missing=True)
 
-        return DTOConverters.to_catalog(catalog_resp.catalog(), 
self.rest_client)
+        return DTOConverters.to_catalog(
+            self.name(), catalog_resp.catalog(), self.rest_client
+        )
 
     def create_catalog(
         self,
@@ -159,7 +161,9 @@ class GravitinoMetalake(MetalakeDTO):
         response = self.rest_client.post(url, json=catalog_create_request)
         catalog_resp = CatalogResponse.from_json(response.body, 
infer_missing=True)
 
-        return DTOConverters.to_catalog(catalog_resp.catalog(), 
self.rest_client)
+        return DTOConverters.to_catalog(
+            self.name(), catalog_resp.catalog(), self.rest_client
+        )
 
     def alter_catalog(self, name: str, *changes: CatalogChange) -> Catalog:
         """Alter the catalog with specified name by applying the changes.
@@ -185,7 +189,9 @@ class GravitinoMetalake(MetalakeDTO):
         catalog_response = CatalogResponse.from_json(response.body, 
infer_missing=True)
         catalog_response.validate()
 
-        return DTOConverters.to_catalog(catalog_response.catalog(), 
self.rest_client)
+        return DTOConverters.to_catalog(
+            self.name(), catalog_response.catalog(), self.rest_client
+        )
 
     def drop_catalog(self, name: str) -> bool:
         """Drop the catalog with specified name.
diff --git a/clients/client-python/gravitino/dto/dto_converters.py 
b/clients/client-python/gravitino/dto/dto_converters.py
index a03b0d467..3405ff9f3 100644
--- a/clients/client-python/gravitino/dto/dto_converters.py
+++ b/clients/client-python/gravitino/dto/dto_converters.py
@@ -26,6 +26,7 @@ from gravitino.dto.requests.catalog_update_request import 
CatalogUpdateRequest
 from gravitino.dto.requests.metalake_update_request import 
MetalakeUpdateRequest
 from gravitino.api.metalake_change import MetalakeChange
 from gravitino.utils import HTTPClient
+from gravitino.namespace import Namespace
 
 
 class DTOConverters:
@@ -52,9 +53,11 @@ class DTOConverters:
         raise ValueError(f"Unknown change type: {type(change).__name__}")
 
     @staticmethod
-    def to_catalog(catalog: CatalogDTO, client: HTTPClient):
+    def to_catalog(metalake: str, catalog: CatalogDTO, client: HTTPClient):
+        namespace = Namespace.of(metalake)
         if catalog.type() == Catalog.Type.FILESET:
             return FilesetCatalog(
+                namespace=namespace,
                 name=catalog.name(),
                 catalog_type=catalog.type(),
                 provider=catalog.provider(),
diff --git a/clients/client-python/gravitino/filesystem/gvfs.py 
b/clients/client-python/gravitino/filesystem/gvfs.py
index f719851ff..45c299824 100644
--- a/clients/client-python/gravitino/filesystem/gvfs.py
+++ b/clients/client-python/gravitino/filesystem/gvfs.py
@@ -571,7 +571,7 @@ class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
 
         match = self._identifier_pattern.match(path)
         if match and len(match.groups()) == 3:
-            return NameIdentifier.of_fileset(
+            return NameIdentifier.of(
                 self._metalake, match.group(1), match.group(2), match.group(3)
             )
         raise GravitinoRuntimeException(
@@ -584,12 +584,11 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
         :param identifier: The fileset identifier
         :return The fileset
         """
-        catalog: Catalog = self._client.load_catalog(
-            NameIdentifier.of_catalog(
-                identifier.namespace().level(0), 
identifier.namespace().level(1)
-            )
+        catalog: Catalog = 
self._client.load_catalog(identifier.namespace().level(1))
+
+        return catalog.as_fileset_catalog().load_fileset(
+            NameIdentifier.of(identifier.namespace().level(2), 
identifier.name())
         )
-        return catalog.as_fileset_catalog().load_fileset(identifier)
 
     def _get_actual_path_by_ident(
         self,
diff --git a/clients/client-python/gravitino/name_identifier.py 
b/clients/client-python/gravitino/name_identifier.py
index 51f0fd0d5..069e22500 100644
--- a/clients/client-python/gravitino/name_identifier.py
+++ b/clients/client-python/gravitino/name_identifier.py
@@ -28,6 +28,8 @@ from gravitino.exceptions.illegal_name_identifier_exception 
import (
 )
 from gravitino.namespace import Namespace
 
+# TODO: delete redundant methods
+
 
 @dataclass
 class NameIdentifier(DataClassJsonMixin):
@@ -40,7 +42,13 @@ class NameIdentifier(DataClassJsonMixin):
     _DOT: ClassVar[str] = "."
 
     _name: str = field(metadata=config(field_name="name"))
-    _namespace: Namespace = field(metadata=config(field_name="namespace"))
+    _namespace: Namespace = field(
+        metadata=config(
+            field_name="namespace",
+            encoder=Namespace.to_json,
+            decoder=Namespace.from_json,
+        )
+    )
 
     @classmethod
     def builder(cls, namespace: Namespace, name: str):
@@ -72,176 +80,6 @@ class NameIdentifier(DataClassJsonMixin):
 
         return NameIdentifier.builder(Namespace.of(*names[:-1]), names[-1])
 
-    @staticmethod
-    def of_namespace(namespace: Namespace, name: str) -> "NameIdentifier":
-        """Create the NameIdentifier with the given Namespace and name.
-
-        Args:
-            namespace: The namespace of the identifier
-            name: The name of the identifier
-
-        Returns:
-            The created NameIdentifier
-        """
-        return NameIdentifier.builder(namespace, name)
-
-    @staticmethod
-    def of_metalake(metalake: str) -> "NameIdentifier":
-        """Create the metalake NameIdentifier with the given name.
-
-        Args:
-            metalake: The metalake name
-
-        Returns:
-            The created metalake NameIdentifier
-        """
-        return NameIdentifier.of(metalake)
-
-    @staticmethod
-    def of_catalog(metalake: str, catalog: str) -> "NameIdentifier":
-        """Create the catalog NameIdentifier with the given metalake and 
catalog name.
-
-        Args:
-            metalake: The metalake name
-            catalog: The catalog name
-
-        Returns:
-            The created catalog NameIdentifier
-        """
-        return NameIdentifier.of(metalake, catalog)
-
-    @staticmethod
-    def of_schema(metalake: str, catalog: str, schema: str) -> 
"NameIdentifier":
-        """Create the schema NameIdentifier with the given metalake, catalog 
and schema name.
-
-        Args:
-            metalake: The metalake name
-            catalog: The catalog name
-            schema: The schema name
-
-        Returns:
-            The created schema NameIdentifier
-        """
-        return NameIdentifier.of(metalake, catalog, schema)
-
-    @staticmethod
-    def of_table(
-        metalake: str, catalog: str, schema: str, table: str
-    ) -> "NameIdentifier":
-        """Create the table NameIdentifier with the given metalake, catalog, 
schema and table name.
-
-        Args:
-            metalake: The metalake name
-            catalog: The catalog name
-            schema: The schema name
-            table: The table name
-
-        Returns:
-            The created table NameIdentifier
-        """
-        return NameIdentifier.of(metalake, catalog, schema, table)
-
-    @staticmethod
-    def of_fileset(
-        metalake: str, catalog: str, schema: str, fileset: str
-    ) -> "NameIdentifier":
-        """Create the fileset NameIdentifier with the given metalake, catalog, 
schema and fileset name.
-
-        Args:
-            metalake: The metalake name
-            catalog: The catalog name
-            schema: The schema name
-            fileset: The fileset name
-
-        Returns:
-            The created fileset NameIdentifier
-        """
-        return NameIdentifier.of(metalake, catalog, schema, fileset)
-
-    @staticmethod
-    def of_topic(
-        metalake: str, catalog: str, schema: str, topic: str
-    ) -> "NameIdentifier":
-        """Create the topic NameIdentifier with the given metalake, catalog, 
schema and topic
-        name.
-
-        Args:
-            metalake: The metalake name
-            catalog: The catalog name
-            schema: The schema name
-            topic: The topic name
-
-        Returns:
-            The created topic NameIdentifier
-        """
-        return NameIdentifier.of(metalake, catalog, schema, topic)
-
-    @staticmethod
-    def check_metalake(ident: "NameIdentifier") -> None:
-        """Check the given NameIdentifier is a metalake identifier. Throw an 
{@link
-        IllegalNameIdentifierException} if it's not.
-
-        Args:
-            ident: The metalake NameIdentifier to check.
-        """
-        NameIdentifier.check(ident is not None, "Metalake identifier must not 
be null")
-        Namespace.check_metalake(ident.namespace())
-
-    @staticmethod
-    def check_catalog(ident: "NameIdentifier") -> None:
-        """Check the given NameIdentifier is a catalog identifier. Throw an 
{@link
-        IllegalNameIdentifierException} if it's not.
-
-        Args:
-            ident: The catalog NameIdentifier to check.
-        """
-        NameIdentifier.check(ident is not None, "Catalog identifier must not 
be null")
-        Namespace.check_catalog(ident.namespace())
-
-    @staticmethod
-    def check_schema(ident: "NameIdentifier") -> None:
-        """Check the given NameIdentifier is a schema identifier. Throw an 
{@link
-        IllegalNameIdentifierException} if it's not.
-
-        Args:
-            ident: The schema NameIdentifier to check.
-        """
-        NameIdentifier.check(ident is not None, "Schema identifier must not be 
null")
-        Namespace.check_schema(ident.namespace())
-
-    @staticmethod
-    def check_table(ident: "NameIdentifier") -> None:
-        """Check the given NameIdentifier is a table identifier. Throw an 
{@link
-        IllegalNameIdentifierException} if it's not.
-
-        Args:
-            ident: The table NameIdentifier to check.
-        """
-        NameIdentifier.check(ident is not None, "Table identifier must not be 
null")
-        Namespace.check_table(ident.namespace())
-
-    @staticmethod
-    def check_fileset(ident: "NameIdentifier") -> None:
-        """Check the given NameIdentifier is a fileset identifier. Throw an 
{@link
-        IllegalNameIdentifierException} if it's not.
-
-        Args:
-            ident: The fileset NameIdentifier to check.
-        """
-        NameIdentifier.check(ident is not None, "Fileset identifier must not 
be null")
-        Namespace.check_fileset(ident.namespace())
-
-    @staticmethod
-    def check_topic(ident: "NameIdentifier") -> None:
-        """Check the given NameIdentifier is a topic identifier. Throw an 
{@link
-        IllegalNameIdentifierException} if it's not.
-
-        Args:
-            ident: The topic NameIdentifier to check.
-        """
-        NameIdentifier.check(ident is not None, "Topic identifier must not be 
null")
-        Namespace.check_topic(ident.namespace())
-
     @staticmethod
     def parse(identifier: str) -> "NameIdentifier":
         """Create a NameIdentifier from the given identifier string.
diff --git a/clients/client-python/gravitino/namespace.py 
b/clients/client-python/gravitino/namespace.py
index 8e549e2ab..597316d68 100644
--- a/clients/client-python/gravitino/namespace.py
+++ b/clients/client-python/gravitino/namespace.py
@@ -18,8 +18,11 @@ under the License.
 
 """
 
+import json
 from typing import List, ClassVar
 
+# TODO: delete redundant methods
+
 
 class Namespace:
     """A namespace is a sequence of levels separated by dots. It's used to 
identify a metalake, a
@@ -29,11 +32,21 @@ class Namespace:
 
     _DOT: ClassVar[str] = "."
 
-    _levels: List[str] = []
+    _levels: List[str]
 
     def __init__(self, levels: List[str]):
         self._levels = levels
 
+    def to_json(self):
+        return json.dumps(self._levels)
+
+    @classmethod
+    def from_json(cls, levels):
+        assert levels is not None and isinstance(
+            levels, list
+        ), f"Cannot parse name identifier from invalid JSON: {levels}"
+        return cls(levels)
+
     @staticmethod
     def empty() -> "Namespace":
         """Get an empty namespace.
@@ -67,160 +80,6 @@ class Namespace:
 
         return Namespace(list(levels))
 
-    @staticmethod
-    def of_metalake() -> "Namespace":
-        """Create a namespace for metalake.
-
-        Returns:
-            A namespace for metalake
-        """
-        return Namespace.empty()
-
-    @staticmethod
-    def of_catalog(metalake: str) -> "Namespace":
-        """Create a namespace for catalog.
-
-        Args:
-            metalake: The metalake name
-
-        Returns:
-            A namespace for catalog
-        """
-        return Namespace.of(metalake)
-
-    @staticmethod
-    def of_schema(metalake: str, catalog: str) -> "Namespace":
-        """Create a namespace for schema.
-
-        Args:
-            metalake: The metalake name
-            catalog: The catalog name
-
-        Returns:
-             A namespace for schema
-        """
-        return Namespace.of(metalake, catalog)
-
-    @staticmethod
-    def of_table(metalake: str, catalog: str, schema: str) -> "Namespace":
-        """Create a namespace for table.
-
-        Args:
-            metalake: The metalake name
-            catalog: The catalog name
-            schema: The schema name
-
-        Returns:
-             A namespace for table
-        """
-        return Namespace.of(metalake, catalog, schema)
-
-    @staticmethod
-    def of_fileset(metalake: str, catalog: str, schema: str) -> "Namespace":
-        """Create a namespace for fileset.
-
-        Args:
-            metalake: The metalake name
-            catalog: The catalog name
-            schema: The schema name
-
-        Returns:
-             A namespace for fileset
-        """
-        return Namespace.of(metalake, catalog, schema)
-
-    @staticmethod
-    def of_topic(metalake: str, catalog: str, schema: str) -> "Namespace":
-        """Create a namespace for topic.
-
-        Args:
-            metalake: The metalake name
-            catalog: The catalog name
-            schema: The schema name
-
-        Returns:
-             A namespace for topic
-        """
-        return Namespace.of(metalake, catalog, schema)
-
-    @staticmethod
-    def check_metalake(namespace: "Namespace") -> None:
-        """Check if the given metalake namespace is legal, throw an 
IllegalNamespaceException if
-        it's illegal.
-
-        Args:
-            namespace: The metalake namespace
-        """
-        Namespace.check(
-            namespace is not None and namespace.is_empty(),
-            f"Metalake namespace must be non-null and empty, the input 
namespace is {namespace}",
-        )
-
-    @staticmethod
-    def check_catalog(namespace: "Namespace") -> None:
-        """Check if the given catalog namespace is legal, throw an 
IllegalNamespaceException if
-        it's illegal.
-
-        Args:
-            namespace: The catalog namespace
-        """
-        Namespace.check(
-            namespace is not None and namespace.length() == 1,
-            f"Catalog namespace must be non-null and have 1 level, the input 
namespace is {namespace}",
-        )
-
-    @staticmethod
-    def check_schema(namespace: "Namespace") -> None:
-        """Check if the given schema namespace is legal, throw an 
IllegalNamespaceException if
-        it's illegal.
-
-        Args:
-            namespace: The schema namespace
-        """
-        Namespace.check(
-            namespace is not None and namespace.length() == 2,
-            f"Schema namespace must be non-null and have 2 levels, the input 
namespace is {namespace}",
-        )
-
-    @staticmethod
-    def check_table(namespace: "Namespace") -> None:
-        """Check if the given table namespace is legal, throw an 
IllegalNamespaceException if it's
-        illegal.
-
-        Args:
-            namespace: The table namespace
-        """
-        Namespace.check(
-            namespace is not None and namespace.length() == 3,
-            f"Table namespace must be non-null and have 3 levels, the input 
namespace is {namespace}",
-        )
-
-    @staticmethod
-    def check_fileset(namespace: "Namespace") -> None:
-        """Check if the given fileset namespace is legal, throw an 
IllegalNamespaceException if
-        it's illegal.
-
-        Args:
-            namespace: The fileset namespace
-        """
-        Namespace.check(
-            namespace is not None and namespace.length() == 3,
-            f"Fileset namespace must be non-null and have 3 levels, the input 
namespace is {namespace}",
-        )
-
-    @staticmethod
-    def check_topic(namespace: "Namespace") -> None:
-        """Check if the given topic namespace is legal, throw an 
IllegalNamespaceException if it's
-        illegal.
-
-        Args:
-            namespace: The topic namespace
-        """
-        Namespace.check(
-            namespace is not None and namespace.length() == 3,
-            f"Topic namespace must be non-null and have 3 levels, the input 
namespace is {namespace}",
-        )
-
     def levels(self) -> List[str]:
         """Get the levels of the namespace.
 
diff --git a/clients/client-python/gravitino/rest/rest_utils.py 
b/clients/client-python/gravitino/rest/rest_utils.py
new file mode 100644
index 000000000..21157602e
--- /dev/null
+++ b/clients/client-python/gravitino/rest/rest_utils.py
@@ -0,0 +1,13 @@
+"""
+Copyright 2024 Datastrato Pvt Ltd.
+This software is licensed under the Apache License version 2.
+"""
+
+import urllib.parse
+
+
+def encode_string(to_encode: str):
+
+    assert to_encode is not None, "Invalid string to encode: None"
+
+    return urllib.parse.quote(to_encode)
diff --git a/clients/client-python/tests/integration/test_catalog.py 
b/clients/client-python/tests/integration/test_catalog.py
index 1535e709c..3f8d2ae4b 100644
--- a/clients/client-python/tests/integration/test_catalog.py
+++ b/clients/client-python/tests/integration/test_catalog.py
@@ -39,9 +39,7 @@ class TestCatalog(IntegrationTestEnv):
     catalog_location_prop: str = "location"  # Fileset Catalog must set 
`location`
     catalog_provider: str = "hadoop"
 
-    catalog_ident: NameIdentifier = NameIdentifier.of_catalog(
-        metalake_name, catalog_name
-    )
+    catalog_ident: NameIdentifier = NameIdentifier.of(metalake_name, 
catalog_name)
 
     gravitino_admin_client: GravitinoAdminClient = GravitinoAdminClient(
         uri="http://localhost:8090";
diff --git a/clients/client-python/tests/integration/test_fileset_catalog.py 
b/clients/client-python/tests/integration/test_fileset_catalog.py
index 36b6362f4..a740320c9 100644
--- a/clients/client-python/tests/integration/test_fileset_catalog.py
+++ b/clients/client-python/tests/integration/test_fileset_catalog.py
@@ -58,18 +58,12 @@ class TestFilesetCatalog(IntegrationTestEnv):
     }
     fileset_new_name = fileset_name + "_new"
 
-    catalog_ident: NameIdentifier = NameIdentifier.of_catalog(
-        metalake_name, catalog_name
-    )
-    schema_ident: NameIdentifier = NameIdentifier.of_schema(
+    catalog_ident: NameIdentifier = NameIdentifier.of(metalake_name, 
catalog_name)
+    schema_ident: NameIdentifier = NameIdentifier.of(
         metalake_name, catalog_name, schema_name
     )
-    fileset_ident: NameIdentifier = NameIdentifier.of_fileset(
-        metalake_name, catalog_name, schema_name, fileset_name
-    )
-    fileset_new_ident: NameIdentifier = NameIdentifier.of_fileset(
-        metalake_name, catalog_name, schema_name, fileset_new_name
-    )
+    fileset_ident: NameIdentifier = NameIdentifier.of(schema_name, 
fileset_name)
+    fileset_new_ident: NameIdentifier = NameIdentifier.of(schema_name, 
fileset_new_name)
 
     gravitino_admin_client: GravitinoAdminClient = GravitinoAdminClient(
         uri="http://localhost:8090";
@@ -101,7 +95,9 @@ class TestFilesetCatalog(IntegrationTestEnv):
             logger.info(
                 "Drop schema %s[%s]",
                 self.schema_ident,
-                catalog.as_schemas().drop_schema(ident=self.schema_ident, 
cascade=True),
+                catalog.as_schemas().drop_schema(
+                    schema_name=self.schema_name, cascade=True
+                ),
             )
             logger.info(
                 "Drop catalog %s[%s]",
@@ -131,7 +127,7 @@ class TestFilesetCatalog(IntegrationTestEnv):
             properties={self.catalog_location_prop: "/tmp/test1"},
         )
         catalog.as_schemas().create_schema(
-            ident=self.schema_ident, comment="", properties={}
+            schema_name=self.schema_name, comment="", properties={}
         )
 
     def create_fileset(self) -> Fileset:
diff --git a/clients/client-python/tests/integration/test_schema.py 
b/clients/client-python/tests/integration/test_schema.py
index d6678d031..c2e66829e 100644
--- a/clients/client-python/tests/integration/test_schema.py
+++ b/clients/client-python/tests/integration/test_schema.py
@@ -56,13 +56,11 @@ class TestSchema(IntegrationTestEnv):
         schema_properties_key2: schema_properties_value2,
     }
 
-    catalog_ident: NameIdentifier = NameIdentifier.of_catalog(
-        metalake_name, catalog_name
-    )
-    schema_ident: NameIdentifier = NameIdentifier.of_schema(
+    catalog_ident: NameIdentifier = NameIdentifier.of(metalake_name, 
catalog_name)
+    schema_ident: NameIdentifier = NameIdentifier.of(
         metalake_name, catalog_name, schema_name
     )
-    schema_new_ident: NameIdentifier = NameIdentifier.of_schema(
+    schema_new_ident: NameIdentifier = NameIdentifier.of(
         metalake_name, catalog_name, schema_new_name
     )
 
@@ -101,12 +99,12 @@ class TestSchema(IntegrationTestEnv):
             logger.info(
                 "Drop schema %s[%s]",
                 self.schema_ident,
-                catalog.as_schemas().drop_schema(self.schema_ident, 
cascade=True),
+                catalog.as_schemas().drop_schema(self.schema_name, 
cascade=True),
             )
             logger.info(
                 "Drop schema %s[%s]",
                 self.schema_new_ident,
-                catalog.as_schemas().drop_schema(self.schema_new_ident, 
cascade=True),
+                catalog.as_schemas().drop_schema(self.schema_new_name, 
cascade=True),
             )
             logger.info(
                 "Drop catalog %s[%s]",
@@ -124,7 +122,7 @@ class TestSchema(IntegrationTestEnv):
     def create_schema(self) -> Schema:
         catalog = self.gravitino_client.load_catalog(name=self.catalog_name)
         return catalog.as_schemas().create_schema(
-            ident=self.schema_ident,
+            schema_name=self.schema_name,
             comment=self.schema_comment,
             properties=self.schema_properties,
         )
@@ -140,21 +138,21 @@ class TestSchema(IntegrationTestEnv):
         self.create_schema()
         catalog = self.gravitino_client.load_catalog(name=self.catalog_name)
         self.assertTrue(
-            catalog.as_schemas().drop_schema(ident=self.schema_ident, 
cascade=True)
+            catalog.as_schemas().drop_schema(schema_name=self.schema_name, 
cascade=True)
         )
 
     def test_list_schema(self):
         self.create_schema()
         catalog = self.gravitino_client.load_catalog(name=self.catalog_name)
-        schema_list: List[NameIdentifier] = catalog.as_schemas().list_schemas(
-            namespace=self.schema_ident.namespace()
+        schema_list: List[str] = catalog.as_schemas().list_schemas()
+        self.assertTrue(
+            any(schema_name == self.schema_name for schema_name in schema_list)
         )
-        self.assertTrue(any(item.name() == self.schema_name for item in 
schema_list))
 
     def test_load_schema(self):
         self.create_schema()
         catalog = self.gravitino_client.load_catalog(name=self.catalog_name)
-        schema = catalog.as_schemas().load_schema(ident=self.schema_ident)
+        schema = catalog.as_schemas().load_schema(schema_name=self.schema_name)
         self.assertIsNotNone(schema)
         self.assertEqual(schema.name(), self.schema_name)
         self.assertEqual(schema.comment(), self.schema_comment)
@@ -172,7 +170,7 @@ class TestSchema(IntegrationTestEnv):
             ),
         )
         catalog = self.gravitino_client.load_catalog(name=self.catalog_name)
-        schema_new = catalog.as_schemas().alter_schema(self.schema_ident, 
*changes)
+        schema_new = catalog.as_schemas().alter_schema(self.schema_name, 
*changes)
         self.assertEqual(
             schema_new.properties().get(self.schema_properties_key2),
             schema_propertie_new_value,
diff --git a/clients/client-python/tests/integration/test_simple_auth_client.py 
b/clients/client-python/tests/integration/test_simple_auth_client.py
index 17b8095f1..dcd4e66b2 100644
--- a/clients/client-python/tests/integration/test_simple_auth_client.py
+++ b/clients/client-python/tests/integration/test_simple_auth_client.py
@@ -58,15 +58,11 @@ class TestSimpleAuthClient(unittest.TestCase):
         fileset_properties_key2: fileset_properties_value2,
     }
 
-    catalog_ident: NameIdentifier = NameIdentifier.of_catalog(
-        metalake_name, catalog_name
-    )
-    schema_ident: NameIdentifier = NameIdentifier.of_schema(
+    catalog_ident: NameIdentifier = NameIdentifier.of(metalake_name, 
catalog_name)
+    schema_ident: NameIdentifier = NameIdentifier.of(
         metalake_name, catalog_name, schema_name
     )
-    fileset_ident: NameIdentifier = NameIdentifier.of_fileset(
-        metalake_name, catalog_name, schema_name, fileset_name
-    )
+    fileset_ident: NameIdentifier = NameIdentifier.of(schema_name, 
fileset_name)
 
     def setUp(self):
         os.environ["GRAVITINO_USER"] = self.creator
@@ -89,7 +85,9 @@ class TestSimpleAuthClient(unittest.TestCase):
             logger.info(
                 "Drop schema %s[%s]",
                 self.schema_ident,
-                catalog.as_schemas().drop_schema(ident=self.schema_ident, 
cascade=True),
+                catalog.as_schemas().drop_schema(
+                    schema_name=self.schema_name, cascade=True
+                ),
             )
             logger.info(
                 "Drop catalog %s[%s]",
@@ -121,7 +119,7 @@ class TestSimpleAuthClient(unittest.TestCase):
             properties={self.catalog_location_prop: "/tmp/test1"},
         )
         catalog.as_schemas().create_schema(
-            ident=self.schema_ident, comment="", properties={}
+            schema_name=self.schema_name, comment="", properties={}
         )
         catalog.as_fileset_catalog().create_fileset(
             ident=self.fileset_ident,
@@ -141,7 +139,7 @@ class TestSimpleAuthClient(unittest.TestCase):
 
     def test_schema_creator(self):
         catalog = self.gravitino_client.load_catalog(self.catalog_name)
-        schema = catalog.as_schemas().load_schema(self.schema_ident)
+        schema = catalog.as_schemas().load_schema(self.schema_name)
         self.assertEqual(schema.audit_info().creator(), self.creator)
 
     def test_fileset_creator(self):
diff --git a/clients/client-python/tests/unittests/mock_base.py 
b/clients/client-python/tests/unittests/mock_base.py
index 3b7130279..b64aa4187 100644
--- a/clients/client-python/tests/unittests/mock_base.py
+++ b/clients/client-python/tests/unittests/mock_base.py
@@ -18,6 +18,7 @@ under the License.
 
 """
 
+import json
 from unittest.mock import patch
 
 from gravitino import GravitinoMetalake, Catalog, Fileset
@@ -25,6 +26,8 @@ from gravitino.catalog.fileset_catalog import FilesetCatalog
 from gravitino.dto.fileset_dto import FilesetDTO
 from gravitino.dto.audit_dto import AuditDTO
 from gravitino.dto.metalake_dto import MetalakeDTO
+from gravitino.namespace import Namespace
+from gravitino.utils.http_client import HTTPClient
 
 
 def mock_load_metalake():
@@ -50,14 +53,18 @@ def mock_load_fileset_catalog():
         _last_modifier="test",
         _last_modified_time="2024-04-05T10:10:35.218Z",
     )
+
+    namespace = Namespace.of("metalake_demo")
+
     catalog = FilesetCatalog(
+        namespace=namespace,
         name="fileset_catalog",
         catalog_type=Catalog.Type.FILESET,
         provider="hadoop",
         comment="this is test",
         properties={"k": "v"},
         audit=audit_dto,
-        rest_client=None,
+        rest_client=HTTPClient("http://localhost:9090";, is_debug=True),
     )
     return catalog
 
@@ -97,3 +104,8 @@ def mock_data(cls):
         pass
 
     return Wrapper
+
+
+def mock_name_identifier_json(name, namespace):
+
+    return json.dumps({"name": name, "namespace": namespace}).encode("utf-8")
diff --git a/clients/client-python/tests/unittests/test_gvfs_with_local.py 
b/clients/client-python/tests/unittests/test_gvfs_with_local.py
index 0c43cfe28..c5cebc764 100644
--- a/clients/client-python/tests/unittests/test_gvfs_with_local.py
+++ b/clients/client-python/tests/unittests/test_gvfs_with_local.py
@@ -89,7 +89,7 @@ class TestLocalFilesystem(unittest.TestCase):
         time.sleep(2)
         self.assertIsNone(
             fs.cache.get(
-                NameIdentifier.of_fileset(
+                NameIdentifier.of(
                     "metalake_demo", "fileset_catalog", "tmp", "test_cache"
                 )
             )
@@ -704,7 +704,7 @@ class TestLocalFilesystem(unittest.TestCase):
             _properties={},
         )
         mock_hdfs_context: FilesetContext = FilesetContext(
-            name_identifier=NameIdentifier.of_fileset(
+            name_identifier=NameIdentifier.of(
                 "test_metalake", "test_catalog", "test_schema", "test_f1"
             ),
             storage_type=StorageType.HDFS,
@@ -744,7 +744,7 @@ class TestLocalFilesystem(unittest.TestCase):
             _properties={},
         )
         mock_local_context: FilesetContext = FilesetContext(
-            name_identifier=NameIdentifier.of_fileset(
+            name_identifier=NameIdentifier.of(
                 "test_metalake", "test_catalog", "test_schema", "test_f1"
             ),
             storage_type=StorageType.LOCAL,
@@ -785,7 +785,7 @@ class TestLocalFilesystem(unittest.TestCase):
             _properties={},
         )
         mock_hdfs_context: FilesetContext = FilesetContext(
-            name_identifier=NameIdentifier.of_fileset(
+            name_identifier=NameIdentifier.of(
                 "test_metalake", "test_catalog", "test_schema", "test_f1"
             ),
             storage_type=StorageType.HDFS,
@@ -825,7 +825,7 @@ class TestLocalFilesystem(unittest.TestCase):
             _properties={},
         )
         mock_local_context: FilesetContext = FilesetContext(
-            name_identifier=NameIdentifier.of_fileset(
+            name_identifier=NameIdentifier.of(
                 "test_metalake", "test_catalog", "test_schema", "test_f1"
             ),
             storage_type=StorageType.LOCAL,
diff --git a/clients/client-python/tests/unittests/test_name_identifier.py 
b/clients/client-python/tests/unittests/test_name_identifier.py
index ca6ff8b27..5fd3464d1 100644
--- a/clients/client-python/tests/unittests/test_name_identifier.py
+++ b/clients/client-python/tests/unittests/test_name_identifier.py
@@ -21,17 +21,30 @@ under the License.
 import unittest
 
 from gravitino import NameIdentifier
+from tests.unittests.mock_base import mock_name_identifier_json
 
 
 class TestNameIdentifier(unittest.TestCase):
     def test_name_identifier_hash(self):
-        name_identifier1: NameIdentifier = NameIdentifier.of_fileset(
+        name_identifier1: NameIdentifier = NameIdentifier.of(
             "test_metalake", "test_catalog", "test_schema", "test_fileset1"
         )
-        name_identifier2: NameIdentifier = NameIdentifier.of_fileset(
+        name_identifier2: NameIdentifier = NameIdentifier.of(
             "test_metalake", "test_catalog", "test_schema", "test_fileset2"
         )
         identifier_dict = {name_identifier1: "test1", name_identifier2: 
"test2"}
 
         self.assertEqual("test1", identifier_dict.get(name_identifier1))
         self.assertNotEqual("test2", identifier_dict.get(name_identifier1))
+
+    def test_from_json_name_identifier(self):
+
+        test_name = "test_name_identifier"
+        test_namespace_levels = ["1", "2", "3", "4"]
+
+        json_data = mock_name_identifier_json(test_name, test_namespace_levels)
+
+        name_identifier = NameIdentifier.from_json(json_data, 
infer_missing=True)
+
+        self.assertEqual(test_name, name_identifier.name())
+        self.assertEqual(test_namespace_levels, 
name_identifier.namespace().levels())

Reply via email to