This is an automated email from the ASF dual-hosted git repository.

mchades pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 74250f72b8 [#7771] improvement(fileset-catalog): Introduce the cache 
system for filesystem in FilesetCatalogOperations (#7782)
74250f72b8 is described below

commit 74250f72b80086426cc6e6be4dd3989be80484e7
Author: Mini Yu <[email protected]>
AuthorDate: Tue Aug 5 11:20:18 2025 +0800

    [#7771] improvement(fileset-catalog): Introduce the cache system for 
filesystem in FilesetCatalogOperations (#7782)
    
    ### What changes were proposed in this pull request?
    
    Add Caffeine to cache file system instance to avoid repeatedly create
    and close file system.
    
    ### Why are the changes needed?
    
    
    Creating and closing a file system instance is a heavy operation, and we
    need to avoid such behaviour.
    
    Fix: #7771
    
    ### Does this PR introduce _any_ user-facing change?
    
    N/A.
    
    ### How was this patch tested?
    
    Existing tests.
---
 .../catalog/fileset/FilesetCatalogOperations.java  | 164 +++++++++++++++------
 .../fileset/TestFilesetCatalogOperations.java      |  28 ++++
 2 files changed, 147 insertions(+), 45 deletions(-)

diff --git 
a/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java
 
b/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java
index 9fa992d333..d49c47c288 100644
--- 
a/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java
+++ 
b/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java
@@ -18,7 +18,6 @@
  */
 package org.apache.gravitino.catalog.fileset;
 
-import static 
org.apache.gravitino.catalog.fileset.FilesetCatalogPropertiesMetadata.CACHE_VALUE_NOT_SET;
 import static org.apache.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;
 import static org.apache.gravitino.file.Fileset.LOCATION_NAME_UNKNOWN;
 import static org.apache.gravitino.file.Fileset.PROPERTY_CATALOG_PLACEHOLDER;
@@ -52,6 +51,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.Entity;
@@ -96,6 +96,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.awaitility.Awaitility;
 import org.awaitility.core.ConditionTimeoutException;
 import org.slf4j.Logger;
@@ -119,7 +120,7 @@ public class FilesetCatalogOperations extends 
ManagedSchemaOperations
 
   @VisibleForTesting Map<String, Path> catalogStorageLocations;
 
-  private Map<String, String> conf;
+  @VisibleForTesting Map<String, String> conf;
 
   private CatalogInfo catalogInfo;
 
@@ -129,8 +130,77 @@ public class FilesetCatalogOperations extends 
ManagedSchemaOperations
 
   private boolean disableFSOps;
 
+  @VisibleForTesting ScheduledThreadPoolExecutor scheduler;
+  @VisibleForTesting Cache<FileSystemCacheKey, FileSystem> fileSystemCache;
+
   FilesetCatalogOperations(EntityStore store) {
     this.store = store;
+    scheduler =
+        new ScheduledThreadPoolExecutor(
+            1,
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat("file-system-cache-for-fileset" + "-%d")
+                .build());
+
+    this.fileSystemCache =
+        Caffeine.newBuilder()
+            .expireAfterAccess(1, TimeUnit.HOURS)
+            .removalListener(
+                (ignored, value, cause) -> {
+                  try {
+                    ((FileSystem) value).close();
+                  } catch (IOException e) {
+                    LOG.warn("Failed to close FileSystem instance in cache", 
e);
+                  }
+                })
+            .scheduler(Scheduler.forScheduledExecutorService(scheduler))
+            .build();
+  }
+
+  static class FileSystemCacheKey {
+    // When the path is a path without scheme such as 'file','hdfs', etc., 
then the scheme and
+    // authority are both null
+    @Nullable private final String scheme;
+    @Nullable private final String authority;
+    private final Map<String, String> conf;
+    private final String currentUser;
+
+    FileSystemCacheKey(String scheme, String authority, Map<String, String> 
conf) {
+      this.scheme = scheme;
+      this.authority = authority;
+      this.conf = conf;
+
+      try {
+        this.currentUser = 
UserGroupInformation.getCurrentUser().getShortUserName();
+      } catch (IOException e) {
+        throw new RuntimeException("Failed to get current user", e);
+      }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof FileSystemCacheKey)) {
+        return false;
+      }
+      FileSystemCacheKey that = (FileSystemCacheKey) o;
+      return conf.equals(that.conf)
+          && (scheme == null ? that.authority == null : 
scheme.equals(that.scheme))
+          && (authority == null ? that.authority == null : 
authority.equals(that.authority))
+          && currentUser.equals(that.currentUser);
+    }
+
+    @Override
+    public int hashCode() {
+      int result = conf.hashCode();
+      result = 31 * result + (scheme == null ? 0 : scheme.hashCode());
+      result = 31 * result + (authority == null ? 0 : authority.hashCode());
+      result = 31 * result + currentUser.hashCode();
+      return result;
+    }
   }
 
   public FilesetCatalogOperations() {
@@ -242,9 +312,12 @@ public class FilesetCatalogOperations extends 
ManagedSchemaOperations
     }
 
     String actualPath = getFileLocation(filesetIdent, subPath, locationName);
-    Path formalizedPath = formalizePath(new Path(actualPath), conf);
 
-    FileSystem fs = getFileSystem(formalizedPath, conf);
+    FileSystem fileSystem = getFileSystemWithCache(new Path(actualPath), conf);
+    Path formalizedPath =
+        new Path(actualPath).makeQualified(fileSystem.getUri(), 
fileSystem.getWorkingDirectory());
+
+    FileSystem fs = getFileSystemWithCache(formalizedPath, conf);
     if (!fs.exists(formalizedPath)) {
       throw new IllegalArgumentException(
           String.format(
@@ -367,10 +440,13 @@ public class FilesetCatalogOperations extends 
ManagedSchemaOperations
       try {
         // formalize the path to avoid path without scheme, uri, authority, 
etc.
         for (Map.Entry<String, Path> entry : filesetPaths.entrySet()) {
-          Path formalizePath = formalizePath(entry.getValue(), conf);
-          filesetPathsBuilder.put(entry.getKey(), formalizePath);
 
-          FileSystem fs = getFileSystem(formalizePath, conf);
+          FileSystem tmpFs = getFileSystemWithCache(entry.getValue(), conf);
+          Path formalizePath =
+              entry.getValue().makeQualified(tmpFs.getUri(), 
tmpFs.getWorkingDirectory());
+
+          filesetPathsBuilder.put(entry.getKey(), formalizePath);
+          FileSystem fs = getFileSystemWithCache(formalizePath, conf);
           if (!fs.exists(formalizePath)) {
             if (!fs.mkdirs(formalizePath)) {
               throw new RuntimeException(
@@ -536,7 +612,7 @@ public class FilesetCatalogOperations extends 
ManagedSchemaOperations
         storageLocations.forEach(
             (locationName, location) -> {
               try {
-                FileSystem fs = getFileSystem(location, conf);
+                FileSystem fs = getFileSystemWithCache(location, conf);
                 if (fs.exists(location)) {
                   if (!fs.delete(location, true)) {
                     LOG.warn(
@@ -692,7 +768,7 @@ public class FilesetCatalogOperations extends 
ManagedSchemaOperations
         (locationName, schemaPath) -> {
           if (schemaPath != null && 
!containsPlaceholder(schemaPath.toString())) {
             try {
-              FileSystem fs = getFileSystem(schemaPath, conf);
+              FileSystem fs = getFileSystemWithCache(schemaPath, conf);
               if (!fs.exists(schemaPath)) {
                 if (!fs.mkdirs(schemaPath)) {
                   // Fail the operation when failed to create the schema path.
@@ -797,7 +873,7 @@ public class FilesetCatalogOperations extends 
ManagedSchemaOperations
                           (locationName, location) -> {
                             try {
                               Path filesetPath = new Path(location);
-                              FileSystem fs = getFileSystem(filesetPath, conf);
+                              FileSystem fs = 
getFileSystemWithCache(filesetPath, conf);
                               if (fs.exists(filesetPath)) {
                                 if (!fs.delete(filesetPath, true)) {
                                   LOG.warn(
@@ -827,7 +903,7 @@ public class FilesetCatalogOperations extends 
ManagedSchemaOperations
         schemaPaths.forEach(
             (locationName, schemaPath) -> {
               try {
-                FileSystem fs = getFileSystem(schemaPath, conf);
+                FileSystem fs = getFileSystemWithCache(schemaPath, conf);
                 if (fs.exists(schemaPath)) {
                   FileStatus[] statuses = fs.listStatus(schemaPath);
                   if (statuses.length == 0) {
@@ -896,42 +972,23 @@ public class FilesetCatalogOperations extends 
ManagedSchemaOperations
   @Override
   public void close() throws IOException {
     // do nothing
-  }
-
-  private Cache<NameIdentifier, FilesetImpl> 
initializeFilesetCache(Map<String, String> config) {
-    Caffeine<Object, Object> cacheBuilder =
-        Caffeine.newBuilder()
-            .removalListener(
-                (k, v, c) -> LOG.info("Evicting fileset {} from cache due to 
{}", k, c))
-            .scheduler(
-                Scheduler.forScheduledExecutorService(
-                    new ScheduledThreadPoolExecutor(
-                        1,
-                        new ThreadFactoryBuilder()
-                            .setDaemon(true)
-                            .setNameFormat("fileset-cleaner-%d")
-                            .build())));
-
-    Long cacheEvictionIntervalInMs =
-        (Long)
-            propertiesMetadata
-                .catalogPropertiesMetadata()
-                .getOrDefault(
-                    config, 
FilesetCatalogPropertiesMetadata.FILESET_CACHE_EVICTION_INTERVAL_MS);
-    if (cacheEvictionIntervalInMs != CACHE_VALUE_NOT_SET) {
-      cacheBuilder.expireAfterAccess(cacheEvictionIntervalInMs, 
TimeUnit.MILLISECONDS);
+    if (scheduler != null) {
+      scheduler.shutdownNow();
     }
 
-    Long cacheMaxSize =
-        (Long)
-            propertiesMetadata
-                .catalogPropertiesMetadata()
-                .getOrDefault(config, 
FilesetCatalogPropertiesMetadata.FILESET_CACHE_MAX_SIZE);
-    if (cacheMaxSize != CACHE_VALUE_NOT_SET) {
-      cacheBuilder.maximumSize(cacheMaxSize);
+    if (fileSystemCache != null) {
+      fileSystemCache
+          .asMap()
+          .forEach(
+              (k, v) -> {
+                try {
+                  v.close();
+                } catch (IOException e) {
+                  LOG.warn("Failed to close FileSystem instance in cache", e);
+                }
+              });
+      fileSystemCache.cleanUp();
     }
-
-    return cacheBuilder.build();
   }
 
   private void validateLocationHierarchy(
@@ -1248,10 +1305,27 @@ public class FilesetCatalogOperations extends 
ManagedSchemaOperations
         && !CallerContext.CallerContextHolder.get().context().isEmpty();
   }
 
+  @VisibleForTesting
+  FileSystem getFileSystemWithCache(Path path, Map<String, String> conf) {
+    String scheme = path.toUri().getScheme();
+    String authority = path.toUri().getAuthority();
+    return fileSystemCache.get(
+        new FileSystemCacheKey(scheme, authority, conf),
+        cacheKey -> {
+          try {
+            return getFileSystem(path, conf);
+          } catch (IOException e) {
+            throw new GravitinoRuntimeException(
+                e, "Failed to get FileSystem for fileset: path: %s, conf: %s", 
path, conf);
+          }
+        });
+  }
+
   private boolean checkSingleFile(Fileset fileset, String locationName) {
     try {
       Path locationPath = new 
Path(fileset.storageLocations().get(locationName));
-      return getFileSystem(locationPath, 
conf).getFileStatus(locationPath).isFile();
+      FileSystem fileSystem = getFileSystemWithCache(locationPath, conf);
+      return fileSystem.getFileStatus(locationPath).isFile();
     } catch (FileNotFoundException e) {
       // We should always return false here, same with the logic in 
`FileSystem.isFile(Path f)`.
       return false;
diff --git 
a/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
 
b/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
index 8c4fd4cdd8..a98e54bd8e 100644
--- 
a/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
+++ 
b/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
@@ -44,6 +44,8 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.when;
 
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Scheduler;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import java.io.File;
@@ -56,6 +58,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.commons.io.FileUtils;
@@ -1426,6 +1430,21 @@ public class TestFilesetCatalogOperations {
 
     try (FilesetCatalogOperations mockOps = 
Mockito.mock(FilesetCatalogOperations.class)) {
       mockOps.hadoopConf = new Configuration();
+      mockOps.conf = Maps.newHashMap();
+      mockOps.scheduler = new ScheduledThreadPoolExecutor(1);
+      mockOps.fileSystemCache =
+          Caffeine.newBuilder()
+              .expireAfterAccess(1000 * 60 * 60 /* 1 hour */, 
TimeUnit.MILLISECONDS)
+              .removalListener(
+                  (ignored, value, cause) -> {
+                    try {
+                      ((FileSystem) value).close();
+                    } catch (IOException e) {
+                      // Ignore
+                    }
+                  })
+              
.scheduler(Scheduler.forScheduledExecutorService(mockOps.scheduler))
+              .build();
       when(mockOps.loadFileset(filesetIdent)).thenReturn(mockFileset);
       when(mockOps.getConf()).thenReturn(Maps.newHashMap());
       String subPath = "/test/test.parquet";
@@ -1433,9 +1452,18 @@ public class TestFilesetCatalogOperations {
       when(mockOps.getFileLocation(filesetIdent, subPath, 
null)).thenCallRealMethod();
       when(mockOps.getFileSystem(Mockito.any(), Mockito.any()))
           .thenReturn(FileSystem.getLocal(new Configuration()));
+      when(mockOps.getFileSystemWithCache(Mockito.any(), 
Mockito.any())).thenCallRealMethod();
       String fileLocation = mockOps.getFileLocation(filesetIdent, subPath);
       Assertions.assertEquals(
           String.format("%s%s", mockFileset.storageLocation(), 
subPath.substring(1)), fileLocation);
+
+      FileSystem fs1 =
+          mockOps.getFileSystemWithCache(new 
Path("file:///dir1/subdir/file1"), mockOps.getConf());
+
+      FileSystem fs2 =
+          mockOps.getFileSystemWithCache(new 
Path("file:///dir1/subdir/file2"), mockOps.getConf());
+
+      Assertions.assertSame(fs1, fs2);
     }
   }
 

Reply via email to