This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 89f939543b [#6953] feat(GVFS): Refactor GVFS to handle fileset not 
existed scenario (#6954)
89f939543b is described below

commit 89f939543bad793011d3f7c7a48305695817e072
Author: mchades <[email protected]>
AuthorDate: Wed Apr 16 14:11:07 2025 +0800

    [#6953] feat(GVFS): Refactor GVFS to handle fileset not existed scenario 
(#6954)
    
    ### What changes were proposed in this pull request?
    
    Refactor the GVFS to handle fileset not created scenario and align with
    FS semantics.
    
    ### Why are the changes needed?
    
    Fix: #6953
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    tests added
    
    Co-authored-by: Jerry Shao <[email protected]>
---
 clients/client-python/gravitino/filesystem/gvfs.py | 133 ++++-
 .../tests/unittests/test_gvfs_without_fileset.py   | 112 +++++
 .../hadoop/FilesetPathNotFoundException.java       |  39 ++
 .../hadoop/GravitinoVirtualFileSystem.java         | 539 ++++++++++++---------
 .../GravitinoVirtualFileSystemConfiguration.java   |   6 +
 .../gravitino/filesystem/hadoop/TestGvfsBase.java  | 142 +++---
 6 files changed, 667 insertions(+), 304 deletions(-)

diff --git a/clients/client-python/gravitino/filesystem/gvfs.py 
b/clients/client-python/gravitino/filesystem/gvfs.py
index 1d4d680a24..8b59739135 100644
--- a/clients/client-python/gravitino/filesystem/gvfs.py
+++ b/clients/client-python/gravitino/filesystem/gvfs.py
@@ -25,8 +25,7 @@ import sys
 import time
 from enum import Enum
 from pathlib import PurePosixPath
-from typing import Dict, Tuple, List
-
+from typing import Dict, Tuple, List, Optional
 import fsspec
 from cachetools import TTLCache, LRUCache
 from fsspec import AbstractFileSystem
@@ -54,6 +53,9 @@ from gravitino.client.fileset_catalog import FilesetCatalog
 from gravitino.client.generic_fileset import GenericFileset
 from gravitino.exceptions.base import (
     GravitinoRuntimeException,
+    NoSuchCatalogException,
+    CatalogNotInUseException,
+    NoSuchFilesetException,
 )
 from gravitino.filesystem.gvfs_config import GVFSConfig
 from gravitino.filesystem.gvfs_utils import (
@@ -79,6 +81,10 @@ class StorageType(Enum):
     ABS = "abfss"
 
 
+class FilesetPathNotFoundError(FileNotFoundError):
+    """Exception raised when the catalog, schema or fileset is not found in 
the GVFS path."""
+
+
 class FilesetContextPair:
     """A context object that holds the information about the actual file 
location and the file system which used in
     the GravitinoVirtualFileSystem's operations.
@@ -173,6 +179,10 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
         context_pair: FilesetContextPair = self._get_fileset_context(
             path, FilesetDataOperation.LIST_STATUS
         )
+        self._throw_fileset_path_not_found_error_if(
+            context_pair is None, path, FilesetDataOperation.LIST_STATUS
+        )
+
         actual_path = context_pair.actual_file_location()
         storage_type = self._recognize_storage_type(actual_path)
         pre_process_path: str = self._pre_process_path(path)
@@ -216,6 +226,10 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
         context_pair: FilesetContextPair = self._get_fileset_context(
             path, FilesetDataOperation.GET_FILE_STATUS
         )
+        self._throw_fileset_path_not_found_error_if(
+            context_pair is None, path, FilesetDataOperation.GET_FILE_STATUS
+        )
+
         actual_path = context_pair.actual_file_location()
         storage_type = self._recognize_storage_type(actual_path)
         pre_process_path: str = self._pre_process_path(path)
@@ -240,6 +254,9 @@ class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
         context_pair: FilesetContextPair = self._get_fileset_context(
             path, FilesetDataOperation.EXISTS
         )
+        if context_pair is None:
+            return False
+
         actual_path = context_pair.actual_file_location()
         storage_type = self._recognize_storage_type(actual_path)
         return context_pair.filesystem().exists(
@@ -264,6 +281,10 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
         src_context_pair: FilesetContextPair = self._get_fileset_context(
             src_path, FilesetDataOperation.COPY_FILE
         )
+        self._throw_fileset_path_not_found_error_if(
+            src_context_pair is None, src_path, FilesetDataOperation.COPY_FILE
+        )
+
         src_actual_path = src_context_pair.actual_file_location()
 
         dst_context_pair: FilesetContextPair = self._get_fileset_context(
@@ -296,11 +317,17 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
                 f"Destination file path identifier: `{dst_identifier}`"
                 f" should be same with src file path identifier: 
`{src_identifier}`."
             )
+
         src_context_pair: FilesetContextPair = self._get_fileset_context(
             src_path, FilesetDataOperation.RENAME
         )
+        self._throw_fileset_path_not_found_error_if(
+            src_context_pair is None, src_path, FilesetDataOperation.RENAME
+        )
+
         src_actual_path = src_context_pair.actual_file_location()
         storage_type = self._recognize_storage_type(src_actual_path)
+
         dst_context_pair: FilesetContextPair = self._get_fileset_context(
             dst_path, FilesetDataOperation.RENAME
         )
@@ -350,6 +377,10 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
         context_pair: FilesetContextPair = self._get_fileset_context(
             path, FilesetDataOperation.DELETE
         )
+        self._throw_fileset_path_not_found_error_if(
+            context_pair is None, path, FilesetDataOperation.DELETE
+        )
+
         actual_path = context_pair.actual_file_location()
         storage_type = self._recognize_storage_type(actual_path)
         fs = context_pair.filesystem()
@@ -371,6 +402,10 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
         context_pair: FilesetContextPair = self._get_fileset_context(
             path, FilesetDataOperation.DELETE
         )
+        self._throw_fileset_path_not_found_error_if(
+            context_pair is None, path, FilesetDataOperation.DELETE
+        )
+
         actual_path = context_pair.actual_file_location()
         storage_type = self._recognize_storage_type(actual_path)
         context_pair.filesystem().rm_file(
@@ -386,6 +421,10 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
         context_pair: FilesetContextPair = self._get_fileset_context(
             path, FilesetDataOperation.DELETE
         )
+        self._throw_fileset_path_not_found_error_if(
+            context_pair is None, path, FilesetDataOperation.DELETE
+        )
+
         actual_path = context_pair.actual_file_location()
         storage_type = self._recognize_storage_type(actual_path)
         context_pair.filesystem().rmdir(
@@ -416,9 +455,20 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
             data_operation = FilesetDataOperation.OPEN_AND_APPEND
         else:
             data_operation = FilesetDataOperation.OPEN
+
         context_pair: FilesetContextPair = self._get_fileset_context(
             path, data_operation
         )
+        if context_pair is None:
+            if mode in ("w", "wb", "x", "xb", "a", "ab"):
+                raise OSError(
+                    f"Fileset is not found for path: {path} for operation 
OPEN. This "
+                    f"may be caused by fileset related metadata not found or 
not in use "
+                    f"in Gravitino,"
+                )
+
+            raise FilesetPathNotFoundError(f"Path {path} not found for 
operation OPEN.")
+
         actual_path = context_pair.actual_file_location()
         storage_type = self._recognize_storage_type(actual_path)
         return context_pair.filesystem().open(
@@ -441,6 +491,13 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
         context_pair: FilesetContextPair = self._get_fileset_context(
             path, FilesetDataOperation.MKDIRS
         )
+        if context_pair is None:
+            raise OSError(
+                f"Fileset is not found for path: {path} for operation MKDIRS. 
This "
+                f"may be caused by fileset related metadata not found or not 
in use "
+                f"in Gravitino,"
+            )
+
         actual_path = context_pair.actual_file_location()
         storage_type = self._recognize_storage_type(actual_path)
         context_pair.filesystem().mkdir(
@@ -457,6 +514,13 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
         context_pair: FilesetContextPair = self._get_fileset_context(
             path, FilesetDataOperation.MKDIRS
         )
+        if context_pair is None:
+            raise OSError(
+                f"Fileset is not found for path: {path} for operation MKDIRS. 
This "
+                f"may be caused by fileset related metadata not found or not 
in use "
+                f"in Gravitino,"
+            )
+
         actual_path = context_pair.actual_file_location()
         storage_type = self._recognize_storage_type(actual_path)
         context_pair.filesystem().makedirs(
@@ -473,6 +537,10 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
         context_pair: FilesetContextPair = self._get_fileset_context(
             path, FilesetDataOperation.CREATED_TIME
         )
+        self._throw_fileset_path_not_found_error_if(
+            context_pair is None, path, FilesetDataOperation.CREATED_TIME
+        )
+
         actual_path = context_pair.actual_file_location()
         storage_type = self._recognize_storage_type(actual_path)
         if storage_type == StorageType.LOCAL:
@@ -491,6 +559,10 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
         context_pair: FilesetContextPair = self._get_fileset_context(
             path, FilesetDataOperation.MODIFIED_TIME
         )
+        self._throw_fileset_path_not_found_error_if(
+            context_pair is None, path, FilesetDataOperation.MODIFIED_TIME
+        )
+
         actual_path = context_pair.actual_file_location()
         storage_type = self._recognize_storage_type(actual_path)
         return context_pair.filesystem().modified(
@@ -508,6 +580,10 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
         context_pair: FilesetContextPair = self._get_fileset_context(
             path, FilesetDataOperation.CAT_FILE
         )
+        self._throw_fileset_path_not_found_error_if(
+            context_pair is None, path, FilesetDataOperation.CAT_FILE
+        )
+
         actual_path = context_pair.actual_file_location()
         storage_type = self._recognize_storage_type(actual_path)
         return context_pair.filesystem().cat_file(
@@ -531,9 +607,14 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
             raise GravitinoRuntimeException(
                 "Doesn't support copy a remote gvfs file to an another remote 
file."
             )
+
         context_pair: FilesetContextPair = self._get_fileset_context(
             rpath, FilesetDataOperation.GET_FILE
         )
+        self._throw_fileset_path_not_found_error_if(
+            context_pair is None, rpath, FilesetDataOperation.GET_FILE
+        )
+
         actual_path = context_pair.actual_file_location()
         storage_type = self._recognize_storage_type(actual_path)
         context_pair.filesystem().get_file(
@@ -658,7 +739,9 @@ class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
             "mtime": last_modified,
         }
 
-    def _get_fileset_context(self, virtual_path: str, operation: 
FilesetDataOperation):
+    def _get_fileset_context(
+        self, virtual_path: str, operation: FilesetDataOperation
+    ) -> Optional[FilesetContextPair]:
         """Get a fileset context from the cache or the Gravitino server
         :param virtual_path: The virtual path
         :param operation: The data operation
@@ -669,7 +752,17 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
         catalog_ident: NameIdentifier = NameIdentifier.of(
             self._metalake, identifier.namespace().level(1)
         )
-        fileset_catalog = self._get_fileset_catalog(catalog_ident)
+
+        try:
+            fileset_catalog = self._get_fileset_catalog(catalog_ident)
+        except (NoSuchCatalogException, CatalogNotInUseException):
+            logger.warning(
+                "Cannot get fileset catalog by identifier: %s",
+                catalog_ident,
+                exc_info=True,
+            )
+            return None
+
         if fileset_catalog is None:
             raise GravitinoRuntimeException(
                 f"Loaded fileset catalog: {catalog_ident} is null."
@@ -681,13 +774,23 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
         }
         caller_context: CallerContext = CallerContext(context)
         CallerContextHolder.set(caller_context)
-        actual_file_location: (
-            str
-        ) = fileset_catalog.as_fileset_catalog().get_file_location(
-            NameIdentifier.of(identifier.namespace().level(2), 
identifier.name()),
-            sub_path,
-            self._current_location_name,
-        )
+
+        try:
+            actual_file_location: (
+                str
+            ) = fileset_catalog.as_fileset_catalog().get_file_location(
+                NameIdentifier.of(identifier.namespace().level(2), 
identifier.name()),
+                sub_path,
+                self._current_location_name,
+            )
+        except NoSuchFilesetException:
+            logger.warning(
+                "Cannot get file location by identifier: %s, sub_path: %s",
+                identifier,
+                sub_path,
+                exc_info=True,
+            )
+            return None
 
         return FilesetContextPair(
             actual_file_location,
@@ -1165,5 +1268,13 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
         )
         return time.time() * 1000 + (expire_time - time.time() * 1000) * ratio
 
+    def _throw_fileset_path_not_found_error_if(
+        self, condition: bool, path: str, op: FilesetDataOperation
+    ):
+        if condition:
+            raise FilesetPathNotFoundError(
+                f"Path [{path}] not found for operation [{op}]"
+            )
+
 
 fsspec.register_implementation(PROTOCOL_NAME, GravitinoVirtualFileSystem)
diff --git a/clients/client-python/tests/unittests/test_gvfs_without_fileset.py 
b/clients/client-python/tests/unittests/test_gvfs_without_fileset.py
new file mode 100644
index 0000000000..bc72b224fd
--- /dev/null
+++ b/clients/client-python/tests/unittests/test_gvfs_without_fileset.py
@@ -0,0 +1,112 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+from unittest.mock import patch
+
+from gravitino.filesystem.gvfs import (
+    GravitinoVirtualFileSystem,
+    FilesetPathNotFoundError,
+)
+
+
+class TestGVFSWithoutFileset(unittest.TestCase):
+
+    @patch("gravitino.filesystem.gvfs.create_client")
+    @patch.object(GravitinoVirtualFileSystem, "_get_fileset_context")
+    def test_when_fileset_not_created(
+        self, mock_get_fileset_context, mock_create_client
+    ):
+        mock_create_client.return_value = None
+        mock_get_fileset_context.return_value = None
+
+        fs = GravitinoVirtualFileSystem(
+            server_uri="http://localhost:9090";,
+            metalake_name="metalake_demo",
+            skip_instance_cache=True,
+        )
+
+        self.assertRaises(
+            FilesetPathNotFoundError, fs.ls, 
"fileset/test_catalog/schema/fileset"
+        )
+        self.assertRaises(
+            FilesetPathNotFoundError, fs.info, 
"fileset/test_catalog/schema/fileset"
+        )
+
+        self.assertFalse(fs.exists("fileset/test_catalog/schema/fileset"))
+
+        with self.assertRaises(FileNotFoundError):
+            fs.cp_file(
+                "fileset/test_catalog/schema/fileset/a",
+                "fileset/test_catalog/schema/fileset/b",
+            )
+
+        self.assertRaises(
+            FilesetPathNotFoundError, fs.rm, 
"fileset/test_catalog/schema/fileset/a"
+        )
+        self.assertRaises(
+            FilesetPathNotFoundError,
+            fs.rm_file,
+            "fileset/test_catalog/schema/fileset/a",
+        )
+        self.assertRaises(
+            FilesetPathNotFoundError, fs.rmdir, 
"fileset/test_catalog/schema/fileset/a"
+        )
+
+        self.assertRaises(OSError, fs.open, 
"fileset/test_catalog/schema/fileset", "w")
+        self.assertRaises(OSError, fs.open, 
"fileset/test_catalog/schema/fileset", "wb")
+        self.assertRaises(OSError, fs.open, 
"fileset/test_catalog/schema/fileset", "a")
+        self.assertRaises(OSError, fs.open, 
"fileset/test_catalog/schema/fileset", "ab")
+        self.assertRaises(OSError, fs.open, 
"fileset/test_catalog/schema/fileset", "x")
+        self.assertRaises(OSError, fs.open, 
"fileset/test_catalog/schema/fileset", "xb")
+
+        self.assertRaises(
+            FilesetPathNotFoundError,
+            fs.open,
+            "fileset/test_catalog/schema/fileset/a",
+            "r",
+        )
+        self.assertRaises(
+            FilesetPathNotFoundError,
+            fs.open,
+            "fileset/test_catalog/schema/fileset/a",
+            "rb",
+        )
+
+        self.assertRaises(OSError, fs.mkdir, 
"fileset/test_catalog/schema/fileset/a")
+        self.assertRaises(
+            OSError, fs.makedirs, "fileset/test_catalog/schema/fileset/a/b"
+        )
+
+        self.assertRaises(
+            FilesetPathNotFoundError,
+            fs.created,
+            "fileset/test_catalog/schema/fileset/a",
+        )
+        self.assertRaises(
+            FilesetPathNotFoundError,
+            fs.modified,
+            "fileset/test_catalog/schema/fileset/a",
+        )
+
+        self.assertRaises(
+            FilesetPathNotFoundError,
+            fs.cat_file,
+            "fileset/test_catalog/schema/fileset/a",
+        )
+
+        with self.assertRaises(FileNotFoundError):
+            fs.get("fileset/test_catalog/schema/fileset/a", "file://tmp/a")
diff --git 
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/FilesetPathNotFoundException.java
 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/FilesetPathNotFoundException.java
new file mode 100644
index 0000000000..f9f26808fb
--- /dev/null
+++ 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/FilesetPathNotFoundException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.filesystem.hadoop;
+
+import java.io.FileNotFoundException;
+
+/** Exception thrown when the catalog, schema or fileset not existed for a 
given GVFS path. */
+public class FilesetPathNotFoundException extends FileNotFoundException {
+
+  /** Creates a new FilesetPathNotFoundException instance. */
+  public FilesetPathNotFoundException() {
+    super();
+  }
+
+  /**
+   * Creates a new FilesetPathNotFoundException instance with the given 
message.
+   *
+   * @param message The message of the exception.
+   */
+  public FilesetPathNotFoundException(String message) {
+    super(message);
+  }
+}
diff --git 
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
index 92dea08b75..dd2763933e 100644
--- 
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
+++ 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
@@ -28,6 +28,7 @@ import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.Scheduler;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -61,7 +62,10 @@ import 
org.apache.gravitino.catalog.hadoop.fs.GravitinoFileSystemCredentialsProv
 import org.apache.gravitino.catalog.hadoop.fs.SupportsCredentialVending;
 import org.apache.gravitino.client.GravitinoClient;
 import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.exceptions.CatalogNotInUseException;
 import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.apache.gravitino.exceptions.NoSuchCatalogException;
+import org.apache.gravitino.exceptions.NoSuchFilesetException;
 import org.apache.gravitino.file.Fileset;
 import org.apache.gravitino.file.FilesetCatalog;
 import org.apache.gravitino.storage.AzureProperties;
@@ -87,7 +91,8 @@ import org.slf4j.LoggerFactory;
  * users to access the underlying storage.
  */
 public class GravitinoVirtualFileSystem extends FileSystem {
-  private static final Logger Logger = 
LoggerFactory.getLogger(GravitinoVirtualFileSystem.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(GravitinoVirtualFileSystem.class);
+
   private Path workingDirectory;
   private URI uri;
   private GravitinoClient client;
@@ -98,6 +103,7 @@ public class GravitinoVirtualFileSystem extends FileSystem {
   // identifier has four levels, the first level is metalake name.
   private Cache<Pair<NameIdentifier, String>, FileSystem> 
internalFileSystemCache;
   private ScheduledThreadPoolExecutor internalFileSystemCleanScheduler;
+  private long defaultBlockSize;
 
   private static final String SLASH = "/";
   private final Map<String, FileSystemProvider> fileSystemProvidersMap = 
Maps.newHashMap();
@@ -167,6 +173,10 @@ public class GravitinoVirtualFileSystem extends FileSystem 
{
 
     this.workingDirectory = new Path(name);
     this.uri = URI.create(name.getScheme() + "://" + name.getAuthority());
+    this.defaultBlockSize =
+        configuration.getLong(
+            GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_BLOCK_SIZE,
+            
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_BLOCK_SIZE_DEFAULT);
 
     setConf(configuration);
     super.initialize(uri, getConf());
@@ -177,59 +187,6 @@ public class GravitinoVirtualFileSystem extends FileSystem 
{
     return internalFileSystemCache;
   }
 
-  private void initializeFileSystemCache(int maxCapacity, long 
expireAfterAccess) {
-    // Since Caffeine does not ensure that removalListener will be involved 
after expiration
-    // We use a scheduler with one thread to clean up expired clients.
-    this.internalFileSystemCleanScheduler =
-        new ScheduledThreadPoolExecutor(1, 
newDaemonThreadFactory("gvfs-filesystem-cache-cleaner"));
-    Caffeine<Object, Object> cacheBuilder =
-        Caffeine.newBuilder()
-            .maximumSize(maxCapacity)
-            
.scheduler(Scheduler.forScheduledExecutorService(internalFileSystemCleanScheduler))
-            .removalListener(
-                (key, value, cause) -> {
-                  FileSystem fs = (FileSystem) value;
-                  if (fs != null) {
-                    try {
-                      fs.close();
-                    } catch (IOException e) {
-                      Logger.error("Cannot close the file system for fileset: 
{}", key, e);
-                    }
-                  }
-                });
-    if (expireAfterAccess > 0) {
-      cacheBuilder.expireAfterAccess(expireAfterAccess, TimeUnit.MILLISECONDS);
-    }
-    this.internalFileSystemCache = cacheBuilder.build();
-  }
-
-  private void initializeCatalogCache() {
-    // Since Caffeine does not ensure that removalListener will be involved 
after expiration
-    // We use a scheduler with one thread to clean up expired clients.
-    this.catalogCleanScheduler =
-        new ScheduledThreadPoolExecutor(1, 
newDaemonThreadFactory("gvfs-catalog-cache-cleaner"));
-    // In most scenarios, it will not read so many catalog filesets at the 
same time, so we can just
-    // set a default value for this cache.
-    this.catalogCache =
-        Caffeine.newBuilder()
-            .maximumSize(100)
-            
.scheduler(Scheduler.forScheduledExecutorService(catalogCleanScheduler))
-            .build();
-  }
-
-  private ThreadFactory newDaemonThreadFactory(String name) {
-    return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(name + 
"-%d").build();
-  }
-
-  private String getVirtualLocation(NameIdentifier identifier, boolean 
withScheme) {
-    return String.format(
-        "%s/%s/%s/%s",
-        withScheme ? 
GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX : "",
-        identifier.namespace().level(1),
-        identifier.namespace().level(2),
-        identifier.name());
-  }
-
   @VisibleForTesting
   FileStatus convertFileStatusPathPrefix(
       FileStatus fileStatus, String actualPrefix, String virtualPrefix) {
@@ -239,7 +196,7 @@ public class GravitinoVirtualFileSystem extends FileSystem {
         "Path %s doesn't start with prefix \"%s\".",
         filePath,
         actualPrefix);
-    // if the storage location is end with "/",
+    // if the storage location ends with "/",
     // we should truncate this to avoid replace issues.
     Path path =
         new Path(
@@ -253,134 +210,6 @@ public class GravitinoVirtualFileSystem extends 
FileSystem {
     return fileStatus;
   }
 
-  private FilesetContextPair getFilesetContext(Path virtualPath, 
FilesetDataOperation operation) {
-    NameIdentifier identifier = extractIdentifier(metalakeName, 
virtualPath.toString());
-    String virtualPathString = virtualPath.toString();
-    String subPath = getSubPathFromGvfsPath(identifier, virtualPathString);
-
-    NameIdentifier catalogIdent = NameIdentifier.of(metalakeName, 
identifier.namespace().level(1));
-    FilesetCatalog filesetCatalog =
-        catalogCache.get(
-            catalogIdent, ident -> 
client.loadCatalog(catalogIdent.name()).asFilesetCatalog());
-    Catalog catalog = (Catalog) filesetCatalog;
-    Preconditions.checkArgument(
-        filesetCatalog != null, String.format("Loaded fileset catalog: %s is 
null.", catalogIdent));
-
-    Map<String, String> contextMap = Maps.newHashMap();
-    contextMap.put(
-        FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE,
-        InternalClientType.HADOOP_GVFS.name());
-    contextMap.put(FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION, 
operation.name());
-    CallerContext callerContext = 
CallerContext.builder().withContext(contextMap).build();
-    CallerContext.CallerContextHolder.set(callerContext);
-
-    String actualFileLocation =
-        filesetCatalog.getFileLocation(
-            NameIdentifier.of(identifier.namespace().level(2), 
identifier.name()),
-            subPath,
-            currentLocationName);
-
-    Path filePath = new Path(actualFileLocation);
-    URI uri = filePath.toUri();
-    // we cache the fs for the same scheme, so we can reuse it
-    String scheme = uri.getScheme();
-    Preconditions.checkArgument(
-        StringUtils.isNotBlank(scheme), "Scheme of the actual file location 
cannot be null.");
-    FileSystem fs =
-        internalFileSystemCache.get(
-            Pair.of(identifier, currentLocationName),
-            ident -> {
-              try {
-                FileSystemProvider provider = 
fileSystemProvidersMap.get(scheme);
-                if (provider == null) {
-                  throw new GravitinoRuntimeException(
-                      "Unsupported file system scheme: %s for %s.",
-                      scheme, 
GravitinoVirtualFileSystemConfiguration.GVFS_SCHEME);
-                }
-
-                // Reset the FileSystem service loader to make sure the 
FileSystem will reload the
-                // service file systems, this is a temporary solution to fix 
the issue
-                // https://github.com/apache/gravitino/issues/5609
-                resetFileSystemServiceLoader(scheme);
-
-                Map<String, String> necessaryPropertyFromCatalog =
-                    catalog.properties().entrySet().stream()
-                        .filter(
-                            property ->
-                                
CATALOG_NECESSARY_PROPERTIES_TO_KEEP.contains(property.getKey()))
-                        .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
-
-                Map<String, String> totalProperty = 
Maps.newHashMap(necessaryPropertyFromCatalog);
-                totalProperty.putAll(getConfigMap(getConf()));
-
-                totalProperty.putAll(getCredentialProperties(provider, 
catalog, identifier));
-
-                return provider.getFileSystem(filePath, totalProperty);
-              } catch (IOException ioe) {
-                throw new GravitinoRuntimeException(
-                    ioe,
-                    "Exception occurs when create new FileSystem for actual 
uri: %s, msg: %s",
-                    uri,
-                    ioe.getMessage());
-              }
-            });
-
-    return new FilesetContextPair(new Path(actualFileLocation), fs);
-  }
-
-  private Map<String, String> getCredentialProperties(
-      FileSystemProvider fileSystemProvider, Catalog catalog, NameIdentifier 
filesetIdentifier) {
-    // Do not support credential vending, we do not need to add any credential 
properties.
-    if (!(fileSystemProvider instanceof SupportsCredentialVending)) {
-      return ImmutableMap.of();
-    }
-
-    ImmutableMap.Builder<String, String> mapBuilder = ImmutableMap.builder();
-    try {
-      Fileset fileset =
-          catalog
-              .asFilesetCatalog()
-              .loadFileset(
-                  NameIdentifier.of(
-                      filesetIdentifier.namespace().level(2), 
filesetIdentifier.name()));
-      Credential[] credentials = 
fileset.supportsCredentials().getCredentials();
-      if (credentials.length > 0) {
-        mapBuilder.put(
-            GravitinoFileSystemCredentialsProvider.GVFS_CREDENTIAL_PROVIDER,
-            
DefaultGravitinoFileSystemCredentialsProvider.class.getCanonicalName());
-        mapBuilder.put(
-            GravitinoFileSystemCredentialsProvider.GVFS_NAME_IDENTIFIER,
-            filesetIdentifier.toString());
-
-        SupportsCredentialVending supportsCredentialVending =
-            (SupportsCredentialVending) fileSystemProvider;
-        
mapBuilder.putAll(supportsCredentialVending.getFileSystemCredentialConf(credentials));
-      }
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-
-    return mapBuilder.build();
-  }
-
-  private void resetFileSystemServiceLoader(String fsScheme) {
-    try {
-      Map<String, Class<? extends FileSystem>> serviceFileSystems =
-          (Map<String, Class<? extends FileSystem>>)
-              FieldUtils.getField(FileSystem.class, "SERVICE_FILE_SYSTEMS", 
true).get(null);
-
-      if (serviceFileSystems.containsKey(fsScheme)) {
-        return;
-      }
-
-      // Set this value to false so that FileSystem will reload the service 
file systems when
-      // needed.
-      FieldUtils.getField(FileSystem.class, "FILE_SYSTEMS_LOADED", 
true).set(null, false);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
   @Override
   public URI getUri() {
     return this.uri;
@@ -393,15 +222,25 @@ public class GravitinoVirtualFileSystem extends 
FileSystem {
 
   @Override
   public synchronized void setWorkingDirectory(Path newDir) {
-    FilesetContextPair context = getFilesetContext(newDir, 
FilesetDataOperation.SET_WORKING_DIR);
-    
context.getFileSystem().setWorkingDirectory(context.getActualFileLocation());
+    Optional<FilesetContextPair> context =
+        getFilesetContext(newDir, FilesetDataOperation.SET_WORKING_DIR);
+    try {
+      throwFilesetPathNotFoundExceptionIf(
+          () -> !context.isPresent(), newDir, 
FilesetDataOperation.SET_WORKING_DIR);
+    } catch (FilesetPathNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+
+    
context.get().getFileSystem().setWorkingDirectory(context.get().getActualFileLocation());
     this.workingDirectory = newDir;
   }
 
   @Override
   public FSDataInputStream open(Path path, int bufferSize) throws IOException {
-    FilesetContextPair context = getFilesetContext(path, 
FilesetDataOperation.OPEN);
-    return context.getFileSystem().open(context.getActualFileLocation(), 
bufferSize);
+    Optional<FilesetContextPair> context = getFilesetContext(path, 
FilesetDataOperation.OPEN);
+    throwFilesetPathNotFoundExceptionIf(
+        () -> !context.isPresent(), path, FilesetDataOperation.OPEN);
+    return 
context.get().getFileSystem().open(context.get().getActualFileLocation(), 
bufferSize);
   }
 
   @Override
@@ -414,11 +253,21 @@ public class GravitinoVirtualFileSystem extends 
FileSystem {
       long blockSize,
       Progressable progress)
       throws IOException {
-    FilesetContextPair context = getFilesetContext(path, 
FilesetDataOperation.CREATE);
+    Optional<FilesetContextPair> context = getFilesetContext(path, 
FilesetDataOperation.CREATE);
+    if (!context.isPresent()) {
+      throw new IOException(
+          "Fileset is not found for path: "
+              + path
+              + " for operation CREATE. "
+              + "This may be caused by fileset related metadata not found or 
not in use in "
+              + "Gravitino, please check the fileset metadata in Gravitino.");
+    }
+
     return context
+        .get()
         .getFileSystem()
         .create(
-            context.getActualFileLocation(),
+            context.get().getActualFileLocation(),
             permission,
             overwrite,
             bufferSize,
@@ -430,8 +279,13 @@ public class GravitinoVirtualFileSystem extends FileSystem 
{
   @Override
   public FSDataOutputStream append(Path path, int bufferSize, Progressable 
progress)
       throws IOException {
-    FilesetContextPair context = getFilesetContext(path, 
FilesetDataOperation.APPEND);
-    return context.getFileSystem().append(context.getActualFileLocation(), 
bufferSize, progress);
+    Optional<FilesetContextPair> context = getFilesetContext(path, 
FilesetDataOperation.APPEND);
+    throwFilesetPathNotFoundExceptionIf(
+        () -> !context.isPresent(), path, FilesetDataOperation.APPEND);
+    return context
+        .get()
+        .getFileSystem()
+        .append(context.get().getActualFileLocation(), bufferSize, progress);
   }
 
   @Override
@@ -442,51 +296,75 @@ public class GravitinoVirtualFileSystem extends 
FileSystem {
     NameIdentifier dstIdentifier = extractIdentifier(metalakeName, 
dst.toString());
     Preconditions.checkArgument(
         srcIdentifier.equals(dstIdentifier),
-        "Destination path fileset identifier: %s should be same with src path 
fileset identifier: %s.",
+        "Destination path fileset identifier: %s should be same with src path "
+            + "fileset identifier: %s.",
         srcIdentifier,
         dstIdentifier);
 
-    FilesetContextPair srcContext = getFilesetContext(src, 
FilesetDataOperation.RENAME);
-    FilesetContextPair dstContext = getFilesetContext(dst, 
FilesetDataOperation.RENAME);
+    Optional<FilesetContextPair> srcContext = getFilesetContext(src, 
FilesetDataOperation.RENAME);
+    throwFilesetPathNotFoundExceptionIf(
+        () -> !srcContext.isPresent(), src, FilesetDataOperation.RENAME);
+
+    Optional<FilesetContextPair> dstContext = getFilesetContext(dst, 
FilesetDataOperation.RENAME);
 
+    // Because src context and dst context are the same, so if src context is 
present, dst context
+    // must be present.
     return srcContext
+        .get()
         .getFileSystem()
-        .rename(srcContext.getActualFileLocation(), 
dstContext.getActualFileLocation());
+        .rename(srcContext.get().getActualFileLocation(), 
dstContext.get().getActualFileLocation());
   }
 
   @Override
   public boolean delete(Path path, boolean recursive) throws IOException {
-    FilesetContextPair context = getFilesetContext(path, 
FilesetDataOperation.DELETE);
-    return context.getFileSystem().delete(context.getActualFileLocation(), 
recursive);
+    Optional<FilesetContextPair> context = getFilesetContext(path, 
FilesetDataOperation.DELETE);
+    if (context.isPresent()) {
+      return 
context.get().getFileSystem().delete(context.get().getActualFileLocation(), 
recursive);
+    } else {
+      return false;
+    }
   }
 
   @Override
   public FileStatus getFileStatus(Path path) throws IOException {
-    FilesetContextPair context = getFilesetContext(path, 
FilesetDataOperation.GET_FILE_STATUS);
-    FileStatus fileStatus = 
context.getFileSystem().getFileStatus(context.getActualFileLocation());
+    Optional<FilesetContextPair> context =
+        getFilesetContext(path, FilesetDataOperation.GET_FILE_STATUS);
+    throwFilesetPathNotFoundExceptionIf(
+        () -> !context.isPresent(), path, 
FilesetDataOperation.GET_FILE_STATUS);
+
+    FileStatus fileStatus =
+        
context.get().getFileSystem().getFileStatus(context.get().getActualFileLocation());
     NameIdentifier identifier = extractIdentifier(metalakeName, 
path.toString());
     String subPath = getSubPathFromGvfsPath(identifier, path.toString());
     String storageLocation =
         context
+            .get()
             .getActualFileLocation()
             .toString()
-            .substring(0, context.getActualFileLocation().toString().length() 
- subPath.length());
+            .substring(
+                0, context.get().getActualFileLocation().toString().length() - 
subPath.length());
     return convertFileStatusPathPrefix(
         fileStatus, storageLocation, getVirtualLocation(identifier, true));
   }
 
   @Override
   public FileStatus[] listStatus(Path path) throws IOException {
-    FilesetContextPair context = getFilesetContext(path, 
FilesetDataOperation.LIST_STATUS);
+    Optional<FilesetContextPair> context =
+        getFilesetContext(path, FilesetDataOperation.LIST_STATUS);
+    throwFilesetPathNotFoundExceptionIf(
+        () -> !context.isPresent(), path, FilesetDataOperation.LIST_STATUS);
+
     FileStatus[] fileStatusResults =
-        context.getFileSystem().listStatus(context.getActualFileLocation());
+        
context.get().getFileSystem().listStatus(context.get().getActualFileLocation());
     NameIdentifier identifier = extractIdentifier(metalakeName, 
path.toString());
     String subPath = getSubPathFromGvfsPath(identifier, path.toString());
     String storageLocation =
         context
+            .get()
             .getActualFileLocation()
             .toString()
-            .substring(0, context.getActualFileLocation().toString().length() 
- subPath.length());
+            .substring(
+                0, context.get().getActualFileLocation().toString().length() - 
subPath.length());
     return Arrays.stream(fileStatusResults)
         .map(
             fileStatus ->
@@ -497,20 +375,35 @@ public class GravitinoVirtualFileSystem extends 
FileSystem {
 
   @Override
   public boolean mkdirs(Path path, FsPermission permission) throws IOException 
{
-    FilesetContextPair context = getFilesetContext(path, 
FilesetDataOperation.MKDIRS);
-    return context.getFileSystem().mkdirs(context.getActualFileLocation(), 
permission);
+    Optional<FilesetContextPair> context = getFilesetContext(path, 
FilesetDataOperation.MKDIRS);
+    if (!context.isPresent()) {
+      throw new IOException(
+          "Fileset is not found for path: "
+              + path
+              + " for operation MKDIRS. "
+              + "This may be caused by fileset related metadata not found or 
not in use in "
+              + "Gravitino, please check the fileset metadata in Gravitino.");
+    }
+
+    return 
context.get().getFileSystem().mkdirs(context.get().getActualFileLocation(), 
permission);
   }
 
   @Override
   public short getDefaultReplication(Path f) {
-    FilesetContextPair context = getFilesetContext(f, 
FilesetDataOperation.GET_DEFAULT_REPLICATION);
-    return 
context.getFileSystem().getDefaultReplication(context.getActualFileLocation());
+    Optional<FilesetContextPair> context =
+        getFilesetContext(f, FilesetDataOperation.GET_DEFAULT_REPLICATION);
+    return context
+        .map(c -> 
c.getFileSystem().getDefaultReplication(c.getActualFileLocation()))
+        .orElse((short) 1);
   }
 
   @Override
   public long getDefaultBlockSize(Path f) {
-    FilesetContextPair context = getFilesetContext(f, 
FilesetDataOperation.GET_DEFAULT_BLOCK_SIZE);
-    return 
context.getFileSystem().getDefaultBlockSize(context.getActualFileLocation());
+    Optional<FilesetContextPair> context =
+        getFilesetContext(f, FilesetDataOperation.GET_DEFAULT_BLOCK_SIZE);
+    return context
+        .map(c -> 
c.getFileSystem().getDefaultBlockSize(c.getActualFileLocation()))
+        .orElse(defaultBlockSize);
   }
 
   @Override
@@ -520,7 +413,7 @@ public class GravitinoVirtualFileSystem extends FileSystem {
       try {
         tokenList.addAll(Arrays.asList(fileSystem.addDelegationTokens(renewer, 
credentials)));
       } catch (IOException e) {
-        Logger.warn("Failed to add delegation tokens for filesystem: {}", 
fileSystem.getUri(), e);
+        LOG.warn("Failed to add delegation tokens for filesystem: {}", 
fileSystem.getUri(), e);
       }
     }
     return tokenList.stream().distinct().toArray(Token[]::new);
@@ -551,6 +444,46 @@ public class GravitinoVirtualFileSystem extends FileSystem 
{
     super.close();
   }
 
+  private void initializeFileSystemCache(int maxCapacity, long 
expireAfterAccess) {
+    // Since Caffeine does not ensure that removalListener will be involved 
after expiration
+    // We use a scheduler with one thread to clean up expired clients.
+    this.internalFileSystemCleanScheduler =
+        new ScheduledThreadPoolExecutor(1, 
newDaemonThreadFactory("gvfs-filesystem-cache-cleaner"));
+    Caffeine<Object, Object> cacheBuilder =
+        Caffeine.newBuilder()
+            .maximumSize(maxCapacity)
+            
.scheduler(Scheduler.forScheduledExecutorService(internalFileSystemCleanScheduler))
+            .removalListener(
+                (key, value, cause) -> {
+                  FileSystem fs = (FileSystem) value;
+                  if (fs != null) {
+                    try {
+                      fs.close();
+                    } catch (IOException e) {
+                      LOG.error("Cannot close the file system for fileset: 
{}", key, e);
+                    }
+                  }
+                });
+    if (expireAfterAccess > 0) {
+      cacheBuilder.expireAfterAccess(expireAfterAccess, TimeUnit.MILLISECONDS);
+    }
+    this.internalFileSystemCache = cacheBuilder.build();
+  }
+
+  private void initializeCatalogCache() {
+    // Since Caffeine does not ensure that removalListener will be involved 
after expiration
+    // We use a scheduler with one thread to clean up expired clients.
+    this.catalogCleanScheduler =
+        new ScheduledThreadPoolExecutor(1, 
newDaemonThreadFactory("gvfs-catalog-cache-cleaner"));
+    // In most scenarios, it will not read so many catalog filesets at the 
same time, so we can just
+    // set a default value for this cache.
+    this.catalogCache =
+        Caffeine.newBuilder()
+            .maximumSize(100)
+            
.scheduler(Scheduler.forScheduledExecutorService(catalogCleanScheduler))
+            .build();
+  }
+
   private String initCurrentLocationName(Configuration configuration) {
     // get from configuration first, otherwise use the env variable
     // if both are not set, return null which means use the default location
@@ -558,21 +491,157 @@ public class GravitinoVirtualFileSystem extends 
FileSystem {
         .orElse(System.getenv(currentLocationEnvVar));
   }
 
-  private static class FilesetContextPair {
-    private final Path actualFileLocation;
-    private final FileSystem fileSystem;
+  private ThreadFactory newDaemonThreadFactory(String name) {
+    return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(name + 
"-%d").build();
+  }
 
-    public FilesetContextPair(Path actualFileLocation, FileSystem fileSystem) {
-      this.actualFileLocation = actualFileLocation;
-      this.fileSystem = fileSystem;
+  private String getVirtualLocation(NameIdentifier identifier, boolean 
withScheme) {
+    return String.format(
+        "%s/%s/%s/%s",
+        withScheme ? 
GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX : "",
+        identifier.namespace().level(1),
+        identifier.namespace().level(2),
+        identifier.name());
+  }
+
+  private Optional<FilesetContextPair> getFilesetContext(
+      Path virtualPath, FilesetDataOperation operation) {
+    NameIdentifier identifier = extractIdentifier(metalakeName, 
virtualPath.toString());
+    String virtualPathString = virtualPath.toString();
+    String subPath = getSubPathFromGvfsPath(identifier, virtualPathString);
+    NameIdentifier catalogIdent = NameIdentifier.of(metalakeName, 
identifier.namespace().level(1));
+
+    FilesetCatalog filesetCatalog;
+    try {
+      filesetCatalog =
+          catalogCache.get(
+              catalogIdent, ident -> 
client.loadCatalog(catalogIdent.name()).asFilesetCatalog());
+      Preconditions.checkArgument(
+          filesetCatalog != null,
+          String.format("Loaded fileset catalog: %s is null.", catalogIdent));
+    } catch (NoSuchCatalogException | CatalogNotInUseException e) {
+      LOG.warn("Cannot get fileset catalog by identifier: {}", catalogIdent, 
e);
+      return Optional.empty();
     }
 
-    public Path getActualFileLocation() {
-      return actualFileLocation;
+    Map<String, String> contextMap = Maps.newHashMap();
+    contextMap.put(
+        FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE,
+        InternalClientType.HADOOP_GVFS.name());
+    contextMap.put(FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION, 
operation.name());
+    CallerContext callerContext = 
CallerContext.builder().withContext(contextMap).build();
+    CallerContext.CallerContextHolder.set(callerContext);
+
+    String actualFileLocation;
+    try {
+      actualFileLocation =
+          filesetCatalog.getFileLocation(
+              NameIdentifier.of(identifier.namespace().level(2), 
identifier.name()),
+              subPath,
+              currentLocationName);
+    } catch (NoSuchFilesetException e) {
+      LOG.warn("Cannot get file location by identifier: {}, sub_path {}", 
identifier, subPath, e);
+      return Optional.empty();
     }
 
-    public FileSystem getFileSystem() {
-      return fileSystem;
+    Path filePath = new Path(actualFileLocation);
+    URI uri = filePath.toUri();
+    // we cache the fs for the same scheme, so we can reuse it
+    String scheme = uri.getScheme();
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(scheme), "Scheme of the actual file location 
cannot be null.");
+    FileSystem fs =
+        internalFileSystemCache.get(
+            Pair.of(identifier, currentLocationName),
+            ident -> {
+              try {
+                FileSystemProvider provider = 
fileSystemProvidersMap.get(scheme);
+                if (provider == null) {
+                  throw new GravitinoRuntimeException(
+                      "Unsupported file system scheme: %s for %s.",
+                      scheme, 
GravitinoVirtualFileSystemConfiguration.GVFS_SCHEME);
+                }
+
+                // Reset the FileSystem service loader to make sure the 
FileSystem will reload the
+                // service file systems, this is a temporary solution to fix 
the issue
+                // https://github.com/apache/gravitino/issues/5609
+                resetFileSystemServiceLoader(scheme);
+
+                Catalog catalog = (Catalog) filesetCatalog;
+                Map<String, String> necessaryPropertyFromCatalog =
+                    catalog.properties().entrySet().stream()
+                        .filter(
+                            property ->
+                                
CATALOG_NECESSARY_PROPERTIES_TO_KEEP.contains(property.getKey()))
+                        .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+                Map<String, String> totalProperty = 
Maps.newHashMap(necessaryPropertyFromCatalog);
+                totalProperty.putAll(getConfigMap(getConf()));
+                totalProperty.putAll(getCredentialProperties(provider, 
catalog, identifier));
+                return provider.getFileSystem(filePath, totalProperty);
+
+              } catch (IOException ioe) {
+                throw new GravitinoRuntimeException(
+                    ioe,
+                    "Exception occurs when create new FileSystem for actual 
uri: %s, msg: %s",
+                    uri,
+                    ioe.getMessage());
+              }
+            });
+
+    return Optional.of(new FilesetContextPair(new Path(actualFileLocation), 
fs));
+  }
+
+  private Map<String, String> getCredentialProperties(
+      FileSystemProvider fileSystemProvider, Catalog catalog, NameIdentifier 
filesetIdentifier) {
+    // Do not support credential vending, we do not need to add any credential 
properties.
+    if (!(fileSystemProvider instanceof SupportsCredentialVending)) {
+      return ImmutableMap.of();
+    }
+
+    ImmutableMap.Builder<String, String> mapBuilder = ImmutableMap.builder();
+    try {
+      Fileset fileset =
+          catalog
+              .asFilesetCatalog()
+              .loadFileset(
+                  NameIdentifier.of(
+                      filesetIdentifier.namespace().level(2), 
filesetIdentifier.name()));
+      Credential[] credentials = 
fileset.supportsCredentials().getCredentials();
+      if (credentials.length > 0) {
+        mapBuilder.put(
+            GravitinoFileSystemCredentialsProvider.GVFS_CREDENTIAL_PROVIDER,
+            
DefaultGravitinoFileSystemCredentialsProvider.class.getCanonicalName());
+        mapBuilder.put(
+            GravitinoFileSystemCredentialsProvider.GVFS_NAME_IDENTIFIER,
+            filesetIdentifier.toString());
+
+        SupportsCredentialVending supportsCredentialVending =
+            (SupportsCredentialVending) fileSystemProvider;
+        
mapBuilder.putAll(supportsCredentialVending.getFileSystemCredentialConf(credentials));
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    return mapBuilder.build();
+  }
+
+  private void resetFileSystemServiceLoader(String fsScheme) {
+    try {
+      Map<String, Class<? extends FileSystem>> serviceFileSystems =
+          (Map<String, Class<? extends FileSystem>>)
+              FieldUtils.getField(FileSystem.class, "SERVICE_FILE_SYSTEMS", 
true).get(null);
+
+      if (serviceFileSystems.containsKey(fsScheme)) {
+        return;
+      }
+
+      // Set this value to false so that FileSystem will reload the service 
file systems when
+      // needed.
+      FieldUtils.getField(FileSystem.class, "FILE_SYSTEMS_LOADED", 
true).set(null, false);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
     }
   }
 
@@ -595,4 +664,34 @@ public class GravitinoVirtualFileSystem extends FileSystem 
{
             });
     return resultMap;
   }
+
+  private void throwFilesetPathNotFoundExceptionIf(
+      Supplier<Boolean> condition, Path path, FilesetDataOperation op)
+      throws FilesetPathNotFoundException {
+    if (condition.get()) {
+      throw new FilesetPathNotFoundException(
+          String.format(
+              "Path [%s] not found for operation [%s] because of fileset and 
related "
+                  + "metadata not existed in Gravitino",
+              path, op));
+    }
+  }
+
+  private static class FilesetContextPair {
+    private final Path actualFileLocation;
+    private final FileSystem fileSystem;
+
+    public FilesetContextPair(Path actualFileLocation, FileSystem fileSystem) {
+      this.actualFileLocation = actualFileLocation;
+      this.fileSystem = fileSystem;
+    }
+
+    public Path getActualFileLocation() {
+      return actualFileLocation;
+    }
+
+    public FileSystem getFileSystem() {
+      return fileSystem;
+    }
+  }
 }
diff --git 
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
index 544cc0ccad..658eb617f6 100644
--- 
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
+++ 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
@@ -116,5 +116,11 @@ public class GravitinoVirtualFileSystemConfiguration {
   public static final String 
FS_GRAVITINO_CURRENT_LOCATION_NAME_ENV_VAR_DEFAULT =
       "CURRENT_LOCATION_NAME";
 
+  /** The configuration key for the block size of the GVFS file. */
+  public static final String FS_GRAVITINO_BLOCK_SIZE = 
"fs.gravitino.block.size";
+
+  /** The default block size of the GVFS file. */
+  public static final long FS_GRAVITINO_BLOCK_SIZE_DEFAULT = 32 * 1024 * 1024;
+
   private GravitinoVirtualFileSystemConfiguration() {}
 }
diff --git 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
index be7a7d35f5..31d7d078cf 100644
--- 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
+++ 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
@@ -20,6 +20,7 @@ package org.apache.gravitino.filesystem.hadoop;
 
 import static org.apache.gravitino.file.Fileset.LOCATION_NAME_UNKNOWN;
 import static 
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemUtils.extractIdentifier;
+import static org.apache.hc.core5.http.HttpStatus.SC_NOT_FOUND;
 import static org.apache.hc.core5.http.HttpStatus.SC_OK;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -47,8 +48,10 @@ import org.apache.gravitino.dto.AuditDTO;
 import org.apache.gravitino.dto.credential.CredentialDTO;
 import org.apache.gravitino.dto.file.FilesetDTO;
 import org.apache.gravitino.dto.responses.CredentialResponse;
+import org.apache.gravitino.dto.responses.ErrorResponse;
 import org.apache.gravitino.dto.responses.FileLocationResponse;
 import org.apache.gravitino.dto.responses.FilesetResponse;
+import org.apache.gravitino.exceptions.NoSuchFilesetException;
 import org.apache.gravitino.file.Fileset;
 import org.apache.gravitino.rest.RESTUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -148,12 +151,8 @@ public class TestGvfsBase extends GravitinoMockServerBase {
       FileLocationResponse fileLocationResponse = new 
FileLocationResponse(localPath.toString());
       Map<String, String> queryParams = new HashMap<>();
       queryParams.put("sub_path", RESTUtils.encodeString(""));
-      try {
-        buildMockResource(Method.GET, locationPath, queryParams, null, 
fileLocationResponse, SC_OK);
-        buildMockResourceForCredential(filesetName, localPath.toString());
-      } catch (JsonProcessingException e) {
-        throw new RuntimeException(e);
-      }
+      buildMockResource(Method.GET, locationPath, queryParams, null, 
fileLocationResponse, SC_OK);
+      buildMockResourceForCredential(filesetName, localPath.toString());
 
       FileSystemTestUtils.mkdirs(managedFilesetPath, gravitinoFileSystem);
       FileSystem proxyLocalFs =
@@ -195,13 +194,8 @@ public class TestGvfsBase extends GravitinoMockServerBase {
       FileLocationResponse fileLocationResponse = new 
FileLocationResponse(localPath1.toString());
       Map<String, String> queryParams = new HashMap<>();
       queryParams.put("sub_path", RESTUtils.encodeString(""));
-      try {
-        buildMockResource(
-            Method.GET, locationPath1, queryParams, null, 
fileLocationResponse, SC_OK);
-        buildMockResourceForCredential("fileset1", localPath1.toString());
-      } catch (JsonProcessingException e) {
-        throw new RuntimeException(e);
-      }
+      buildMockResource(Method.GET, locationPath1, queryParams, null, 
fileLocationResponse, SC_OK);
+      buildMockResourceForCredential("fileset1", localPath1.toString());
       FileSystemTestUtils.mkdirs(filesetPath1, fs);
 
       // expired by time
@@ -244,12 +238,8 @@ public class TestGvfsBase extends GravitinoMockServerBase {
       FileLocationResponse fileLocationResponse = new 
FileLocationResponse(localPath + "/test.txt");
       Map<String, String> queryParams = new HashMap<>();
       queryParams.put("sub_path", RESTUtils.encodeString("/test.txt"));
-      try {
-        buildMockResource(Method.GET, locationPath, queryParams, null, 
fileLocationResponse, SC_OK);
-        buildMockResourceForCredential(filesetName, localPath + "/test.txt");
-      } catch (JsonProcessingException e) {
-        throw new RuntimeException(e);
-      }
+      buildMockResource(Method.GET, locationPath, queryParams, null, 
fileLocationResponse, SC_OK);
+      buildMockResourceForCredential(filesetName, localPath + "/test.txt");
 
       Path localFilePath = new Path(localPath + "/test.txt");
       assertFalse(localFileSystem.exists(localFilePath));
@@ -301,12 +291,8 @@ public class TestGvfsBase extends GravitinoMockServerBase {
       FileLocationResponse fileLocationResponse = new 
FileLocationResponse(localPath + "/test.txt");
       Map<String, String> queryParams = new HashMap<>();
       queryParams.put("sub_path", RESTUtils.encodeString("/test.txt"));
-      try {
-        buildMockResource(Method.GET, locationPath, queryParams, null, 
fileLocationResponse, SC_OK);
-        buildMockResourceForCredential(filesetName, localPath + "/test.txt");
-      } catch (JsonProcessingException e) {
-        throw new RuntimeException(e);
-      }
+      buildMockResource(Method.GET, locationPath, queryParams, null, 
fileLocationResponse, SC_OK);
+      buildMockResourceForCredential(filesetName, localPath + "/test.txt");
 
       Path appendFile = new Path(managedFilesetPath + "/test.txt");
       Path localAppendFile = new Path(localPath + "/test.txt");
@@ -388,23 +374,14 @@ public class TestGvfsBase extends GravitinoMockServerBase 
{
           new FileLocationResponse(localPath + "/rename_src");
       Map<String, String> queryParams = new HashMap<>();
       queryParams.put("sub_path", RESTUtils.encodeString("/rename_src"));
-      try {
-        buildMockResource(Method.GET, locationPath, queryParams, null, 
fileLocationResponse, SC_OK);
-      } catch (JsonProcessingException e) {
-        throw new RuntimeException(e);
-      }
+      buildMockResource(Method.GET, locationPath, queryParams, null, 
fileLocationResponse, SC_OK);
 
       FileLocationResponse fileLocationResponse1 =
           new FileLocationResponse(localPath + "/rename_dst2");
       Map<String, String> queryParams1 = new HashMap<>();
       queryParams1.put("sub_path", RESTUtils.encodeString("/rename_dst2"));
-      try {
-        buildMockResource(
-            Method.GET, locationPath, queryParams1, null, 
fileLocationResponse1, SC_OK);
-        buildMockResourceForCredential(filesetName, localPath + 
"/rename_dst2");
-      } catch (JsonProcessingException e) {
-        throw new RuntimeException(e);
-      }
+      buildMockResource(Method.GET, locationPath, queryParams1, null, 
fileLocationResponse1, SC_OK);
+      buildMockResourceForCredential(filesetName, localPath + "/rename_dst2");
 
       Path srcLocalRenamePath = new Path(localPath + "/rename_src");
       localFileSystem.mkdirs(srcLocalRenamePath);
@@ -470,12 +447,8 @@ public class TestGvfsBase extends GravitinoMockServerBase {
           new FileLocationResponse(localPath + "/test_delete");
       Map<String, String> queryParams = new HashMap<>();
       queryParams.put("sub_path", RESTUtils.encodeString("/test_delete"));
-      try {
-        buildMockResource(Method.GET, locationPath, queryParams, null, 
fileLocationResponse, SC_OK);
-        buildMockResourceForCredential(filesetName, localPath + 
"/test_delete");
-      } catch (JsonProcessingException e) {
-        throw new RuntimeException(e);
-      }
+      buildMockResource(Method.GET, locationPath, queryParams, null, 
fileLocationResponse, SC_OK);
+      buildMockResourceForCredential(filesetName, localPath + "/test_delete");
 
       Path dirPath = new Path(managedFilesetPath + "/test_delete");
       Path localDirPath = new Path(localPath + "/test_delete");
@@ -517,12 +490,8 @@ public class TestGvfsBase extends GravitinoMockServerBase {
       FileLocationResponse fileLocationResponse = new 
FileLocationResponse(localPath.toString());
       Map<String, String> queryParams = new HashMap<>();
       queryParams.put("sub_path", RESTUtils.encodeString(""));
-      try {
-        buildMockResource(Method.GET, locationPath, queryParams, null, 
fileLocationResponse, SC_OK);
-        buildMockResourceForCredential(filesetName, localPath.toString());
-      } catch (JsonProcessingException e) {
-        throw new RuntimeException(e);
-      }
+      buildMockResource(Method.GET, locationPath, queryParams, null, 
fileLocationResponse, SC_OK);
+      buildMockResourceForCredential(filesetName, localPath.toString());
 
       FileStatus gravitinoStatus = 
gravitinoFileSystem.getFileStatus(managedFilesetPath);
       FileStatus localStatus = localFileSystem.getFileStatus(localPath);
@@ -562,12 +531,8 @@ public class TestGvfsBase extends GravitinoMockServerBase {
       FileLocationResponse fileLocationResponse = new 
FileLocationResponse(localPath.toString());
       Map<String, String> queryParams = new HashMap<>();
       queryParams.put("sub_path", RESTUtils.encodeString(""));
-      try {
-        buildMockResource(Method.GET, locationPath, queryParams, null, 
fileLocationResponse, SC_OK);
-        buildMockResourceForCredential(filesetName, localPath.toString());
-      } catch (JsonProcessingException e) {
-        throw new RuntimeException(e);
-      }
+      buildMockResource(Method.GET, locationPath, queryParams, null, 
fileLocationResponse, SC_OK);
+      buildMockResourceForCredential(filesetName, localPath.toString());
 
       List<FileStatus> gravitinoStatuses =
           new 
ArrayList<>(Arrays.asList(gravitinoFileSystem.listStatus(managedFilesetPath)));
@@ -613,12 +578,8 @@ public class TestGvfsBase extends GravitinoMockServerBase {
           new FileLocationResponse(localPath + "/test_mkdirs");
       Map<String, String> queryParams = new HashMap<>();
       queryParams.put("sub_path", RESTUtils.encodeString("/test_mkdirs"));
-      try {
-        buildMockResource(Method.GET, locationPath, queryParams, null, 
fileLocationResponse, SC_OK);
-        buildMockResourceForCredential(filesetName, localPath + 
"/test_mkdirs");
-      } catch (JsonProcessingException e) {
-        throw new RuntimeException(e);
-      }
+      buildMockResource(Method.GET, locationPath, queryParams, null, 
fileLocationResponse, SC_OK);
+      buildMockResourceForCredential(filesetName, localPath + "/test_mkdirs");
 
       Path subDirPath = new Path(managedFilesetPath + "/test_mkdirs");
       Path localDirPath = new Path(localPath + "/test_mkdirs");
@@ -739,12 +700,8 @@ public class TestGvfsBase extends GravitinoMockServerBase {
       FileLocationResponse fileLocationResponse = new 
FileLocationResponse(localPath.toString());
       Map<String, String> queryParams = new HashMap<>();
       queryParams.put("sub_path", RESTUtils.encodeString(""));
-      try {
-        buildMockResource(Method.GET, locationPath, queryParams, null, 
fileLocationResponse, SC_OK);
-        buildMockResourceForCredential(filesetName, localPath.toString());
-      } catch (JsonProcessingException e) {
-        throw new RuntimeException(e);
-      }
+      buildMockResource(Method.GET, locationPath, queryParams, null, 
fileLocationResponse, SC_OK);
+      buildMockResourceForCredential(filesetName, localPath.toString());
 
       assertEquals(1, fs.getDefaultReplication(managedFilesetPath));
     }
@@ -766,12 +723,8 @@ public class TestGvfsBase extends GravitinoMockServerBase {
       FileLocationResponse fileLocationResponse = new 
FileLocationResponse(localPath.toString());
       Map<String, String> queryParams = new HashMap<>();
       queryParams.put("sub_path", RESTUtils.encodeString(""));
-      try {
-        buildMockResource(Method.GET, locationPath, queryParams, null, 
fileLocationResponse, SC_OK);
-        buildMockResourceForCredential(filesetName, localPath.toString());
-      } catch (JsonProcessingException e) {
-        throw new RuntimeException(e);
-      }
+      buildMockResource(Method.GET, locationPath, queryParams, null, 
fileLocationResponse, SC_OK);
+      buildMockResourceForCredential(filesetName, localPath.toString());
 
       assertEquals(32 * 1024 * 1024, 
fs.getDefaultBlockSize(managedFilesetPath));
     }
@@ -795,4 +748,47 @@ public class TestGvfsBase extends GravitinoMockServerBase {
       assertEquals(expectedPath, convertedStatus.getPath());
     }
   }
+
+  @Test
+  public void testWhenFilesetNotCreated() throws IOException {
+    String filesetName = "testWhenFilesetNotCreated";
+    Path managedFilesetPath =
+        FileSystemTestUtils.createFilesetPath(catalogName, schemaName, 
filesetName, true);
+    Path localPath = FileSystemTestUtils.createLocalDirPrefix(catalogName, 
schemaName, filesetName);
+    String locationPath =
+        String.format(
+            "/api/metalakes/%s/catalogs/%s/schemas/%s/filesets/%s/location",
+            metalakeName, catalogName, schemaName, filesetName);
+    try (GravitinoVirtualFileSystem fs =
+        (GravitinoVirtualFileSystem) managedFilesetPath.getFileSystem(conf)) {
+
+      Map<String, String> queryParams = new HashMap<>();
+      queryParams.put("sub_path", RESTUtils.encodeString(""));
+      ErrorResponse errResp =
+          ErrorResponse.notFound(NoSuchFilesetException.class.getSimpleName(), 
"fileset not found");
+      buildMockResource(Method.GET, locationPath, queryParams, null, errResp, 
SC_NOT_FOUND);
+      buildMockResourceForCredential(filesetName, localPath.toString());
+
+      Path testPath = new Path(managedFilesetPath + "/test.txt");
+      assertThrows(RuntimeException.class, () -> 
fs.setWorkingDirectory(testPath));
+      assertThrows(FilesetPathNotFoundException.class, () -> 
fs.open(testPath));
+      assertThrows(IOException.class, () -> fs.create(testPath));
+      assertThrows(FilesetPathNotFoundException.class, () -> 
fs.append(testPath));
+
+      Path testPath1 = new Path(managedFilesetPath + "/test1.txt");
+      assertThrows(FilesetPathNotFoundException.class, () -> 
fs.rename(testPath, testPath1));
+
+      assertFalse(fs.delete(testPath, true));
+
+      assertThrows(FilesetPathNotFoundException.class, () -> 
fs.getFileStatus(testPath));
+      assertThrows(FilesetPathNotFoundException.class, () -> 
fs.listStatus(testPath));
+
+      assertThrows(IOException.class, () -> fs.mkdirs(testPath));
+
+      assertEquals(1, fs.getDefaultReplication(testPath));
+      assertEquals(
+          
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_BLOCK_SIZE_DEFAULT,
+          fs.getDefaultBlockSize(testPath));
+    }
+  }
 }

Reply via email to