This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 89f939543b [#6953] feat(GVFS): Refactor GVFS to handle fileset not
existed scenario (#6954)
89f939543b is described below
commit 89f939543bad793011d3f7c7a48305695817e072
Author: mchades <[email protected]>
AuthorDate: Wed Apr 16 14:11:07 2025 +0800
[#6953] feat(GVFS): Refactor GVFS to handle fileset not existed scenario
(#6954)
### What changes were proposed in this pull request?
Refactor the GVFS to handle fileset not created scenario and align with
FS semantics.
### Why are the changes needed?
Fix: #6953
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
tests added
Co-authored-by: Jerry Shao <[email protected]>
---
clients/client-python/gravitino/filesystem/gvfs.py | 133 ++++-
.../tests/unittests/test_gvfs_without_fileset.py | 112 +++++
.../hadoop/FilesetPathNotFoundException.java | 39 ++
.../hadoop/GravitinoVirtualFileSystem.java | 539 ++++++++++++---------
.../GravitinoVirtualFileSystemConfiguration.java | 6 +
.../gravitino/filesystem/hadoop/TestGvfsBase.java | 142 +++---
6 files changed, 667 insertions(+), 304 deletions(-)
diff --git a/clients/client-python/gravitino/filesystem/gvfs.py
b/clients/client-python/gravitino/filesystem/gvfs.py
index 1d4d680a24..8b59739135 100644
--- a/clients/client-python/gravitino/filesystem/gvfs.py
+++ b/clients/client-python/gravitino/filesystem/gvfs.py
@@ -25,8 +25,7 @@ import sys
import time
from enum import Enum
from pathlib import PurePosixPath
-from typing import Dict, Tuple, List
-
+from typing import Dict, Tuple, List, Optional
import fsspec
from cachetools import TTLCache, LRUCache
from fsspec import AbstractFileSystem
@@ -54,6 +53,9 @@ from gravitino.client.fileset_catalog import FilesetCatalog
from gravitino.client.generic_fileset import GenericFileset
from gravitino.exceptions.base import (
GravitinoRuntimeException,
+ NoSuchCatalogException,
+ CatalogNotInUseException,
+ NoSuchFilesetException,
)
from gravitino.filesystem.gvfs_config import GVFSConfig
from gravitino.filesystem.gvfs_utils import (
@@ -79,6 +81,10 @@ class StorageType(Enum):
ABS = "abfss"
+class FilesetPathNotFoundError(FileNotFoundError):
+ """Exception raised when the catalog, schema or fileset is not found in
the GVFS path."""
+
+
class FilesetContextPair:
"""A context object that holds the information about the actual file
location and the file system which used in
the GravitinoVirtualFileSystem's operations.
@@ -173,6 +179,10 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
context_pair: FilesetContextPair = self._get_fileset_context(
path, FilesetDataOperation.LIST_STATUS
)
+ self._throw_fileset_path_not_found_error_if(
+ context_pair is None, path, FilesetDataOperation.LIST_STATUS
+ )
+
actual_path = context_pair.actual_file_location()
storage_type = self._recognize_storage_type(actual_path)
pre_process_path: str = self._pre_process_path(path)
@@ -216,6 +226,10 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
context_pair: FilesetContextPair = self._get_fileset_context(
path, FilesetDataOperation.GET_FILE_STATUS
)
+ self._throw_fileset_path_not_found_error_if(
+ context_pair is None, path, FilesetDataOperation.GET_FILE_STATUS
+ )
+
actual_path = context_pair.actual_file_location()
storage_type = self._recognize_storage_type(actual_path)
pre_process_path: str = self._pre_process_path(path)
@@ -240,6 +254,9 @@ class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
context_pair: FilesetContextPair = self._get_fileset_context(
path, FilesetDataOperation.EXISTS
)
+ if context_pair is None:
+ return False
+
actual_path = context_pair.actual_file_location()
storage_type = self._recognize_storage_type(actual_path)
return context_pair.filesystem().exists(
@@ -264,6 +281,10 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
src_context_pair: FilesetContextPair = self._get_fileset_context(
src_path, FilesetDataOperation.COPY_FILE
)
+ self._throw_fileset_path_not_found_error_if(
+ src_context_pair is None, src_path, FilesetDataOperation.COPY_FILE
+ )
+
src_actual_path = src_context_pair.actual_file_location()
dst_context_pair: FilesetContextPair = self._get_fileset_context(
@@ -296,11 +317,17 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
f"Destination file path identifier: `{dst_identifier}`"
f" should be same with src file path identifier:
`{src_identifier}`."
)
+
src_context_pair: FilesetContextPair = self._get_fileset_context(
src_path, FilesetDataOperation.RENAME
)
+ self._throw_fileset_path_not_found_error_if(
+ src_context_pair is None, src_path, FilesetDataOperation.RENAME
+ )
+
src_actual_path = src_context_pair.actual_file_location()
storage_type = self._recognize_storage_type(src_actual_path)
+
dst_context_pair: FilesetContextPair = self._get_fileset_context(
dst_path, FilesetDataOperation.RENAME
)
@@ -350,6 +377,10 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
context_pair: FilesetContextPair = self._get_fileset_context(
path, FilesetDataOperation.DELETE
)
+ self._throw_fileset_path_not_found_error_if(
+ context_pair is None, path, FilesetDataOperation.DELETE
+ )
+
actual_path = context_pair.actual_file_location()
storage_type = self._recognize_storage_type(actual_path)
fs = context_pair.filesystem()
@@ -371,6 +402,10 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
context_pair: FilesetContextPair = self._get_fileset_context(
path, FilesetDataOperation.DELETE
)
+ self._throw_fileset_path_not_found_error_if(
+ context_pair is None, path, FilesetDataOperation.DELETE
+ )
+
actual_path = context_pair.actual_file_location()
storage_type = self._recognize_storage_type(actual_path)
context_pair.filesystem().rm_file(
@@ -386,6 +421,10 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
context_pair: FilesetContextPair = self._get_fileset_context(
path, FilesetDataOperation.DELETE
)
+ self._throw_fileset_path_not_found_error_if(
+ context_pair is None, path, FilesetDataOperation.DELETE
+ )
+
actual_path = context_pair.actual_file_location()
storage_type = self._recognize_storage_type(actual_path)
context_pair.filesystem().rmdir(
@@ -416,9 +455,20 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
data_operation = FilesetDataOperation.OPEN_AND_APPEND
else:
data_operation = FilesetDataOperation.OPEN
+
context_pair: FilesetContextPair = self._get_fileset_context(
path, data_operation
)
+ if context_pair is None:
+ if mode in ("w", "wb", "x", "xb", "a", "ab"):
+ raise OSError(
+ f"Fileset is not found for path: {path} for operation
OPEN. This "
+ f"may be caused by fileset related metadata not found or
not in use "
+ f"in Gravitino,"
+ )
+
+ raise FilesetPathNotFoundError(f"Path {path} not found for
operation OPEN.")
+
actual_path = context_pair.actual_file_location()
storage_type = self._recognize_storage_type(actual_path)
return context_pair.filesystem().open(
@@ -441,6 +491,13 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
context_pair: FilesetContextPair = self._get_fileset_context(
path, FilesetDataOperation.MKDIRS
)
+ if context_pair is None:
+ raise OSError(
+ f"Fileset is not found for path: {path} for operation MKDIRS.
This "
+ f"may be caused by fileset related metadata not found or not
in use "
+ f"in Gravitino,"
+ )
+
actual_path = context_pair.actual_file_location()
storage_type = self._recognize_storage_type(actual_path)
context_pair.filesystem().mkdir(
@@ -457,6 +514,13 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
context_pair: FilesetContextPair = self._get_fileset_context(
path, FilesetDataOperation.MKDIRS
)
+ if context_pair is None:
+ raise OSError(
+ f"Fileset is not found for path: {path} for operation MKDIRS.
This "
+ f"may be caused by fileset related metadata not found or not
in use "
+ f"in Gravitino,"
+ )
+
actual_path = context_pair.actual_file_location()
storage_type = self._recognize_storage_type(actual_path)
context_pair.filesystem().makedirs(
@@ -473,6 +537,10 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
context_pair: FilesetContextPair = self._get_fileset_context(
path, FilesetDataOperation.CREATED_TIME
)
+ self._throw_fileset_path_not_found_error_if(
+ context_pair is None, path, FilesetDataOperation.CREATED_TIME
+ )
+
actual_path = context_pair.actual_file_location()
storage_type = self._recognize_storage_type(actual_path)
if storage_type == StorageType.LOCAL:
@@ -491,6 +559,10 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
context_pair: FilesetContextPair = self._get_fileset_context(
path, FilesetDataOperation.MODIFIED_TIME
)
+ self._throw_fileset_path_not_found_error_if(
+ context_pair is None, path, FilesetDataOperation.MODIFIED_TIME
+ )
+
actual_path = context_pair.actual_file_location()
storage_type = self._recognize_storage_type(actual_path)
return context_pair.filesystem().modified(
@@ -508,6 +580,10 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
context_pair: FilesetContextPair = self._get_fileset_context(
path, FilesetDataOperation.CAT_FILE
)
+ self._throw_fileset_path_not_found_error_if(
+ context_pair is None, path, FilesetDataOperation.CAT_FILE
+ )
+
actual_path = context_pair.actual_file_location()
storage_type = self._recognize_storage_type(actual_path)
return context_pair.filesystem().cat_file(
@@ -531,9 +607,14 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
raise GravitinoRuntimeException(
"Doesn't support copy a remote gvfs file to an another remote
file."
)
+
context_pair: FilesetContextPair = self._get_fileset_context(
rpath, FilesetDataOperation.GET_FILE
)
+ self._throw_fileset_path_not_found_error_if(
+ context_pair is None, rpath, FilesetDataOperation.GET_FILE
+ )
+
actual_path = context_pair.actual_file_location()
storage_type = self._recognize_storage_type(actual_path)
context_pair.filesystem().get_file(
@@ -658,7 +739,9 @@ class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
"mtime": last_modified,
}
- def _get_fileset_context(self, virtual_path: str, operation:
FilesetDataOperation):
+ def _get_fileset_context(
+ self, virtual_path: str, operation: FilesetDataOperation
+ ) -> Optional[FilesetContextPair]:
"""Get a fileset context from the cache or the Gravitino server
:param virtual_path: The virtual path
:param operation: The data operation
@@ -669,7 +752,17 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
catalog_ident: NameIdentifier = NameIdentifier.of(
self._metalake, identifier.namespace().level(1)
)
- fileset_catalog = self._get_fileset_catalog(catalog_ident)
+
+ try:
+ fileset_catalog = self._get_fileset_catalog(catalog_ident)
+ except (NoSuchCatalogException, CatalogNotInUseException):
+ logger.warning(
+ "Cannot get fileset catalog by identifier: %s",
+ catalog_ident,
+ exc_info=True,
+ )
+ return None
+
if fileset_catalog is None:
raise GravitinoRuntimeException(
f"Loaded fileset catalog: {catalog_ident} is null."
@@ -681,13 +774,23 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
}
caller_context: CallerContext = CallerContext(context)
CallerContextHolder.set(caller_context)
- actual_file_location: (
- str
- ) = fileset_catalog.as_fileset_catalog().get_file_location(
- NameIdentifier.of(identifier.namespace().level(2),
identifier.name()),
- sub_path,
- self._current_location_name,
- )
+
+ try:
+ actual_file_location: (
+ str
+ ) = fileset_catalog.as_fileset_catalog().get_file_location(
+ NameIdentifier.of(identifier.namespace().level(2),
identifier.name()),
+ sub_path,
+ self._current_location_name,
+ )
+ except NoSuchFilesetException:
+ logger.warning(
+ "Cannot get file location by identifier: %s, sub_path: %s",
+ identifier,
+ sub_path,
+ exc_info=True,
+ )
+ return None
return FilesetContextPair(
actual_file_location,
@@ -1165,5 +1268,13 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
)
return time.time() * 1000 + (expire_time - time.time() * 1000) * ratio
+ def _throw_fileset_path_not_found_error_if(
+ self, condition: bool, path: str, op: FilesetDataOperation
+ ):
+ if condition:
+ raise FilesetPathNotFoundError(
+ f"Path [{path}] not found for operation [{op}]"
+ )
+
fsspec.register_implementation(PROTOCOL_NAME, GravitinoVirtualFileSystem)
diff --git a/clients/client-python/tests/unittests/test_gvfs_without_fileset.py
b/clients/client-python/tests/unittests/test_gvfs_without_fileset.py
new file mode 100644
index 0000000000..bc72b224fd
--- /dev/null
+++ b/clients/client-python/tests/unittests/test_gvfs_without_fileset.py
@@ -0,0 +1,112 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+from unittest.mock import patch
+
+from gravitino.filesystem.gvfs import (
+ GravitinoVirtualFileSystem,
+ FilesetPathNotFoundError,
+)
+
+
+class TestGVFSWithoutFileset(unittest.TestCase):
+
+ @patch("gravitino.filesystem.gvfs.create_client")
+ @patch.object(GravitinoVirtualFileSystem, "_get_fileset_context")
+ def test_when_fileset_not_created(
+ self, mock_get_fileset_context, mock_create_client
+ ):
+ mock_create_client.return_value = None
+ mock_get_fileset_context.return_value = None
+
+ fs = GravitinoVirtualFileSystem(
+ server_uri="http://localhost:9090",
+ metalake_name="metalake_demo",
+ skip_instance_cache=True,
+ )
+
+ self.assertRaises(
+ FilesetPathNotFoundError, fs.ls,
"fileset/test_catalog/schema/fileset"
+ )
+ self.assertRaises(
+ FilesetPathNotFoundError, fs.info,
"fileset/test_catalog/schema/fileset"
+ )
+
+ self.assertFalse(fs.exists("fileset/test_catalog/schema/fileset"))
+
+ with self.assertRaises(FileNotFoundError):
+ fs.cp_file(
+ "fileset/test_catalog/schema/fileset/a",
+ "fileset/test_catalog/schema/fileset/b",
+ )
+
+ self.assertRaises(
+ FilesetPathNotFoundError, fs.rm,
"fileset/test_catalog/schema/fileset/a"
+ )
+ self.assertRaises(
+ FilesetPathNotFoundError,
+ fs.rm_file,
+ "fileset/test_catalog/schema/fileset/a",
+ )
+ self.assertRaises(
+ FilesetPathNotFoundError, fs.rmdir,
"fileset/test_catalog/schema/fileset/a"
+ )
+
+ self.assertRaises(OSError, fs.open,
"fileset/test_catalog/schema/fileset", "w")
+ self.assertRaises(OSError, fs.open,
"fileset/test_catalog/schema/fileset", "wb")
+ self.assertRaises(OSError, fs.open,
"fileset/test_catalog/schema/fileset", "a")
+ self.assertRaises(OSError, fs.open,
"fileset/test_catalog/schema/fileset", "ab")
+ self.assertRaises(OSError, fs.open,
"fileset/test_catalog/schema/fileset", "x")
+ self.assertRaises(OSError, fs.open,
"fileset/test_catalog/schema/fileset", "xb")
+
+ self.assertRaises(
+ FilesetPathNotFoundError,
+ fs.open,
+ "fileset/test_catalog/schema/fileset/a",
+ "r",
+ )
+ self.assertRaises(
+ FilesetPathNotFoundError,
+ fs.open,
+ "fileset/test_catalog/schema/fileset/a",
+ "rb",
+ )
+
+ self.assertRaises(OSError, fs.mkdir,
"fileset/test_catalog/schema/fileset/a")
+ self.assertRaises(
+ OSError, fs.makedirs, "fileset/test_catalog/schema/fileset/a/b"
+ )
+
+ self.assertRaises(
+ FilesetPathNotFoundError,
+ fs.created,
+ "fileset/test_catalog/schema/fileset/a",
+ )
+ self.assertRaises(
+ FilesetPathNotFoundError,
+ fs.modified,
+ "fileset/test_catalog/schema/fileset/a",
+ )
+
+ self.assertRaises(
+ FilesetPathNotFoundError,
+ fs.cat_file,
+ "fileset/test_catalog/schema/fileset/a",
+ )
+
+ with self.assertRaises(FileNotFoundError):
+ fs.get("fileset/test_catalog/schema/fileset/a", "file://tmp/a")
diff --git
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/FilesetPathNotFoundException.java
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/FilesetPathNotFoundException.java
new file mode 100644
index 0000000000..f9f26808fb
--- /dev/null
+++
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/FilesetPathNotFoundException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.filesystem.hadoop;
+
+import java.io.FileNotFoundException;
+
+/** Exception thrown when the catalog, schema or fileset not existed for a
given GVFS path. */
+public class FilesetPathNotFoundException extends FileNotFoundException {
+
+ /** Creates a new FilesetPathNotFoundException instance. */
+ public FilesetPathNotFoundException() {
+ super();
+ }
+
+ /**
+ * Creates a new FilesetPathNotFoundException instance with the given
message.
+ *
+ * @param message The message of the exception.
+ */
+ public FilesetPathNotFoundException(String message) {
+ super(message);
+ }
+}
diff --git
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
index 92dea08b75..dd2763933e 100644
---
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
+++
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
@@ -28,6 +28,7 @@ import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -61,7 +62,10 @@ import
org.apache.gravitino.catalog.hadoop.fs.GravitinoFileSystemCredentialsProv
import org.apache.gravitino.catalog.hadoop.fs.SupportsCredentialVending;
import org.apache.gravitino.client.GravitinoClient;
import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.exceptions.CatalogNotInUseException;
import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.apache.gravitino.exceptions.NoSuchCatalogException;
+import org.apache.gravitino.exceptions.NoSuchFilesetException;
import org.apache.gravitino.file.Fileset;
import org.apache.gravitino.file.FilesetCatalog;
import org.apache.gravitino.storage.AzureProperties;
@@ -87,7 +91,8 @@ import org.slf4j.LoggerFactory;
* users to access the underlying storage.
*/
public class GravitinoVirtualFileSystem extends FileSystem {
- private static final Logger Logger =
LoggerFactory.getLogger(GravitinoVirtualFileSystem.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(GravitinoVirtualFileSystem.class);
+
private Path workingDirectory;
private URI uri;
private GravitinoClient client;
@@ -98,6 +103,7 @@ public class GravitinoVirtualFileSystem extends FileSystem {
// identifier has four levels, the first level is metalake name.
private Cache<Pair<NameIdentifier, String>, FileSystem>
internalFileSystemCache;
private ScheduledThreadPoolExecutor internalFileSystemCleanScheduler;
+ private long defaultBlockSize;
private static final String SLASH = "/";
private final Map<String, FileSystemProvider> fileSystemProvidersMap =
Maps.newHashMap();
@@ -167,6 +173,10 @@ public class GravitinoVirtualFileSystem extends FileSystem
{
this.workingDirectory = new Path(name);
this.uri = URI.create(name.getScheme() + "://" + name.getAuthority());
+ this.defaultBlockSize =
+ configuration.getLong(
+ GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_BLOCK_SIZE,
+
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_BLOCK_SIZE_DEFAULT);
setConf(configuration);
super.initialize(uri, getConf());
@@ -177,59 +187,6 @@ public class GravitinoVirtualFileSystem extends FileSystem
{
return internalFileSystemCache;
}
- private void initializeFileSystemCache(int maxCapacity, long
expireAfterAccess) {
- // Since Caffeine does not ensure that removalListener will be involved
after expiration
- // We use a scheduler with one thread to clean up expired clients.
- this.internalFileSystemCleanScheduler =
- new ScheduledThreadPoolExecutor(1,
newDaemonThreadFactory("gvfs-filesystem-cache-cleaner"));
- Caffeine<Object, Object> cacheBuilder =
- Caffeine.newBuilder()
- .maximumSize(maxCapacity)
-
.scheduler(Scheduler.forScheduledExecutorService(internalFileSystemCleanScheduler))
- .removalListener(
- (key, value, cause) -> {
- FileSystem fs = (FileSystem) value;
- if (fs != null) {
- try {
- fs.close();
- } catch (IOException e) {
- Logger.error("Cannot close the file system for fileset:
{}", key, e);
- }
- }
- });
- if (expireAfterAccess > 0) {
- cacheBuilder.expireAfterAccess(expireAfterAccess, TimeUnit.MILLISECONDS);
- }
- this.internalFileSystemCache = cacheBuilder.build();
- }
-
- private void initializeCatalogCache() {
- // Since Caffeine does not ensure that removalListener will be involved
after expiration
- // We use a scheduler with one thread to clean up expired clients.
- this.catalogCleanScheduler =
- new ScheduledThreadPoolExecutor(1,
newDaemonThreadFactory("gvfs-catalog-cache-cleaner"));
- // In most scenarios, it will not read so many catalog filesets at the
same time, so we can just
- // set a default value for this cache.
- this.catalogCache =
- Caffeine.newBuilder()
- .maximumSize(100)
-
.scheduler(Scheduler.forScheduledExecutorService(catalogCleanScheduler))
- .build();
- }
-
- private ThreadFactory newDaemonThreadFactory(String name) {
- return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(name +
"-%d").build();
- }
-
- private String getVirtualLocation(NameIdentifier identifier, boolean
withScheme) {
- return String.format(
- "%s/%s/%s/%s",
- withScheme ?
GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX : "",
- identifier.namespace().level(1),
- identifier.namespace().level(2),
- identifier.name());
- }
-
@VisibleForTesting
FileStatus convertFileStatusPathPrefix(
FileStatus fileStatus, String actualPrefix, String virtualPrefix) {
@@ -239,7 +196,7 @@ public class GravitinoVirtualFileSystem extends FileSystem {
"Path %s doesn't start with prefix \"%s\".",
filePath,
actualPrefix);
- // if the storage location is end with "/",
+ // if the storage location ends with "/",
// we should truncate this to avoid replace issues.
Path path =
new Path(
@@ -253,134 +210,6 @@ public class GravitinoVirtualFileSystem extends
FileSystem {
return fileStatus;
}
- private FilesetContextPair getFilesetContext(Path virtualPath,
FilesetDataOperation operation) {
- NameIdentifier identifier = extractIdentifier(metalakeName,
virtualPath.toString());
- String virtualPathString = virtualPath.toString();
- String subPath = getSubPathFromGvfsPath(identifier, virtualPathString);
-
- NameIdentifier catalogIdent = NameIdentifier.of(metalakeName,
identifier.namespace().level(1));
- FilesetCatalog filesetCatalog =
- catalogCache.get(
- catalogIdent, ident ->
client.loadCatalog(catalogIdent.name()).asFilesetCatalog());
- Catalog catalog = (Catalog) filesetCatalog;
- Preconditions.checkArgument(
- filesetCatalog != null, String.format("Loaded fileset catalog: %s is
null.", catalogIdent));
-
- Map<String, String> contextMap = Maps.newHashMap();
- contextMap.put(
- FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE,
- InternalClientType.HADOOP_GVFS.name());
- contextMap.put(FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION,
operation.name());
- CallerContext callerContext =
CallerContext.builder().withContext(contextMap).build();
- CallerContext.CallerContextHolder.set(callerContext);
-
- String actualFileLocation =
- filesetCatalog.getFileLocation(
- NameIdentifier.of(identifier.namespace().level(2),
identifier.name()),
- subPath,
- currentLocationName);
-
- Path filePath = new Path(actualFileLocation);
- URI uri = filePath.toUri();
- // we cache the fs for the same scheme, so we can reuse it
- String scheme = uri.getScheme();
- Preconditions.checkArgument(
- StringUtils.isNotBlank(scheme), "Scheme of the actual file location
cannot be null.");
- FileSystem fs =
- internalFileSystemCache.get(
- Pair.of(identifier, currentLocationName),
- ident -> {
- try {
- FileSystemProvider provider =
fileSystemProvidersMap.get(scheme);
- if (provider == null) {
- throw new GravitinoRuntimeException(
- "Unsupported file system scheme: %s for %s.",
- scheme,
GravitinoVirtualFileSystemConfiguration.GVFS_SCHEME);
- }
-
- // Reset the FileSystem service loader to make sure the
FileSystem will reload the
- // service file systems, this is a temporary solution to fix
the issue
- // https://github.com/apache/gravitino/issues/5609
- resetFileSystemServiceLoader(scheme);
-
- Map<String, String> necessaryPropertyFromCatalog =
- catalog.properties().entrySet().stream()
- .filter(
- property ->
-
CATALOG_NECESSARY_PROPERTIES_TO_KEEP.contains(property.getKey()))
- .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
-
- Map<String, String> totalProperty =
Maps.newHashMap(necessaryPropertyFromCatalog);
- totalProperty.putAll(getConfigMap(getConf()));
-
- totalProperty.putAll(getCredentialProperties(provider,
catalog, identifier));
-
- return provider.getFileSystem(filePath, totalProperty);
- } catch (IOException ioe) {
- throw new GravitinoRuntimeException(
- ioe,
- "Exception occurs when create new FileSystem for actual
uri: %s, msg: %s",
- uri,
- ioe.getMessage());
- }
- });
-
- return new FilesetContextPair(new Path(actualFileLocation), fs);
- }
-
- private Map<String, String> getCredentialProperties(
- FileSystemProvider fileSystemProvider, Catalog catalog, NameIdentifier
filesetIdentifier) {
- // Do not support credential vending, we do not need to add any credential
properties.
- if (!(fileSystemProvider instanceof SupportsCredentialVending)) {
- return ImmutableMap.of();
- }
-
- ImmutableMap.Builder<String, String> mapBuilder = ImmutableMap.builder();
- try {
- Fileset fileset =
- catalog
- .asFilesetCatalog()
- .loadFileset(
- NameIdentifier.of(
- filesetIdentifier.namespace().level(2),
filesetIdentifier.name()));
- Credential[] credentials =
fileset.supportsCredentials().getCredentials();
- if (credentials.length > 0) {
- mapBuilder.put(
- GravitinoFileSystemCredentialsProvider.GVFS_CREDENTIAL_PROVIDER,
-
DefaultGravitinoFileSystemCredentialsProvider.class.getCanonicalName());
- mapBuilder.put(
- GravitinoFileSystemCredentialsProvider.GVFS_NAME_IDENTIFIER,
- filesetIdentifier.toString());
-
- SupportsCredentialVending supportsCredentialVending =
- (SupportsCredentialVending) fileSystemProvider;
-
mapBuilder.putAll(supportsCredentialVending.getFileSystemCredentialConf(credentials));
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- return mapBuilder.build();
- }
-
- private void resetFileSystemServiceLoader(String fsScheme) {
- try {
- Map<String, Class<? extends FileSystem>> serviceFileSystems =
- (Map<String, Class<? extends FileSystem>>)
- FieldUtils.getField(FileSystem.class, "SERVICE_FILE_SYSTEMS",
true).get(null);
-
- if (serviceFileSystems.containsKey(fsScheme)) {
- return;
- }
-
- // Set this value to false so that FileSystem will reload the service
file systems when
- // needed.
- FieldUtils.getField(FileSystem.class, "FILE_SYSTEMS_LOADED",
true).set(null, false);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
@Override
public URI getUri() {
return this.uri;
@@ -393,15 +222,25 @@ public class GravitinoVirtualFileSystem extends
FileSystem {
@Override
public synchronized void setWorkingDirectory(Path newDir) {
- FilesetContextPair context = getFilesetContext(newDir,
FilesetDataOperation.SET_WORKING_DIR);
-
context.getFileSystem().setWorkingDirectory(context.getActualFileLocation());
+ Optional<FilesetContextPair> context =
+ getFilesetContext(newDir, FilesetDataOperation.SET_WORKING_DIR);
+ try {
+ throwFilesetPathNotFoundExceptionIf(
+ () -> !context.isPresent(), newDir,
FilesetDataOperation.SET_WORKING_DIR);
+ } catch (FilesetPathNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+
+
context.get().getFileSystem().setWorkingDirectory(context.get().getActualFileLocation());
this.workingDirectory = newDir;
}
@Override
public FSDataInputStream open(Path path, int bufferSize) throws IOException {
- FilesetContextPair context = getFilesetContext(path,
FilesetDataOperation.OPEN);
- return context.getFileSystem().open(context.getActualFileLocation(),
bufferSize);
+ Optional<FilesetContextPair> context = getFilesetContext(path,
FilesetDataOperation.OPEN);
+ throwFilesetPathNotFoundExceptionIf(
+ () -> !context.isPresent(), path, FilesetDataOperation.OPEN);
+ return
context.get().getFileSystem().open(context.get().getActualFileLocation(),
bufferSize);
}
@Override
@@ -414,11 +253,21 @@ public class GravitinoVirtualFileSystem extends
FileSystem {
long blockSize,
Progressable progress)
throws IOException {
- FilesetContextPair context = getFilesetContext(path,
FilesetDataOperation.CREATE);
+ Optional<FilesetContextPair> context = getFilesetContext(path,
FilesetDataOperation.CREATE);
+ if (!context.isPresent()) {
+ throw new IOException(
+ "Fileset is not found for path: "
+ + path
+ + " for operation CREATE. "
+ + "This may be caused by fileset related metadata not found or
not in use in "
+ + "Gravitino, please check the fileset metadata in Gravitino.");
+ }
+
return context
+ .get()
.getFileSystem()
.create(
- context.getActualFileLocation(),
+ context.get().getActualFileLocation(),
permission,
overwrite,
bufferSize,
@@ -430,8 +279,13 @@ public class GravitinoVirtualFileSystem extends FileSystem
{
@Override
public FSDataOutputStream append(Path path, int bufferSize, Progressable
progress)
throws IOException {
- FilesetContextPair context = getFilesetContext(path,
FilesetDataOperation.APPEND);
- return context.getFileSystem().append(context.getActualFileLocation(),
bufferSize, progress);
+ Optional<FilesetContextPair> context = getFilesetContext(path,
FilesetDataOperation.APPEND);
+ throwFilesetPathNotFoundExceptionIf(
+ () -> !context.isPresent(), path, FilesetDataOperation.APPEND);
+ return context
+ .get()
+ .getFileSystem()
+ .append(context.get().getActualFileLocation(), bufferSize, progress);
}
@Override
@@ -442,51 +296,75 @@ public class GravitinoVirtualFileSystem extends
FileSystem {
NameIdentifier dstIdentifier = extractIdentifier(metalakeName,
dst.toString());
Preconditions.checkArgument(
srcIdentifier.equals(dstIdentifier),
- "Destination path fileset identifier: %s should be same with src path
fileset identifier: %s.",
+ "Destination path fileset identifier: %s should be same with src path "
+ + "fileset identifier: %s.",
srcIdentifier,
dstIdentifier);
- FilesetContextPair srcContext = getFilesetContext(src,
FilesetDataOperation.RENAME);
- FilesetContextPair dstContext = getFilesetContext(dst,
FilesetDataOperation.RENAME);
+ Optional<FilesetContextPair> srcContext = getFilesetContext(src,
FilesetDataOperation.RENAME);
+ throwFilesetPathNotFoundExceptionIf(
+ () -> !srcContext.isPresent(), src, FilesetDataOperation.RENAME);
+
+ Optional<FilesetContextPair> dstContext = getFilesetContext(dst,
FilesetDataOperation.RENAME);
+ // Because src context and dst context are the same, so if src context is
present, dst context
+ // must be present.
return srcContext
+ .get()
.getFileSystem()
- .rename(srcContext.getActualFileLocation(),
dstContext.getActualFileLocation());
+ .rename(srcContext.get().getActualFileLocation(),
dstContext.get().getActualFileLocation());
}
@Override
public boolean delete(Path path, boolean recursive) throws IOException {
- FilesetContextPair context = getFilesetContext(path,
FilesetDataOperation.DELETE);
- return context.getFileSystem().delete(context.getActualFileLocation(),
recursive);
+ Optional<FilesetContextPair> context = getFilesetContext(path,
FilesetDataOperation.DELETE);
+ if (context.isPresent()) {
+ return
context.get().getFileSystem().delete(context.get().getActualFileLocation(),
recursive);
+ } else {
+ return false;
+ }
}
@Override
public FileStatus getFileStatus(Path path) throws IOException {
- FilesetContextPair context = getFilesetContext(path,
FilesetDataOperation.GET_FILE_STATUS);
- FileStatus fileStatus =
context.getFileSystem().getFileStatus(context.getActualFileLocation());
+ Optional<FilesetContextPair> context =
+ getFilesetContext(path, FilesetDataOperation.GET_FILE_STATUS);
+ throwFilesetPathNotFoundExceptionIf(
+ () -> !context.isPresent(), path,
FilesetDataOperation.GET_FILE_STATUS);
+
+ FileStatus fileStatus =
+
context.get().getFileSystem().getFileStatus(context.get().getActualFileLocation());
NameIdentifier identifier = extractIdentifier(metalakeName,
path.toString());
String subPath = getSubPathFromGvfsPath(identifier, path.toString());
String storageLocation =
context
+ .get()
.getActualFileLocation()
.toString()
- .substring(0, context.getActualFileLocation().toString().length()
- subPath.length());
+ .substring(
+ 0, context.get().getActualFileLocation().toString().length() -
subPath.length());
return convertFileStatusPathPrefix(
fileStatus, storageLocation, getVirtualLocation(identifier, true));
}
@Override
public FileStatus[] listStatus(Path path) throws IOException {
- FilesetContextPair context = getFilesetContext(path,
FilesetDataOperation.LIST_STATUS);
+ Optional<FilesetContextPair> context =
+ getFilesetContext(path, FilesetDataOperation.LIST_STATUS);
+ throwFilesetPathNotFoundExceptionIf(
+ () -> !context.isPresent(), path, FilesetDataOperation.LIST_STATUS);
+
FileStatus[] fileStatusResults =
- context.getFileSystem().listStatus(context.getActualFileLocation());
+
context.get().getFileSystem().listStatus(context.get().getActualFileLocation());
NameIdentifier identifier = extractIdentifier(metalakeName,
path.toString());
String subPath = getSubPathFromGvfsPath(identifier, path.toString());
String storageLocation =
context
+ .get()
.getActualFileLocation()
.toString()
- .substring(0, context.getActualFileLocation().toString().length()
- subPath.length());
+ .substring(
+ 0, context.get().getActualFileLocation().toString().length() -
subPath.length());
return Arrays.stream(fileStatusResults)
.map(
fileStatus ->
@@ -497,20 +375,35 @@ public class GravitinoVirtualFileSystem extends
FileSystem {
@Override
public boolean mkdirs(Path path, FsPermission permission) throws IOException
{
- FilesetContextPair context = getFilesetContext(path,
FilesetDataOperation.MKDIRS);
- return context.getFileSystem().mkdirs(context.getActualFileLocation(),
permission);
+ Optional<FilesetContextPair> context = getFilesetContext(path,
FilesetDataOperation.MKDIRS);
+ if (!context.isPresent()) {
+ throw new IOException(
+ "Fileset is not found for path: "
+ + path
+ + " for operation MKDIRS. "
+ + "This may be caused by fileset related metadata not found or
not in use in "
+ + "Gravitino, please check the fileset metadata in Gravitino.");
+ }
+
+ return
context.get().getFileSystem().mkdirs(context.get().getActualFileLocation(),
permission);
}
@Override
public short getDefaultReplication(Path f) {
- FilesetContextPair context = getFilesetContext(f,
FilesetDataOperation.GET_DEFAULT_REPLICATION);
- return
context.getFileSystem().getDefaultReplication(context.getActualFileLocation());
+ Optional<FilesetContextPair> context =
+ getFilesetContext(f, FilesetDataOperation.GET_DEFAULT_REPLICATION);
+ return context
+ .map(c ->
c.getFileSystem().getDefaultReplication(c.getActualFileLocation()))
+ .orElse((short) 1);
}
@Override
public long getDefaultBlockSize(Path f) {
- FilesetContextPair context = getFilesetContext(f,
FilesetDataOperation.GET_DEFAULT_BLOCK_SIZE);
- return
context.getFileSystem().getDefaultBlockSize(context.getActualFileLocation());
+ Optional<FilesetContextPair> context =
+ getFilesetContext(f, FilesetDataOperation.GET_DEFAULT_BLOCK_SIZE);
+ return context
+ .map(c ->
c.getFileSystem().getDefaultBlockSize(c.getActualFileLocation()))
+ .orElse(defaultBlockSize);
}
@Override
@@ -520,7 +413,7 @@ public class GravitinoVirtualFileSystem extends FileSystem {
try {
tokenList.addAll(Arrays.asList(fileSystem.addDelegationTokens(renewer,
credentials)));
} catch (IOException e) {
- Logger.warn("Failed to add delegation tokens for filesystem: {}",
fileSystem.getUri(), e);
+ LOG.warn("Failed to add delegation tokens for filesystem: {}",
fileSystem.getUri(), e);
}
}
return tokenList.stream().distinct().toArray(Token[]::new);
@@ -551,6 +444,46 @@ public class GravitinoVirtualFileSystem extends FileSystem
{
super.close();
}
+ private void initializeFileSystemCache(int maxCapacity, long
expireAfterAccess) {
+ // Since Caffeine does not ensure that removalListener will be involved
after expiration
+ // We use a scheduler with one thread to clean up expired clients.
+ this.internalFileSystemCleanScheduler =
+ new ScheduledThreadPoolExecutor(1,
newDaemonThreadFactory("gvfs-filesystem-cache-cleaner"));
+ Caffeine<Object, Object> cacheBuilder =
+ Caffeine.newBuilder()
+ .maximumSize(maxCapacity)
+
.scheduler(Scheduler.forScheduledExecutorService(internalFileSystemCleanScheduler))
+ .removalListener(
+ (key, value, cause) -> {
+ FileSystem fs = (FileSystem) value;
+ if (fs != null) {
+ try {
+ fs.close();
+ } catch (IOException e) {
+ LOG.error("Cannot close the file system for fileset:
{}", key, e);
+ }
+ }
+ });
+ if (expireAfterAccess > 0) {
+ cacheBuilder.expireAfterAccess(expireAfterAccess, TimeUnit.MILLISECONDS);
+ }
+ this.internalFileSystemCache = cacheBuilder.build();
+ }
+
+ private void initializeCatalogCache() {
+ // Since Caffeine does not ensure that removalListener will be involved
after expiration
+ // We use a scheduler with one thread to clean up expired clients.
+ this.catalogCleanScheduler =
+ new ScheduledThreadPoolExecutor(1,
newDaemonThreadFactory("gvfs-catalog-cache-cleaner"));
+ // In most scenarios, it will not read so many catalog filesets at the
same time, so we can just
+ // set a default value for this cache.
+ this.catalogCache =
+ Caffeine.newBuilder()
+ .maximumSize(100)
+
.scheduler(Scheduler.forScheduledExecutorService(catalogCleanScheduler))
+ .build();
+ }
+
private String initCurrentLocationName(Configuration configuration) {
// get from configuration first, otherwise use the env variable
// if both are not set, return null which means use the default location
@@ -558,21 +491,157 @@ public class GravitinoVirtualFileSystem extends
FileSystem {
.orElse(System.getenv(currentLocationEnvVar));
}
- private static class FilesetContextPair {
- private final Path actualFileLocation;
- private final FileSystem fileSystem;
+ private ThreadFactory newDaemonThreadFactory(String name) {
+ return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(name +
"-%d").build();
+ }
- public FilesetContextPair(Path actualFileLocation, FileSystem fileSystem) {
- this.actualFileLocation = actualFileLocation;
- this.fileSystem = fileSystem;
+ private String getVirtualLocation(NameIdentifier identifier, boolean
withScheme) {
+ return String.format(
+ "%s/%s/%s/%s",
+ withScheme ?
GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX : "",
+ identifier.namespace().level(1),
+ identifier.namespace().level(2),
+ identifier.name());
+ }
+
+ private Optional<FilesetContextPair> getFilesetContext(
+ Path virtualPath, FilesetDataOperation operation) {
+ NameIdentifier identifier = extractIdentifier(metalakeName,
virtualPath.toString());
+ String virtualPathString = virtualPath.toString();
+ String subPath = getSubPathFromGvfsPath(identifier, virtualPathString);
+ NameIdentifier catalogIdent = NameIdentifier.of(metalakeName,
identifier.namespace().level(1));
+
+ FilesetCatalog filesetCatalog;
+ try {
+ filesetCatalog =
+ catalogCache.get(
+ catalogIdent, ident ->
client.loadCatalog(catalogIdent.name()).asFilesetCatalog());
+ Preconditions.checkArgument(
+ filesetCatalog != null,
+ String.format("Loaded fileset catalog: %s is null.", catalogIdent));
+ } catch (NoSuchCatalogException | CatalogNotInUseException e) {
+ LOG.warn("Cannot get fileset catalog by identifier: {}", catalogIdent,
e);
+ return Optional.empty();
}
- public Path getActualFileLocation() {
- return actualFileLocation;
+ Map<String, String> contextMap = Maps.newHashMap();
+ contextMap.put(
+ FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE,
+ InternalClientType.HADOOP_GVFS.name());
+ contextMap.put(FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION,
operation.name());
+ CallerContext callerContext =
CallerContext.builder().withContext(contextMap).build();
+ CallerContext.CallerContextHolder.set(callerContext);
+
+ String actualFileLocation;
+ try {
+ actualFileLocation =
+ filesetCatalog.getFileLocation(
+ NameIdentifier.of(identifier.namespace().level(2),
identifier.name()),
+ subPath,
+ currentLocationName);
+ } catch (NoSuchFilesetException e) {
+ LOG.warn("Cannot get file location by identifier: {}, sub_path {}",
identifier, subPath, e);
+ return Optional.empty();
}
- public FileSystem getFileSystem() {
- return fileSystem;
+ Path filePath = new Path(actualFileLocation);
+ URI uri = filePath.toUri();
+ // we cache the fs for the same scheme, so we can reuse it
+ String scheme = uri.getScheme();
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(scheme), "Scheme of the actual file location
cannot be null.");
+ FileSystem fs =
+ internalFileSystemCache.get(
+ Pair.of(identifier, currentLocationName),
+ ident -> {
+ try {
+ FileSystemProvider provider =
fileSystemProvidersMap.get(scheme);
+ if (provider == null) {
+ throw new GravitinoRuntimeException(
+ "Unsupported file system scheme: %s for %s.",
+ scheme,
GravitinoVirtualFileSystemConfiguration.GVFS_SCHEME);
+ }
+
+ // Reset the FileSystem service loader to make sure the
FileSystem will reload the
+ // service file systems, this is a temporary solution to fix
the issue
+ // https://github.com/apache/gravitino/issues/5609
+ resetFileSystemServiceLoader(scheme);
+
+ Catalog catalog = (Catalog) filesetCatalog;
+ Map<String, String> necessaryPropertyFromCatalog =
+ catalog.properties().entrySet().stream()
+ .filter(
+ property ->
+
CATALOG_NECESSARY_PROPERTIES_TO_KEEP.contains(property.getKey()))
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+
+ Map<String, String> totalProperty =
Maps.newHashMap(necessaryPropertyFromCatalog);
+ totalProperty.putAll(getConfigMap(getConf()));
+ totalProperty.putAll(getCredentialProperties(provider,
catalog, identifier));
+ return provider.getFileSystem(filePath, totalProperty);
+
+ } catch (IOException ioe) {
+ throw new GravitinoRuntimeException(
+ ioe,
+ "Exception occurs when create new FileSystem for actual
uri: %s, msg: %s",
+ uri,
+ ioe.getMessage());
+ }
+ });
+
+ return Optional.of(new FilesetContextPair(new Path(actualFileLocation),
fs));
+ }
+
+ private Map<String, String> getCredentialProperties(
+ FileSystemProvider fileSystemProvider, Catalog catalog, NameIdentifier
filesetIdentifier) {
+ // Do not support credential vending, we do not need to add any credential
properties.
+ if (!(fileSystemProvider instanceof SupportsCredentialVending)) {
+ return ImmutableMap.of();
+ }
+
+ ImmutableMap.Builder<String, String> mapBuilder = ImmutableMap.builder();
+ try {
+ Fileset fileset =
+ catalog
+ .asFilesetCatalog()
+ .loadFileset(
+ NameIdentifier.of(
+ filesetIdentifier.namespace().level(2),
filesetIdentifier.name()));
+ Credential[] credentials =
fileset.supportsCredentials().getCredentials();
+ if (credentials.length > 0) {
+ mapBuilder.put(
+ GravitinoFileSystemCredentialsProvider.GVFS_CREDENTIAL_PROVIDER,
+
DefaultGravitinoFileSystemCredentialsProvider.class.getCanonicalName());
+ mapBuilder.put(
+ GravitinoFileSystemCredentialsProvider.GVFS_NAME_IDENTIFIER,
+ filesetIdentifier.toString());
+
+ SupportsCredentialVending supportsCredentialVending =
+ (SupportsCredentialVending) fileSystemProvider;
+
mapBuilder.putAll(supportsCredentialVending.getFileSystemCredentialConf(credentials));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ return mapBuilder.build();
+ }
+
+ private void resetFileSystemServiceLoader(String fsScheme) {
+ try {
+ Map<String, Class<? extends FileSystem>> serviceFileSystems =
+ (Map<String, Class<? extends FileSystem>>)
+ FieldUtils.getField(FileSystem.class, "SERVICE_FILE_SYSTEMS",
true).get(null);
+
+ if (serviceFileSystems.containsKey(fsScheme)) {
+ return;
+ }
+
+ // Set this value to false so that FileSystem will reload the service
file systems when
+ // needed.
+ FieldUtils.getField(FileSystem.class, "FILE_SYSTEMS_LOADED",
true).set(null, false);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
}
@@ -595,4 +664,34 @@ public class GravitinoVirtualFileSystem extends FileSystem
{
});
return resultMap;
}
+
+ private void throwFilesetPathNotFoundExceptionIf(
+ Supplier<Boolean> condition, Path path, FilesetDataOperation op)
+ throws FilesetPathNotFoundException {
+ if (condition.get()) {
+ throw new FilesetPathNotFoundException(
+ String.format(
+ "Path [%s] not found for operation [%s] because of fileset and
related "
+ + "metadata not existed in Gravitino",
+ path, op));
+ }
+ }
+
+ private static class FilesetContextPair {
+ private final Path actualFileLocation;
+ private final FileSystem fileSystem;
+
+ public FilesetContextPair(Path actualFileLocation, FileSystem fileSystem) {
+ this.actualFileLocation = actualFileLocation;
+ this.fileSystem = fileSystem;
+ }
+
+ public Path getActualFileLocation() {
+ return actualFileLocation;
+ }
+
+ public FileSystem getFileSystem() {
+ return fileSystem;
+ }
+ }
}
diff --git
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
index 544cc0ccad..658eb617f6 100644
---
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
+++
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
@@ -116,5 +116,11 @@ public class GravitinoVirtualFileSystemConfiguration {
public static final String
FS_GRAVITINO_CURRENT_LOCATION_NAME_ENV_VAR_DEFAULT =
"CURRENT_LOCATION_NAME";
+ /** The configuration key for the block size of the GVFS file. */
+ public static final String FS_GRAVITINO_BLOCK_SIZE =
"fs.gravitino.block.size";
+
+ /** The default block size of the GVFS file. */
+ public static final long FS_GRAVITINO_BLOCK_SIZE_DEFAULT = 32 * 1024 * 1024;
+
private GravitinoVirtualFileSystemConfiguration() {}
}
diff --git
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
index be7a7d35f5..31d7d078cf 100644
---
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
+++
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
@@ -20,6 +20,7 @@ package org.apache.gravitino.filesystem.hadoop;
import static org.apache.gravitino.file.Fileset.LOCATION_NAME_UNKNOWN;
import static
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemUtils.extractIdentifier;
+import static org.apache.hc.core5.http.HttpStatus.SC_NOT_FOUND;
import static org.apache.hc.core5.http.HttpStatus.SC_OK;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -47,8 +48,10 @@ import org.apache.gravitino.dto.AuditDTO;
import org.apache.gravitino.dto.credential.CredentialDTO;
import org.apache.gravitino.dto.file.FilesetDTO;
import org.apache.gravitino.dto.responses.CredentialResponse;
+import org.apache.gravitino.dto.responses.ErrorResponse;
import org.apache.gravitino.dto.responses.FileLocationResponse;
import org.apache.gravitino.dto.responses.FilesetResponse;
+import org.apache.gravitino.exceptions.NoSuchFilesetException;
import org.apache.gravitino.file.Fileset;
import org.apache.gravitino.rest.RESTUtils;
import org.apache.hadoop.conf.Configuration;
@@ -148,12 +151,8 @@ public class TestGvfsBase extends GravitinoMockServerBase {
FileLocationResponse fileLocationResponse = new
FileLocationResponse(localPath.toString());
Map<String, String> queryParams = new HashMap<>();
queryParams.put("sub_path", RESTUtils.encodeString(""));
- try {
- buildMockResource(Method.GET, locationPath, queryParams, null,
fileLocationResponse, SC_OK);
- buildMockResourceForCredential(filesetName, localPath.toString());
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
+ buildMockResource(Method.GET, locationPath, queryParams, null,
fileLocationResponse, SC_OK);
+ buildMockResourceForCredential(filesetName, localPath.toString());
FileSystemTestUtils.mkdirs(managedFilesetPath, gravitinoFileSystem);
FileSystem proxyLocalFs =
@@ -195,13 +194,8 @@ public class TestGvfsBase extends GravitinoMockServerBase {
FileLocationResponse fileLocationResponse = new
FileLocationResponse(localPath1.toString());
Map<String, String> queryParams = new HashMap<>();
queryParams.put("sub_path", RESTUtils.encodeString(""));
- try {
- buildMockResource(
- Method.GET, locationPath1, queryParams, null,
fileLocationResponse, SC_OK);
- buildMockResourceForCredential("fileset1", localPath1.toString());
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
+ buildMockResource(Method.GET, locationPath1, queryParams, null,
fileLocationResponse, SC_OK);
+ buildMockResourceForCredential("fileset1", localPath1.toString());
FileSystemTestUtils.mkdirs(filesetPath1, fs);
// expired by time
@@ -244,12 +238,8 @@ public class TestGvfsBase extends GravitinoMockServerBase {
FileLocationResponse fileLocationResponse = new
FileLocationResponse(localPath + "/test.txt");
Map<String, String> queryParams = new HashMap<>();
queryParams.put("sub_path", RESTUtils.encodeString("/test.txt"));
- try {
- buildMockResource(Method.GET, locationPath, queryParams, null,
fileLocationResponse, SC_OK);
- buildMockResourceForCredential(filesetName, localPath + "/test.txt");
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
+ buildMockResource(Method.GET, locationPath, queryParams, null,
fileLocationResponse, SC_OK);
+ buildMockResourceForCredential(filesetName, localPath + "/test.txt");
Path localFilePath = new Path(localPath + "/test.txt");
assertFalse(localFileSystem.exists(localFilePath));
@@ -301,12 +291,8 @@ public class TestGvfsBase extends GravitinoMockServerBase {
FileLocationResponse fileLocationResponse = new
FileLocationResponse(localPath + "/test.txt");
Map<String, String> queryParams = new HashMap<>();
queryParams.put("sub_path", RESTUtils.encodeString("/test.txt"));
- try {
- buildMockResource(Method.GET, locationPath, queryParams, null,
fileLocationResponse, SC_OK);
- buildMockResourceForCredential(filesetName, localPath + "/test.txt");
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
+ buildMockResource(Method.GET, locationPath, queryParams, null,
fileLocationResponse, SC_OK);
+ buildMockResourceForCredential(filesetName, localPath + "/test.txt");
Path appendFile = new Path(managedFilesetPath + "/test.txt");
Path localAppendFile = new Path(localPath + "/test.txt");
@@ -388,23 +374,14 @@ public class TestGvfsBase extends GravitinoMockServerBase
{
new FileLocationResponse(localPath + "/rename_src");
Map<String, String> queryParams = new HashMap<>();
queryParams.put("sub_path", RESTUtils.encodeString("/rename_src"));
- try {
- buildMockResource(Method.GET, locationPath, queryParams, null,
fileLocationResponse, SC_OK);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
+ buildMockResource(Method.GET, locationPath, queryParams, null,
fileLocationResponse, SC_OK);
FileLocationResponse fileLocationResponse1 =
new FileLocationResponse(localPath + "/rename_dst2");
Map<String, String> queryParams1 = new HashMap<>();
queryParams1.put("sub_path", RESTUtils.encodeString("/rename_dst2"));
- try {
- buildMockResource(
- Method.GET, locationPath, queryParams1, null,
fileLocationResponse1, SC_OK);
- buildMockResourceForCredential(filesetName, localPath +
"/rename_dst2");
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
+ buildMockResource(Method.GET, locationPath, queryParams1, null,
fileLocationResponse1, SC_OK);
+ buildMockResourceForCredential(filesetName, localPath + "/rename_dst2");
Path srcLocalRenamePath = new Path(localPath + "/rename_src");
localFileSystem.mkdirs(srcLocalRenamePath);
@@ -470,12 +447,8 @@ public class TestGvfsBase extends GravitinoMockServerBase {
new FileLocationResponse(localPath + "/test_delete");
Map<String, String> queryParams = new HashMap<>();
queryParams.put("sub_path", RESTUtils.encodeString("/test_delete"));
- try {
- buildMockResource(Method.GET, locationPath, queryParams, null,
fileLocationResponse, SC_OK);
- buildMockResourceForCredential(filesetName, localPath +
"/test_delete");
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
+ buildMockResource(Method.GET, locationPath, queryParams, null,
fileLocationResponse, SC_OK);
+ buildMockResourceForCredential(filesetName, localPath + "/test_delete");
Path dirPath = new Path(managedFilesetPath + "/test_delete");
Path localDirPath = new Path(localPath + "/test_delete");
@@ -517,12 +490,8 @@ public class TestGvfsBase extends GravitinoMockServerBase {
FileLocationResponse fileLocationResponse = new
FileLocationResponse(localPath.toString());
Map<String, String> queryParams = new HashMap<>();
queryParams.put("sub_path", RESTUtils.encodeString(""));
- try {
- buildMockResource(Method.GET, locationPath, queryParams, null,
fileLocationResponse, SC_OK);
- buildMockResourceForCredential(filesetName, localPath.toString());
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
+ buildMockResource(Method.GET, locationPath, queryParams, null,
fileLocationResponse, SC_OK);
+ buildMockResourceForCredential(filesetName, localPath.toString());
FileStatus gravitinoStatus =
gravitinoFileSystem.getFileStatus(managedFilesetPath);
FileStatus localStatus = localFileSystem.getFileStatus(localPath);
@@ -562,12 +531,8 @@ public class TestGvfsBase extends GravitinoMockServerBase {
FileLocationResponse fileLocationResponse = new
FileLocationResponse(localPath.toString());
Map<String, String> queryParams = new HashMap<>();
queryParams.put("sub_path", RESTUtils.encodeString(""));
- try {
- buildMockResource(Method.GET, locationPath, queryParams, null,
fileLocationResponse, SC_OK);
- buildMockResourceForCredential(filesetName, localPath.toString());
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
+ buildMockResource(Method.GET, locationPath, queryParams, null,
fileLocationResponse, SC_OK);
+ buildMockResourceForCredential(filesetName, localPath.toString());
List<FileStatus> gravitinoStatuses =
new
ArrayList<>(Arrays.asList(gravitinoFileSystem.listStatus(managedFilesetPath)));
@@ -613,12 +578,8 @@ public class TestGvfsBase extends GravitinoMockServerBase {
new FileLocationResponse(localPath + "/test_mkdirs");
Map<String, String> queryParams = new HashMap<>();
queryParams.put("sub_path", RESTUtils.encodeString("/test_mkdirs"));
- try {
- buildMockResource(Method.GET, locationPath, queryParams, null,
fileLocationResponse, SC_OK);
- buildMockResourceForCredential(filesetName, localPath +
"/test_mkdirs");
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
+ buildMockResource(Method.GET, locationPath, queryParams, null,
fileLocationResponse, SC_OK);
+ buildMockResourceForCredential(filesetName, localPath + "/test_mkdirs");
Path subDirPath = new Path(managedFilesetPath + "/test_mkdirs");
Path localDirPath = new Path(localPath + "/test_mkdirs");
@@ -739,12 +700,8 @@ public class TestGvfsBase extends GravitinoMockServerBase {
FileLocationResponse fileLocationResponse = new
FileLocationResponse(localPath.toString());
Map<String, String> queryParams = new HashMap<>();
queryParams.put("sub_path", RESTUtils.encodeString(""));
- try {
- buildMockResource(Method.GET, locationPath, queryParams, null,
fileLocationResponse, SC_OK);
- buildMockResourceForCredential(filesetName, localPath.toString());
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
+ buildMockResource(Method.GET, locationPath, queryParams, null,
fileLocationResponse, SC_OK);
+ buildMockResourceForCredential(filesetName, localPath.toString());
assertEquals(1, fs.getDefaultReplication(managedFilesetPath));
}
@@ -766,12 +723,8 @@ public class TestGvfsBase extends GravitinoMockServerBase {
FileLocationResponse fileLocationResponse = new
FileLocationResponse(localPath.toString());
Map<String, String> queryParams = new HashMap<>();
queryParams.put("sub_path", RESTUtils.encodeString(""));
- try {
- buildMockResource(Method.GET, locationPath, queryParams, null,
fileLocationResponse, SC_OK);
- buildMockResourceForCredential(filesetName, localPath.toString());
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
+ buildMockResource(Method.GET, locationPath, queryParams, null,
fileLocationResponse, SC_OK);
+ buildMockResourceForCredential(filesetName, localPath.toString());
assertEquals(32 * 1024 * 1024,
fs.getDefaultBlockSize(managedFilesetPath));
}
@@ -795,4 +748,47 @@ public class TestGvfsBase extends GravitinoMockServerBase {
assertEquals(expectedPath, convertedStatus.getPath());
}
}
+
+ @Test
+ public void testWhenFilesetNotCreated() throws IOException {
+ String filesetName = "testWhenFilesetNotCreated";
+ Path managedFilesetPath =
+ FileSystemTestUtils.createFilesetPath(catalogName, schemaName,
filesetName, true);
+ Path localPath = FileSystemTestUtils.createLocalDirPrefix(catalogName,
schemaName, filesetName);
+ String locationPath =
+ String.format(
+ "/api/metalakes/%s/catalogs/%s/schemas/%s/filesets/%s/location",
+ metalakeName, catalogName, schemaName, filesetName);
+ try (GravitinoVirtualFileSystem fs =
+ (GravitinoVirtualFileSystem) managedFilesetPath.getFileSystem(conf)) {
+
+ Map<String, String> queryParams = new HashMap<>();
+ queryParams.put("sub_path", RESTUtils.encodeString(""));
+ ErrorResponse errResp =
+ ErrorResponse.notFound(NoSuchFilesetException.class.getSimpleName(),
"fileset not found");
+ buildMockResource(Method.GET, locationPath, queryParams, null, errResp,
SC_NOT_FOUND);
+ buildMockResourceForCredential(filesetName, localPath.toString());
+
+ Path testPath = new Path(managedFilesetPath + "/test.txt");
+ assertThrows(RuntimeException.class, () ->
fs.setWorkingDirectory(testPath));
+ assertThrows(FilesetPathNotFoundException.class, () ->
fs.open(testPath));
+ assertThrows(IOException.class, () -> fs.create(testPath));
+ assertThrows(FilesetPathNotFoundException.class, () ->
fs.append(testPath));
+
+ Path testPath1 = new Path(managedFilesetPath + "/test1.txt");
+ assertThrows(FilesetPathNotFoundException.class, () ->
fs.rename(testPath, testPath1));
+
+ assertFalse(fs.delete(testPath, true));
+
+ assertThrows(FilesetPathNotFoundException.class, () ->
fs.getFileStatus(testPath));
+ assertThrows(FilesetPathNotFoundException.class, () ->
fs.listStatus(testPath));
+
+ assertThrows(IOException.class, () -> fs.mkdirs(testPath));
+
+ assertEquals(1, fs.getDefaultReplication(testPath));
+ assertEquals(
+
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_BLOCK_SIZE_DEFAULT,
+ fs.getDefaultBlockSize(testPath));
+ }
+ }
}