xloya commented on code in PR #5020:
URL: https://github.com/apache/gravitino/pull/5020#discussion_r1796789415


##########
catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/FileSystemProvider.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.catalog.hadoop;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+public interface FileSystemProvider {

Review Comment:
   It's better to extend the `AutoCloeable` and add the `close` method in the 
provider impl.



##########
clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java:
##########
@@ -125,13 +136,33 @@ public void initialize(URI name, Configuration 
configuration) throws IOException
 
     initializeClient(configuration);
 
+    initializePluginFileSystem(configuration);
+
     this.workingDirectory = new Path(name);
     this.uri = URI.create(name.getScheme() + "://" + name.getAuthority());
 
     setConf(configuration);
     super.initialize(uri, getConf());
   }
 
+  private void initializePluginFileSystem(Configuration configuration) {
+    String fileSystemProviders = 
configuration.get("fs.gvfs.filesystem.providers");

Review Comment:
   I think you need define this config in the 
`GravitinoVirtualFileSystemConfiguration`, and how about use 
`fs.gravitino.filesystem.providers`.



##########
catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.hadoop.fs;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+import org.apache.gravitino.catalog.hadoop.FileSystemProvider;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class HDFSFileSystemProvider implements FileSystemProvider {
+
+  @Override
+  public FileSystem getFileSystem(Map<String, String> config) throws 
IOException {
+    Configuration configuration = new Configuration();
+    config.forEach(configuration::set);
+
+    String pathString = configuration.get("fs.defaultFS");
+    if (pathString == null) {
+      throw new IllegalArgumentException("The path should be specified.");
+    }
+
+    URI uri = new Path(pathString).toUri();
+    if (uri.getScheme() != null && !uri.getScheme().equals("hdfs")) {
+      throw new IllegalArgumentException("The path should be a HDFS path.");
+    }
+
+    // Should we call DistributedFileSystem to create file system instance 
explicitly? If we
+    // explicitly create a HDFS file system here, we can't reuse the file 
system cache in the
+    // FileSystem class.
+    String impl = configuration.get("fs.hdfs.impl");
+    if (impl == null) {
+      configuration.set("fs.hdfs.impl", 
"org.apache.hadoop.hdfs.DistributedFileSystem");
+    } else {
+      if (!impl.equals("org.apache.hadoop.hdfs.DistributedFileSystem")) {
+        throw new IllegalArgumentException(
+            "The HDFS file system implementation class should be 
'org.apache.hadoop.hdfs.DistributedFileSystem'.");
+      }
+    }
+
+    try {
+      if 
(HDFSFileSystemProvider.class.getClassLoader().loadClass(configuration.get("fs.hdfs.impl"))
+          == null) {
+        throw new IllegalArgumentException(
+            "The HDFS file system implementation class is not found.");
+      }
+    } catch (ClassNotFoundException e) {
+      throw new IllegalArgumentException("The HDFS file system implementation 
class is not found.");
+    }
+
+    return FileSystem.newInstance(uri, configuration);

Review Comment:
   For GVFS, it's okay to use `newInstance` method, because we need guarantee 
that users cannot get the authenticated FileSystem through `FileSystem.get()` 
to avoid unauthorized access. But for the Gravitino server, maybe we can reuse 
the FileSystem, because there may use the super user + proxy user, so reusing 
it can reduce the cost of rebuilding the FileSystem.



##########
clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java:
##########
@@ -385,6 +423,23 @@ private FilesetContextPair getFilesetContext(Path 
virtualPath, FilesetDataOperat
     return new FilesetContextPair(new Path(actualFileLocation), fs);
   }
 
+  private Map<String, String> getConfigMap(Configuration configuration, URI 
uri) {
+    Map<String, String> maps = Maps.newHashMap();
+    configuration.forEach(
+        entry -> {
+          String key = entry.getKey();
+          if (key.startsWith(GRAVITINO_BYPASS_PREFIX)) {
+            maps.put(key.substring(GRAVITINO_BYPASS_PREFIX.length()), 
entry.getValue());
+          } else if (!key.startsWith("fs.gvfs.")) {

Review Comment:
   Also define with a variable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to