This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 2c63bc9603 [#6604] Improvement(GVFS): extract methods from gvfs to 
utils for reusing (#6959)
2c63bc9603 is described below

commit 2c63bc9603142394652f4e66119ba0cad85c07ff
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Apr 16 14:02:27 2025 +0800

    [#6604] Improvement(GVFS): extract methods from gvfs to utils for reusing 
(#6959)
    
    ### What changes were proposed in this pull request?
    
    extract methods from gvfs to utils for reusing
    
    ### Why are the changes needed?
    
    Fix: #6604
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    CI pass
    
    Co-authored-by: mchades <[email protected]>
---
 clients/client-python/gravitino/filesystem/gvfs.py | 146 +++++-------------
 .../gravitino/filesystem/gvfs_utils.py             | 165 +++++++++++++++++++++
 .../tests/unittests/test_gvfs_with_local.py        | 118 +++++++--------
 .../hadoop/GravitinoVirtualFileSystem.java         |  68 ++-------
 .../hadoop/GravitinoVirtualFileSystemUtils.java    | 117 ++++++++++++++-
 .../filesystem/hadoop/GravitinoMockServerBase.java |   3 +-
 .../gravitino/filesystem/hadoop/TestGvfsBase.java  | 118 +++++++++------
 7 files changed, 457 insertions(+), 278 deletions(-)

diff --git a/clients/client-python/gravitino/filesystem/gvfs.py 
b/clients/client-python/gravitino/filesystem/gvfs.py
index 8c1d89c757..1d4d680a24 100644
--- a/clients/client-python/gravitino/filesystem/gvfs.py
+++ b/clients/client-python/gravitino/filesystem/gvfs.py
@@ -14,8 +14,10 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import importlib
 import logging
 import os
+import re
 import sys
 
 # Disable C0302: Too many lines in module
@@ -24,45 +26,42 @@ import time
 from enum import Enum
 from pathlib import PurePosixPath
 from typing import Dict, Tuple, List
-import re
-import importlib
-import fsspec
 
+import fsspec
 from cachetools import TTLCache, LRUCache
 from fsspec import AbstractFileSystem
-from fsspec.implementations.local import LocalFileSystem
 from fsspec.implementations.arrow import ArrowFSWrapper
+from fsspec.implementations.local import LocalFileSystem
 from fsspec.utils import infer_storage_options
-
 from readerwriterlock import rwlock
 
 from gravitino.api.catalog import Catalog
+from gravitino.api.credential.adls_token_credential import ADLSTokenCredential
+from gravitino.api.credential.azure_account_key_credential import (
+    AzureAccountKeyCredential,
+)
 from gravitino.api.credential.credential import Credential
+from gravitino.api.credential.gcs_token_credential import GCSTokenCredential
+from gravitino.api.credential.oss_secret_key_credential import 
OSSSecretKeyCredential
+from gravitino.api.credential.oss_token_credential import OSSTokenCredential
+from gravitino.api.credential.s3_secret_key_credential import 
S3SecretKeyCredential
+from gravitino.api.credential.s3_token_credential import S3TokenCredential
 from gravitino.audit.caller_context import CallerContext, CallerContextHolder
 from gravitino.audit.fileset_audit_constants import FilesetAuditConstants
 from gravitino.audit.fileset_data_operation import FilesetDataOperation
 from gravitino.audit.internal_client_type import InternalClientType
-from gravitino.auth.default_oauth2_token_provider import 
DefaultOAuth2TokenProvider
-from gravitino.auth.oauth2_token_provider import OAuth2TokenProvider
-from gravitino.auth.simple_auth_provider import SimpleAuthProvider
-from gravitino.client.generic_fileset import GenericFileset
 from gravitino.client.fileset_catalog import FilesetCatalog
-from gravitino.client.gravitino_client import GravitinoClient
+from gravitino.client.generic_fileset import GenericFileset
 from gravitino.exceptions.base import (
     GravitinoRuntimeException,
 )
 from gravitino.filesystem.gvfs_config import GVFSConfig
-from gravitino.name_identifier import NameIdentifier
-
-from gravitino.api.credential.adls_token_credential import ADLSTokenCredential
-from gravitino.api.credential.azure_account_key_credential import (
-    AzureAccountKeyCredential,
+from gravitino.filesystem.gvfs_utils import (
+    create_client,
+    extract_identifier,
+    get_sub_path_from_virtual_path,
 )
-from gravitino.api.credential.gcs_token_credential import GCSTokenCredential
-from gravitino.api.credential.oss_secret_key_credential import 
OSSSecretKeyCredential
-from gravitino.api.credential.oss_token_credential import OSSTokenCredential
-from gravitino.api.credential.s3_secret_key_credential import 
S3SecretKeyCredential
-from gravitino.api.credential.s3_token_credential import S3TokenCredential
+from gravitino.name_identifier import NameIdentifier
 
 logger = logging.getLogger(__name__)
 
@@ -128,46 +127,7 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
         :param kwargs: Extra args for super filesystem
         """
         self._metalake = metalake_name
-        auth_type = (
-            GVFSConfig.SIMPLE_AUTH_TYPE
-            if options is None
-            else options.get(GVFSConfig.AUTH_TYPE, GVFSConfig.SIMPLE_AUTH_TYPE)
-        )
-        if auth_type == GVFSConfig.SIMPLE_AUTH_TYPE:
-            self._client = GravitinoClient(
-                uri=server_uri,
-                metalake_name=metalake_name,
-                auth_data_provider=SimpleAuthProvider(),
-            )
-        elif auth_type == GVFSConfig.OAUTH2_AUTH_TYPE:
-            oauth2_server_uri = options.get(GVFSConfig.OAUTH2_SERVER_URI)
-            self._check_auth_config(
-                auth_type, GVFSConfig.OAUTH2_SERVER_URI, oauth2_server_uri
-            )
-
-            oauth2_credential = options.get(GVFSConfig.OAUTH2_CREDENTIAL)
-            self._check_auth_config(
-                auth_type, GVFSConfig.OAUTH2_CREDENTIAL, oauth2_credential
-            )
-
-            oauth2_path = options.get(GVFSConfig.OAUTH2_PATH)
-            self._check_auth_config(auth_type, GVFSConfig.OAUTH2_PATH, 
oauth2_path)
-
-            oauth2_scope = options.get(GVFSConfig.OAUTH2_SCOPE)
-            self._check_auth_config(auth_type, GVFSConfig.OAUTH2_SCOPE, 
oauth2_scope)
-
-            oauth2_token_provider: OAuth2TokenProvider = 
DefaultOAuth2TokenProvider(
-                oauth2_server_uri, oauth2_credential, oauth2_path, oauth2_scope
-            )
-            self._client = GravitinoClient(
-                uri=server_uri,
-                metalake_name=metalake_name,
-                auth_data_provider=oauth2_token_provider,
-            )
-        else:
-            raise GravitinoRuntimeException(
-                f"Authentication type {auth_type} is not supported."
-            )
+        self._client = create_client(options, server_uri, metalake_name)
         cache_size = (
             GVFSConfig.DEFAULT_CACHE_SIZE
             if options is None
@@ -216,10 +176,10 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
         actual_path = context_pair.actual_file_location()
         storage_type = self._recognize_storage_type(actual_path)
         pre_process_path: str = self._pre_process_path(path)
-        identifier: NameIdentifier = self._extract_identifier(pre_process_path)
-        sub_path: str = self._get_sub_path_from_virtual_path(
-            identifier, pre_process_path
+        identifier: NameIdentifier = extract_identifier(
+            self._metalake, pre_process_path
         )
+        sub_path: str = get_sub_path_from_virtual_path(identifier, 
pre_process_path)
         storage_location: str = actual_path[: len(actual_path) - len(sub_path)]
         # return entries with details
         if detail:
@@ -259,10 +219,10 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
         actual_path = context_pair.actual_file_location()
         storage_type = self._recognize_storage_type(actual_path)
         pre_process_path: str = self._pre_process_path(path)
-        identifier: NameIdentifier = self._extract_identifier(pre_process_path)
-        sub_path: str = self._get_sub_path_from_virtual_path(
-            identifier, pre_process_path
+        identifier: NameIdentifier = extract_identifier(
+            self._metalake, pre_process_path
         )
+        sub_path: str = get_sub_path_from_virtual_path(identifier, 
pre_process_path)
         storage_location: str = actual_path[: len(actual_path) - len(sub_path)]
         actual_info: Dict = context_pair.filesystem().info(
             self._strip_storage_protocol(storage_type, actual_path)
@@ -294,8 +254,8 @@ class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
         """
         src_path = self._pre_process_path(path1)
         dst_path = self._pre_process_path(path2)
-        src_identifier: NameIdentifier = self._extract_identifier(src_path)
-        dst_identifier: NameIdentifier = self._extract_identifier(dst_path)
+        src_identifier: NameIdentifier = extract_identifier(self._metalake, 
src_path)
+        dst_identifier: NameIdentifier = extract_identifier(self._metalake, 
dst_path)
         if src_identifier != dst_identifier:
             raise GravitinoRuntimeException(
                 f"Destination file path identifier: `{dst_identifier}` should 
be same with src file path "
@@ -329,8 +289,8 @@ class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
         """
         src_path = self._pre_process_path(path1)
         dst_path = self._pre_process_path(path2)
-        src_identifier: NameIdentifier = self._extract_identifier(src_path)
-        dst_identifier: NameIdentifier = self._extract_identifier(dst_path)
+        src_identifier: NameIdentifier = extract_identifier(self._metalake, 
src_path)
+        dst_identifier: NameIdentifier = extract_identifier(self._metalake, 
dst_path)
         if src_identifier != dst_identifier:
             raise GravitinoRuntimeException(
                 f"Destination file path identifier: `{dst_identifier}`"
@@ -705,7 +665,7 @@ class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
         :return A fileset context pair
         """
         virtual_path: str = self._pre_process_path(virtual_path)
-        identifier: NameIdentifier = self._extract_identifier(virtual_path)
+        identifier: NameIdentifier = extract_identifier(self._metalake, 
virtual_path)
         catalog_ident: NameIdentifier = NameIdentifier.of(
             self._metalake, identifier.namespace().level(1)
         )
@@ -714,7 +674,7 @@ class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
             raise GravitinoRuntimeException(
                 f"Loaded fileset catalog: {catalog_ident} is null."
             )
-        sub_path: str = self._get_sub_path_from_virtual_path(identifier, 
virtual_path)
+        sub_path: str = get_sub_path_from_virtual_path(identifier, 
virtual_path)
         context = {
             FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION: 
operation.name,
             FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE: 
InternalClientType.PYTHON_GVFS.name,
@@ -739,25 +699,6 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
             ),
         )
 
-    def _extract_identifier(self, path):
-        """Extract the fileset identifier from the path.
-        :param path: The virtual fileset path
-        :return The fileset identifier
-        """
-        if path is None:
-            raise GravitinoRuntimeException(
-                "path which need be extracted cannot be null or empty."
-            )
-
-        match = self._identifier_pattern.match(path)
-        if match and len(match.groups()) == 3:
-            return NameIdentifier.of(
-                self._metalake, match.group(1), match.group(2), match.group(3)
-            )
-        raise GravitinoRuntimeException(
-            f"path: `{path}` doesn't contains valid identifier."
-        )
-
     @staticmethod
     def _get_virtual_location(identifier: NameIdentifier):
         """Get the virtual location of the fileset.
@@ -815,14 +756,6 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
             f"Storage type doesn't support now. Path:{path}"
         )
 
-    @staticmethod
-    def _get_sub_path_from_virtual_path(identifier: NameIdentifier, 
virtual_path: str):
-        return virtual_path[
-            len(
-                
f"fileset/{identifier.namespace().level(1)}/{identifier.namespace().level(2)}/{identifier.name()}"
-            ) :
-        ]
-
     @staticmethod
     def _strip_storage_protocol(storage_type: StorageType, path: str):
         """Strip the storage protocol from the path.
@@ -867,7 +800,7 @@ class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
         if storage_type == StorageType.LOCAL:
             return path[len(f"{StorageType.LOCAL.value}:") :]
 
-        ## We need to remove the protocol and accout from the path, for 
instance,
+        ## We need to remove the protocol and account from the path, for 
instance,
         # the path can be converted from 'abfss://container@account/path' to
         # 'container/path'.
         if storage_type == StorageType.ABS:
@@ -885,19 +818,6 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
             f"Storage type:{storage_type} doesn't support now."
         )
 
-    @staticmethod
-    def _check_auth_config(auth_type: str, config_key: str, config_value: str):
-        """Check if the config value is null.
-        :param auth_type: The auth type
-        :param config_key: The config key
-        :param config_value: The config value
-        """
-        if config_value is None:
-            raise GravitinoRuntimeException(
-                f"{config_key} should not be null"
-                f" if {GVFSConfig.AUTH_TYPE} is set to {auth_type}."
-            )
-
     def _get_fileset_catalog(self, catalog_ident: NameIdentifier):
         read_lock = self._catalog_cache_lock.gen_rlock()
         try:
diff --git a/clients/client-python/gravitino/filesystem/gvfs_utils.py 
b/clients/client-python/gravitino/filesystem/gvfs_utils.py
new file mode 100644
index 0000000000..b3b7f8f385
--- /dev/null
+++ b/clients/client-python/gravitino/filesystem/gvfs_utils.py
@@ -0,0 +1,165 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import re
+from typing import Dict
+
+from gravitino.client.gravitino_client import GravitinoClient
+from gravitino.auth.default_oauth2_token_provider import 
DefaultOAuth2TokenProvider
+from gravitino.auth.oauth2_token_provider import OAuth2TokenProvider
+from gravitino.auth.simple_auth_provider import SimpleAuthProvider
+from gravitino.filesystem.gvfs_config import GVFSConfig
+from gravitino.name_identifier import NameIdentifier
+from gravitino.exceptions.base import GravitinoRuntimeException
+
+_identifier_pattern = re.compile(
+    "^(?:gvfs://)?fileset/([^/]+)/([^/]+)/([^/]+)(?:/[^/]+)*/?$"
+)
+
+
+def _check_auth_config(auth_type: str, config_key: str, config_value: str):
+    """Check if the config value is null.
+    :param auth_type: The auth type
+    :param config_key: The config key
+    :param config_value: The config value
+    """
+    if config_value is None:
+        raise GravitinoRuntimeException(
+            f"{config_key} should not be null"
+            f" if {GVFSConfig.AUTH_TYPE} is set to {auth_type}."
+        )
+
+
+def create_client(
+    options: Dict[str, str],
+    server_uri: str,
+    metalake_name: str,
+    request_headers: dict = None,
+):
+    """Create the Gravitino client.
+    :param options: The options
+    :param server_uri: The server URI
+    :param metalake_name: The metalake name
+    :param request_headers: The request headers
+    :return The Gravitino client
+    """
+    auth_type = (
+        GVFSConfig.SIMPLE_AUTH_TYPE
+        if options is None
+        else options.get(GVFSConfig.AUTH_TYPE, GVFSConfig.SIMPLE_AUTH_TYPE)
+    )
+
+    if auth_type == GVFSConfig.SIMPLE_AUTH_TYPE:
+        return GravitinoClient(
+            uri=server_uri,
+            metalake_name=metalake_name,
+            auth_data_provider=SimpleAuthProvider(),
+        )
+
+    if auth_type == GVFSConfig.OAUTH2_AUTH_TYPE:
+        oauth2_server_uri = options.get(GVFSConfig.OAUTH2_SERVER_URI)
+        _check_auth_config(auth_type, GVFSConfig.OAUTH2_SERVER_URI, 
oauth2_server_uri)
+
+        oauth2_credential = options.get(GVFSConfig.OAUTH2_CREDENTIAL)
+        _check_auth_config(auth_type, GVFSConfig.OAUTH2_CREDENTIAL, 
oauth2_credential)
+
+        oauth2_path = options.get(GVFSConfig.OAUTH2_PATH)
+        _check_auth_config(auth_type, GVFSConfig.OAUTH2_PATH, oauth2_path)
+
+        oauth2_scope = options.get(GVFSConfig.OAUTH2_SCOPE)
+        _check_auth_config(auth_type, GVFSConfig.OAUTH2_SCOPE, oauth2_scope)
+
+        oauth2_token_provider: OAuth2TokenProvider = 
DefaultOAuth2TokenProvider(
+            oauth2_server_uri, oauth2_credential, oauth2_path, oauth2_scope
+        )
+
+        return GravitinoClient(
+            uri=server_uri,
+            metalake_name=metalake_name,
+            auth_data_provider=oauth2_token_provider,
+        )
+
+    raise GravitinoRuntimeException(
+        f"Authentication type {auth_type} is not supported."
+    )
+
+
+def extract_identifier(metalake_name: str, path: str):
+    """Extract the fileset identifier from the path.
+    :param metalake_name: The metalake name of the fileset.
+    :param path: The virtual fileset path, format is
+    
[gvfs://fileset]/{fileset_catalog}/{fileset_schema}/{fileset_name}[/sub_path]
+    :return The fileset identifier
+    """
+    if not metalake_name or metalake_name.isspace():
+        raise ValueError("Virtual path cannot be null or empty.")
+
+    if not path or (isinstance(path, str) and path.isspace()):
+        raise GravitinoRuntimeException(
+            "path which need be extracted cannot be null or empty."
+        )
+
+    match = _identifier_pattern.match(str(path))
+    if match and len(match.groups()) == 3:
+        return NameIdentifier.of(
+            metalake_name, match.group(1), match.group(2), match.group(3)
+        )
+    raise GravitinoRuntimeException(
+        f"path: `{path}` doesn't contains valid identifier."
+    )
+
+
+def get_sub_path_from_virtual_path(identifier: NameIdentifier, virtual_path: 
str):
+    """Get the sub path from the virtual path.
+    :param identifier: The identifier of the fileset.
+    :param virtual_path: The virtual fileset path, format is
+    fileset/{fileset_catalog}/{fileset_schema}/{fileset_name}[/sub_path]
+    :return The sub path.
+    """
+    if not virtual_path or virtual_path.isspace():
+        raise ValueError("Virtual path cannot be null or empty.")
+
+    gvfs_path_prefix = "fileset/"
+    if not virtual_path.startswith(gvfs_path_prefix):
+        raise ValueError(f"Virtual path should start with 
'{gvfs_path_prefix}'")
+
+    prefix = to_gvfs_path_prefix(identifier)
+    if not virtual_path.startswith(prefix):
+        raise ValueError(
+            f"Virtual path '{virtual_path}' doesn't match fileset identifier 
'{identifier}'"
+        )
+
+    return virtual_path[len(prefix) :]
+
+
+def to_gvfs_path_prefix(identifier: NameIdentifier):
+    """Convert the fileset identifier to the virtual path prefix.
+    :param identifier: The name identifier of the fileset.
+    :return The virtual path prefix, format is: 
fileset/{fileset_catalog}/{fileset_schema}/{fileset_name}
+    """
+    if len(identifier.namespace().levels()) != 3:
+        raise ValueError(
+            f"The namespace of the identifier should have 3 levels, but got 
{len(identifier.namespace().levels())}"
+        )
+
+    if not identifier.name():
+        raise ValueError("The identifier name should not be null or empty.")
+
+    return (
+        f"fileset/{identifier.namespace().level(1)}"
+        f"/{identifier.namespace().level(2)}"
+        f"/{identifier.name()}"
+    )
diff --git a/clients/client-python/tests/unittests/test_gvfs_with_local.py 
b/clients/client-python/tests/unittests/test_gvfs_with_local.py
index 7ee935e929..a63d3b47f3 100644
--- a/clients/client-python/tests/unittests/test_gvfs_with_local.py
+++ b/clients/client-python/tests/unittests/test_gvfs_with_local.py
@@ -40,6 +40,7 @@ from gravitino.exceptions.base import (
     BadRequestException,
 )
 from gravitino.filesystem.gvfs_config import GVFSConfig
+from gravitino.filesystem.gvfs_utils import extract_identifier
 from tests.unittests import mock_base
 from tests.unittests.auth.mock_base import (
     mock_jwt,
@@ -58,6 +59,8 @@ def generate_unique_random_string(length):
 
 @mock_base.mock_data
 class TestLocalFilesystem(unittest.TestCase):
+    _metalake_name: str = "metalake_demo"
+    _server_uri = "http://localhost:9090";
     _local_base_dir_path: str = "file:/tmp/fileset"
     _fileset_dir: str = (
         
f"{_local_base_dir_path}/{generate_unique_random_string(10)}/fileset_catalog/tmp"
@@ -86,8 +89,8 @@ class TestLocalFilesystem(unittest.TestCase):
             self.assertTrue(local_fs.exists(fileset_storage_location))
             options = {GVFSConfig.CACHE_SIZE: 1, 
GVFSConfig.CACHE_EXPIRED_TIME: 1}
             fs = gvfs.GravitinoVirtualFileSystem(
-                server_uri="http://localhost:9090";,
-                metalake_name="metalake_demo",
+                server_uri=self._server_uri,
+                metalake_name=self._metalake_name,
                 options=options,
                 skip_instance_cache=True,
             )
@@ -104,8 +107,8 @@ class TestLocalFilesystem(unittest.TestCase):
         user = "test_gvfs"
         os.environ["user.name"] = user
         fs = gvfs.GravitinoVirtualFileSystem(
-            server_uri="http://localhost:9090";,
-            metalake_name="metalake_demo",
+            server_uri=self._server_uri,
+            metalake_name=self._metalake_name,
             options=options,
             skip_instance_cache=True,
         )
@@ -152,8 +155,8 @@ class TestLocalFilesystem(unittest.TestCase):
                 local_fs.touch(sub_file_path)
                 self.assertTrue(local_fs.exists(sub_file_path))
                 fs = gvfs.GravitinoVirtualFileSystem(
-                    server_uri="http://localhost:9090";,
-                    metalake_name="metalake_demo",
+                    server_uri=self._server_uri,
+                    metalake_name=self._metalake_name,
                     options=fs_options,
                     skip_instance_cache=True,
                 )
@@ -167,8 +170,8 @@ class TestLocalFilesystem(unittest.TestCase):
         ):
             with self.assertRaises(IllegalArgumentException):
                 gvfs.GravitinoVirtualFileSystem(
-                    server_uri="http://localhost:9090";,
-                    metalake_name="metalake_demo",
+                    server_uri=self._server_uri,
+                    metalake_name=self._metalake_name,
                     options=fs_options,
                     skip_instance_cache=True,
                 )
@@ -180,8 +183,8 @@ class TestLocalFilesystem(unittest.TestCase):
         ):
             with self.assertRaises(BadRequestException):
                 gvfs.GravitinoVirtualFileSystem(
-                    server_uri="http://localhost:9090";,
-                    metalake_name="metalake_demo",
+                    server_uri=self._server_uri,
+                    metalake_name=self._metalake_name,
                     options=fs_options,
                     skip_instance_cache=True,
                 )
@@ -204,8 +207,8 @@ class TestLocalFilesystem(unittest.TestCase):
             self.assertTrue(local_fs.exists(sub_file_path))
 
             fs = gvfs.GravitinoVirtualFileSystem(
-                server_uri="http://localhost:9090";,
-                metalake_name="metalake_demo",
+                server_uri=self._server_uri,
+                metalake_name=self._metalake_name,
                 skip_instance_cache=True,
             )
             self.assertTrue(fs.exists(fileset_virtual_location))
@@ -248,8 +251,8 @@ class TestLocalFilesystem(unittest.TestCase):
         self.assertTrue(local_fs.exists(sub_file_path))
 
         fs = gvfs.GravitinoVirtualFileSystem(
-            server_uri="http://localhost:9090";,
-            metalake_name="metalake_demo",
+            server_uri=self._server_uri,
+            metalake_name=self._metalake_name,
             skip_instance_cache=True,
         )
         with patch(
@@ -290,8 +293,8 @@ class TestLocalFilesystem(unittest.TestCase):
         self.assertTrue(local_fs.exists(sub_file_path))
 
         fs = gvfs.GravitinoVirtualFileSystem(
-            server_uri="http://localhost:9090";,
-            metalake_name="metalake_demo",
+            server_uri=self._server_uri,
+            metalake_name=self._metalake_name,
             skip_instance_cache=True,
         )
         with patch(
@@ -330,8 +333,8 @@ class TestLocalFilesystem(unittest.TestCase):
             f.write(b"test_file_1")
 
         fs = gvfs.GravitinoVirtualFileSystem(
-            server_uri="http://localhost:9090";,
-            metalake_name="metalake_demo",
+            server_uri=self._server_uri,
+            metalake_name=self._metalake_name,
             skip_instance_cache=True,
         )
         with patch(
@@ -382,8 +385,8 @@ class TestLocalFilesystem(unittest.TestCase):
         self.assertTrue(local_fs.exists(another_dir_path))
 
         fs = gvfs.GravitinoVirtualFileSystem(
-            server_uri="http://localhost:9090";,
-            metalake_name="metalake_demo",
+            server_uri=self._server_uri,
+            metalake_name=self._metalake_name,
             skip_instance_cache=True,
         )
         with patch(
@@ -452,8 +455,8 @@ class TestLocalFilesystem(unittest.TestCase):
         self.assertTrue(local_fs.exists(sub_dir_path))
 
         fs = gvfs.GravitinoVirtualFileSystem(
-            server_uri="http://localhost:9090";,
-            metalake_name="metalake_demo",
+            server_uri=self._server_uri,
+            metalake_name=self._metalake_name,
             skip_instance_cache=True,
         )
         with patch(
@@ -504,8 +507,8 @@ class TestLocalFilesystem(unittest.TestCase):
         self.assertTrue(local_fs.exists(sub_dir_path))
 
         fs = gvfs.GravitinoVirtualFileSystem(
-            server_uri="http://localhost:9090";,
-            metalake_name="metalake_demo",
+            server_uri=self._server_uri,
+            metalake_name=self._metalake_name,
             skip_instance_cache=True,
         )
         with patch(
@@ -551,8 +554,8 @@ class TestLocalFilesystem(unittest.TestCase):
         self.assertTrue(local_fs.exists(sub_dir_path))
 
         fs = gvfs.GravitinoVirtualFileSystem(
-            server_uri="http://localhost:9090";,
-            metalake_name="metalake_demo",
+            server_uri=self._server_uri,
+            metalake_name=self._metalake_name,
             skip_instance_cache=True,
         )
         with patch(
@@ -598,8 +601,8 @@ class TestLocalFilesystem(unittest.TestCase):
         self.assertTrue(local_fs.exists(sub_dir_path))
 
         fs = gvfs.GravitinoVirtualFileSystem(
-            server_uri="http://localhost:9090";,
-            metalake_name="metalake_demo",
+            server_uri=self._server_uri,
+            metalake_name=self._metalake_name,
             skip_instance_cache=True,
         )
         with patch(
@@ -646,8 +649,8 @@ class TestLocalFilesystem(unittest.TestCase):
         self.assertTrue(local_fs.exists(sub_dir_path))
 
         fs = gvfs.GravitinoVirtualFileSystem(
-            server_uri="http://localhost:9090";,
-            metalake_name="metalake_demo",
+            server_uri=self._server_uri,
+            metalake_name=self._metalake_name,
             skip_instance_cache=True,
         )
         with patch(
@@ -695,8 +698,8 @@ class TestLocalFilesystem(unittest.TestCase):
         self.assertTrue(local_fs.exists(sub_dir_path))
 
         fs = gvfs.GravitinoVirtualFileSystem(
-            server_uri="http://localhost:9090";,
-            metalake_name="metalake_demo",
+            server_uri=self._server_uri,
+            metalake_name=self._metalake_name,
             skip_instance_cache=True,
         )
         with patch(
@@ -733,8 +736,8 @@ class TestLocalFilesystem(unittest.TestCase):
         self.assertTrue(local_fs.exists(sub_dir_path))
 
         fs = gvfs.GravitinoVirtualFileSystem(
-            server_uri="http://localhost:9090";,
-            metalake_name="metalake_demo",
+            server_uri=self._server_uri,
+            metalake_name=self._metalake_name,
             skip_instance_cache=True,
         )
         with patch(
@@ -764,8 +767,8 @@ class TestLocalFilesystem(unittest.TestCase):
         self.assertTrue(local_fs.exists(sub_dir_path))
 
         fs = gvfs.GravitinoVirtualFileSystem(
-            server_uri="http://localhost:9090";,
-            metalake_name="metalake_demo",
+            server_uri=self._server_uri,
+            metalake_name=self._metalake_name,
             skip_instance_cache=True,
         )
         with patch(
@@ -799,8 +802,8 @@ class TestLocalFilesystem(unittest.TestCase):
         self.assertTrue(local_fs.exists(sub_dir_path))
 
         fs = gvfs.GravitinoVirtualFileSystem(
-            server_uri="http://localhost:9090";,
-            metalake_name="metalake_demo",
+            server_uri=self._server_uri,
+            metalake_name=self._metalake_name,
             skip_instance_cache=True,
         )
         with patch(
@@ -851,8 +854,8 @@ class TestLocalFilesystem(unittest.TestCase):
         self.assertTrue(local_fs.exists(sub_dir_path))
 
         fs = gvfs.GravitinoVirtualFileSystem(
-            server_uri="http://localhost:9090";,
-            metalake_name="metalake_demo",
+            server_uri=self._server_uri,
+            metalake_name=self._metalake_name,
             skip_instance_cache=True,
         )
         with patch(
@@ -899,8 +902,8 @@ class TestLocalFilesystem(unittest.TestCase):
 
     def test_convert_actual_path(self, *mock_methods):
         fs = gvfs.GravitinoVirtualFileSystem(
-            server_uri="http://localhost:9090";,
-            metalake_name="metalake_demo",
+            server_uri=self._server_uri,
+            metalake_name=self._metalake_name,
             skip_instance_cache=True,
         )
         storage_location = "hdfs://localhost:8090/fileset/test_f1"
@@ -923,8 +926,8 @@ class TestLocalFilesystem(unittest.TestCase):
 
         # test convert actual local path
         fs = gvfs.GravitinoVirtualFileSystem(
-            server_uri="http://localhost:9090";,
-            metalake_name="metalake_demo",
+            server_uri=self._server_uri,
+            metalake_name=self._metalake_name,
             skip_instance_cache=True,
         )
         storage_location = "file:/tmp/fileset/test_f1"
@@ -981,8 +984,8 @@ class TestLocalFilesystem(unittest.TestCase):
 
     def test_convert_info(self, *mock_methods):
         fs = gvfs.GravitinoVirtualFileSystem(
-            server_uri="http://localhost:9090";,
-            metalake_name="metalake_demo",
+            server_uri=self._server_uri,
+            metalake_name=self._metalake_name,
             skip_instance_cache=True,
         )
         # test actual path not start with storage location
@@ -1013,8 +1016,8 @@ class TestLocalFilesystem(unittest.TestCase):
 
         # test convert actual local path
         fs = gvfs.GravitinoVirtualFileSystem(
-            server_uri="http://localhost:9090";,
-            metalake_name="metalake_demo",
+            server_uri=self._server_uri,
+            metalake_name=self._metalake_name,
             skip_instance_cache=True,
         )
         # test actual path not start with storage location
@@ -1044,21 +1047,18 @@ class TestLocalFilesystem(unittest.TestCase):
         )
 
     def test_extract_identifier(self, *mock_methods):
-        fs = gvfs.GravitinoVirtualFileSystem(
-            server_uri="http://localhost:9090";,
-            metalake_name="metalake_demo",
-            skip_instance_cache=True,
-        )
         with self.assertRaises(GravitinoRuntimeException):
-            fs._extract_identifier(path=None)
+            extract_identifier(self._metalake_name, path=None)
 
         invalid_path = "s3://bucket_1/test_catalog/schema/fileset/ttt"
         with self.assertRaises(GravitinoRuntimeException):
-            fs._extract_identifier(path=invalid_path)
+            extract_identifier(self._metalake_name, path=invalid_path)
 
         valid_path = "fileset/test_catalog/schema/fileset/ttt"
-        identifier: NameIdentifier = fs._extract_identifier(path=valid_path)
-        self.assertEqual("metalake_demo", identifier.namespace().level(0))
+        identifier: NameIdentifier = extract_identifier(
+            self._metalake_name, path=valid_path
+        )
+        self.assertEqual(self._metalake_name, identifier.namespace().level(0))
         self.assertEqual("test_catalog", identifier.namespace().level(1))
         self.assertEqual("schema", identifier.namespace().level(2))
         self.assertEqual("fileset", identifier.name())
@@ -1168,8 +1168,8 @@ class TestLocalFilesystem(unittest.TestCase):
 
         actual_path = fileset_storage_location
         fs = gvfs.GravitinoVirtualFileSystem(
-            server_uri="http://localhost:9090";,
-            metalake_name="metalake_demo",
+            server_uri=self._server_uri,
+            metalake_name=self._metalake_name,
             skip_instance_cache=True,
         )
         with patch(
diff --git 
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
index 10000b3da2..92dea08b75 100644
--- 
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
+++ 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
@@ -19,6 +19,9 @@
 package org.apache.gravitino.filesystem.hadoop;
 
 import static 
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CURRENT_LOCATION_NAME;
+import static 
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemUtils.extractIdentifier;
+import static 
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemUtils.getConfigMap;
+import static 
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemUtils.getSubPathFromGvfsPath;
 
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
@@ -42,8 +45,6 @@ import java.util.Set;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
@@ -98,12 +99,6 @@ public class GravitinoVirtualFileSystem extends FileSystem {
   private Cache<Pair<NameIdentifier, String>, FileSystem> 
internalFileSystemCache;
   private ScheduledThreadPoolExecutor internalFileSystemCleanScheduler;
 
-  // The pattern is used to match gvfs path. The scheme prefix 
(gvfs://fileset) is optional.
-  // The following path can be match:
-  //     gvfs://fileset/fileset_catalog/fileset_schema/fileset1/file.txt
-  //     /fileset_catalog/fileset_schema/fileset1/sub_dir/
-  private static final Pattern IDENTIFIER_PATTERN =
-      
Pattern.compile("^(?:gvfs://fileset)?/([^/]+)/([^/]+)/([^/]+)(?>/[^/]+)*/?$");
   private static final String SLASH = "/";
   private final Map<String, FileSystemProvider> fileSystemProvidersMap = 
Maps.newHashMap();
   private String currentLocationEnvVar;
@@ -258,26 +253,10 @@ public class GravitinoVirtualFileSystem extends 
FileSystem {
     return fileStatus;
   }
 
-  @VisibleForTesting
-  NameIdentifier extractIdentifier(URI virtualUri) {
-    String virtualPath = virtualUri.toString();
-    Preconditions.checkArgument(
-        StringUtils.isNotBlank(virtualPath),
-        "Uri which need be extracted cannot be null or empty.");
-
-    Matcher matcher = IDENTIFIER_PATTERN.matcher(virtualPath);
-    Preconditions.checkArgument(
-        matcher.matches() && matcher.groupCount() == 3,
-        "URI %s doesn't contains valid identifier",
-        virtualPath);
-
-    return NameIdentifier.of(metalakeName, matcher.group(1), matcher.group(2), 
matcher.group(3));
-  }
-
   private FilesetContextPair getFilesetContext(Path virtualPath, 
FilesetDataOperation operation) {
-    NameIdentifier identifier = extractIdentifier(virtualPath.toUri());
+    NameIdentifier identifier = extractIdentifier(metalakeName, 
virtualPath.toString());
     String virtualPathString = virtualPath.toString();
-    String subPath = getSubPathFromVirtualPath(identifier, virtualPathString);
+    String subPath = getSubPathFromGvfsPath(identifier, virtualPathString);
 
     NameIdentifier catalogIdent = NameIdentifier.of(metalakeName, 
identifier.namespace().level(1));
     FilesetCatalog filesetCatalog =
@@ -402,31 +381,6 @@ public class GravitinoVirtualFileSystem extends FileSystem 
{
     }
   }
 
-  private Map<String, String> getConfigMap(Configuration configuration) {
-    Map<String, String> maps = Maps.newHashMap();
-    configuration.forEach(entry -> maps.put(entry.getKey(), entry.getValue()));
-    return maps;
-  }
-
-  private String getSubPathFromVirtualPath(NameIdentifier identifier, String 
virtualPathString) {
-    return 
virtualPathString.startsWith(GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX)
-        ? virtualPathString.substring(
-            String.format(
-                    "%s/%s/%s/%s",
-                    
GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX,
-                    identifier.namespace().level(1),
-                    identifier.namespace().level(2),
-                    identifier.name())
-                .length())
-        : virtualPathString.substring(
-            String.format(
-                    "/%s/%s/%s",
-                    identifier.namespace().level(1),
-                    identifier.namespace().level(2),
-                    identifier.name())
-                .length());
-  }
-
   @Override
   public URI getUri() {
     return this.uri;
@@ -484,8 +438,8 @@ public class GravitinoVirtualFileSystem extends FileSystem {
   public boolean rename(Path src, Path dst) throws IOException {
     // Fileset identifier is not allowed to be renamed, only its 
subdirectories can be renamed
     // which not in the storage location of the fileset;
-    NameIdentifier srcIdentifier = extractIdentifier(src.toUri());
-    NameIdentifier dstIdentifier = extractIdentifier(dst.toUri());
+    NameIdentifier srcIdentifier = extractIdentifier(metalakeName, 
src.toString());
+    NameIdentifier dstIdentifier = extractIdentifier(metalakeName, 
dst.toString());
     Preconditions.checkArgument(
         srcIdentifier.equals(dstIdentifier),
         "Destination path fileset identifier: %s should be same with src path 
fileset identifier: %s.",
@@ -510,8 +464,8 @@ public class GravitinoVirtualFileSystem extends FileSystem {
   public FileStatus getFileStatus(Path path) throws IOException {
     FilesetContextPair context = getFilesetContext(path, 
FilesetDataOperation.GET_FILE_STATUS);
     FileStatus fileStatus = 
context.getFileSystem().getFileStatus(context.getActualFileLocation());
-    NameIdentifier identifier = extractIdentifier(path.toUri());
-    String subPath = getSubPathFromVirtualPath(identifier, path.toString());
+    NameIdentifier identifier = extractIdentifier(metalakeName, 
path.toString());
+    String subPath = getSubPathFromGvfsPath(identifier, path.toString());
     String storageLocation =
         context
             .getActualFileLocation()
@@ -526,8 +480,8 @@ public class GravitinoVirtualFileSystem extends FileSystem {
     FilesetContextPair context = getFilesetContext(path, 
FilesetDataOperation.LIST_STATUS);
     FileStatus[] fileStatusResults =
         context.getFileSystem().listStatus(context.getActualFileLocation());
-    NameIdentifier identifier = extractIdentifier(path.toUri());
-    String subPath = getSubPathFromVirtualPath(identifier, path.toString());
+    NameIdentifier identifier = extractIdentifier(metalakeName, 
path.toString());
+    String subPath = getSubPathFromGvfsPath(identifier, path.toString());
     String storageLocation =
         context
             .getActualFileLocation()
diff --git 
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemUtils.java
 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemUtils.java
index 8a0d1d8743..181843cfd5 100644
--- 
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemUtils.java
+++ 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemUtils.java
@@ -20,8 +20,13 @@
 package org.apache.gravitino.filesystem.hadoop;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 import java.io.File;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.client.DefaultOAuth2TokenProvider;
 import org.apache.gravitino.client.GravitinoClient;
 import org.apache.gravitino.client.KerberosTokenProvider;
@@ -30,6 +35,27 @@ import org.apache.hadoop.conf.Configuration;
 /** Utility class for Gravitino Virtual File System. */
 public class GravitinoVirtualFileSystemUtils {
 
+  // The pattern is used to match gvfs path. The scheme prefix 
(gvfs://fileset) is optional.
+  // The following path can be match:
+  //     gvfs://fileset/fileset_catalog/fileset_schema/fileset1/file.txt
+  //     /fileset_catalog/fileset_schema/fileset1/sub_dir/
+  private static final Pattern IDENTIFIER_PATTERN =
+      
Pattern.compile("^(?:gvfs://fileset)?/([^/]+)/([^/]+)/([^/]+)(?>/[^/]+)*/?$");
+
+  /**
+   * Transform the Hadoop configuration to a map.
+   *
+   * @param configuration The Hadoop configuration.
+   * @return The configuration map.
+   */
+  public static Map<String, String> getConfigMap(Configuration configuration) {
+    Map<String, String> maps = Maps.newHashMap();
+    // Don't use entry.getKey() directly in the lambda, because it cannot
+    // handle variable expansion in the Configuration values.
+    configuration.forEach(entry -> maps.put(entry.getKey(), 
configuration.get(entry.getKey())));
+    return maps;
+  }
+
   /**
    * Get Gravitino client by the configuration.
    *
@@ -37,6 +63,16 @@ public class GravitinoVirtualFileSystemUtils {
    * @return The Gravitino client.
    */
   public static GravitinoClient createClient(Configuration configuration) {
+    return createClient(getConfigMap(configuration));
+  }
+
+  /**
+   * Get Gravitino client by the configuration.
+   *
+   * @param configuration The configuration for the Gravitino client.
+   * @return The Gravitino client.
+   */
+  public static GravitinoClient createClient(Map<String, String> 
configuration) {
     // initialize the Gravitino client
     String serverUri =
         
configuration.get(GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_SERVER_URI_KEY);
@@ -48,7 +84,7 @@ public class GravitinoVirtualFileSystemUtils {
         GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_SERVER_URI_KEY);
 
     String authType =
-        configuration.get(
+        configuration.getOrDefault(
             
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_AUTH_TYPE_KEY,
             GravitinoVirtualFileSystemConfiguration.SIMPLE_AUTH_TYPE);
     if 
(authType.equalsIgnoreCase(GravitinoVirtualFileSystemConfiguration.SIMPLE_AUTH_TYPE))
 {
@@ -140,6 +176,85 @@ public class GravitinoVirtualFileSystemUtils {
     }
   }
 
+  /**
+   * Extract the full identifier of the fileset from the virtual path.
+   *
+   * @param metalakeName The metalake name of the fileset.
+   * @param gvfsPath The virtual path, format is
+   *     
[gvfs://fileset]/{fileset_catalog}/{fileset_schema}/{fileset_name}[/sub_path]
+   * @return The full identifier of the fileset.
+   * @throws IllegalArgumentException If the URI doesn't contain a valid 
identifier.
+   */
+  public static NameIdentifier extractIdentifier(String metalakeName, String 
gvfsPath)
+      throws IllegalArgumentException {
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(metalakeName), "The metalake name cannot be 
null or empty.");
+
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(gvfsPath),
+        "The gvfsPath which need be extracted cannot be null or empty.");
+
+    Matcher matcher = IDENTIFIER_PATTERN.matcher(gvfsPath);
+    Preconditions.checkArgument(
+        matcher.matches() && matcher.groupCount() == 3,
+        "URI %s doesn't contains valid identifier",
+        gvfsPath);
+
+    return NameIdentifier.of(metalakeName, matcher.group(1), matcher.group(2), 
matcher.group(3));
+  }
+
+  /**
+   * Get the sub path from the virtual path.
+   *
+   * @param identifier The identifier of the fileset.
+   * @param gvfsPath The virtual path, format is
+   *     
[gvfs://fileset]/{fileset_catalog}/{fileset_schema}/{fileset_name}[/sub_path]
+   * @return The sub path.
+   * @throws IllegalArgumentException If the virtual path doesn't match the 
identifier.
+   */
+  public static String getSubPathFromGvfsPath(NameIdentifier identifier, 
String gvfsPath)
+      throws IllegalArgumentException {
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(gvfsPath), "Virtual path cannot be null or 
empty.");
+
+    Preconditions.checkArgument(
+        identifier.namespace().levels().length == 3,
+        "The identifier should have 3 levels, but got %s",
+        identifier.namespace().levels().length);
+
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(identifier.name()),
+        "The identifier name should not be null or empty.");
+
+    Preconditions.checkArgument(
+        
gvfsPath.startsWith(GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX)
+            || gvfsPath.startsWith("/"),
+        "Virtual path should start with '%s' or '/'",
+        GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX);
+
+    String prefix =
+        
gvfsPath.startsWith(GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX)
+            ? String.format(
+                "%s/%s/%s/%s",
+                GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX,
+                identifier.namespace().level(1),
+                identifier.namespace().level(2),
+                identifier.name())
+            : String.format(
+                "/%s/%s/%s",
+                identifier.namespace().level(1),
+                identifier.namespace().level(2),
+                identifier.name());
+
+    Preconditions.checkArgument(
+        gvfsPath.startsWith(prefix),
+        "Virtual path '%s' doesn't match fileset identifier '%s'",
+        gvfsPath,
+        identifier);
+
+    return gvfsPath.substring(prefix.length());
+  }
+
   private static void checkAuthConfig(String authType, String configKey, 
String configValue) {
     Preconditions.checkArgument(
         StringUtils.isNotBlank(configValue),
diff --git 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/GravitinoMockServerBase.java
 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/GravitinoMockServerBase.java
index 531cb37eb1..0a4e1cd908 100644
--- 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/GravitinoMockServerBase.java
+++ 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/GravitinoMockServerBase.java
@@ -41,6 +41,7 @@ import org.apache.gravitino.dto.responses.MetalakeResponse;
 import org.apache.gravitino.dto.responses.VersionResponse;
 import org.apache.gravitino.file.Fileset;
 import org.apache.gravitino.json.JsonUtils;
+import org.apache.gravitino.rest.RESTUtils;
 import org.apache.hc.core5.http.HttpStatus;
 import org.apache.hc.core5.http.Method;
 import org.junit.jupiter.api.AfterAll;
@@ -184,7 +185,7 @@ public abstract class GravitinoMockServerBase {
     String filesetPath =
         String.format(
             "/api/metalakes/%s/catalogs/%s/schemas/%s/filesets/%s",
-            metalakeName, catalogName, schemaName, filesetName);
+            metalakeName, catalogName, schemaName, 
RESTUtils.encodeString(filesetName));
     Map<String, String> locations =
         location == null
             ? Collections.emptyMap()
diff --git 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
index 6403a0200e..be7a7d35f5 100644
--- 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
+++ 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
@@ -19,6 +19,7 @@
 package org.apache.gravitino.filesystem.hadoop;
 
 import static org.apache.gravitino.file.Fileset.LOCATION_NAME_UNKNOWN;
+import static 
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemUtils.extractIdentifier;
 import static org.apache.hc.core5.http.HttpStatus.SC_OK;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -30,7 +31,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
-import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -41,6 +41,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.dto.AuditDTO;
 import org.apache.gravitino.dto.credential.CredentialDTO;
@@ -62,8 +63,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 import org.junit.jupiter.params.provider.ValueSource;
-import org.testcontainers.shaded.org.apache.commons.lang3.tuple.Pair;
 
 public class TestGvfsBase extends GravitinoMockServerBase {
   protected static final String GVFS_IMPL_CLASS = 
GravitinoVirtualFileSystem.class.getName();
@@ -221,16 +222,20 @@ public class TestGvfsBase extends GravitinoMockServerBase 
{
   }
 
   @ParameterizedTest
-  @ValueSource(booleans = {true, false})
-  public void testCreate(boolean withScheme) throws IOException {
-    String filesetName = "testCreate";
+  @CsvSource({
+    "true, testCreate",
+    "false, testCreate",
+    "true, testCreate%2Fabc",
+    "false, testCreate%2Fabc"
+  })
+  public void testCreate(boolean withScheme, String filesetName) throws 
IOException {
     Path managedFilesetPath =
         FileSystemTestUtils.createFilesetPath(catalogName, schemaName, 
filesetName, true);
     Path localPath = FileSystemTestUtils.createLocalDirPrefix(catalogName, 
schemaName, filesetName);
     String locationPath =
         String.format(
             "/api/metalakes/%s/catalogs/%s/schemas/%s/filesets/%s/location",
-            metalakeName, catalogName, schemaName, filesetName);
+            metalakeName, catalogName, schemaName, 
RESTUtils.encodeString(filesetName));
     try (FileSystem gravitinoFileSystem = 
managedFilesetPath.getFileSystem(conf);
         FileSystem localFileSystem = localPath.getFileSystem(conf)) {
       FileSystemTestUtils.mkdirs(localPath, localFileSystem);
@@ -272,17 +277,21 @@ public class TestGvfsBase extends GravitinoMockServerBase 
{
   }
 
   @ParameterizedTest
-  @ValueSource(booleans = {true, false})
+  @CsvSource({
+    "true, testAppend",
+    "false, testAppend",
+    "true, testAppend%2Fabc",
+    "false, testAppend%2Fabc"
+  })
   @Disabled("Append operation is not supported in LocalFileSystem. We can't 
test it now.")
-  public void testAppend(boolean withScheme) throws IOException {
-    String filesetName = "testAppend";
+  public void testAppend(boolean withScheme, String filesetName) throws 
IOException {
     Path managedFilesetPath =
         FileSystemTestUtils.createFilesetPath(catalogName, schemaName, 
filesetName, true);
     Path localPath = FileSystemTestUtils.createLocalDirPrefix(catalogName, 
schemaName, filesetName);
     String locationPath =
         String.format(
             "/api/metalakes/%s/catalogs/%s/schemas/%s/filesets/%s/location",
-            metalakeName, catalogName, schemaName, filesetName);
+            metalakeName, catalogName, schemaName, 
RESTUtils.encodeString(filesetName));
     try (FileSystem gravitinoFileSystem = 
managedFilesetPath.getFileSystem(conf);
         FileSystem localFileSystem = localPath.getFileSystem(conf)) {
       FileSystemTestUtils.mkdirs(localPath, localFileSystem);
@@ -355,16 +364,20 @@ public class TestGvfsBase extends GravitinoMockServerBase 
{
   }
 
   @ParameterizedTest
-  @ValueSource(booleans = {true, false})
-  public void testRename(boolean withScheme) throws IOException {
-    String filesetName = "testRename";
+  @CsvSource({
+    "true, testRename",
+    "false, testRename",
+    "true, testRename%2Fabc",
+    "false, testRename%2Fabc"
+  })
+  public void testRename(boolean withScheme, String filesetName) throws 
IOException {
     Path managedFilesetPath =
         FileSystemTestUtils.createFilesetPath(catalogName, schemaName, 
filesetName, true);
     Path localPath = FileSystemTestUtils.createLocalDirPrefix(catalogName, 
schemaName, filesetName);
     String locationPath =
         String.format(
             "/api/metalakes/%s/catalogs/%s/schemas/%s/filesets/%s/location",
-            metalakeName, catalogName, schemaName, filesetName);
+            metalakeName, catalogName, schemaName, 
RESTUtils.encodeString(filesetName));
     try (FileSystem gravitinoFileSystem = 
managedFilesetPath.getFileSystem(conf);
         FileSystem localFileSystem = localPath.getFileSystem(conf)) {
       FileSystemTestUtils.mkdirs(localPath, localFileSystem);
@@ -433,16 +446,20 @@ public class TestGvfsBase extends GravitinoMockServerBase 
{
   }
 
   @ParameterizedTest
-  @ValueSource(booleans = {true, false})
-  public void testDelete(boolean withScheme) throws IOException {
-    String filesetName = "testDelete";
+  @CsvSource({
+    "true, testDelete",
+    "false, testDelete",
+    "true, testDelete%2Fabc",
+    "false, testDelete%2Fabc"
+  })
+  public void testDelete(boolean withScheme, String filesetName) throws 
IOException {
     Path managedFilesetPath =
         FileSystemTestUtils.createFilesetPath(catalogName, schemaName, 
filesetName, true);
     Path localPath = FileSystemTestUtils.createLocalDirPrefix(catalogName, 
schemaName, filesetName);
     String locationPath =
         String.format(
             "/api/metalakes/%s/catalogs/%s/schemas/%s/filesets/%s/location",
-            metalakeName, catalogName, schemaName, filesetName);
+            metalakeName, catalogName, schemaName, 
RESTUtils.encodeString(filesetName));
     try (FileSystem gravitinoFileSystem = 
managedFilesetPath.getFileSystem(conf);
         FileSystem localFileSystem = localPath.getFileSystem(conf)) {
       FileSystemTestUtils.mkdirs(localPath, localFileSystem);
@@ -482,16 +499,16 @@ public class TestGvfsBase extends GravitinoMockServerBase 
{
     }
   }
 
-  @Test
-  public void testGetStatus() throws IOException {
-    String filesetName = "testGetStatus";
+  @ParameterizedTest
+  @ValueSource(strings = {"testGetFileStatus", "testGetFileStatus%2Fabc"})
+  public void testGetStatus(String filesetName) throws IOException {
     Path managedFilesetPath =
         FileSystemTestUtils.createFilesetPath(catalogName, schemaName, 
filesetName, true);
     Path localPath = FileSystemTestUtils.createLocalDirPrefix(catalogName, 
schemaName, filesetName);
     String locationPath =
         String.format(
             "/api/metalakes/%s/catalogs/%s/schemas/%s/filesets/%s/location",
-            metalakeName, catalogName, schemaName, filesetName);
+            metalakeName, catalogName, schemaName, 
RESTUtils.encodeString(filesetName));
     try (FileSystem gravitinoFileSystem = 
managedFilesetPath.getFileSystem(conf);
         FileSystem localFileSystem = localPath.getFileSystem(conf)) {
       FileSystemTestUtils.mkdirs(localPath, localFileSystem);
@@ -520,16 +537,16 @@ public class TestGvfsBase extends GravitinoMockServerBase 
{
     }
   }
 
-  @Test
-  public void testListStatus() throws IOException {
-    String filesetName = "testListStatus";
+  @ParameterizedTest
+  @ValueSource(strings = {"testListStatus", "testListStatus%2Fabc"})
+  public void testListStatus(String filesetName) throws IOException {
     Path managedFilesetPath =
         FileSystemTestUtils.createFilesetPath(catalogName, schemaName, 
filesetName, true);
     Path localPath = FileSystemTestUtils.createLocalDirPrefix(catalogName, 
schemaName, filesetName);
     String locationPath =
         String.format(
             "/api/metalakes/%s/catalogs/%s/schemas/%s/filesets/%s/location",
-            metalakeName, catalogName, schemaName, filesetName);
+            metalakeName, catalogName, schemaName, 
RESTUtils.encodeString(filesetName));
     try (FileSystem gravitinoFileSystem = 
managedFilesetPath.getFileSystem(conf);
         FileSystem localFileSystem = localPath.getFileSystem(conf)) {
       FileSystemTestUtils.mkdirs(localPath, localFileSystem);
@@ -576,16 +593,16 @@ public class TestGvfsBase extends GravitinoMockServerBase 
{
     }
   }
 
-  @Test
-  public void testMkdirs() throws IOException {
-    String filesetName = "testMkdirs";
+  @ParameterizedTest
+  @ValueSource(strings = {"testMkdirs", "testMkdirs%2Fabc"})
+  public void testMkdirs(String filesetName) throws IOException {
     Path managedFilesetPath =
         FileSystemTestUtils.createFilesetPath(catalogName, schemaName, 
filesetName, true);
     Path localPath = FileSystemTestUtils.createLocalDirPrefix(catalogName, 
schemaName, filesetName);
     String locationPath =
         String.format(
             "/api/metalakes/%s/catalogs/%s/schemas/%s/filesets/%s/location",
-            metalakeName, catalogName, schemaName, filesetName);
+            metalakeName, catalogName, schemaName, 
RESTUtils.encodeString(filesetName));
     try (FileSystem gravitinoFileSystem = 
managedFilesetPath.getFileSystem(conf);
         FileSystem localFileSystem = localPath.getFileSystem(conf)) {
       FileSystemTestUtils.mkdirs(localPath, localFileSystem);
@@ -632,40 +649,41 @@ public class TestGvfsBase extends GravitinoMockServerBase 
{
     try (GravitinoVirtualFileSystem fs =
         (GravitinoVirtualFileSystem) managedFilesetPath.getFileSystem(conf)) {
       NameIdentifier identifier =
-          fs.extractIdentifier(new 
URI("gvfs://fileset/catalog1/schema1/fileset1"));
+          extractIdentifier(metalakeName, 
"gvfs://fileset/catalog1/schema1/fileset1");
       assertEquals(NameIdentifier.of(metalakeName, "catalog1", "schema1", 
"fileset1"), identifier);
 
       NameIdentifier identifier2 =
-          fs.extractIdentifier(new 
URI("gvfs://fileset/catalog1/schema1/fileset1/"));
+          extractIdentifier(metalakeName, 
"gvfs://fileset/catalog1/schema1/fileset1/");
       assertEquals(NameIdentifier.of(metalakeName, "catalog1", "schema1", 
"fileset1"), identifier2);
 
       NameIdentifier identifier3 =
-          fs.extractIdentifier(new 
URI("gvfs://fileset/catalog1/schema1/fileset1/files"));
+          extractIdentifier(metalakeName, 
"gvfs://fileset/catalog1/schema1/fileset1/files");
       assertEquals(NameIdentifier.of(metalakeName, "catalog1", "schema1", 
"fileset1"), identifier3);
 
       NameIdentifier identifier4 =
-          fs.extractIdentifier(new 
URI("gvfs://fileset/catalog1/schema1/fileset1/dir/dir"));
+          extractIdentifier(metalakeName, 
"gvfs://fileset/catalog1/schema1/fileset1/dir/dir");
       assertEquals(NameIdentifier.of(metalakeName, "catalog1", "schema1", 
"fileset1"), identifier4);
 
       NameIdentifier identifier5 =
-          fs.extractIdentifier(new 
URI("gvfs://fileset/catalog1/schema1/fileset1/dir/dir/"));
+          extractIdentifier(metalakeName, 
"gvfs://fileset/catalog1/schema1/fileset1/dir/dir/");
       assertEquals(NameIdentifier.of(metalakeName, "catalog1", "schema1", 
"fileset1"), identifier5);
 
-      NameIdentifier identifier6 = fs.extractIdentifier(new 
URI("/catalog1/schema1/fileset1"));
+      NameIdentifier identifier6 = extractIdentifier(metalakeName, 
"/catalog1/schema1/fileset1");
       assertEquals(NameIdentifier.of(metalakeName, "catalog1", "schema1", 
"fileset1"), identifier6);
 
-      NameIdentifier identifier7 = fs.extractIdentifier(new 
URI("/catalog1/schema1/fileset1/"));
+      NameIdentifier identifier7 = extractIdentifier(metalakeName, 
"/catalog1/schema1/fileset1/");
       assertEquals(NameIdentifier.of(metalakeName, "catalog1", "schema1", 
"fileset1"), identifier7);
 
-      NameIdentifier identifier8 = fs.extractIdentifier(new 
URI("/catalog1/schema1/fileset1/dir"));
+      NameIdentifier identifier8 =
+          extractIdentifier(metalakeName, "/catalog1/schema1/fileset1/dir");
       assertEquals(NameIdentifier.of(metalakeName, "catalog1", "schema1", 
"fileset1"), identifier8);
 
       NameIdentifier identifier9 =
-          fs.extractIdentifier(new URI("/catalog1/schema1/fileset1/dir/dir/"));
+          extractIdentifier(metalakeName, 
"/catalog1/schema1/fileset1/dir/dir/");
       assertEquals(NameIdentifier.of(metalakeName, "catalog1", "schema1", 
"fileset1"), identifier9);
 
       NameIdentifier identifier10 =
-          fs.extractIdentifier(new URI("/catalog1/schema1/fileset1/dir/dir"));
+          extractIdentifier(metalakeName, 
"/catalog1/schema1/fileset1/dir/dir");
       assertEquals(
           NameIdentifier.of(metalakeName, "catalog1", "schema1", "fileset1"), 
identifier10);
 
@@ -673,29 +691,35 @@ public class TestGvfsBase extends GravitinoMockServerBase 
{
       for (int i = 0; i < 1500; i++) {
         longUri.append("/dir");
       }
-      NameIdentifier identifier11 = fs.extractIdentifier(new 
URI(longUri.toString()));
+      NameIdentifier identifier11 = extractIdentifier(metalakeName, 
longUri.toString());
       assertEquals(
           NameIdentifier.of(metalakeName, "catalog1", "schema1", "fileset1"), 
identifier11);
 
-      NameIdentifier identifier12 = fs.extractIdentifier(new 
URI(longUri.delete(0, 14).toString()));
+      NameIdentifier identifier12 =
+          extractIdentifier(metalakeName, longUri.delete(0, 14).toString());
       assertEquals(
           NameIdentifier.of(metalakeName, "catalog1", "schema1", "fileset1"), 
identifier12);
 
+      NameIdentifier identifier13 =
+          extractIdentifier(metalakeName, 
"gvfs://fileset/catalog1/schema1/abc%2Fdef%2Fghi");
+      assertEquals(
+          NameIdentifier.of(metalakeName, "catalog1", "schema1", 
"abc%2Fdef%2Fghi"), identifier13);
+
       assertThrows(
           IllegalArgumentException.class,
-          () -> fs.extractIdentifier(new URI("gvfs://fileset/catalog1/")));
+          () -> extractIdentifier(metalakeName, "gvfs://fileset/catalog1/"));
       assertThrows(
           IllegalArgumentException.class,
-          () -> fs.extractIdentifier(new 
URI("hdfs://fileset/catalog1/schema1/fileset1")));
+          () -> extractIdentifier(metalakeName, 
"hdfs://fileset/catalog1/schema1/fileset1"));
       assertThrows(
           IllegalArgumentException.class,
-          () -> fs.extractIdentifier(new URI("/catalog1/schema1/")));
+          () -> extractIdentifier(metalakeName, "/catalog1/schema1/"));
       assertThrows(
           IllegalArgumentException.class,
-          () -> fs.extractIdentifier(new 
URI("gvfs://fileset/catalog1/schema1/fileset1//")));
+          () -> extractIdentifier(metalakeName, 
"gvfs://fileset/catalog1/schema1/fileset1//"));
       assertThrows(
           IllegalArgumentException.class,
-          () -> fs.extractIdentifier(new 
URI("/catalog1/schema1/fileset1/dir//")));
+          () -> extractIdentifier(metalakeName, 
"/catalog1/schema1/fileset1/dir//"));
     }
   }
 

Reply via email to