This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch branch-0.7
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/branch-0.7 by this push:
     new 1d7e9d656 [#5220] improvment(hadoop-catalog): Optimize the name 
properties keys for Hadoop catalog. (#5372)
1d7e9d656 is described below

commit 1d7e9d656181fc0cedbc6b023a11b41cd7c5b38e
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Oct 31 12:00:34 2024 +0800

    [#5220] improvment(hadoop-catalog): Optimize the name properties keys for 
Hadoop catalog. (#5372)
    
    ### What changes were proposed in this pull request?
    
    Replace the properties keys with `gravitino.bypass` prefix with a more
    elegant one.
    
    ### Why are the changes needed?
    
    For better user experience.
    
    Fix: #5220
    
    ### Does this PR introduce _any_ user-facing change?
    
    N/A
    
    ### How was this patch tested?
    
    Existing UTs and ITs.
    
    Co-authored-by: Qi Yu <[email protected]>
---
 bundles/aliyun-bundle/build.gradle.kts             |  3 +
 .../gravitino/oss/fs/OSSFileSystemProvider.java    | 34 +++++++-
 bundles/aws-bundle/build.gradle.kts                |  3 +
 .../gravitino/s3/fs/S3FileSystemProvider.java      | 24 +++++-
 bundles/gcp-bundle/build.gradle.kts                |  3 +
 .../gravitino/gcs/fs/GCSFileSystemProvider.java    | 17 ++--
 .../{OSSProperties.java => GCSProperties.java}     | 14 ++--
 .../apache/gravitino/storage/OSSProperties.java    |  2 +-
 .../org/apache/gravitino/storage/S3Properties.java |  3 +
 catalogs/catalog-hadoop/build.gradle.kts           |  4 +
 .../catalog/hadoop/fs/FileSystemProvider.java      | 14 ++++
 .../catalog/hadoop/fs/FileSystemUtils.java         | 59 ++++++++++++++
 .../hadoop/TestHadoopCatalogOperations.java        | 34 ++++++++
 .../catalog/hadoop/fs/TestFileSystemUtils.java     | 91 ++++++++++++++++++++++
 .../integration/test/HadoopGCSCatalogIT.java       | 71 ++++++++++++++++-
 .../integration/test/HadoopOSSCatalogIT.java       | 83 ++++++++++++++++++--
 .../hadoop/integration/test/HadoopS3CatalogIT.java | 78 +++++++++++++++++--
 .../gravitino/filesystem/gvfs_config.py            | 10 +--
 .../tests/integration/test_gvfs_with_gcs.py        |  3 +-
 .../tests/integration/test_gvfs_with_oss.py        |  7 +-
 .../tests/integration/test_gvfs_with_s3.py         |  6 +-
 clients/filesystem-hadoop3/build.gradle.kts        |  3 +
 .../hadoop/GravitinoVirtualFileSystem.java         | 16 +---
 .../test/GravitinoVirtualFileSystemGCSIT.java      | 19 +++--
 .../test/GravitinoVirtualFileSystemOSSIT.java      | 39 ++++++----
 .../test/GravitinoVirtualFileSystemS3IT.java       | 27 ++++---
 26 files changed, 567 insertions(+), 100 deletions(-)

diff --git a/bundles/aliyun-bundle/build.gradle.kts 
b/bundles/aliyun-bundle/build.gradle.kts
index 5858147e2..29676d8fa 100644
--- a/bundles/aliyun-bundle/build.gradle.kts
+++ b/bundles/aliyun-bundle/build.gradle.kts
@@ -35,6 +35,9 @@ dependencies {
   // 
org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.initialize(AliyunOSSFileSystem.java:323)
   // org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3611)
   implementation(libs.commons.lang)
+  implementation(project(":catalogs:catalog-common")) {
+    exclude("*")
+  }
 }
 
 tasks.withType(ShadowJar::class.java) {
diff --git 
a/bundles/aliyun-bundle/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java
 
b/bundles/aliyun-bundle/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java
index 97bce16f0..b47d25335 100644
--- 
a/bundles/aliyun-bundle/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java
+++ 
b/bundles/aliyun-bundle/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java
@@ -18,23 +18,49 @@
  */
 package org.apache.gravitino.oss.fs;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.util.Map;
 import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
+import org.apache.gravitino.storage.OSSProperties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem;
+import org.apache.hadoop.fs.aliyun.oss.Constants;
 
 public class OSSFileSystemProvider implements FileSystemProvider {
+
+  private static final String OSS_FILESYSTEM_IMPL = "fs.oss.impl";
+
+  // This map maintains the mapping relationship between the OSS properties in 
Gravitino and
+  // the Hadoop properties. Through this map, users can customize the OSS 
properties in Gravitino
+  // and map them to the corresponding Hadoop properties.
+  // For example, User can use oss-endpoint to set the endpoint of OSS 
'fs.oss.endpoint' in
+  // Gravitino.
+  // GCS and S3 also have similar mapping relationship.
+
+  @VisibleForTesting
+  public static final Map<String, String> GRAVITINO_KEY_TO_OSS_HADOOP_KEY =
+      ImmutableMap.of(
+          OSSProperties.GRAVITINO_OSS_ENDPOINT, Constants.ENDPOINT_KEY,
+          OSSProperties.GRAVITINO_OSS_ACCESS_KEY_ID, Constants.ACCESS_KEY_ID,
+          OSSProperties.GRAVITINO_OSS_ACCESS_KEY_SECRET, 
Constants.ACCESS_KEY_SECRET);
+
   @Override
   public FileSystem getFileSystem(Path path, Map<String, String> config) 
throws IOException {
     Configuration configuration = new Configuration();
-    config.forEach(
-        (k, v) -> {
-          configuration.set(k.replace("gravitino.bypass.", ""), v);
-        });
 
+    Map<String, String> hadoopConfMap =
+        FileSystemUtils.toHadoopConfigMap(config, 
GRAVITINO_KEY_TO_OSS_HADOOP_KEY);
+    // OSS do not use service loader to load the file system, so we need to 
set the impl class
+    if (!hadoopConfMap.containsKey(OSS_FILESYSTEM_IMPL)) {
+      hadoopConfMap.put(OSS_FILESYSTEM_IMPL, 
AliyunOSSFileSystem.class.getCanonicalName());
+    }
+
+    hadoopConfMap.forEach(configuration::set);
     return AliyunOSSFileSystem.newInstance(path.toUri(), configuration);
   }
 
diff --git a/bundles/aws-bundle/build.gradle.kts 
b/bundles/aws-bundle/build.gradle.kts
index e1723d7af..0036b5eea 100644
--- a/bundles/aws-bundle/build.gradle.kts
+++ b/bundles/aws-bundle/build.gradle.kts
@@ -35,6 +35,9 @@ dependencies {
   implementation(libs.aws.policy)
   implementation(libs.aws.sts)
   implementation(libs.hadoop3.aws)
+  implementation(project(":catalogs:catalog-common")) {
+    exclude("*")
+  }
 }
 
 tasks.withType(ShadowJar::class.java) {
diff --git 
a/bundles/aws-bundle/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java
 
b/bundles/aws-bundle/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java
index 4ab1ca242..b61e9d14f 100644
--- 
a/bundles/aws-bundle/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java
+++ 
b/bundles/aws-bundle/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java
@@ -19,23 +19,39 @@
 
 package org.apache.gravitino.s3.fs;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.util.Map;
 import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
+import org.apache.gravitino.storage.S3Properties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 
 public class S3FileSystemProvider implements FileSystemProvider {
+
+  @VisibleForTesting
+  public static final Map<String, String> GRAVITINO_KEY_TO_S3_HADOOP_KEY =
+      ImmutableMap.of(
+          S3Properties.GRAVITINO_S3_ENDPOINT, Constants.ENDPOINT,
+          S3Properties.GRAVITINO_S3_ACCESS_KEY_ID, Constants.ACCESS_KEY,
+          S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY, Constants.SECRET_KEY);
+
   @Override
   public FileSystem getFileSystem(Path path, Map<String, String> config) 
throws IOException {
     Configuration configuration = new Configuration();
-    config.forEach(
-        (k, v) -> {
-          configuration.set(k.replace("gravitino.bypass.", ""), v);
-        });
+    Map<String, String> hadoopConfMap =
+        FileSystemUtils.toHadoopConfigMap(config, 
GRAVITINO_KEY_TO_S3_HADOOP_KEY);
 
+    if (!hadoopConfMap.containsKey(Constants.AWS_CREDENTIALS_PROVIDER)) {
+      configuration.set(
+          Constants.AWS_CREDENTIALS_PROVIDER, 
Constants.ASSUMED_ROLE_CREDENTIALS_DEFAULT);
+    }
+    hadoopConfMap.forEach(configuration::set);
     return S3AFileSystem.newInstance(path.toUri(), configuration);
   }
 
diff --git a/bundles/gcp-bundle/build.gradle.kts 
b/bundles/gcp-bundle/build.gradle.kts
index 4ff29b845..14488ec27 100644
--- a/bundles/gcp-bundle/build.gradle.kts
+++ b/bundles/gcp-bundle/build.gradle.kts
@@ -36,6 +36,9 @@ dependencies {
   // runtime used
   implementation(libs.commons.logging)
   implementation(libs.hadoop3.gcs)
+  implementation(project(":catalogs:catalog-common")) {
+    exclude("*")
+  }
   implementation(libs.google.auth.http)
   implementation(libs.google.auth.credentials)
 }
diff --git 
a/bundles/gcp-bundle/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
 
b/bundles/gcp-bundle/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
index 74a70f083..a07ff3d6e 100644
--- 
a/bundles/gcp-bundle/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
+++ 
b/bundles/gcp-bundle/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
@@ -19,9 +19,13 @@
 package org.apache.gravitino.gcs.fs;
 
 import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.util.Map;
 import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
+import org.apache.gravitino.storage.GCSProperties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -30,15 +34,18 @@ import org.slf4j.LoggerFactory;
 
 public class GCSFileSystemProvider implements FileSystemProvider {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(GCSFileSystemProvider.class);
+  private static final String GCS_SERVICE_ACCOUNT_JSON_FILE =
+      "fs.gs.auth.service.account.json.keyfile";
+
+  @VisibleForTesting
+  public static final Map<String, String> GRAVITINO_KEY_TO_GCS_HADOOP_KEY =
+      ImmutableMap.of(GCSProperties.GCS_SERVICE_ACCOUNT_JSON_PATH, 
GCS_SERVICE_ACCOUNT_JSON_FILE);
 
   @Override
   public FileSystem getFileSystem(Path path, Map<String, String> config) 
throws IOException {
     Configuration configuration = new Configuration();
-    config.forEach(
-        (k, v) -> {
-          configuration.set(k.replace("gravitino.bypass.", ""), v);
-        });
-
+    FileSystemUtils.toHadoopConfigMap(config, GRAVITINO_KEY_TO_GCS_HADOOP_KEY)
+        .forEach(configuration::set);
     LOGGER.info("Creating GCS file system with config: {}", config);
     return GoogleHadoopFileSystem.newInstance(path.toUri(), configuration);
   }
diff --git 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/storage/OSSProperties.java
 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/storage/GCSProperties.java
similarity index 61%
copy from 
catalogs/catalog-common/src/main/java/org/apache/gravitino/storage/OSSProperties.java
copy to 
catalogs/catalog-common/src/main/java/org/apache/gravitino/storage/GCSProperties.java
index 8ceae5b82..ca8599584 100644
--- 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/storage/OSSProperties.java
+++ 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/storage/GCSProperties.java
@@ -16,17 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.gravitino.storage;
 
-// Defines the unified OSS properties for different catalogs and connectors.
-public class OSSProperties {
+public class GCSProperties {
 
-  // The endpoint of Aliyun OSS service.
-  public static final String GRAVITINO_OSS_ENDPOINT = "oss-endpoint";
-  // The static access key ID used to access OSS data.
-  public static final String GRAVITINO_OSS_ACCESS_KEY_ID = "oss-access-key-id";
-  // The static access key secret used to access OSS data.
-  public static final String GRAVITINO_OSS_ACCESS_KEY_SECRET = 
"oss-access-key-secret";
+  // The path of service account JSON file of Google Cloud Storage.
+  public static final String GCS_SERVICE_ACCOUNT_JSON_PATH = 
"gcs-service-account-file";
 
-  private OSSProperties() {}
+  private GCSProperties() {}
 }
diff --git 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/storage/OSSProperties.java
 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/storage/OSSProperties.java
index 8ceae5b82..3885eb360 100644
--- 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/storage/OSSProperties.java
+++ 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/storage/OSSProperties.java
@@ -26,7 +26,7 @@ public class OSSProperties {
   // The static access key ID used to access OSS data.
   public static final String GRAVITINO_OSS_ACCESS_KEY_ID = "oss-access-key-id";
   // The static access key secret used to access OSS data.
-  public static final String GRAVITINO_OSS_ACCESS_KEY_SECRET = 
"oss-access-key-secret";
+  public static final String GRAVITINO_OSS_ACCESS_KEY_SECRET = 
"oss-secret-access-key";
 
   private OSSProperties() {}
 }
diff --git 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/storage/S3Properties.java
 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/storage/S3Properties.java
index af37ae690..2dbe67649 100644
--- 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/storage/S3Properties.java
+++ 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/storage/S3Properties.java
@@ -35,5 +35,8 @@ public class S3Properties {
   // S3 external id
   public static final String GRAVITINO_S3_EXTERNAL_ID = "s3-external-id";
 
+  // The S3 credentials provider class name.
+  public static final String GRAVITINO_S3_CREDS_PROVIDER = "s3-creds-provider";
+
   private S3Properties() {}
 }
diff --git a/catalogs/catalog-hadoop/build.gradle.kts 
b/catalogs/catalog-hadoop/build.gradle.kts
index 62a48656c..c925d1b92 100644
--- a/catalogs/catalog-hadoop/build.gradle.kts
+++ b/catalogs/catalog-hadoop/build.gradle.kts
@@ -36,6 +36,10 @@ dependencies {
     exclude(group = "*")
   }
 
+  implementation(project(":catalogs:catalog-common")) {
+    exclude(group = "*")
+  }
+
   compileOnly(libs.guava)
 
   implementation(libs.hadoop3.common) {
diff --git 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemProvider.java
 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemProvider.java
index 5bee821e5..9c1979345 100644
--- 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemProvider.java
+++ 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemProvider.java
@@ -32,6 +32,20 @@ import org.apache.hadoop.fs.Path;
  */
 public interface FileSystemProvider {
 
+  /**
+   * The prefix of the configuration key that should be bypassed when setting 
the configuration to
+   * the FileSystem instance.
+   *
+   * <p>For example, if the configuration key passed to {@link
+   * FileSystemProvider#getFileSystem(Path, Map)} 
'gravitino.bypass.fs.s3a.endpoint', the prefix
+   * 'gravitino.bypass.' should be removed when setting the configuration to 
the FileSystem
+   * instance.
+   *
+   * <p>User can use this prefix to pass the configuration item that has not 
been defined in
+   * Gravitino.
+   */
+  String GRAVITINO_BYPASS = "gravitino.bypass.";
+
   /**
    * Get the FileSystem instance according to the configuration map and file 
path.
    *
diff --git 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemUtils.java
 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemUtils.java
index 3a959ff37..3ed307aa0 100644
--- 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemUtils.java
+++ 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemUtils.java
@@ -20,6 +20,7 @@ package org.apache.gravitino.catalog.hadoop.fs;
 
 import static 
org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.BUILTIN_HDFS_FS_PROVIDER;
 import static 
org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.BUILTIN_LOCAL_FS_PROVIDER;
+import static 
org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider.GRAVITINO_BYPASS;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -101,4 +102,62 @@ public class FileSystemUtils {
                         "File system provider with name '%s' not found in the 
file system provider list.",
                         fileSystemProviderName)));
   }
+
+  /**
+   * Convert the Gravitino configuration to Hadoop configuration.
+   *
+   * <p>Predefined keys have the highest priority. If the key does not exist 
in the predefined keys,
+   * it will be set to the configuration. Keys with prefixes 
'gravitino.bypass' has the lowest
+   * priority.
+   *
+   * <p>Consider the following example:
+   *
+   * <pre>
+   * config:
+   *  k1=v1
+   *  gravitino.bypass.k1=v2
+   *  custom-k1=v3
+   * predefinedKeys:
+   *  custom-k1=k1
+   * then the result will be:
+   *  k1=v3
+   * </pre>
+   *
+   * @param config Gravitino configuration
+   * @return Hadoop configuration Map
+   */
+  public static Map<String, String> toHadoopConfigMap(
+      Map<String, String> config, Map<String, String> predefinedKeys) {
+    Map<String, String> result = Maps.newHashMap();
+
+    // First, add those keys that start with 'gravitino.bypass' to the result 
map as it has the
+    // lowest priority.
+    config.forEach(
+        (k, v) -> {
+          if (k.startsWith(GRAVITINO_BYPASS)) {
+            String key = k.replace(GRAVITINO_BYPASS, "");
+            result.put(key, v);
+          }
+        });
+
+    // Then add those keys that are not in the predefined keys and not start 
with 'gravitino.bypass'
+    // to the result map.
+    config.forEach(
+        (k, v) -> {
+          if (!predefinedKeys.containsKey(k) && 
!k.startsWith(GRAVITINO_BYPASS)) {
+            result.put(k, v);
+          }
+        });
+
+    // Last, add those keys that are in the predefined keys to the result map.
+    config.forEach(
+        (k, v) -> {
+          if (predefinedKeys.containsKey(k)) {
+            String key = predefinedKeys.get(k);
+            result.put(key, v);
+          }
+        });
+
+    return result;
+  }
 }
diff --git 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
index 9b5b61f27..9575a1313 100644
--- 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
+++ 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
@@ -41,6 +41,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import java.io.File;
 import java.io.IOException;
+import java.net.ConnectException;
 import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.Map;
@@ -796,6 +797,39 @@ public class TestHadoopCatalogOperations {
                 ImmutableMap.of()));
   }
 
+  @Test
+  void testTrailSlash() throws IOException {
+    try (SecureHadoopCatalogOperations ops = new 
SecureHadoopCatalogOperations(store)) {
+
+      String location = "hdfs://localhost:9000";
+      Map<String, String> catalogProperties = Maps.newHashMap();
+      catalogProperties.put(HadoopCatalogPropertiesMetadata.LOCATION, 
location);
+
+      ops.initialize(catalogProperties, randomCatalogInfo(), 
HADOOP_PROPERTIES_METADATA);
+
+      String schemaName = "schema1024";
+      NameIdentifier nameIdentifier = NameIdentifierUtil.ofSchema("m1", "c1", 
schemaName);
+
+      Map<String, String> schemaProperties = Maps.newHashMap();
+      schemaProperties.put(HadoopCatalogPropertiesMetadata.LOCATION, 
"hdfs://localhost:9000/user1");
+      StringIdentifier stringId = 
StringIdentifier.fromId(idGenerator.nextId());
+      schemaProperties =
+          Maps.newHashMap(StringIdentifier.newPropertiesWithId(stringId, 
schemaProperties));
+
+      Map<String, String> finalSchemaProperties = schemaProperties;
+
+      // If not fixed by #5296, this method will throw 
java.lang.IllegalArgumentException:
+      // java.net.URISyntaxException: Relative path in absolute URI: 
hdfs://localhost:9000schema1024
+      // After #5296, this method will throw java.lang.RuntimeException: 
Failed to create
+      // schema m1.c1.schema1024 location hdfs://localhost:9000/user1
+      Exception exception =
+          Assertions.assertThrows(
+              Exception.class,
+              () -> ops.createSchema(nameIdentifier, "comment", 
finalSchemaProperties));
+      Assertions.assertTrue(exception.getCause() instanceof ConnectException);
+    }
+  }
+
   @Test
   public void testGetFileLocation() throws IOException {
     String schemaName = "schema1024";
diff --git 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/fs/TestFileSystemUtils.java
 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/fs/TestFileSystemUtils.java
new file mode 100644
index 000000000..b4e0809b6
--- /dev/null
+++ 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/fs/TestFileSystemUtils.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.catalog.hadoop.fs;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import java.util.stream.Stream;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+public class TestFileSystemUtils {
+  @ParameterizedTest
+  @MethodSource("mapArguments")
+  void testToHadoopConfigMap(
+      Map<String, String> confMap,
+      Map<String, String> toHadoopConf,
+      Map<String, String> predefineKeys) {
+    Map<String, String> result = FileSystemUtils.toHadoopConfigMap(confMap, 
predefineKeys);
+    Assertions.assertEquals(toHadoopConf, result);
+  }
+
+  private static Stream<Arguments> mapArguments() {
+    return Stream.of(
+        Arguments.of(
+            ImmutableMap.of(
+                "fs.s3a.endpoint", "v1",
+                "fs.s3a.impl", "v2"),
+            ImmutableMap.of("fs.s3a.endpoint", "v1", "fs.s3a.impl", "v2"),
+            ImmutableMap.of()),
+        Arguments.of(
+            ImmutableMap.of(
+                "gravitino.bypass.fs.s3a.endpoint", "v1",
+                "fs.s3a.impl", "v2"),
+            ImmutableMap.of("fs.s3a.endpoint", "v1", "fs.s3a.impl", "v2"),
+            ImmutableMap.of()),
+        Arguments.of(
+            ImmutableMap.of(
+                "fs.s3a.endpoint", "v1",
+                "gravitino.bypass.fs.s3a.endpoint", "v2",
+                "fs.s3a.impl", "v2"),
+            ImmutableMap.of("fs.s3a.endpoint", "v1", "fs.s3a.impl", "v2"),
+            ImmutableMap.of()),
+        Arguments.of(
+            ImmutableMap.of(
+                "s3a-endpoint", "v1",
+                "fs.s3a.impl", "v2"),
+            ImmutableMap.of("fs.s3a.endpoint", "v1", "fs.s3a.impl", "v2"),
+            ImmutableMap.of("s3a-endpoint", "fs.s3a.endpoint")),
+        Arguments.of(
+            ImmutableMap.of(
+                "s3a-endpoint", "v1",
+                "fs.s3a.impl", "v2",
+                "gravitino.bypass.fs.s3a.endpoint", "v3"),
+            ImmutableMap.of("fs.s3a.endpoint", "v1", "fs.s3a.impl", "v2"),
+            ImmutableMap.of("s3a-endpoint", "fs.s3a.endpoint")),
+        Arguments.of(
+            ImmutableMap.of(
+                "s3a-endpoint", "v1",
+                "fs.s3a.impl", "v2",
+                "fs.s3a.endpoint", "v3"),
+            ImmutableMap.of("fs.s3a.endpoint", "v1", "fs.s3a.impl", "v2"),
+            ImmutableMap.of("s3a-endpoint", "fs.s3a.endpoint")),
+        Arguments.of(
+            ImmutableMap.of(
+                "s3a-endpoint", "v1",
+                "fs.s3a.impl", "v2",
+                "fs.s3a.endpoint", "v3",
+                "gravitino.bypass.fs.s3a.endpoint", "v4"),
+            ImmutableMap.of("fs.s3a.endpoint", "v1", "fs.s3a.impl", "v2"),
+            ImmutableMap.of("s3a-endpoint", "fs.s3a.endpoint")));
+  }
+}
diff --git 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopGCSCatalogIT.java
 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopGCSCatalogIT.java
index cca13b770..2e52e74bb 100644
--- 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopGCSCatalogIT.java
+++ 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopGCSCatalogIT.java
@@ -19,19 +19,26 @@
 package org.apache.gravitino.catalog.hadoop.integration.test;
 
 import static 
org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS;
+import static 
org.apache.gravitino.storage.GCSProperties.GCS_SERVICE_ACCOUNT_JSON_PATH;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Map;
 import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Schema;
+import org.apache.gravitino.file.Fileset;
 import org.apache.gravitino.integration.test.util.GravitinoITUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
 
 @Tag("gravitino-docker-test")
 @Disabled(
@@ -95,10 +102,8 @@ public class HadoopGCSCatalogIT extends HadoopCatalogIT {
 
   protected void createCatalog() {
     Map<String, String> map = Maps.newHashMap();
-    map.put("gravitino.bypass.fs.gs.auth.service.account.enable", "true");
-    map.put("gravitino.bypass.fs.gs.auth.service.account.json.keyfile", 
SERVICE_ACCOUNT_FILE);
+    map.put(GCS_SERVICE_ACCOUNT_JSON_PATH, SERVICE_ACCOUNT_FILE);
     map.put(FILESYSTEM_PROVIDERS, "gcs");
-
     metalake.createCatalog(catalogName, Catalog.Type.FILESET, provider, 
"comment", map);
 
     catalog = metalake.loadCatalog(catalogName);
@@ -107,4 +112,64 @@ public class HadoopGCSCatalogIT extends HadoopCatalogIT {
   protected String generateLocation(String filesetName) {
     return String.format("%s/%s", defaultBaseLocation, filesetName);
   }
+
+  @Test
+  public void testCreateSchemaAndFilesetWithSpecialLocation() {
+    String localCatalogName = GravitinoITUtils.genRandomName("local_catalog");
+
+    String ossLocation = String.format("gs://%s", BUCKET_NAME);
+    Map<String, String> catalogProps = Maps.newHashMap();
+    catalogProps.put("location", ossLocation);
+    catalogProps.put(GCS_SERVICE_ACCOUNT_JSON_PATH, SERVICE_ACCOUNT_FILE);
+    catalogProps.put(FILESYSTEM_PROVIDERS, "gcs");
+
+    Catalog localCatalog =
+        metalake.createCatalog(
+            localCatalogName, Catalog.Type.FILESET, provider, "comment", 
catalogProps);
+    Assertions.assertEquals(ossLocation, 
localCatalog.properties().get("location"));
+
+    // Create schema without specifying location.
+    Schema localSchema =
+        localCatalog
+            .asSchemas()
+            .createSchema("local_schema", "comment", ImmutableMap.of("key1", 
"val1"));
+
+    Fileset localFileset =
+        localCatalog
+            .asFilesetCatalog()
+            .createFileset(
+                NameIdentifier.of(localSchema.name(), "local_fileset"),
+                "fileset comment",
+                Fileset.Type.MANAGED,
+                null,
+                ImmutableMap.of("k1", "v1"));
+    Assertions.assertEquals(
+        ossLocation + "/local_schema/local_fileset", 
localFileset.storageLocation());
+
+    // Delete schema
+    localCatalog.asSchemas().dropSchema(localSchema.name(), true);
+
+    // Create schema with specifying location.
+    Map<String, String> schemaProps = ImmutableMap.of("location", ossLocation);
+    Schema localSchema2 =
+        localCatalog.asSchemas().createSchema("local_schema2", "comment", 
schemaProps);
+    Assertions.assertEquals(ossLocation, 
localSchema2.properties().get("location"));
+
+    Fileset localFileset2 =
+        localCatalog
+            .asFilesetCatalog()
+            .createFileset(
+                NameIdentifier.of(localSchema2.name(), "local_fileset2"),
+                "fileset comment",
+                Fileset.Type.MANAGED,
+                null,
+                ImmutableMap.of("k1", "v1"));
+    Assertions.assertEquals(ossLocation + "/local_fileset2", 
localFileset2.storageLocation());
+
+    // Delete schema
+    localCatalog.asSchemas().dropSchema(localSchema2.name(), true);
+
+    // Delete catalog
+    metalake.dropCatalog(localCatalogName, true);
+  }
 }
diff --git 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopOSSCatalogIT.java
 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopOSSCatalogIT.java
index 0bd077399..b6c3ac722 100644
--- 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopOSSCatalogIT.java
+++ 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopOSSCatalogIT.java
@@ -21,24 +21,32 @@ package 
org.apache.gravitino.catalog.hadoop.integration.test;
 import static 
org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Map;
 import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Schema;
+import org.apache.gravitino.file.Fileset;
 import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.storage.OSSProperties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @Disabled(
     "Disabled due to we don't have a real OSS account to test. If you have a 
GCP account,"
-        + "please change the configuration(BUCKET_NAME, OSS_ACCESS_KEY, 
OSS_SECRET_KEY, OSS_ENDPOINT) and enable this test.")
+        + "please change the configuration(BUCKET_NAME, OSS_ACCESS_KEY, 
OSS_SECRET_KEY, "
+        + "OSS_ENDPOINT) and enable this test.")
 public class HadoopOSSCatalogIT extends HadoopCatalogIT {
   private static final Logger LOG = 
LoggerFactory.getLogger(HadoopOSSCatalogIT.class);
   public static final String BUCKET_NAME = "YOUR_BUCKET";
@@ -81,8 +89,8 @@ public class HadoopOSSCatalogIT extends HadoopCatalogIT {
   public void stop() throws IOException {
     Catalog catalog = metalake.loadCatalog(catalogName);
     catalog.asSchemas().dropSchema(schemaName, true);
-    metalake.dropCatalog(catalogName);
-    client.dropMetalake(metalakeName);
+    metalake.dropCatalog(catalogName, true);
+    client.dropMetalake(metalakeName, true);
 
     try {
       closer.close();
@@ -114,10 +122,9 @@ public class HadoopOSSCatalogIT extends HadoopCatalogIT {
 
   protected void createCatalog() {
     Map<String, String> map = Maps.newHashMap();
-    map.put("gravitino.bypass.fs.oss.accessKeyId", OSS_ACCESS_KEY);
-    map.put("gravitino.bypass.fs.oss.accessKeySecret", OSS_SECRET_KEY);
-    map.put("gravitino.bypass.fs.oss.endpoint", OSS_ENDPOINT);
-    map.put("gravitino.bypass.fs.oss.impl", 
"org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
+    map.put(OSSProperties.GRAVITINO_OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY);
+    map.put(OSSProperties.GRAVITINO_OSS_ACCESS_KEY_SECRET, OSS_SECRET_KEY);
+    map.put(OSSProperties.GRAVITINO_OSS_ENDPOINT, OSS_ENDPOINT);
     map.put(FILESYSTEM_PROVIDERS, "oss");
 
     metalake.createCatalog(catalogName, Catalog.Type.FILESET, provider, 
"comment", map);
@@ -128,4 +135,66 @@ public class HadoopOSSCatalogIT extends HadoopCatalogIT {
   protected String generateLocation(String filesetName) {
     return String.format("%s/%s", defaultBaseLocation, filesetName);
   }
+
+  @Test
+  public void testCreateSchemaAndFilesetWithSpecialLocation() {
+    String localCatalogName = GravitinoITUtils.genRandomName("local_catalog");
+
+    String ossLocation = String.format("oss://%s", BUCKET_NAME);
+    Map<String, String> catalogProps = Maps.newHashMap();
+    catalogProps.put("location", ossLocation);
+    catalogProps.put(OSSProperties.GRAVITINO_OSS_ENDPOINT, OSS_ENDPOINT);
+    catalogProps.put(OSSProperties.GRAVITINO_OSS_ACCESS_KEY_ID, 
OSS_ACCESS_KEY);
+    catalogProps.put(OSSProperties.GRAVITINO_OSS_ACCESS_KEY_SECRET, 
OSS_SECRET_KEY);
+    catalogProps.put(FILESYSTEM_PROVIDERS, "oss");
+
+    Catalog localCatalog =
+        metalake.createCatalog(
+            localCatalogName, Catalog.Type.FILESET, provider, "comment", 
catalogProps);
+    Assertions.assertEquals(ossLocation, 
localCatalog.properties().get("location"));
+
+    // Create schema without specifying location.
+    Schema localSchema =
+        localCatalog
+            .asSchemas()
+            .createSchema("local_schema", "comment", ImmutableMap.of("key1", 
"val1"));
+
+    Fileset localFileset =
+        localCatalog
+            .asFilesetCatalog()
+            .createFileset(
+                NameIdentifier.of(localSchema.name(), "local_fileset"),
+                "fileset comment",
+                Fileset.Type.MANAGED,
+                null,
+                ImmutableMap.of("k1", "v1"));
+    Assertions.assertEquals(
+        ossLocation + "/local_schema/local_fileset", 
localFileset.storageLocation());
+
+    // Delete schema
+    localCatalog.asSchemas().dropSchema(localSchema.name(), true);
+
+    // Create schema with specifying location.
+    Map<String, String> schemaProps = ImmutableMap.of("location", ossLocation);
+    Schema localSchema2 =
+        localCatalog.asSchemas().createSchema("local_schema2", "comment", 
schemaProps);
+    Assertions.assertEquals(ossLocation, 
localSchema2.properties().get("location"));
+
+    Fileset localFileset2 =
+        localCatalog
+            .asFilesetCatalog()
+            .createFileset(
+                NameIdentifier.of(localSchema2.name(), "local_fileset2"),
+                "fileset comment",
+                Fileset.Type.MANAGED,
+                null,
+                ImmutableMap.of("k1", "v1"));
+    Assertions.assertEquals(ossLocation + "/local_fileset2", 
localFileset2.storageLocation());
+
+    // Delete schema
+    localCatalog.asSchemas().dropSchema(localSchema2.name(), true);
+
+    // Delete catalog
+    metalake.dropCatalog(localCatalogName, true);
+  }
 }
diff --git 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopS3CatalogIT.java
 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopS3CatalogIT.java
index 90b441392..e79901ae0 100644
--- 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopS3CatalogIT.java
+++ 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopS3CatalogIT.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.catalog.hadoop.integration.test;
 import static 
org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import java.io.IOException;
 import java.net.URI;
@@ -28,14 +29,20 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Schema;
+import org.apache.gravitino.file.Fileset;
 import 
org.apache.gravitino.integration.test.container.GravitinoLocalStackContainer;
 import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.storage.S3Properties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.Container;
@@ -170,12 +177,9 @@ public class HadoopS3CatalogIT extends HadoopCatalogIT {
 
   protected void createCatalog() {
     Map<String, String> map = Maps.newHashMap();
-    map.put("gravitino.bypass.fs.s3a.access.key", accessKey);
-    map.put("gravitino.bypass.fs.s3a.secret.key", secretKey);
-    map.put("gravitino.bypass.fs.s3a.endpoint", s3Endpoint);
-    map.put(
-        "gravitino.bypass.fs.s3a.aws.credentials.provider",
-        "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
+    map.put(S3Properties.GRAVITINO_S3_ENDPOINT, s3Endpoint);
+    map.put(S3Properties.GRAVITINO_S3_ACCESS_KEY_ID, accessKey);
+    map.put(S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY, secretKey);
     map.put(FILESYSTEM_PROVIDERS, "s3");
 
     metalake.createCatalog(catalogName, Catalog.Type.FILESET, provider, 
"comment", map);
@@ -186,4 +190,66 @@ public class HadoopS3CatalogIT extends HadoopCatalogIT {
   protected String generateLocation(String filesetName) {
     return String.format("%s/%s", defaultBaseLocation, filesetName);
   }
+
+  @Test
+  public void testCreateSchemaAndFilesetWithSpecialLocation() {
+    String localCatalogName = GravitinoITUtils.genRandomName("local_catalog");
+
+    String s3Location = String.format("s3a://%s", bucketName);
+    Map<String, String> catalogProps = Maps.newHashMap();
+    catalogProps.put("location", s3Location);
+    catalogProps.put(S3Properties.GRAVITINO_S3_ENDPOINT, s3Endpoint);
+    catalogProps.put(S3Properties.GRAVITINO_S3_ACCESS_KEY_ID, accessKey);
+    catalogProps.put(S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY, secretKey);
+    catalogProps.put(FILESYSTEM_PROVIDERS, "s3");
+
+    Catalog localCatalog =
+        metalake.createCatalog(
+            localCatalogName, Catalog.Type.FILESET, provider, "comment", 
catalogProps);
+    Assertions.assertEquals(s3Location, 
localCatalog.properties().get("location"));
+
+    // Create schema without specifying location.
+    Schema localSchema =
+        localCatalog
+            .asSchemas()
+            .createSchema("local_schema", "comment", ImmutableMap.of("key1", 
"val1"));
+
+    Fileset localFileset =
+        localCatalog
+            .asFilesetCatalog()
+            .createFileset(
+                NameIdentifier.of(localSchema.name(), "local_fileset"),
+                "fileset comment",
+                Fileset.Type.MANAGED,
+                null,
+                ImmutableMap.of("k1", "v1"));
+    Assertions.assertEquals(
+        s3Location + "/local_schema/local_fileset", 
localFileset.storageLocation());
+
+    // Delete schema
+    localCatalog.asSchemas().dropSchema(localSchema.name(), true);
+
+    // Create schema with specifying location.
+    Map<String, String> schemaProps = ImmutableMap.of("location", s3Location);
+    Schema localSchema2 =
+        localCatalog.asSchemas().createSchema("local_schema2", "comment", 
schemaProps);
+    Assertions.assertEquals(s3Location, 
localSchema2.properties().get("location"));
+
+    Fileset localFileset2 =
+        localCatalog
+            .asFilesetCatalog()
+            .createFileset(
+                NameIdentifier.of(localSchema2.name(), "local_fileset2"),
+                "fileset comment",
+                Fileset.Type.MANAGED,
+                null,
+                ImmutableMap.of("k1", "v1"));
+    Assertions.assertEquals(s3Location + "/local_fileset2", 
localFileset2.storageLocation());
+
+    // Delete schema
+    localCatalog.asSchemas().dropSchema(localSchema2.name(), true);
+
+    // Delete catalog
+    metalake.dropCatalog(localCatalogName, true);
+  }
 }
diff --git a/clients/client-python/gravitino/filesystem/gvfs_config.py 
b/clients/client-python/gravitino/filesystem/gvfs_config.py
index 00ae8c641..c8e22e95f 100644
--- a/clients/client-python/gravitino/filesystem/gvfs_config.py
+++ b/clients/client-python/gravitino/filesystem/gvfs_config.py
@@ -32,12 +32,12 @@ class GVFSConfig:
     OAUTH2_PATH = "oauth2_path"
     OAUTH2_SCOPE = "oauth2_scope"
 
-    GVFS_FILESYSTEM_GCS_SERVICE_KEY_FILE = "gcs_service_account_key_path"
+    GVFS_FILESYSTEM_GCS_SERVICE_KEY_FILE = "gcs_service_account_file"
 
-    GVFS_FILESYSTEM_S3_ACCESS_KEY = "s3_access_key"
-    GVFS_FILESYSTEM_S3_SECRET_KEY = "s3_secret_key"
+    GVFS_FILESYSTEM_S3_ACCESS_KEY = "s3_access_key_id"
+    GVFS_FILESYSTEM_S3_SECRET_KEY = "s3_secret_access_key"
     GVFS_FILESYSTEM_S3_ENDPOINT = "s3_endpoint"
 
-    GVFS_FILESYSTEM_OSS_ACCESS_KEY = "oss_access_key"
-    GVFS_FILESYSTEM_OSS_SECRET_KEY = "oss_secret_key"
+    GVFS_FILESYSTEM_OSS_ACCESS_KEY = "oss_access_key_id"
+    GVFS_FILESYSTEM_OSS_SECRET_KEY = "oss_secret_access_key"
     GVFS_FILESYSTEM_OSS_ENDPOINT = "oss_endpoint"
diff --git a/clients/client-python/tests/integration/test_gvfs_with_gcs.py 
b/clients/client-python/tests/integration/test_gvfs_with_gcs.py
index 54a2cfd07..95951459b 100644
--- a/clients/client-python/tests/integration/test_gvfs_with_gcs.py
+++ b/clients/client-python/tests/integration/test_gvfs_with_gcs.py
@@ -109,8 +109,7 @@ class TestGvfsWithGCS(TestGvfsWithHDFS):
             comment="",
             properties={
                 "filesystem-providers": "gcs",
-                "gravitino.bypass.fs.gs.auth.service.account.enable": "true",
-                "gravitino.bypass.fs.gs.auth.service.account.json.keyfile": 
cls.key_file,
+                "gcs-service-account-file": cls.key_file,
             },
         )
         catalog.as_schemas().create_schema(
diff --git a/clients/client-python/tests/integration/test_gvfs_with_oss.py 
b/clients/client-python/tests/integration/test_gvfs_with_oss.py
index 95b385ea9..7b709b495 100644
--- a/clients/client-python/tests/integration/test_gvfs_with_oss.py
+++ b/clients/client-python/tests/integration/test_gvfs_with_oss.py
@@ -115,10 +115,9 @@ class TestGvfsWithOSS(TestGvfsWithHDFS):
             comment="",
             properties={
                 "filesystem-providers": "oss",
-                "gravitino.bypass.fs.oss.accessKeyId": cls.oss_access_key,
-                "gravitino.bypass.fs.oss.accessKeySecret": cls.oss_secret_key,
-                "gravitino.bypass.fs.oss.endpoint": cls.oss_endpoint,
-                "gravitino.bypass.fs.oss.impl": 
"org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem",
+                "oss-access-key-id": cls.oss_access_key,
+                "oss-secret-access-key": cls.oss_secret_key,
+                "oss-endpoint": cls.oss_endpoint,
             },
         )
         catalog.as_schemas().create_schema(
diff --git a/clients/client-python/tests/integration/test_gvfs_with_s3.py 
b/clients/client-python/tests/integration/test_gvfs_with_s3.py
index 5758a7e65..ec059a88f 100644
--- a/clients/client-python/tests/integration/test_gvfs_with_s3.py
+++ b/clients/client-python/tests/integration/test_gvfs_with_s3.py
@@ -113,9 +113,9 @@ class TestGvfsWithS3(TestGvfsWithHDFS):
             comment="",
             properties={
                 "filesystem-providers": "s3",
-                "gravitino.bypass.fs.s3a.access.key": cls.s3_access_key,
-                "gravitino.bypass.fs.s3a.secret.key": cls.s3_secret_key,
-                "gravitino.bypass.fs.s3a.endpoint": cls.s3_endpoint,
+                "s3-access-key-id": cls.s3_access_key,
+                "s3-secret-access-key": cls.s3_secret_key,
+                "s3-endpoint": cls.s3_endpoint,
             },
         )
         catalog.as_schemas().create_schema(
diff --git a/clients/filesystem-hadoop3/build.gradle.kts 
b/clients/filesystem-hadoop3/build.gradle.kts
index 7f21c700d..9836c3514 100644
--- a/clients/filesystem-hadoop3/build.gradle.kts
+++ b/clients/filesystem-hadoop3/build.gradle.kts
@@ -29,6 +29,9 @@ dependencies {
   implementation(project(":catalogs:catalog-hadoop")) {
     exclude(group = "*")
   }
+  implementation(project(":catalogs:catalog-common")) {
+    exclude(group = "*")
+  }
 
   implementation(libs.caffeine)
 
diff --git 
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
index 05e769667..aaa81ab55 100644
--- 
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
+++ 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
@@ -19,7 +19,6 @@
 package org.apache.gravitino.filesystem.hadoop;
 
 import static 
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemConfiguration.FS_FILESYSTEM_PROVIDERS;
-import static 
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemConfiguration.GVFS_CONFIG_PREFIX;
 
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
@@ -87,7 +86,6 @@ public class GravitinoVirtualFileSystem extends FileSystem {
       
Pattern.compile("^(?:gvfs://fileset)?/([^/]+)/([^/]+)/([^/]+)(?>/[^/]+)*/?$");
   private static final String SLASH = "/";
   private final Map<String, FileSystemProvider> fileSystemProvidersMap = 
Maps.newHashMap();
-  private static final String GRAVITINO_BYPASS_PREFIX = "gravitino.bypass.";
 
   @Override
   public void initialize(URI name, Configuration configuration) throws 
IOException {
@@ -385,13 +383,14 @@ public class GravitinoVirtualFileSystem extends 
FileSystem {
             scheme,
             str -> {
               try {
-                Map<String, String> maps = getConfigMap(getConf());
                 FileSystemProvider provider = 
fileSystemProvidersMap.get(scheme);
                 if (provider == null) {
                   throw new GravitinoRuntimeException(
                       "Unsupported file system scheme: %s for %s.",
                       scheme, 
GravitinoVirtualFileSystemConfiguration.GVFS_SCHEME);
                 }
+
+                Map<String, String> maps = getConfigMap(getConf());
                 return provider.getFileSystem(filePath, maps);
               } catch (IOException ioe) {
                 throw new GravitinoRuntimeException(
@@ -405,16 +404,7 @@ public class GravitinoVirtualFileSystem extends FileSystem 
{
 
   private Map<String, String> getConfigMap(Configuration configuration) {
     Map<String, String> maps = Maps.newHashMap();
-    configuration.forEach(
-        entry -> {
-          String key = entry.getKey();
-          if (key.startsWith(GRAVITINO_BYPASS_PREFIX)) {
-            maps.put(key.substring(GRAVITINO_BYPASS_PREFIX.length()), 
entry.getValue());
-          } else if (!key.startsWith(GVFS_CONFIG_PREFIX)) {
-            maps.put(key, entry.getValue());
-          }
-        });
-
+    configuration.forEach(entry -> maps.put(entry.getKey(), entry.getValue()));
     return maps;
   }
 
diff --git 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemGCSIT.java
 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemGCSIT.java
index 312236fe5..e35671770 100644
--- 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemGCSIT.java
+++ 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemGCSIT.java
@@ -27,7 +27,10 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 import org.apache.gravitino.Catalog;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
+import org.apache.gravitino.gcs.fs.GCSFileSystemProvider;
 import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.storage.GCSProperties;
 import org.apache.hadoop.conf.Configuration;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
@@ -88,8 +91,7 @@ public class GravitinoVirtualFileSystemGCSIT extends 
GravitinoVirtualFileSystemI
     conf.set("fs.gravitino.client.metalake", metalakeName);
 
     // Pass this configuration to the real file system
-    conf.set("gravitino.bypass.fs.gs.auth.service.account.enable", "true");
-    conf.set("gravitino.bypass.fs.gs.auth.service.account.json.keyfile", 
SERVICE_ACCOUNT_FILE);
+    conf.set(GCSProperties.GCS_SERVICE_ACCOUNT_JSON_PATH, 
SERVICE_ACCOUNT_FILE);
     conf.set(FS_FILESYSTEM_PROVIDERS, "gcs");
   }
 
@@ -119,10 +121,15 @@ public class GravitinoVirtualFileSystemGCSIT extends 
GravitinoVirtualFileSystemI
    */
   protected Configuration 
convertGvfsConfigToRealFileSystemConfig(Configuration gvfsConf) {
     Configuration gcsConf = new Configuration();
-    gvfsConf.forEach(
-        entry -> {
-          gcsConf.set(entry.getKey().replace("gravitino.bypass.", ""), 
entry.getValue());
-        });
+    Map<String, String> map = Maps.newHashMap();
+
+    gvfsConf.forEach(entry -> map.put(entry.getKey(), entry.getValue()));
+
+    Map<String, String> hadoopConfMap =
+        FileSystemUtils.toHadoopConfigMap(
+            map, GCSFileSystemProvider.GRAVITINO_KEY_TO_GCS_HADOOP_KEY);
+
+    hadoopConfMap.forEach(gcsConf::set);
 
     return gcsConf;
   }
diff --git 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemOSSIT.java
 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemOSSIT.java
index 6a6557c6c..257181dbf 100644
--- 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemOSSIT.java
+++ 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemOSSIT.java
@@ -27,7 +27,10 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 import org.apache.gravitino.Catalog;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
 import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.oss.fs.OSSFileSystemProvider;
+import org.apache.gravitino.storage.OSSProperties;
 import org.apache.hadoop.conf.Configuration;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
@@ -74,11 +77,9 @@ public class GravitinoVirtualFileSystemOSSIT extends 
GravitinoVirtualFileSystemI
 
     Map<String, String> properties = Maps.newHashMap();
     properties.put(FILESYSTEM_PROVIDERS, "oss");
-    properties.put("gravitino.bypass.fs.oss.accessKeyId", OSS_ACCESS_KEY);
-    properties.put("gravitino.bypass.fs.oss.accessKeySecret", OSS_SECRET_KEY);
-    properties.put("gravitino.bypass.fs.oss.endpoint", OSS_ENDPOINT);
-    properties.put(
-        "gravitino.bypass.fs.oss.impl", 
"org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
+    properties.put(OSSProperties.GRAVITINO_OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY);
+    properties.put(OSSProperties.GRAVITINO_OSS_ACCESS_KEY_SECRET, 
OSS_SECRET_KEY);
+    properties.put(OSSProperties.GRAVITINO_OSS_ENDPOINT, OSS_ENDPOINT);
 
     Catalog catalog =
         metalake.createCatalog(
@@ -95,11 +96,10 @@ public class GravitinoVirtualFileSystemOSSIT extends 
GravitinoVirtualFileSystemI
     conf.set("fs.gravitino.client.metalake", metalakeName);
 
     // Pass this configuration to the real file system
-    conf.set("gravitino.bypass.fs.oss.accessKeyId", OSS_ACCESS_KEY);
-    conf.set("gravitino.bypass.fs.oss.accessKeySecret", OSS_SECRET_KEY);
-    conf.set("gravitino.bypass.fs.oss.endpoint", OSS_ENDPOINT);
-    conf.set("gravitino.bypass.fs.oss.impl", 
"org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
-
+    conf.set(OSSProperties.GRAVITINO_OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY);
+    conf.set(OSSProperties.GRAVITINO_OSS_ACCESS_KEY_SECRET, OSS_SECRET_KEY);
+    conf.set(OSSProperties.GRAVITINO_OSS_ENDPOINT, OSS_ENDPOINT);
+    conf.set("fs.oss.impl", 
"org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
     conf.set(FS_FILESYSTEM_PROVIDERS, "oss");
   }
 
@@ -108,7 +108,7 @@ public class GravitinoVirtualFileSystemOSSIT extends 
GravitinoVirtualFileSystemI
     Catalog catalog = metalake.loadCatalog(catalogName);
     catalog.asSchemas().dropSchema(schemaName, true);
     metalake.dropCatalog(catalogName, true);
-    client.dropMetalake(metalakeName);
+    client.dropMetalake(metalakeName, true);
 
     if (client != null) {
       client.close();
@@ -128,13 +128,18 @@ public class GravitinoVirtualFileSystemOSSIT extends 
GravitinoVirtualFileSystemI
    * .GravitinoVirtualFileSystem#getConfigMap(Configuration) in the original 
code.
    */
   protected Configuration 
convertGvfsConfigToRealFileSystemConfig(Configuration gvfsConf) {
-    Configuration gcsConf = new Configuration();
-    gvfsConf.forEach(
-        entry -> {
-          gcsConf.set(entry.getKey().replace("gravitino.bypass.", ""), 
entry.getValue());
-        });
+    Configuration ossConf = new Configuration();
+    Map<String, String> map = Maps.newHashMap();
+
+    gvfsConf.forEach(entry -> map.put(entry.getKey(), entry.getValue()));
+
+    Map<String, String> hadoopConfMap =
+        FileSystemUtils.toHadoopConfigMap(
+            map, OSSFileSystemProvider.GRAVITINO_KEY_TO_OSS_HADOOP_KEY);
+
+    hadoopConfMap.forEach(ossConf::set);
 
-    return gcsConf;
+    return ossConf;
   }
 
   protected String genStorageLocation(String fileset) {
diff --git 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemS3IT.java
 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemS3IT.java
index 2f9952f28..22c487288 100644
--- 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemS3IT.java
+++ 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemS3IT.java
@@ -29,8 +29,11 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import org.apache.gravitino.Catalog;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
 import 
org.apache.gravitino.integration.test.container.GravitinoLocalStackContainer;
 import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.s3.fs.S3FileSystemProvider;
+import org.apache.gravitino.storage.S3Properties;
 import org.apache.hadoop.conf.Configuration;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
@@ -151,11 +154,9 @@ public class GravitinoVirtualFileSystemS3IT extends 
GravitinoVirtualFileSystemIT
     conf.set("fs.gravitino.client.metalake", metalakeName);
 
     // Pass this configuration to the real file system
-    conf.set("fs.s3a.access.key", accessKey);
-    conf.set("fs.s3a.secret.key", secretKey);
-    conf.set("fs.s3a.endpoint", s3Endpoint);
-    conf.set(
-        "fs.s3a.aws.credentials.provider", 
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
+    conf.set(S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY, accessKey);
+    conf.set(S3Properties.GRAVITINO_S3_ACCESS_KEY_ID, secretKey);
+    conf.set(S3Properties.GRAVITINO_S3_ENDPOINT, s3Endpoint);
     conf.set(FS_FILESYSTEM_PROVIDERS, "s3");
   }
 
@@ -184,13 +185,17 @@ public class GravitinoVirtualFileSystemS3IT extends 
GravitinoVirtualFileSystemIT
    * .GravitinoVirtualFileSystem#getConfigMap(Configuration) in the original 
code.
    */
   protected Configuration 
convertGvfsConfigToRealFileSystemConfig(Configuration gvfsConf) {
-    Configuration gcsConf = new Configuration();
-    gvfsConf.forEach(
-        entry -> {
-          gcsConf.set(entry.getKey().replace("gravitino.bypass.", ""), 
entry.getValue());
-        });
+    Configuration s3Conf = new Configuration();
+    Map<String, String> map = Maps.newHashMap();
 
-    return gcsConf;
+    gvfsConf.forEach(entry -> map.put(entry.getKey(), entry.getValue()));
+
+    Map<String, String> hadoopConfMap =
+        FileSystemUtils.toHadoopConfigMap(map, 
S3FileSystemProvider.GRAVITINO_KEY_TO_S3_HADOOP_KEY);
+
+    hadoopConfMap.forEach(s3Conf::set);
+
+    return s3Conf;
   }
 
   protected String genStorageLocation(String fileset) {

Reply via email to