xloya commented on code in PR #4320:
URL: https://github.com/apache/gravitino/pull/4320#discussion_r1768310821
##########
clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java:
##########
@@ -65,8 +69,10 @@ public class GravitinoVirtualFileSystem extends FileSystem {
private URI uri;
private GravitinoClient client;
private String metalakeName;
- private Cache<NameIdentifier, Pair<Fileset, FileSystem>> filesetCache;
- private ScheduledThreadPoolExecutor scheduler;
+ private Cache<NameIdentifier, FilesetCatalog> catalogCache;
Review Comment:
Currently, to obtain Fileset metadata, we need to load Catalog first, and
then call asFilesetCatalog().getFileLocation. If the Catalog is not cached
here, two RPCs are required for each file operation. Considering that changes
to the Catalog are not very frequent, these requests may be unnecessary.
##########
clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java:
##########
@@ -381,51 +339,66 @@ NameIdentifier extractIdentifier(URI virtualUri) {
return NameIdentifier.of(metalakeName, matcher.group(1), matcher.group(2),
matcher.group(3));
}
- private FilesetContext getFilesetContext(Path virtualPath) {
+ private FilesetContextPair getFilesetContext(Path virtualPath,
FilesetDataOperation operation) {
NameIdentifier identifier = extractIdentifier(virtualPath.toUri());
- Pair<Fileset, FileSystem> pair = filesetCache.get(identifier,
this::constructNewFilesetPair);
- Preconditions.checkState(
- pair != null,
- "Cannot get the pair of fileset instance and actual file system for
%s",
- identifier);
- Path actualPath = getActualPathByIdentifier(identifier, pair, virtualPath);
- return FilesetContext.builder()
- .withIdentifier(identifier)
- .withFileset(pair.getLeft())
- .withFileSystem(pair.getRight())
- .withActualPath(actualPath)
- .build();
- }
+ String virtualPathString = virtualPath.toString();
+ String subPath = getSubPathFromVirtualPath(identifier, virtualPathString);
- private Pair<Fileset, FileSystem> constructNewFilesetPair(NameIdentifier
identifier) {
- // Always create a new file system instance for the fileset.
- // Therefore, users cannot bypass gvfs and use `FileSystem.get()` to
directly obtain the
- // FileSystem
- try {
- Fileset fileset = loadFileset(identifier);
- URI storageUri = URI.create(fileset.storageLocation());
- FileSystem actualFileSystem = FileSystem.newInstance(storageUri,
getConf());
- Preconditions.checkState(actualFileSystem != null, "Cannot get the
actual file system");
- return Pair.of(fileset, actualFileSystem);
- } catch (IOException e) {
- throw new RuntimeException(
- String.format(
- "Cannot create file system for fileset: %s, exception: %s",
- identifier, e.getMessage()),
- e);
- } catch (RuntimeException e) {
- throw new RuntimeException(
- String.format(
- "Cannot load fileset: %s from the server. exception: %s",
- identifier, e.getMessage()));
- }
+ NameIdentifier catalogIdent = NameIdentifier.of(metalakeName,
identifier.namespace().level(1));
+ FilesetCatalog filesetCatalog =
+ catalogCache.get(
+ catalogIdent, ident ->
client.loadCatalog(catalogIdent.name()).asFilesetCatalog());
Review Comment:
If the permissions on the catalog are changed frequently, I think it is
possible. However, in our scenario, we may not change the read permissions of
the catalog frequently, because only the read permission is needed here, and we
basically grant the read permission of this catalog to all users. I think we
can remove the cache here, but the premise is that our client can support
direct calls to `getFileLocation()`, instead of loading Catalog and then
calling `getFileLocation()` every time.
##########
catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java:
##########
@@ -369,11 +373,51 @@ public String getFileLocation(NameIdentifier ident,
String subPath)
Fileset fileset = loadFileset(ident);
+ boolean isMountSingleFile = checkMountsSingleFile(fileset);
+ if (isMountSingleFile) {
+ // if the storage location is a single file, it cannot have sub path to
access.
+ Preconditions.checkArgument(
+ StringUtils.isBlank(processedSubPath),
+ "Sub path should always be blank, because the fileset only mounts a
single file.");
+ }
+
+ // do checks for some data operations.
+ CallerContext callerContext = CallerContext.CallerContextHolder.get();
+ if (callerContext != null
+ && callerContext.context() != null
+ && !callerContext.context().isEmpty()) {
+ Map<String, String> contextMap =
CallerContext.CallerContextHolder.get().context();
+ String operation =
contextMap.get(FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION);
+ if (StringUtils.isNotBlank(operation)) {
+ Preconditions.checkArgument(
+ FilesetDataOperation.checkValid(operation),
+ String.format("The data operation: %s is not valid.", operation));
Review Comment:
Okay, I think it's acceptable.
##########
catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java:
##########
@@ -672,4 +716,18 @@ static Path formalizePath(Path path, Configuration
configuration) throws IOExcep
FileSystem defaultFs = FileSystem.get(configuration);
return path.makeQualified(defaultFs.getUri(),
defaultFs.getWorkingDirectory());
}
+
+ private boolean checkMountsSingleFile(Fileset fileset) {
+ try {
+ Path locationPath = new Path(fileset.storageLocation());
+ return
locationPath.getFileSystem(hadoopConf).getFileStatus(locationPath).isFile();
+ } catch (FileNotFoundException e) {
+ // We should always return false here, same with the logic in
`FileSystem.isFile(Path f)`.
+ return false;
Review Comment:
The implementation of the `isFile()` method of Hadoop FileSystem is
referenced here. Since this method has been deprecated in Hadoop 3, its
implementation is directly copied here.

##########
clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java:
##########
@@ -381,51 +339,66 @@ NameIdentifier extractIdentifier(URI virtualUri) {
return NameIdentifier.of(metalakeName, matcher.group(1), matcher.group(2),
matcher.group(3));
}
- private FilesetContext getFilesetContext(Path virtualPath) {
+ private FilesetContextPair getFilesetContext(Path virtualPath,
FilesetDataOperation operation) {
NameIdentifier identifier = extractIdentifier(virtualPath.toUri());
- Pair<Fileset, FileSystem> pair = filesetCache.get(identifier,
this::constructNewFilesetPair);
- Preconditions.checkState(
- pair != null,
- "Cannot get the pair of fileset instance and actual file system for
%s",
- identifier);
- Path actualPath = getActualPathByIdentifier(identifier, pair, virtualPath);
- return FilesetContext.builder()
- .withIdentifier(identifier)
- .withFileset(pair.getLeft())
- .withFileSystem(pair.getRight())
- .withActualPath(actualPath)
- .build();
- }
+ String virtualPathString = virtualPath.toString();
+ String subPath = getSubPathFromVirtualPath(identifier, virtualPathString);
- private Pair<Fileset, FileSystem> constructNewFilesetPair(NameIdentifier
identifier) {
- // Always create a new file system instance for the fileset.
- // Therefore, users cannot bypass gvfs and use `FileSystem.get()` to
directly obtain the
- // FileSystem
- try {
- Fileset fileset = loadFileset(identifier);
- URI storageUri = URI.create(fileset.storageLocation());
- FileSystem actualFileSystem = FileSystem.newInstance(storageUri,
getConf());
- Preconditions.checkState(actualFileSystem != null, "Cannot get the
actual file system");
- return Pair.of(fileset, actualFileSystem);
- } catch (IOException e) {
- throw new RuntimeException(
- String.format(
- "Cannot create file system for fileset: %s, exception: %s",
- identifier, e.getMessage()),
- e);
- } catch (RuntimeException e) {
- throw new RuntimeException(
- String.format(
- "Cannot load fileset: %s from the server. exception: %s",
- identifier, e.getMessage()));
- }
+ NameIdentifier catalogIdent = NameIdentifier.of(metalakeName,
identifier.namespace().level(1));
+ FilesetCatalog filesetCatalog =
+ catalogCache.get(
+ catalogIdent, ident ->
client.loadCatalog(catalogIdent.name()).asFilesetCatalog());
+ Preconditions.checkArgument(
+ filesetCatalog != null, String.format("Loaded fileset catalog: %s is
null.", catalogIdent));
+
+ // set the thread local audit info
+ Map<String, String> contextMap = Maps.newHashMap();
+ contextMap.put(
+ FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE,
+ InternalClientType.HADOOP_GVFS.name());
+ contextMap.put(FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION,
operation.name());
+ CallerContext callerContext =
CallerContext.builder().withContext(contextMap).build();
+ CallerContext.CallerContextHolder.set(callerContext);
+
+ String actualFileLocation =
+ filesetCatalog.getFileLocation(
+ NameIdentifier.of(identifier.namespace().level(2),
identifier.name()), subPath);
+
+ URI uri = new Path(actualFileLocation).toUri();
+ // we cache the fs for the same scheme, so we can reuse it
+ FileSystem fs =
Review Comment:
I think it is necessary not to use Hadoop's FileSystem cache but to maintain
our own FileSystem cache, because if Hadoop's FileSystem cache is used, the
user may directly obtain the authenticated FileSystem operation in the form of
FileSystem.get(), which does not belong to his storage resources. In addition,
here we use the `FileSystem.newInstance(storageUri, getConf())` method, which
ensures that each time a new FileSystem instance is created, the user cannot
directly obtain the authenticated FileSystem instance through FileSystem.get().
##########
clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java:
##########
@@ -65,8 +69,10 @@ public class GravitinoVirtualFileSystem extends FileSystem {
private URI uri;
private GravitinoClient client;
private String metalakeName;
- private Cache<NameIdentifier, Pair<Fileset, FileSystem>> filesetCache;
- private ScheduledThreadPoolExecutor scheduler;
+ private Cache<NameIdentifier, FilesetCatalog> catalogCache;
+ private ScheduledThreadPoolExecutor catalogCleanScheduler;
+ private Cache<String, FileSystem> internalFileSystemCache;
Review Comment:
The cache here ensures that the created FileSystem can be cleaned up at the
same time when GVFS is closed, otherwise we need to rely on the Hadoop
FileSystem mechanism to close it. In addition, we use the method of creating
new instances for accessing the underlying storage FileSystem in GVFS, see:
https://github.com/apache/gravitino/blob/main/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java#L407.
The reason for not using Hadoop's FileSystem cache is that in a multi-tenant
scenario, an unauthorized user may obtain the authenticated FileSystem through
`FileSystem.get()`, and this user may not have authorization for the
corresponding storage.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]