jerryshao commented on code in PR #4320:
URL: https://github.com/apache/gravitino/pull/4320#discussion_r1766911569


##########
catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java:
##########
@@ -369,11 +373,51 @@ public String getFileLocation(NameIdentifier ident, 
String subPath)
 
     Fileset fileset = loadFileset(ident);
 
+    boolean isMountSingleFile = checkMountsSingleFile(fileset);

Review Comment:
   Maybe we can rename to "isSingleFile" and "checkSingleFile", I feel that the 
current naming is a little weird.



##########
catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java:
##########
@@ -369,11 +373,51 @@ public String getFileLocation(NameIdentifier ident, 
String subPath)
 
     Fileset fileset = loadFileset(ident);
 
+    boolean isMountSingleFile = checkMountsSingleFile(fileset);
+    if (isMountSingleFile) {
+      // if the storage location is a single file, it cannot have sub path to 
access.
+      Preconditions.checkArgument(
+          StringUtils.isBlank(processedSubPath),
+          "Sub path should always be blank, because the fileset only mounts a 
single file.");
+    }
+
+    // do checks for some data operations.
+    CallerContext callerContext = CallerContext.CallerContextHolder.get();
+    if (callerContext != null
+        && callerContext.context() != null
+        && !callerContext.context().isEmpty()) {
+      Map<String, String> contextMap = 
CallerContext.CallerContextHolder.get().context();
+      String operation = 
contextMap.get(FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION);
+      if (StringUtils.isNotBlank(operation)) {
+        Preconditions.checkArgument(
+            FilesetDataOperation.checkValid(operation),
+            String.format("The data operation: %s is not valid.", operation));
+        FilesetDataOperation dataOperation = 
FilesetDataOperation.valueOf(operation);
+        if (dataOperation == FilesetDataOperation.RENAME) {
+          // Fileset only mounts a single file, the storage location of the 
fileset cannot be
+          // renamed;
+          // Otherwise the metadata in the Gravitino server may be 
inconsistent.
+          Preconditions.checkArgument(
+              StringUtils.isNotBlank(processedSubPath)
+                  && processedSubPath.startsWith(SLASH)
+                  && processedSubPath.length() > 1,
+              "subPath cannot be blank when need to rename a file or a 
directory.");
+          Preconditions.checkArgument(
+              !isMountSingleFile,
+              String.format(
+                  "Cannot rename the fileset: %s which only mounts to a single 
file.", ident));
+        }
+      }
+    }

Review Comment:
   Also for here, we can either use `if...else` only, or use `checkArgument` 
only.



##########
catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java:
##########
@@ -672,4 +716,18 @@ static Path formalizePath(Path path, Configuration 
configuration) throws IOExcep
     FileSystem defaultFs = FileSystem.get(configuration);
     return path.makeQualified(defaultFs.getUri(), 
defaultFs.getWorkingDirectory());
   }
+
+  private boolean checkMountsSingleFile(Fileset fileset) {
+    try {
+      Path locationPath = new Path(fileset.storageLocation());
+      return 
locationPath.getFileSystem(hadoopConf).getFileStatus(locationPath).isFile();
+    } catch (FileNotFoundException e) {
+      // We should always return false here, same with the logic in 
`FileSystem.isFile(Path f)`.
+      return false;

Review Comment:
   I'm not sure about the behavior of returning `false` here, can you explain 
why it should be false?



##########
clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java:
##########
@@ -381,51 +339,66 @@ NameIdentifier extractIdentifier(URI virtualUri) {
     return NameIdentifier.of(metalakeName, matcher.group(1), matcher.group(2), 
matcher.group(3));
   }
 
-  private FilesetContext getFilesetContext(Path virtualPath) {
+  private FilesetContextPair getFilesetContext(Path virtualPath, 
FilesetDataOperation operation) {
     NameIdentifier identifier = extractIdentifier(virtualPath.toUri());
-    Pair<Fileset, FileSystem> pair = filesetCache.get(identifier, 
this::constructNewFilesetPair);
-    Preconditions.checkState(
-        pair != null,
-        "Cannot get the pair of fileset instance and actual file system for 
%s",
-        identifier);
-    Path actualPath = getActualPathByIdentifier(identifier, pair, virtualPath);
-    return FilesetContext.builder()
-        .withIdentifier(identifier)
-        .withFileset(pair.getLeft())
-        .withFileSystem(pair.getRight())
-        .withActualPath(actualPath)
-        .build();
-  }
+    String virtualPathString = virtualPath.toString();
+    String subPath = getSubPathFromVirtualPath(identifier, virtualPathString);
 
-  private Pair<Fileset, FileSystem> constructNewFilesetPair(NameIdentifier 
identifier) {
-    // Always create a new file system instance for the fileset.
-    // Therefore, users cannot bypass gvfs and use `FileSystem.get()` to 
directly obtain the
-    // FileSystem
-    try {
-      Fileset fileset = loadFileset(identifier);
-      URI storageUri = URI.create(fileset.storageLocation());
-      FileSystem actualFileSystem = FileSystem.newInstance(storageUri, 
getConf());
-      Preconditions.checkState(actualFileSystem != null, "Cannot get the 
actual file system");
-      return Pair.of(fileset, actualFileSystem);
-    } catch (IOException e) {
-      throw new RuntimeException(
-          String.format(
-              "Cannot create file system for fileset: %s, exception: %s",
-              identifier, e.getMessage()),
-          e);
-    } catch (RuntimeException e) {
-      throw new RuntimeException(
-          String.format(
-              "Cannot load fileset: %s from the server. exception: %s",
-              identifier, e.getMessage()));
-    }
+    NameIdentifier catalogIdent = NameIdentifier.of(metalakeName, 
identifier.namespace().level(1));
+    FilesetCatalog filesetCatalog =
+        catalogCache.get(
+            catalogIdent, ident -> 
client.loadCatalog(catalogIdent.name()).asFilesetCatalog());

Review Comment:
   Will there be an authorization issue if we cache the catalog in local?



##########
clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java:
##########
@@ -65,8 +69,10 @@ public class GravitinoVirtualFileSystem extends FileSystem {
   private URI uri;
   private GravitinoClient client;
   private String metalakeName;
-  private Cache<NameIdentifier, Pair<Fileset, FileSystem>> filesetCache;
-  private ScheduledThreadPoolExecutor scheduler;
+  private Cache<NameIdentifier, FilesetCatalog> catalogCache;

Review Comment:
   Why do we need this cache?



##########
catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java:
##########
@@ -369,11 +373,51 @@ public String getFileLocation(NameIdentifier ident, 
String subPath)
 
     Fileset fileset = loadFileset(ident);
 
+    boolean isMountSingleFile = checkMountsSingleFile(fileset);
+    if (isMountSingleFile) {
+      // if the storage location is a single file, it cannot have sub path to 
access.
+      Preconditions.checkArgument(
+          StringUtils.isBlank(processedSubPath),
+          "Sub path should always be blank, because the fileset only mounts a 
single file.");
+    }

Review Comment:
   Here in this code, we mixed `if...else...` with `checkArgument`, I think we 
can replace to use with `checkArgument` only, that will be easy to read.



##########
clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java:
##########
@@ -65,8 +69,10 @@ public class GravitinoVirtualFileSystem extends FileSystem {
   private URI uri;
   private GravitinoClient client;
   private String metalakeName;
-  private Cache<NameIdentifier, Pair<Fileset, FileSystem>> filesetCache;
-  private ScheduledThreadPoolExecutor scheduler;
+  private Cache<NameIdentifier, FilesetCatalog> catalogCache;
+  private ScheduledThreadPoolExecutor catalogCleanScheduler;
+  private Cache<String, FileSystem> internalFileSystemCache;

Review Comment:
   What is the usage of this cache?



##########
catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java:
##########
@@ -369,11 +373,51 @@ public String getFileLocation(NameIdentifier ident, 
String subPath)
 
     Fileset fileset = loadFileset(ident);
 
+    boolean isMountSingleFile = checkMountsSingleFile(fileset);
+    if (isMountSingleFile) {
+      // if the storage location is a single file, it cannot have sub path to 
access.
+      Preconditions.checkArgument(
+          StringUtils.isBlank(processedSubPath),
+          "Sub path should always be blank, because the fileset only mounts a 
single file.");
+    }
+
+    // do checks for some data operations.
+    CallerContext callerContext = CallerContext.CallerContextHolder.get();
+    if (callerContext != null
+        && callerContext.context() != null
+        && !callerContext.context().isEmpty()) {
+      Map<String, String> contextMap = 
CallerContext.CallerContextHolder.get().context();
+      String operation = 
contextMap.get(FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION);
+      if (StringUtils.isNotBlank(operation)) {
+        Preconditions.checkArgument(
+            FilesetDataOperation.checkValid(operation),
+            String.format("The data operation: %s is not valid.", operation));

Review Comment:
   Is it too overkill that if the operation is unexpected then we throw an 
exception here?
   
   From my thinking, I think the warning log should be enough for illegal 
header, we can continue the main logic even if the header check is illegal, 
what do you think?



##########
clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java:
##########
@@ -381,51 +339,66 @@ NameIdentifier extractIdentifier(URI virtualUri) {
     return NameIdentifier.of(metalakeName, matcher.group(1), matcher.group(2), 
matcher.group(3));
   }
 
-  private FilesetContext getFilesetContext(Path virtualPath) {
+  private FilesetContextPair getFilesetContext(Path virtualPath, 
FilesetDataOperation operation) {
     NameIdentifier identifier = extractIdentifier(virtualPath.toUri());
-    Pair<Fileset, FileSystem> pair = filesetCache.get(identifier, 
this::constructNewFilesetPair);
-    Preconditions.checkState(
-        pair != null,
-        "Cannot get the pair of fileset instance and actual file system for 
%s",
-        identifier);
-    Path actualPath = getActualPathByIdentifier(identifier, pair, virtualPath);
-    return FilesetContext.builder()
-        .withIdentifier(identifier)
-        .withFileset(pair.getLeft())
-        .withFileSystem(pair.getRight())
-        .withActualPath(actualPath)
-        .build();
-  }
+    String virtualPathString = virtualPath.toString();
+    String subPath = getSubPathFromVirtualPath(identifier, virtualPathString);
 
-  private Pair<Fileset, FileSystem> constructNewFilesetPair(NameIdentifier 
identifier) {
-    // Always create a new file system instance for the fileset.
-    // Therefore, users cannot bypass gvfs and use `FileSystem.get()` to 
directly obtain the
-    // FileSystem
-    try {
-      Fileset fileset = loadFileset(identifier);
-      URI storageUri = URI.create(fileset.storageLocation());
-      FileSystem actualFileSystem = FileSystem.newInstance(storageUri, 
getConf());
-      Preconditions.checkState(actualFileSystem != null, "Cannot get the 
actual file system");
-      return Pair.of(fileset, actualFileSystem);
-    } catch (IOException e) {
-      throw new RuntimeException(
-          String.format(
-              "Cannot create file system for fileset: %s, exception: %s",
-              identifier, e.getMessage()),
-          e);
-    } catch (RuntimeException e) {
-      throw new RuntimeException(
-          String.format(
-              "Cannot load fileset: %s from the server. exception: %s",
-              identifier, e.getMessage()));
-    }
+    NameIdentifier catalogIdent = NameIdentifier.of(metalakeName, 
identifier.namespace().level(1));
+    FilesetCatalog filesetCatalog =
+        catalogCache.get(
+            catalogIdent, ident -> 
client.loadCatalog(catalogIdent.name()).asFilesetCatalog());
+    Preconditions.checkArgument(
+        filesetCatalog != null, String.format("Loaded fileset catalog: %s is 
null.", catalogIdent));
+
+    // set the thread local audit info
+    Map<String, String> contextMap = Maps.newHashMap();
+    contextMap.put(
+        FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE,
+        InternalClientType.HADOOP_GVFS.name());
+    contextMap.put(FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION, 
operation.name());
+    CallerContext callerContext = 
CallerContext.builder().withContext(contextMap).build();
+    CallerContext.CallerContextHolder.set(callerContext);
+
+    String actualFileLocation =
+        filesetCatalog.getFileLocation(
+            NameIdentifier.of(identifier.namespace().level(2), 
identifier.name()), subPath);
+
+    URI uri = new Path(actualFileLocation).toUri();
+    // we cache the fs for the same scheme, so we can reuse it
+    FileSystem fs =

Review Comment:
   I think hadoop client will also do the cache, do we need to do this in our 
side?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to