This is an automated email from the ASF dual-hosted git repository.
mchades pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 74250f72b8 [#7771] improvement(fileset-catalog): Introduce the cache
system for filesystem in FilesetCatalogOperations (#7782)
74250f72b8 is described below
commit 74250f72b80086426cc6e6be4dd3989be80484e7
Author: Mini Yu <[email protected]>
AuthorDate: Tue Aug 5 11:20:18 2025 +0800
[#7771] improvement(fileset-catalog): Introduce the cache system for
filesystem in FilesetCatalogOperations (#7782)
### What changes were proposed in this pull request?
Add Caffeine to cache file system instance to avoid repeatedly create
and close file system.
### Why are the changes needed?
Creating and closing a file system instance is a heavy operation, and we
need to avoid such behaviour.
Fix: #7771
### Does this PR introduce _any_ user-facing change?
N/A.
### How was this patch tested?
Existing tests.
---
.../catalog/fileset/FilesetCatalogOperations.java | 164 +++++++++++++++------
.../fileset/TestFilesetCatalogOperations.java | 28 ++++
2 files changed, 147 insertions(+), 45 deletions(-)
diff --git
a/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java
b/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java
index 9fa992d333..d49c47c288 100644
---
a/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java
+++
b/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java
@@ -18,7 +18,6 @@
*/
package org.apache.gravitino.catalog.fileset;
-import static
org.apache.gravitino.catalog.fileset.FilesetCatalogPropertiesMetadata.CACHE_VALUE_NOT_SET;
import static org.apache.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;
import static org.apache.gravitino.file.Fileset.LOCATION_NAME_UNKNOWN;
import static org.apache.gravitino.file.Fileset.PROPERTY_CATALOG_PLACEHOLDER;
@@ -52,6 +51,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.Entity;
@@ -96,6 +96,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.slf4j.Logger;
@@ -119,7 +120,7 @@ public class FilesetCatalogOperations extends
ManagedSchemaOperations
@VisibleForTesting Map<String, Path> catalogStorageLocations;
- private Map<String, String> conf;
+ @VisibleForTesting Map<String, String> conf;
private CatalogInfo catalogInfo;
@@ -129,8 +130,77 @@ public class FilesetCatalogOperations extends
ManagedSchemaOperations
private boolean disableFSOps;
+ @VisibleForTesting ScheduledThreadPoolExecutor scheduler;
+ @VisibleForTesting Cache<FileSystemCacheKey, FileSystem> fileSystemCache;
+
FilesetCatalogOperations(EntityStore store) {
this.store = store;
+ scheduler =
+ new ScheduledThreadPoolExecutor(
+ 1,
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("file-system-cache-for-fileset" + "-%d")
+ .build());
+
+ this.fileSystemCache =
+ Caffeine.newBuilder()
+ .expireAfterAccess(1, TimeUnit.HOURS)
+ .removalListener(
+ (ignored, value, cause) -> {
+ try {
+ ((FileSystem) value).close();
+ } catch (IOException e) {
+ LOG.warn("Failed to close FileSystem instance in cache",
e);
+ }
+ })
+ .scheduler(Scheduler.forScheduledExecutorService(scheduler))
+ .build();
+ }
+
+ static class FileSystemCacheKey {
+ // When the path is a path without scheme such as 'file','hdfs', etc.,
then the scheme and
+ // authority are both null
+ @Nullable private final String scheme;
+ @Nullable private final String authority;
+ private final Map<String, String> conf;
+ private final String currentUser;
+
+ FileSystemCacheKey(String scheme, String authority, Map<String, String>
conf) {
+ this.scheme = scheme;
+ this.authority = authority;
+ this.conf = conf;
+
+ try {
+ this.currentUser =
UserGroupInformation.getCurrentUser().getShortUserName();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to get current user", e);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof FileSystemCacheKey)) {
+ return false;
+ }
+ FileSystemCacheKey that = (FileSystemCacheKey) o;
+ return conf.equals(that.conf)
+ && (scheme == null ? that.authority == null :
scheme.equals(that.scheme))
+ && (authority == null ? that.authority == null :
authority.equals(that.authority))
+ && currentUser.equals(that.currentUser);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = conf.hashCode();
+ result = 31 * result + (scheme == null ? 0 : scheme.hashCode());
+ result = 31 * result + (authority == null ? 0 : authority.hashCode());
+ result = 31 * result + currentUser.hashCode();
+ return result;
+ }
}
public FilesetCatalogOperations() {
@@ -242,9 +312,12 @@ public class FilesetCatalogOperations extends
ManagedSchemaOperations
}
String actualPath = getFileLocation(filesetIdent, subPath, locationName);
- Path formalizedPath = formalizePath(new Path(actualPath), conf);
- FileSystem fs = getFileSystem(formalizedPath, conf);
+ FileSystem fileSystem = getFileSystemWithCache(new Path(actualPath), conf);
+ Path formalizedPath =
+ new Path(actualPath).makeQualified(fileSystem.getUri(),
fileSystem.getWorkingDirectory());
+
+ FileSystem fs = getFileSystemWithCache(formalizedPath, conf);
if (!fs.exists(formalizedPath)) {
throw new IllegalArgumentException(
String.format(
@@ -367,10 +440,13 @@ public class FilesetCatalogOperations extends
ManagedSchemaOperations
try {
// formalize the path to avoid path without scheme, uri, authority,
etc.
for (Map.Entry<String, Path> entry : filesetPaths.entrySet()) {
- Path formalizePath = formalizePath(entry.getValue(), conf);
- filesetPathsBuilder.put(entry.getKey(), formalizePath);
- FileSystem fs = getFileSystem(formalizePath, conf);
+ FileSystem tmpFs = getFileSystemWithCache(entry.getValue(), conf);
+ Path formalizePath =
+ entry.getValue().makeQualified(tmpFs.getUri(),
tmpFs.getWorkingDirectory());
+
+ filesetPathsBuilder.put(entry.getKey(), formalizePath);
+ FileSystem fs = getFileSystemWithCache(formalizePath, conf);
if (!fs.exists(formalizePath)) {
if (!fs.mkdirs(formalizePath)) {
throw new RuntimeException(
@@ -536,7 +612,7 @@ public class FilesetCatalogOperations extends
ManagedSchemaOperations
storageLocations.forEach(
(locationName, location) -> {
try {
- FileSystem fs = getFileSystem(location, conf);
+ FileSystem fs = getFileSystemWithCache(location, conf);
if (fs.exists(location)) {
if (!fs.delete(location, true)) {
LOG.warn(
@@ -692,7 +768,7 @@ public class FilesetCatalogOperations extends
ManagedSchemaOperations
(locationName, schemaPath) -> {
if (schemaPath != null &&
!containsPlaceholder(schemaPath.toString())) {
try {
- FileSystem fs = getFileSystem(schemaPath, conf);
+ FileSystem fs = getFileSystemWithCache(schemaPath, conf);
if (!fs.exists(schemaPath)) {
if (!fs.mkdirs(schemaPath)) {
// Fail the operation when failed to create the schema path.
@@ -797,7 +873,7 @@ public class FilesetCatalogOperations extends
ManagedSchemaOperations
(locationName, location) -> {
try {
Path filesetPath = new Path(location);
- FileSystem fs = getFileSystem(filesetPath, conf);
+ FileSystem fs =
getFileSystemWithCache(filesetPath, conf);
if (fs.exists(filesetPath)) {
if (!fs.delete(filesetPath, true)) {
LOG.warn(
@@ -827,7 +903,7 @@ public class FilesetCatalogOperations extends
ManagedSchemaOperations
schemaPaths.forEach(
(locationName, schemaPath) -> {
try {
- FileSystem fs = getFileSystem(schemaPath, conf);
+ FileSystem fs = getFileSystemWithCache(schemaPath, conf);
if (fs.exists(schemaPath)) {
FileStatus[] statuses = fs.listStatus(schemaPath);
if (statuses.length == 0) {
@@ -896,42 +972,23 @@ public class FilesetCatalogOperations extends
ManagedSchemaOperations
@Override
public void close() throws IOException {
// do nothing
- }
-
- private Cache<NameIdentifier, FilesetImpl>
initializeFilesetCache(Map<String, String> config) {
- Caffeine<Object, Object> cacheBuilder =
- Caffeine.newBuilder()
- .removalListener(
- (k, v, c) -> LOG.info("Evicting fileset {} from cache due to
{}", k, c))
- .scheduler(
- Scheduler.forScheduledExecutorService(
- new ScheduledThreadPoolExecutor(
- 1,
- new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat("fileset-cleaner-%d")
- .build())));
-
- Long cacheEvictionIntervalInMs =
- (Long)
- propertiesMetadata
- .catalogPropertiesMetadata()
- .getOrDefault(
- config,
FilesetCatalogPropertiesMetadata.FILESET_CACHE_EVICTION_INTERVAL_MS);
- if (cacheEvictionIntervalInMs != CACHE_VALUE_NOT_SET) {
- cacheBuilder.expireAfterAccess(cacheEvictionIntervalInMs,
TimeUnit.MILLISECONDS);
+ if (scheduler != null) {
+ scheduler.shutdownNow();
}
- Long cacheMaxSize =
- (Long)
- propertiesMetadata
- .catalogPropertiesMetadata()
- .getOrDefault(config,
FilesetCatalogPropertiesMetadata.FILESET_CACHE_MAX_SIZE);
- if (cacheMaxSize != CACHE_VALUE_NOT_SET) {
- cacheBuilder.maximumSize(cacheMaxSize);
+ if (fileSystemCache != null) {
+ fileSystemCache
+ .asMap()
+ .forEach(
+ (k, v) -> {
+ try {
+ v.close();
+ } catch (IOException e) {
+ LOG.warn("Failed to close FileSystem instance in cache", e);
+ }
+ });
+ fileSystemCache.cleanUp();
}
-
- return cacheBuilder.build();
}
private void validateLocationHierarchy(
@@ -1248,10 +1305,27 @@ public class FilesetCatalogOperations extends
ManagedSchemaOperations
&& !CallerContext.CallerContextHolder.get().context().isEmpty();
}
+ @VisibleForTesting
+ FileSystem getFileSystemWithCache(Path path, Map<String, String> conf) {
+ String scheme = path.toUri().getScheme();
+ String authority = path.toUri().getAuthority();
+ return fileSystemCache.get(
+ new FileSystemCacheKey(scheme, authority, conf),
+ cacheKey -> {
+ try {
+ return getFileSystem(path, conf);
+ } catch (IOException e) {
+ throw new GravitinoRuntimeException(
+ e, "Failed to get FileSystem for fileset: path: %s, conf: %s",
path, conf);
+ }
+ });
+ }
+
private boolean checkSingleFile(Fileset fileset, String locationName) {
try {
Path locationPath = new
Path(fileset.storageLocations().get(locationName));
- return getFileSystem(locationPath,
conf).getFileStatus(locationPath).isFile();
+ FileSystem fileSystem = getFileSystemWithCache(locationPath, conf);
+ return fileSystem.getFileStatus(locationPath).isFile();
} catch (FileNotFoundException e) {
// We should always return false here, same with the logic in
`FileSystem.isFile(Path f)`.
return false;
diff --git
a/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
b/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
index 8c4fd4cdd8..a98e54bd8e 100644
---
a/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
+++
b/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
@@ -44,6 +44,8 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Scheduler;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.io.File;
@@ -56,6 +58,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.FileUtils;
@@ -1426,6 +1430,21 @@ public class TestFilesetCatalogOperations {
try (FilesetCatalogOperations mockOps =
Mockito.mock(FilesetCatalogOperations.class)) {
mockOps.hadoopConf = new Configuration();
+ mockOps.conf = Maps.newHashMap();
+ mockOps.scheduler = new ScheduledThreadPoolExecutor(1);
+ mockOps.fileSystemCache =
+ Caffeine.newBuilder()
+ .expireAfterAccess(1000 * 60 * 60 /* 1 hour */,
TimeUnit.MILLISECONDS)
+ .removalListener(
+ (ignored, value, cause) -> {
+ try {
+ ((FileSystem) value).close();
+ } catch (IOException e) {
+ // Ignore
+ }
+ })
+
.scheduler(Scheduler.forScheduledExecutorService(mockOps.scheduler))
+ .build();
when(mockOps.loadFileset(filesetIdent)).thenReturn(mockFileset);
when(mockOps.getConf()).thenReturn(Maps.newHashMap());
String subPath = "/test/test.parquet";
@@ -1433,9 +1452,18 @@ public class TestFilesetCatalogOperations {
when(mockOps.getFileLocation(filesetIdent, subPath,
null)).thenCallRealMethod();
when(mockOps.getFileSystem(Mockito.any(), Mockito.any()))
.thenReturn(FileSystem.getLocal(new Configuration()));
+ when(mockOps.getFileSystemWithCache(Mockito.any(),
Mockito.any())).thenCallRealMethod();
String fileLocation = mockOps.getFileLocation(filesetIdent, subPath);
Assertions.assertEquals(
String.format("%s%s", mockFileset.storageLocation(),
subPath.substring(1)), fileLocation);
+
+ FileSystem fs1 =
+ mockOps.getFileSystemWithCache(new
Path("file:///dir1/subdir/file1"), mockOps.getConf());
+
+ FileSystem fs2 =
+ mockOps.getFileSystemWithCache(new
Path("file:///dir1/subdir/file2"), mockOps.getConf());
+
+ Assertions.assertSame(fs1, fs2);
}
}