This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new 2c63bc9603 [#6604] Improvement(GVFS): extract methods from gvfs to
utils for reusing (#6959)
2c63bc9603 is described below
commit 2c63bc9603142394652f4e66119ba0cad85c07ff
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Apr 16 14:02:27 2025 +0800
[#6604] Improvement(GVFS): extract methods from gvfs to utils for reusing
(#6959)
### What changes were proposed in this pull request?
extract methods from gvfs to utils for reusing
### Why are the changes needed?
Fix: #6604
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
CI pass
Co-authored-by: mchades <[email protected]>
---
clients/client-python/gravitino/filesystem/gvfs.py | 146 +++++-------------
.../gravitino/filesystem/gvfs_utils.py | 165 +++++++++++++++++++++
.../tests/unittests/test_gvfs_with_local.py | 118 +++++++--------
.../hadoop/GravitinoVirtualFileSystem.java | 68 ++-------
.../hadoop/GravitinoVirtualFileSystemUtils.java | 117 ++++++++++++++-
.../filesystem/hadoop/GravitinoMockServerBase.java | 3 +-
.../gravitino/filesystem/hadoop/TestGvfsBase.java | 118 +++++++++------
7 files changed, 457 insertions(+), 278 deletions(-)
diff --git a/clients/client-python/gravitino/filesystem/gvfs.py
b/clients/client-python/gravitino/filesystem/gvfs.py
index 8c1d89c757..1d4d680a24 100644
--- a/clients/client-python/gravitino/filesystem/gvfs.py
+++ b/clients/client-python/gravitino/filesystem/gvfs.py
@@ -14,8 +14,10 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+import importlib
import logging
import os
+import re
import sys
# Disable C0302: Too many lines in module
@@ -24,45 +26,42 @@ import time
from enum import Enum
from pathlib import PurePosixPath
from typing import Dict, Tuple, List
-import re
-import importlib
-import fsspec
+import fsspec
from cachetools import TTLCache, LRUCache
from fsspec import AbstractFileSystem
-from fsspec.implementations.local import LocalFileSystem
from fsspec.implementations.arrow import ArrowFSWrapper
+from fsspec.implementations.local import LocalFileSystem
from fsspec.utils import infer_storage_options
-
from readerwriterlock import rwlock
from gravitino.api.catalog import Catalog
+from gravitino.api.credential.adls_token_credential import ADLSTokenCredential
+from gravitino.api.credential.azure_account_key_credential import (
+ AzureAccountKeyCredential,
+)
from gravitino.api.credential.credential import Credential
+from gravitino.api.credential.gcs_token_credential import GCSTokenCredential
+from gravitino.api.credential.oss_secret_key_credential import
OSSSecretKeyCredential
+from gravitino.api.credential.oss_token_credential import OSSTokenCredential
+from gravitino.api.credential.s3_secret_key_credential import
S3SecretKeyCredential
+from gravitino.api.credential.s3_token_credential import S3TokenCredential
from gravitino.audit.caller_context import CallerContext, CallerContextHolder
from gravitino.audit.fileset_audit_constants import FilesetAuditConstants
from gravitino.audit.fileset_data_operation import FilesetDataOperation
from gravitino.audit.internal_client_type import InternalClientType
-from gravitino.auth.default_oauth2_token_provider import
DefaultOAuth2TokenProvider
-from gravitino.auth.oauth2_token_provider import OAuth2TokenProvider
-from gravitino.auth.simple_auth_provider import SimpleAuthProvider
-from gravitino.client.generic_fileset import GenericFileset
from gravitino.client.fileset_catalog import FilesetCatalog
-from gravitino.client.gravitino_client import GravitinoClient
+from gravitino.client.generic_fileset import GenericFileset
from gravitino.exceptions.base import (
GravitinoRuntimeException,
)
from gravitino.filesystem.gvfs_config import GVFSConfig
-from gravitino.name_identifier import NameIdentifier
-
-from gravitino.api.credential.adls_token_credential import ADLSTokenCredential
-from gravitino.api.credential.azure_account_key_credential import (
- AzureAccountKeyCredential,
+from gravitino.filesystem.gvfs_utils import (
+ create_client,
+ extract_identifier,
+ get_sub_path_from_virtual_path,
)
-from gravitino.api.credential.gcs_token_credential import GCSTokenCredential
-from gravitino.api.credential.oss_secret_key_credential import
OSSSecretKeyCredential
-from gravitino.api.credential.oss_token_credential import OSSTokenCredential
-from gravitino.api.credential.s3_secret_key_credential import
S3SecretKeyCredential
-from gravitino.api.credential.s3_token_credential import S3TokenCredential
+from gravitino.name_identifier import NameIdentifier
logger = logging.getLogger(__name__)
@@ -128,46 +127,7 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
:param kwargs: Extra args for super filesystem
"""
self._metalake = metalake_name
- auth_type = (
- GVFSConfig.SIMPLE_AUTH_TYPE
- if options is None
- else options.get(GVFSConfig.AUTH_TYPE, GVFSConfig.SIMPLE_AUTH_TYPE)
- )
- if auth_type == GVFSConfig.SIMPLE_AUTH_TYPE:
- self._client = GravitinoClient(
- uri=server_uri,
- metalake_name=metalake_name,
- auth_data_provider=SimpleAuthProvider(),
- )
- elif auth_type == GVFSConfig.OAUTH2_AUTH_TYPE:
- oauth2_server_uri = options.get(GVFSConfig.OAUTH2_SERVER_URI)
- self._check_auth_config(
- auth_type, GVFSConfig.OAUTH2_SERVER_URI, oauth2_server_uri
- )
-
- oauth2_credential = options.get(GVFSConfig.OAUTH2_CREDENTIAL)
- self._check_auth_config(
- auth_type, GVFSConfig.OAUTH2_CREDENTIAL, oauth2_credential
- )
-
- oauth2_path = options.get(GVFSConfig.OAUTH2_PATH)
- self._check_auth_config(auth_type, GVFSConfig.OAUTH2_PATH,
oauth2_path)
-
- oauth2_scope = options.get(GVFSConfig.OAUTH2_SCOPE)
- self._check_auth_config(auth_type, GVFSConfig.OAUTH2_SCOPE,
oauth2_scope)
-
- oauth2_token_provider: OAuth2TokenProvider =
DefaultOAuth2TokenProvider(
- oauth2_server_uri, oauth2_credential, oauth2_path, oauth2_scope
- )
- self._client = GravitinoClient(
- uri=server_uri,
- metalake_name=metalake_name,
- auth_data_provider=oauth2_token_provider,
- )
- else:
- raise GravitinoRuntimeException(
- f"Authentication type {auth_type} is not supported."
- )
+ self._client = create_client(options, server_uri, metalake_name)
cache_size = (
GVFSConfig.DEFAULT_CACHE_SIZE
if options is None
@@ -216,10 +176,10 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
actual_path = context_pair.actual_file_location()
storage_type = self._recognize_storage_type(actual_path)
pre_process_path: str = self._pre_process_path(path)
- identifier: NameIdentifier = self._extract_identifier(pre_process_path)
- sub_path: str = self._get_sub_path_from_virtual_path(
- identifier, pre_process_path
+ identifier: NameIdentifier = extract_identifier(
+ self._metalake, pre_process_path
)
+ sub_path: str = get_sub_path_from_virtual_path(identifier,
pre_process_path)
storage_location: str = actual_path[: len(actual_path) - len(sub_path)]
# return entries with details
if detail:
@@ -259,10 +219,10 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
actual_path = context_pair.actual_file_location()
storage_type = self._recognize_storage_type(actual_path)
pre_process_path: str = self._pre_process_path(path)
- identifier: NameIdentifier = self._extract_identifier(pre_process_path)
- sub_path: str = self._get_sub_path_from_virtual_path(
- identifier, pre_process_path
+ identifier: NameIdentifier = extract_identifier(
+ self._metalake, pre_process_path
)
+ sub_path: str = get_sub_path_from_virtual_path(identifier,
pre_process_path)
storage_location: str = actual_path[: len(actual_path) - len(sub_path)]
actual_info: Dict = context_pair.filesystem().info(
self._strip_storage_protocol(storage_type, actual_path)
@@ -294,8 +254,8 @@ class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
"""
src_path = self._pre_process_path(path1)
dst_path = self._pre_process_path(path2)
- src_identifier: NameIdentifier = self._extract_identifier(src_path)
- dst_identifier: NameIdentifier = self._extract_identifier(dst_path)
+ src_identifier: NameIdentifier = extract_identifier(self._metalake,
src_path)
+ dst_identifier: NameIdentifier = extract_identifier(self._metalake,
dst_path)
if src_identifier != dst_identifier:
raise GravitinoRuntimeException(
f"Destination file path identifier: `{dst_identifier}` should
be same with src file path "
@@ -329,8 +289,8 @@ class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
"""
src_path = self._pre_process_path(path1)
dst_path = self._pre_process_path(path2)
- src_identifier: NameIdentifier = self._extract_identifier(src_path)
- dst_identifier: NameIdentifier = self._extract_identifier(dst_path)
+ src_identifier: NameIdentifier = extract_identifier(self._metalake,
src_path)
+ dst_identifier: NameIdentifier = extract_identifier(self._metalake,
dst_path)
if src_identifier != dst_identifier:
raise GravitinoRuntimeException(
f"Destination file path identifier: `{dst_identifier}`"
@@ -705,7 +665,7 @@ class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
:return A fileset context pair
"""
virtual_path: str = self._pre_process_path(virtual_path)
- identifier: NameIdentifier = self._extract_identifier(virtual_path)
+ identifier: NameIdentifier = extract_identifier(self._metalake,
virtual_path)
catalog_ident: NameIdentifier = NameIdentifier.of(
self._metalake, identifier.namespace().level(1)
)
@@ -714,7 +674,7 @@ class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
raise GravitinoRuntimeException(
f"Loaded fileset catalog: {catalog_ident} is null."
)
- sub_path: str = self._get_sub_path_from_virtual_path(identifier,
virtual_path)
+ sub_path: str = get_sub_path_from_virtual_path(identifier,
virtual_path)
context = {
FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION:
operation.name,
FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE:
InternalClientType.PYTHON_GVFS.name,
@@ -739,25 +699,6 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
),
)
- def _extract_identifier(self, path):
- """Extract the fileset identifier from the path.
- :param path: The virtual fileset path
- :return The fileset identifier
- """
- if path is None:
- raise GravitinoRuntimeException(
- "path which need be extracted cannot be null or empty."
- )
-
- match = self._identifier_pattern.match(path)
- if match and len(match.groups()) == 3:
- return NameIdentifier.of(
- self._metalake, match.group(1), match.group(2), match.group(3)
- )
- raise GravitinoRuntimeException(
- f"path: `{path}` doesn't contains valid identifier."
- )
-
@staticmethod
def _get_virtual_location(identifier: NameIdentifier):
"""Get the virtual location of the fileset.
@@ -815,14 +756,6 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
f"Storage type doesn't support now. Path:{path}"
)
- @staticmethod
- def _get_sub_path_from_virtual_path(identifier: NameIdentifier,
virtual_path: str):
- return virtual_path[
- len(
-
f"fileset/{identifier.namespace().level(1)}/{identifier.namespace().level(2)}/{identifier.name()}"
- ) :
- ]
-
@staticmethod
def _strip_storage_protocol(storage_type: StorageType, path: str):
"""Strip the storage protocol from the path.
@@ -867,7 +800,7 @@ class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
if storage_type == StorageType.LOCAL:
return path[len(f"{StorageType.LOCAL.value}:") :]
- ## We need to remove the protocol and accout from the path, for
instance,
+ ## We need to remove the protocol and account from the path, for
instance,
# the path can be converted from 'abfss://container@account/path' to
# 'container/path'.
if storage_type == StorageType.ABS:
@@ -885,19 +818,6 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
f"Storage type:{storage_type} doesn't support now."
)
- @staticmethod
- def _check_auth_config(auth_type: str, config_key: str, config_value: str):
- """Check if the config value is null.
- :param auth_type: The auth type
- :param config_key: The config key
- :param config_value: The config value
- """
- if config_value is None:
- raise GravitinoRuntimeException(
- f"{config_key} should not be null"
- f" if {GVFSConfig.AUTH_TYPE} is set to {auth_type}."
- )
-
def _get_fileset_catalog(self, catalog_ident: NameIdentifier):
read_lock = self._catalog_cache_lock.gen_rlock()
try:
diff --git a/clients/client-python/gravitino/filesystem/gvfs_utils.py
b/clients/client-python/gravitino/filesystem/gvfs_utils.py
new file mode 100644
index 0000000000..b3b7f8f385
--- /dev/null
+++ b/clients/client-python/gravitino/filesystem/gvfs_utils.py
@@ -0,0 +1,165 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import re
+from typing import Dict
+
+from gravitino.client.gravitino_client import GravitinoClient
+from gravitino.auth.default_oauth2_token_provider import
DefaultOAuth2TokenProvider
+from gravitino.auth.oauth2_token_provider import OAuth2TokenProvider
+from gravitino.auth.simple_auth_provider import SimpleAuthProvider
+from gravitino.filesystem.gvfs_config import GVFSConfig
+from gravitino.name_identifier import NameIdentifier
+from gravitino.exceptions.base import GravitinoRuntimeException
+
+_identifier_pattern = re.compile(
+ "^(?:gvfs://)?fileset/([^/]+)/([^/]+)/([^/]+)(?:/[^/]+)*/?$"
+)
+
+
+def _check_auth_config(auth_type: str, config_key: str, config_value: str):
+ """Check if the config value is null.
+ :param auth_type: The auth type
+ :param config_key: The config key
+ :param config_value: The config value
+ """
+ if config_value is None:
+ raise GravitinoRuntimeException(
+ f"{config_key} should not be null"
+ f" if {GVFSConfig.AUTH_TYPE} is set to {auth_type}."
+ )
+
+
+def create_client(
+ options: Dict[str, str],
+ server_uri: str,
+ metalake_name: str,
+ request_headers: dict = None,
+):
+ """Create the Gravitino client.
+ :param options: The options
+ :param server_uri: The server URI
+ :param metalake_name: The metalake name
+ :param request_headers: The request headers
+ :return The Gravitino client
+ """
+ auth_type = (
+ GVFSConfig.SIMPLE_AUTH_TYPE
+ if options is None
+ else options.get(GVFSConfig.AUTH_TYPE, GVFSConfig.SIMPLE_AUTH_TYPE)
+ )
+
+ if auth_type == GVFSConfig.SIMPLE_AUTH_TYPE:
+ return GravitinoClient(
+ uri=server_uri,
+ metalake_name=metalake_name,
+ auth_data_provider=SimpleAuthProvider(),
+ )
+
+ if auth_type == GVFSConfig.OAUTH2_AUTH_TYPE:
+ oauth2_server_uri = options.get(GVFSConfig.OAUTH2_SERVER_URI)
+ _check_auth_config(auth_type, GVFSConfig.OAUTH2_SERVER_URI,
oauth2_server_uri)
+
+ oauth2_credential = options.get(GVFSConfig.OAUTH2_CREDENTIAL)
+ _check_auth_config(auth_type, GVFSConfig.OAUTH2_CREDENTIAL,
oauth2_credential)
+
+ oauth2_path = options.get(GVFSConfig.OAUTH2_PATH)
+ _check_auth_config(auth_type, GVFSConfig.OAUTH2_PATH, oauth2_path)
+
+ oauth2_scope = options.get(GVFSConfig.OAUTH2_SCOPE)
+ _check_auth_config(auth_type, GVFSConfig.OAUTH2_SCOPE, oauth2_scope)
+
+ oauth2_token_provider: OAuth2TokenProvider =
DefaultOAuth2TokenProvider(
+ oauth2_server_uri, oauth2_credential, oauth2_path, oauth2_scope
+ )
+
+ return GravitinoClient(
+ uri=server_uri,
+ metalake_name=metalake_name,
+ auth_data_provider=oauth2_token_provider,
+ )
+
+ raise GravitinoRuntimeException(
+ f"Authentication type {auth_type} is not supported."
+ )
+
+
+def extract_identifier(metalake_name: str, path: str):
+ """Extract the fileset identifier from the path.
+ :param metalake_name: The metalake name of the fileset.
+ :param path: The virtual fileset path, format is
+
[gvfs://fileset]/{fileset_catalog}/{fileset_schema}/{fileset_name}[/sub_path]
+ :return The fileset identifier
+ """
+ if not metalake_name or metalake_name.isspace():
+ raise ValueError("Virtual path cannot be null or empty.")
+
+ if not path or (isinstance(path, str) and path.isspace()):
+ raise GravitinoRuntimeException(
+ "path which need be extracted cannot be null or empty."
+ )
+
+ match = _identifier_pattern.match(str(path))
+ if match and len(match.groups()) == 3:
+ return NameIdentifier.of(
+ metalake_name, match.group(1), match.group(2), match.group(3)
+ )
+ raise GravitinoRuntimeException(
+ f"path: `{path}` doesn't contains valid identifier."
+ )
+
+
+def get_sub_path_from_virtual_path(identifier: NameIdentifier, virtual_path:
str):
+ """Get the sub path from the virtual path.
+ :param identifier: The identifier of the fileset.
+ :param virtual_path: The virtual fileset path, format is
+ fileset/{fileset_catalog}/{fileset_schema}/{fileset_name}[/sub_path]
+ :return The sub path.
+ """
+ if not virtual_path or virtual_path.isspace():
+ raise ValueError("Virtual path cannot be null or empty.")
+
+ gvfs_path_prefix = "fileset/"
+ if not virtual_path.startswith(gvfs_path_prefix):
+ raise ValueError(f"Virtual path should start with
'{gvfs_path_prefix}'")
+
+ prefix = to_gvfs_path_prefix(identifier)
+ if not virtual_path.startswith(prefix):
+ raise ValueError(
+ f"Virtual path '{virtual_path}' doesn't match fileset identifier
'{identifier}'"
+ )
+
+ return virtual_path[len(prefix) :]
+
+
+def to_gvfs_path_prefix(identifier: NameIdentifier):
+ """Convert the fileset identifier to the virtual path prefix.
+ :param identifier: The name identifier of the fileset.
+ :return The virtual path prefix, format is:
fileset/{fileset_catalog}/{fileset_schema}/{fileset_name}
+ """
+ if len(identifier.namespace().levels()) != 3:
+ raise ValueError(
+ f"The namespace of the identifier should have 3 levels, but got
{len(identifier.namespace().levels())}"
+ )
+
+ if not identifier.name():
+ raise ValueError("The identifier name should not be null or empty.")
+
+ return (
+ f"fileset/{identifier.namespace().level(1)}"
+ f"/{identifier.namespace().level(2)}"
+ f"/{identifier.name()}"
+ )
diff --git a/clients/client-python/tests/unittests/test_gvfs_with_local.py
b/clients/client-python/tests/unittests/test_gvfs_with_local.py
index 7ee935e929..a63d3b47f3 100644
--- a/clients/client-python/tests/unittests/test_gvfs_with_local.py
+++ b/clients/client-python/tests/unittests/test_gvfs_with_local.py
@@ -40,6 +40,7 @@ from gravitino.exceptions.base import (
BadRequestException,
)
from gravitino.filesystem.gvfs_config import GVFSConfig
+from gravitino.filesystem.gvfs_utils import extract_identifier
from tests.unittests import mock_base
from tests.unittests.auth.mock_base import (
mock_jwt,
@@ -58,6 +59,8 @@ def generate_unique_random_string(length):
@mock_base.mock_data
class TestLocalFilesystem(unittest.TestCase):
+ _metalake_name: str = "metalake_demo"
+ _server_uri = "http://localhost:9090"
_local_base_dir_path: str = "file:/tmp/fileset"
_fileset_dir: str = (
f"{_local_base_dir_path}/{generate_unique_random_string(10)}/fileset_catalog/tmp"
@@ -86,8 +89,8 @@ class TestLocalFilesystem(unittest.TestCase):
self.assertTrue(local_fs.exists(fileset_storage_location))
options = {GVFSConfig.CACHE_SIZE: 1,
GVFSConfig.CACHE_EXPIRED_TIME: 1}
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090",
- metalake_name="metalake_demo",
+ server_uri=self._server_uri,
+ metalake_name=self._metalake_name,
options=options,
skip_instance_cache=True,
)
@@ -104,8 +107,8 @@ class TestLocalFilesystem(unittest.TestCase):
user = "test_gvfs"
os.environ["user.name"] = user
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090",
- metalake_name="metalake_demo",
+ server_uri=self._server_uri,
+ metalake_name=self._metalake_name,
options=options,
skip_instance_cache=True,
)
@@ -152,8 +155,8 @@ class TestLocalFilesystem(unittest.TestCase):
local_fs.touch(sub_file_path)
self.assertTrue(local_fs.exists(sub_file_path))
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090",
- metalake_name="metalake_demo",
+ server_uri=self._server_uri,
+ metalake_name=self._metalake_name,
options=fs_options,
skip_instance_cache=True,
)
@@ -167,8 +170,8 @@ class TestLocalFilesystem(unittest.TestCase):
):
with self.assertRaises(IllegalArgumentException):
gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090",
- metalake_name="metalake_demo",
+ server_uri=self._server_uri,
+ metalake_name=self._metalake_name,
options=fs_options,
skip_instance_cache=True,
)
@@ -180,8 +183,8 @@ class TestLocalFilesystem(unittest.TestCase):
):
with self.assertRaises(BadRequestException):
gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090",
- metalake_name="metalake_demo",
+ server_uri=self._server_uri,
+ metalake_name=self._metalake_name,
options=fs_options,
skip_instance_cache=True,
)
@@ -204,8 +207,8 @@ class TestLocalFilesystem(unittest.TestCase):
self.assertTrue(local_fs.exists(sub_file_path))
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090",
- metalake_name="metalake_demo",
+ server_uri=self._server_uri,
+ metalake_name=self._metalake_name,
skip_instance_cache=True,
)
self.assertTrue(fs.exists(fileset_virtual_location))
@@ -248,8 +251,8 @@ class TestLocalFilesystem(unittest.TestCase):
self.assertTrue(local_fs.exists(sub_file_path))
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090",
- metalake_name="metalake_demo",
+ server_uri=self._server_uri,
+ metalake_name=self._metalake_name,
skip_instance_cache=True,
)
with patch(
@@ -290,8 +293,8 @@ class TestLocalFilesystem(unittest.TestCase):
self.assertTrue(local_fs.exists(sub_file_path))
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090",
- metalake_name="metalake_demo",
+ server_uri=self._server_uri,
+ metalake_name=self._metalake_name,
skip_instance_cache=True,
)
with patch(
@@ -330,8 +333,8 @@ class TestLocalFilesystem(unittest.TestCase):
f.write(b"test_file_1")
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090",
- metalake_name="metalake_demo",
+ server_uri=self._server_uri,
+ metalake_name=self._metalake_name,
skip_instance_cache=True,
)
with patch(
@@ -382,8 +385,8 @@ class TestLocalFilesystem(unittest.TestCase):
self.assertTrue(local_fs.exists(another_dir_path))
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090",
- metalake_name="metalake_demo",
+ server_uri=self._server_uri,
+ metalake_name=self._metalake_name,
skip_instance_cache=True,
)
with patch(
@@ -452,8 +455,8 @@ class TestLocalFilesystem(unittest.TestCase):
self.assertTrue(local_fs.exists(sub_dir_path))
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090",
- metalake_name="metalake_demo",
+ server_uri=self._server_uri,
+ metalake_name=self._metalake_name,
skip_instance_cache=True,
)
with patch(
@@ -504,8 +507,8 @@ class TestLocalFilesystem(unittest.TestCase):
self.assertTrue(local_fs.exists(sub_dir_path))
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090",
- metalake_name="metalake_demo",
+ server_uri=self._server_uri,
+ metalake_name=self._metalake_name,
skip_instance_cache=True,
)
with patch(
@@ -551,8 +554,8 @@ class TestLocalFilesystem(unittest.TestCase):
self.assertTrue(local_fs.exists(sub_dir_path))
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090",
- metalake_name="metalake_demo",
+ server_uri=self._server_uri,
+ metalake_name=self._metalake_name,
skip_instance_cache=True,
)
with patch(
@@ -598,8 +601,8 @@ class TestLocalFilesystem(unittest.TestCase):
self.assertTrue(local_fs.exists(sub_dir_path))
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090",
- metalake_name="metalake_demo",
+ server_uri=self._server_uri,
+ metalake_name=self._metalake_name,
skip_instance_cache=True,
)
with patch(
@@ -646,8 +649,8 @@ class TestLocalFilesystem(unittest.TestCase):
self.assertTrue(local_fs.exists(sub_dir_path))
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090",
- metalake_name="metalake_demo",
+ server_uri=self._server_uri,
+ metalake_name=self._metalake_name,
skip_instance_cache=True,
)
with patch(
@@ -695,8 +698,8 @@ class TestLocalFilesystem(unittest.TestCase):
self.assertTrue(local_fs.exists(sub_dir_path))
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090",
- metalake_name="metalake_demo",
+ server_uri=self._server_uri,
+ metalake_name=self._metalake_name,
skip_instance_cache=True,
)
with patch(
@@ -733,8 +736,8 @@ class TestLocalFilesystem(unittest.TestCase):
self.assertTrue(local_fs.exists(sub_dir_path))
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090",
- metalake_name="metalake_demo",
+ server_uri=self._server_uri,
+ metalake_name=self._metalake_name,
skip_instance_cache=True,
)
with patch(
@@ -764,8 +767,8 @@ class TestLocalFilesystem(unittest.TestCase):
self.assertTrue(local_fs.exists(sub_dir_path))
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090",
- metalake_name="metalake_demo",
+ server_uri=self._server_uri,
+ metalake_name=self._metalake_name,
skip_instance_cache=True,
)
with patch(
@@ -799,8 +802,8 @@ class TestLocalFilesystem(unittest.TestCase):
self.assertTrue(local_fs.exists(sub_dir_path))
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090",
- metalake_name="metalake_demo",
+ server_uri=self._server_uri,
+ metalake_name=self._metalake_name,
skip_instance_cache=True,
)
with patch(
@@ -851,8 +854,8 @@ class TestLocalFilesystem(unittest.TestCase):
self.assertTrue(local_fs.exists(sub_dir_path))
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090",
- metalake_name="metalake_demo",
+ server_uri=self._server_uri,
+ metalake_name=self._metalake_name,
skip_instance_cache=True,
)
with patch(
@@ -899,8 +902,8 @@ class TestLocalFilesystem(unittest.TestCase):
def test_convert_actual_path(self, *mock_methods):
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090",
- metalake_name="metalake_demo",
+ server_uri=self._server_uri,
+ metalake_name=self._metalake_name,
skip_instance_cache=True,
)
storage_location = "hdfs://localhost:8090/fileset/test_f1"
@@ -923,8 +926,8 @@ class TestLocalFilesystem(unittest.TestCase):
# test convert actual local path
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090",
- metalake_name="metalake_demo",
+ server_uri=self._server_uri,
+ metalake_name=self._metalake_name,
skip_instance_cache=True,
)
storage_location = "file:/tmp/fileset/test_f1"
@@ -981,8 +984,8 @@ class TestLocalFilesystem(unittest.TestCase):
def test_convert_info(self, *mock_methods):
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090",
- metalake_name="metalake_demo",
+ server_uri=self._server_uri,
+ metalake_name=self._metalake_name,
skip_instance_cache=True,
)
# test actual path not start with storage location
@@ -1013,8 +1016,8 @@ class TestLocalFilesystem(unittest.TestCase):
# test convert actual local path
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090",
- metalake_name="metalake_demo",
+ server_uri=self._server_uri,
+ metalake_name=self._metalake_name,
skip_instance_cache=True,
)
# test actual path not start with storage location
@@ -1044,21 +1047,18 @@ class TestLocalFilesystem(unittest.TestCase):
)
def test_extract_identifier(self, *mock_methods):
- fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090",
- metalake_name="metalake_demo",
- skip_instance_cache=True,
- )
with self.assertRaises(GravitinoRuntimeException):
- fs._extract_identifier(path=None)
+ extract_identifier(self._metalake_name, path=None)
invalid_path = "s3://bucket_1/test_catalog/schema/fileset/ttt"
with self.assertRaises(GravitinoRuntimeException):
- fs._extract_identifier(path=invalid_path)
+ extract_identifier(self._metalake_name, path=invalid_path)
valid_path = "fileset/test_catalog/schema/fileset/ttt"
- identifier: NameIdentifier = fs._extract_identifier(path=valid_path)
- self.assertEqual("metalake_demo", identifier.namespace().level(0))
+ identifier: NameIdentifier = extract_identifier(
+ self._metalake_name, path=valid_path
+ )
+ self.assertEqual(self._metalake_name, identifier.namespace().level(0))
self.assertEqual("test_catalog", identifier.namespace().level(1))
self.assertEqual("schema", identifier.namespace().level(2))
self.assertEqual("fileset", identifier.name())
@@ -1168,8 +1168,8 @@ class TestLocalFilesystem(unittest.TestCase):
actual_path = fileset_storage_location
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090",
- metalake_name="metalake_demo",
+ server_uri=self._server_uri,
+ metalake_name=self._metalake_name,
skip_instance_cache=True,
)
with patch(
diff --git
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
index 10000b3da2..92dea08b75 100644
---
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
+++
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
@@ -19,6 +19,9 @@
package org.apache.gravitino.filesystem.hadoop;
import static
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CURRENT_LOCATION_NAME;
+import static
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemUtils.extractIdentifier;
+import static
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemUtils.getConfigMap;
+import static
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemUtils.getSubPathFromGvfsPath;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
@@ -42,8 +45,6 @@ import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
@@ -98,12 +99,6 @@ public class GravitinoVirtualFileSystem extends FileSystem {
private Cache<Pair<NameIdentifier, String>, FileSystem>
internalFileSystemCache;
private ScheduledThreadPoolExecutor internalFileSystemCleanScheduler;
- // The pattern is used to match gvfs path. The scheme prefix
(gvfs://fileset) is optional.
- // The following path can be match:
- // gvfs://fileset/fileset_catalog/fileset_schema/fileset1/file.txt
- // /fileset_catalog/fileset_schema/fileset1/sub_dir/
- private static final Pattern IDENTIFIER_PATTERN =
-
Pattern.compile("^(?:gvfs://fileset)?/([^/]+)/([^/]+)/([^/]+)(?>/[^/]+)*/?$");
private static final String SLASH = "/";
private final Map<String, FileSystemProvider> fileSystemProvidersMap =
Maps.newHashMap();
private String currentLocationEnvVar;
@@ -258,26 +253,10 @@ public class GravitinoVirtualFileSystem extends
FileSystem {
return fileStatus;
}
- @VisibleForTesting
- NameIdentifier extractIdentifier(URI virtualUri) {
- String virtualPath = virtualUri.toString();
- Preconditions.checkArgument(
- StringUtils.isNotBlank(virtualPath),
- "Uri which need be extracted cannot be null or empty.");
-
- Matcher matcher = IDENTIFIER_PATTERN.matcher(virtualPath);
- Preconditions.checkArgument(
- matcher.matches() && matcher.groupCount() == 3,
- "URI %s doesn't contains valid identifier",
- virtualPath);
-
- return NameIdentifier.of(metalakeName, matcher.group(1), matcher.group(2),
matcher.group(3));
- }
-
private FilesetContextPair getFilesetContext(Path virtualPath,
FilesetDataOperation operation) {
- NameIdentifier identifier = extractIdentifier(virtualPath.toUri());
+ NameIdentifier identifier = extractIdentifier(metalakeName,
virtualPath.toString());
String virtualPathString = virtualPath.toString();
- String subPath = getSubPathFromVirtualPath(identifier, virtualPathString);
+ String subPath = getSubPathFromGvfsPath(identifier, virtualPathString);
NameIdentifier catalogIdent = NameIdentifier.of(metalakeName,
identifier.namespace().level(1));
FilesetCatalog filesetCatalog =
@@ -402,31 +381,6 @@ public class GravitinoVirtualFileSystem extends FileSystem
{
}
}
- private Map<String, String> getConfigMap(Configuration configuration) {
- Map<String, String> maps = Maps.newHashMap();
- configuration.forEach(entry -> maps.put(entry.getKey(), entry.getValue()));
- return maps;
- }
-
- private String getSubPathFromVirtualPath(NameIdentifier identifier, String
virtualPathString) {
- return
virtualPathString.startsWith(GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX)
- ? virtualPathString.substring(
- String.format(
- "%s/%s/%s/%s",
-
GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX,
- identifier.namespace().level(1),
- identifier.namespace().level(2),
- identifier.name())
- .length())
- : virtualPathString.substring(
- String.format(
- "/%s/%s/%s",
- identifier.namespace().level(1),
- identifier.namespace().level(2),
- identifier.name())
- .length());
- }
-
@Override
public URI getUri() {
return this.uri;
@@ -484,8 +438,8 @@ public class GravitinoVirtualFileSystem extends FileSystem {
public boolean rename(Path src, Path dst) throws IOException {
// Fileset identifier is not allowed to be renamed, only its
subdirectories can be renamed
// which not in the storage location of the fileset;
- NameIdentifier srcIdentifier = extractIdentifier(src.toUri());
- NameIdentifier dstIdentifier = extractIdentifier(dst.toUri());
+ NameIdentifier srcIdentifier = extractIdentifier(metalakeName,
src.toString());
+ NameIdentifier dstIdentifier = extractIdentifier(metalakeName,
dst.toString());
Preconditions.checkArgument(
srcIdentifier.equals(dstIdentifier),
"Destination path fileset identifier: %s should be same with src path
fileset identifier: %s.",
@@ -510,8 +464,8 @@ public class GravitinoVirtualFileSystem extends FileSystem {
public FileStatus getFileStatus(Path path) throws IOException {
FilesetContextPair context = getFilesetContext(path,
FilesetDataOperation.GET_FILE_STATUS);
FileStatus fileStatus =
context.getFileSystem().getFileStatus(context.getActualFileLocation());
- NameIdentifier identifier = extractIdentifier(path.toUri());
- String subPath = getSubPathFromVirtualPath(identifier, path.toString());
+ NameIdentifier identifier = extractIdentifier(metalakeName,
path.toString());
+ String subPath = getSubPathFromGvfsPath(identifier, path.toString());
String storageLocation =
context
.getActualFileLocation()
@@ -526,8 +480,8 @@ public class GravitinoVirtualFileSystem extends FileSystem {
FilesetContextPair context = getFilesetContext(path,
FilesetDataOperation.LIST_STATUS);
FileStatus[] fileStatusResults =
context.getFileSystem().listStatus(context.getActualFileLocation());
- NameIdentifier identifier = extractIdentifier(path.toUri());
- String subPath = getSubPathFromVirtualPath(identifier, path.toString());
+ NameIdentifier identifier = extractIdentifier(metalakeName,
path.toString());
+ String subPath = getSubPathFromGvfsPath(identifier, path.toString());
String storageLocation =
context
.getActualFileLocation()
diff --git
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemUtils.java
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemUtils.java
index 8a0d1d8743..181843cfd5 100644
---
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemUtils.java
+++
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemUtils.java
@@ -20,8 +20,13 @@
package org.apache.gravitino.filesystem.hadoop;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
import java.io.File;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.client.DefaultOAuth2TokenProvider;
import org.apache.gravitino.client.GravitinoClient;
import org.apache.gravitino.client.KerberosTokenProvider;
@@ -30,6 +35,27 @@ import org.apache.hadoop.conf.Configuration;
/** Utility class for Gravitino Virtual File System. */
public class GravitinoVirtualFileSystemUtils {
+ // The pattern is used to match gvfs path. The scheme prefix
(gvfs://fileset) is optional.
+ // The following path can be match:
+ // gvfs://fileset/fileset_catalog/fileset_schema/fileset1/file.txt
+ // /fileset_catalog/fileset_schema/fileset1/sub_dir/
+ private static final Pattern IDENTIFIER_PATTERN =
+
Pattern.compile("^(?:gvfs://fileset)?/([^/]+)/([^/]+)/([^/]+)(?>/[^/]+)*/?$");
+
+ /**
+ * Transform the Hadoop configuration to a map.
+ *
+ * @param configuration The Hadoop configuration.
+ * @return The configuration map.
+ */
+ public static Map<String, String> getConfigMap(Configuration configuration) {
+ Map<String, String> maps = Maps.newHashMap();
+ // Don't use entry.getKey() directly in the lambda, because it cannot
+ // handle variable expansion in the Configuration values.
+ configuration.forEach(entry -> maps.put(entry.getKey(),
configuration.get(entry.getKey())));
+ return maps;
+ }
+
/**
* Get Gravitino client by the configuration.
*
@@ -37,6 +63,16 @@ public class GravitinoVirtualFileSystemUtils {
* @return The Gravitino client.
*/
public static GravitinoClient createClient(Configuration configuration) {
+ return createClient(getConfigMap(configuration));
+ }
+
+ /**
+ * Get Gravitino client by the configuration.
+ *
+ * @param configuration The configuration for the Gravitino client.
+ * @return The Gravitino client.
+ */
+ public static GravitinoClient createClient(Map<String, String>
configuration) {
// initialize the Gravitino client
String serverUri =
configuration.get(GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_SERVER_URI_KEY);
@@ -48,7 +84,7 @@ public class GravitinoVirtualFileSystemUtils {
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_SERVER_URI_KEY);
String authType =
- configuration.get(
+ configuration.getOrDefault(
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_AUTH_TYPE_KEY,
GravitinoVirtualFileSystemConfiguration.SIMPLE_AUTH_TYPE);
if
(authType.equalsIgnoreCase(GravitinoVirtualFileSystemConfiguration.SIMPLE_AUTH_TYPE))
{
@@ -140,6 +176,85 @@ public class GravitinoVirtualFileSystemUtils {
}
}
+ /**
+ * Extract the full identifier of the fileset from the virtual path.
+ *
+ * @param metalakeName The metalake name of the fileset.
+ * @param gvfsPath The virtual path, format is
+ *
[gvfs://fileset]/{fileset_catalog}/{fileset_schema}/{fileset_name}[/sub_path]
+ * @return The full identifier of the fileset.
+ * @throws IllegalArgumentException If the URI doesn't contain a valid
identifier.
+ */
+ public static NameIdentifier extractIdentifier(String metalakeName, String
gvfsPath)
+ throws IllegalArgumentException {
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(metalakeName), "The metalake name cannot be
null or empty.");
+
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(gvfsPath),
+ "The gvfsPath which need be extracted cannot be null or empty.");
+
+ Matcher matcher = IDENTIFIER_PATTERN.matcher(gvfsPath);
+ Preconditions.checkArgument(
+ matcher.matches() && matcher.groupCount() == 3,
+ "URI %s doesn't contains valid identifier",
+ gvfsPath);
+
+ return NameIdentifier.of(metalakeName, matcher.group(1), matcher.group(2),
matcher.group(3));
+ }
+
+ /**
+ * Get the sub path from the virtual path.
+ *
+ * @param identifier The identifier of the fileset.
+ * @param gvfsPath The virtual path, format is
+ *
[gvfs://fileset]/{fileset_catalog}/{fileset_schema}/{fileset_name}[/sub_path]
+ * @return The sub path.
+ * @throws IllegalArgumentException If the virtual path doesn't match the
identifier.
+ */
+ public static String getSubPathFromGvfsPath(NameIdentifier identifier,
String gvfsPath)
+ throws IllegalArgumentException {
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(gvfsPath), "Virtual path cannot be null or
empty.");
+
+ Preconditions.checkArgument(
+ identifier.namespace().levels().length == 3,
+ "The identifier should have 3 levels, but got %s",
+ identifier.namespace().levels().length);
+
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(identifier.name()),
+ "The identifier name should not be null or empty.");
+
+ Preconditions.checkArgument(
+
gvfsPath.startsWith(GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX)
+ || gvfsPath.startsWith("/"),
+ "Virtual path should start with '%s' or '/'",
+ GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX);
+
+ String prefix =
+
gvfsPath.startsWith(GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX)
+ ? String.format(
+ "%s/%s/%s/%s",
+ GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX,
+ identifier.namespace().level(1),
+ identifier.namespace().level(2),
+ identifier.name())
+ : String.format(
+ "/%s/%s/%s",
+ identifier.namespace().level(1),
+ identifier.namespace().level(2),
+ identifier.name());
+
+ Preconditions.checkArgument(
+ gvfsPath.startsWith(prefix),
+ "Virtual path '%s' doesn't match fileset identifier '%s'",
+ gvfsPath,
+ identifier);
+
+ return gvfsPath.substring(prefix.length());
+ }
+
private static void checkAuthConfig(String authType, String configKey,
String configValue) {
Preconditions.checkArgument(
StringUtils.isNotBlank(configValue),
diff --git
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/GravitinoMockServerBase.java
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/GravitinoMockServerBase.java
index 531cb37eb1..0a4e1cd908 100644
---
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/GravitinoMockServerBase.java
+++
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/GravitinoMockServerBase.java
@@ -41,6 +41,7 @@ import org.apache.gravitino.dto.responses.MetalakeResponse;
import org.apache.gravitino.dto.responses.VersionResponse;
import org.apache.gravitino.file.Fileset;
import org.apache.gravitino.json.JsonUtils;
+import org.apache.gravitino.rest.RESTUtils;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.Method;
import org.junit.jupiter.api.AfterAll;
@@ -184,7 +185,7 @@ public abstract class GravitinoMockServerBase {
String filesetPath =
String.format(
"/api/metalakes/%s/catalogs/%s/schemas/%s/filesets/%s",
- metalakeName, catalogName, schemaName, filesetName);
+ metalakeName, catalogName, schemaName,
RESTUtils.encodeString(filesetName));
Map<String, String> locations =
location == null
? Collections.emptyMap()
diff --git
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
index 6403a0200e..be7a7d35f5 100644
---
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
+++
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
@@ -19,6 +19,7 @@
package org.apache.gravitino.filesystem.hadoop;
import static org.apache.gravitino.file.Fileset.LOCATION_NAME_UNKNOWN;
+import static
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemUtils.extractIdentifier;
import static org.apache.hc.core5.http.HttpStatus.SC_OK;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -30,7 +31,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
-import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -41,6 +41,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.dto.AuditDTO;
import org.apache.gravitino.dto.credential.CredentialDTO;
@@ -62,8 +63,8 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
-import org.testcontainers.shaded.org.apache.commons.lang3.tuple.Pair;
public class TestGvfsBase extends GravitinoMockServerBase {
protected static final String GVFS_IMPL_CLASS =
GravitinoVirtualFileSystem.class.getName();
@@ -221,16 +222,20 @@ public class TestGvfsBase extends GravitinoMockServerBase
{
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testCreate(boolean withScheme) throws IOException {
- String filesetName = "testCreate";
+ @CsvSource({
+ "true, testCreate",
+ "false, testCreate",
+ "true, testCreate%2Fabc",
+ "false, testCreate%2Fabc"
+ })
+ public void testCreate(boolean withScheme, String filesetName) throws
IOException {
Path managedFilesetPath =
FileSystemTestUtils.createFilesetPath(catalogName, schemaName,
filesetName, true);
Path localPath = FileSystemTestUtils.createLocalDirPrefix(catalogName,
schemaName, filesetName);
String locationPath =
String.format(
"/api/metalakes/%s/catalogs/%s/schemas/%s/filesets/%s/location",
- metalakeName, catalogName, schemaName, filesetName);
+ metalakeName, catalogName, schemaName,
RESTUtils.encodeString(filesetName));
try (FileSystem gravitinoFileSystem =
managedFilesetPath.getFileSystem(conf);
FileSystem localFileSystem = localPath.getFileSystem(conf)) {
FileSystemTestUtils.mkdirs(localPath, localFileSystem);
@@ -272,17 +277,21 @@ public class TestGvfsBase extends GravitinoMockServerBase
{
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
+ @CsvSource({
+ "true, testAppend",
+ "false, testAppend",
+ "true, testAppend%2Fabc",
+ "false, testAppend%2Fabc"
+ })
@Disabled("Append operation is not supported in LocalFileSystem. We can't
test it now.")
- public void testAppend(boolean withScheme) throws IOException {
- String filesetName = "testAppend";
+ public void testAppend(boolean withScheme, String filesetName) throws
IOException {
Path managedFilesetPath =
FileSystemTestUtils.createFilesetPath(catalogName, schemaName,
filesetName, true);
Path localPath = FileSystemTestUtils.createLocalDirPrefix(catalogName,
schemaName, filesetName);
String locationPath =
String.format(
"/api/metalakes/%s/catalogs/%s/schemas/%s/filesets/%s/location",
- metalakeName, catalogName, schemaName, filesetName);
+ metalakeName, catalogName, schemaName,
RESTUtils.encodeString(filesetName));
try (FileSystem gravitinoFileSystem =
managedFilesetPath.getFileSystem(conf);
FileSystem localFileSystem = localPath.getFileSystem(conf)) {
FileSystemTestUtils.mkdirs(localPath, localFileSystem);
@@ -355,16 +364,20 @@ public class TestGvfsBase extends GravitinoMockServerBase
{
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testRename(boolean withScheme) throws IOException {
- String filesetName = "testRename";
+ @CsvSource({
+ "true, testRename",
+ "false, testRename",
+ "true, testRename%2Fabc",
+ "false, testRename%2Fabc"
+ })
+ public void testRename(boolean withScheme, String filesetName) throws
IOException {
Path managedFilesetPath =
FileSystemTestUtils.createFilesetPath(catalogName, schemaName,
filesetName, true);
Path localPath = FileSystemTestUtils.createLocalDirPrefix(catalogName,
schemaName, filesetName);
String locationPath =
String.format(
"/api/metalakes/%s/catalogs/%s/schemas/%s/filesets/%s/location",
- metalakeName, catalogName, schemaName, filesetName);
+ metalakeName, catalogName, schemaName,
RESTUtils.encodeString(filesetName));
try (FileSystem gravitinoFileSystem =
managedFilesetPath.getFileSystem(conf);
FileSystem localFileSystem = localPath.getFileSystem(conf)) {
FileSystemTestUtils.mkdirs(localPath, localFileSystem);
@@ -433,16 +446,20 @@ public class TestGvfsBase extends GravitinoMockServerBase
{
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testDelete(boolean withScheme) throws IOException {
- String filesetName = "testDelete";
+ @CsvSource({
+ "true, testDelete",
+ "false, testDelete",
+ "true, testDelete%2Fabc",
+ "false, testDelete%2Fabc"
+ })
+ public void testDelete(boolean withScheme, String filesetName) throws
IOException {
Path managedFilesetPath =
FileSystemTestUtils.createFilesetPath(catalogName, schemaName,
filesetName, true);
Path localPath = FileSystemTestUtils.createLocalDirPrefix(catalogName,
schemaName, filesetName);
String locationPath =
String.format(
"/api/metalakes/%s/catalogs/%s/schemas/%s/filesets/%s/location",
- metalakeName, catalogName, schemaName, filesetName);
+ metalakeName, catalogName, schemaName,
RESTUtils.encodeString(filesetName));
try (FileSystem gravitinoFileSystem =
managedFilesetPath.getFileSystem(conf);
FileSystem localFileSystem = localPath.getFileSystem(conf)) {
FileSystemTestUtils.mkdirs(localPath, localFileSystem);
@@ -482,16 +499,16 @@ public class TestGvfsBase extends GravitinoMockServerBase
{
}
}
- @Test
- public void testGetStatus() throws IOException {
- String filesetName = "testGetStatus";
+ @ParameterizedTest
+ @ValueSource(strings = {"testGetFileStatus", "testGetFileStatus%2Fabc"})
+ public void testGetStatus(String filesetName) throws IOException {
Path managedFilesetPath =
FileSystemTestUtils.createFilesetPath(catalogName, schemaName,
filesetName, true);
Path localPath = FileSystemTestUtils.createLocalDirPrefix(catalogName,
schemaName, filesetName);
String locationPath =
String.format(
"/api/metalakes/%s/catalogs/%s/schemas/%s/filesets/%s/location",
- metalakeName, catalogName, schemaName, filesetName);
+ metalakeName, catalogName, schemaName,
RESTUtils.encodeString(filesetName));
try (FileSystem gravitinoFileSystem =
managedFilesetPath.getFileSystem(conf);
FileSystem localFileSystem = localPath.getFileSystem(conf)) {
FileSystemTestUtils.mkdirs(localPath, localFileSystem);
@@ -520,16 +537,16 @@ public class TestGvfsBase extends GravitinoMockServerBase
{
}
}
- @Test
- public void testListStatus() throws IOException {
- String filesetName = "testListStatus";
+ @ParameterizedTest
+ @ValueSource(strings = {"testListStatus", "testListStatus%2Fabc"})
+ public void testListStatus(String filesetName) throws IOException {
Path managedFilesetPath =
FileSystemTestUtils.createFilesetPath(catalogName, schemaName,
filesetName, true);
Path localPath = FileSystemTestUtils.createLocalDirPrefix(catalogName,
schemaName, filesetName);
String locationPath =
String.format(
"/api/metalakes/%s/catalogs/%s/schemas/%s/filesets/%s/location",
- metalakeName, catalogName, schemaName, filesetName);
+ metalakeName, catalogName, schemaName,
RESTUtils.encodeString(filesetName));
try (FileSystem gravitinoFileSystem =
managedFilesetPath.getFileSystem(conf);
FileSystem localFileSystem = localPath.getFileSystem(conf)) {
FileSystemTestUtils.mkdirs(localPath, localFileSystem);
@@ -576,16 +593,16 @@ public class TestGvfsBase extends GravitinoMockServerBase
{
}
}
- @Test
- public void testMkdirs() throws IOException {
- String filesetName = "testMkdirs";
+ @ParameterizedTest
+ @ValueSource(strings = {"testMkdirs", "testMkdirs%2Fabc"})
+ public void testMkdirs(String filesetName) throws IOException {
Path managedFilesetPath =
FileSystemTestUtils.createFilesetPath(catalogName, schemaName,
filesetName, true);
Path localPath = FileSystemTestUtils.createLocalDirPrefix(catalogName,
schemaName, filesetName);
String locationPath =
String.format(
"/api/metalakes/%s/catalogs/%s/schemas/%s/filesets/%s/location",
- metalakeName, catalogName, schemaName, filesetName);
+ metalakeName, catalogName, schemaName,
RESTUtils.encodeString(filesetName));
try (FileSystem gravitinoFileSystem =
managedFilesetPath.getFileSystem(conf);
FileSystem localFileSystem = localPath.getFileSystem(conf)) {
FileSystemTestUtils.mkdirs(localPath, localFileSystem);
@@ -632,40 +649,41 @@ public class TestGvfsBase extends GravitinoMockServerBase
{
try (GravitinoVirtualFileSystem fs =
(GravitinoVirtualFileSystem) managedFilesetPath.getFileSystem(conf)) {
NameIdentifier identifier =
- fs.extractIdentifier(new
URI("gvfs://fileset/catalog1/schema1/fileset1"));
+ extractIdentifier(metalakeName,
"gvfs://fileset/catalog1/schema1/fileset1");
assertEquals(NameIdentifier.of(metalakeName, "catalog1", "schema1",
"fileset1"), identifier);
NameIdentifier identifier2 =
- fs.extractIdentifier(new
URI("gvfs://fileset/catalog1/schema1/fileset1/"));
+ extractIdentifier(metalakeName,
"gvfs://fileset/catalog1/schema1/fileset1/");
assertEquals(NameIdentifier.of(metalakeName, "catalog1", "schema1",
"fileset1"), identifier2);
NameIdentifier identifier3 =
- fs.extractIdentifier(new
URI("gvfs://fileset/catalog1/schema1/fileset1/files"));
+ extractIdentifier(metalakeName,
"gvfs://fileset/catalog1/schema1/fileset1/files");
assertEquals(NameIdentifier.of(metalakeName, "catalog1", "schema1",
"fileset1"), identifier3);
NameIdentifier identifier4 =
- fs.extractIdentifier(new
URI("gvfs://fileset/catalog1/schema1/fileset1/dir/dir"));
+ extractIdentifier(metalakeName,
"gvfs://fileset/catalog1/schema1/fileset1/dir/dir");
assertEquals(NameIdentifier.of(metalakeName, "catalog1", "schema1",
"fileset1"), identifier4);
NameIdentifier identifier5 =
- fs.extractIdentifier(new
URI("gvfs://fileset/catalog1/schema1/fileset1/dir/dir/"));
+ extractIdentifier(metalakeName,
"gvfs://fileset/catalog1/schema1/fileset1/dir/dir/");
assertEquals(NameIdentifier.of(metalakeName, "catalog1", "schema1",
"fileset1"), identifier5);
- NameIdentifier identifier6 = fs.extractIdentifier(new
URI("/catalog1/schema1/fileset1"));
+ NameIdentifier identifier6 = extractIdentifier(metalakeName,
"/catalog1/schema1/fileset1");
assertEquals(NameIdentifier.of(metalakeName, "catalog1", "schema1",
"fileset1"), identifier6);
- NameIdentifier identifier7 = fs.extractIdentifier(new
URI("/catalog1/schema1/fileset1/"));
+ NameIdentifier identifier7 = extractIdentifier(metalakeName,
"/catalog1/schema1/fileset1/");
assertEquals(NameIdentifier.of(metalakeName, "catalog1", "schema1",
"fileset1"), identifier7);
- NameIdentifier identifier8 = fs.extractIdentifier(new
URI("/catalog1/schema1/fileset1/dir"));
+ NameIdentifier identifier8 =
+ extractIdentifier(metalakeName, "/catalog1/schema1/fileset1/dir");
assertEquals(NameIdentifier.of(metalakeName, "catalog1", "schema1",
"fileset1"), identifier8);
NameIdentifier identifier9 =
- fs.extractIdentifier(new URI("/catalog1/schema1/fileset1/dir/dir/"));
+ extractIdentifier(metalakeName,
"/catalog1/schema1/fileset1/dir/dir/");
assertEquals(NameIdentifier.of(metalakeName, "catalog1", "schema1",
"fileset1"), identifier9);
NameIdentifier identifier10 =
- fs.extractIdentifier(new URI("/catalog1/schema1/fileset1/dir/dir"));
+ extractIdentifier(metalakeName,
"/catalog1/schema1/fileset1/dir/dir");
assertEquals(
NameIdentifier.of(metalakeName, "catalog1", "schema1", "fileset1"),
identifier10);
@@ -673,29 +691,35 @@ public class TestGvfsBase extends GravitinoMockServerBase
{
for (int i = 0; i < 1500; i++) {
longUri.append("/dir");
}
- NameIdentifier identifier11 = fs.extractIdentifier(new
URI(longUri.toString()));
+ NameIdentifier identifier11 = extractIdentifier(metalakeName,
longUri.toString());
assertEquals(
NameIdentifier.of(metalakeName, "catalog1", "schema1", "fileset1"),
identifier11);
- NameIdentifier identifier12 = fs.extractIdentifier(new
URI(longUri.delete(0, 14).toString()));
+ NameIdentifier identifier12 =
+ extractIdentifier(metalakeName, longUri.delete(0, 14).toString());
assertEquals(
NameIdentifier.of(metalakeName, "catalog1", "schema1", "fileset1"),
identifier12);
+ NameIdentifier identifier13 =
+ extractIdentifier(metalakeName,
"gvfs://fileset/catalog1/schema1/abc%2Fdef%2Fghi");
+ assertEquals(
+ NameIdentifier.of(metalakeName, "catalog1", "schema1",
"abc%2Fdef%2Fghi"), identifier13);
+
assertThrows(
IllegalArgumentException.class,
- () -> fs.extractIdentifier(new URI("gvfs://fileset/catalog1/")));
+ () -> extractIdentifier(metalakeName, "gvfs://fileset/catalog1/"));
assertThrows(
IllegalArgumentException.class,
- () -> fs.extractIdentifier(new
URI("hdfs://fileset/catalog1/schema1/fileset1")));
+ () -> extractIdentifier(metalakeName,
"hdfs://fileset/catalog1/schema1/fileset1"));
assertThrows(
IllegalArgumentException.class,
- () -> fs.extractIdentifier(new URI("/catalog1/schema1/")));
+ () -> extractIdentifier(metalakeName, "/catalog1/schema1/"));
assertThrows(
IllegalArgumentException.class,
- () -> fs.extractIdentifier(new
URI("gvfs://fileset/catalog1/schema1/fileset1//")));
+ () -> extractIdentifier(metalakeName,
"gvfs://fileset/catalog1/schema1/fileset1//"));
assertThrows(
IllegalArgumentException.class,
- () -> fs.extractIdentifier(new
URI("/catalog1/schema1/fileset1/dir//")));
+ () -> extractIdentifier(metalakeName,
"/catalog1/schema1/fileset1/dir//"));
}
}