This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 93cdbc259 [#5074] feat(hadoop-catalog): Support GCS fileset.  (#5079)
93cdbc259 is described below

commit 93cdbc259b1b4bcd4bda2d1b34d011f19368d2dd
Author: Qi Yu <[email protected]>
AuthorDate: Thu Oct 17 20:06:35 2024 +0800

    [#5074] feat(hadoop-catalog): Support GCS fileset.  (#5079)
    
    ### What changes were proposed in this pull request?
    
    1.  Add a bundled jar for Hadoop GCS jar.
    2. Support GCS in Hadoop catalog.
    
    ### Why are the changes needed?
    
    Users highly demand Fileset for GCS storage.
    
    Fix: #5074
    
    ### Does this PR introduce _any_ user-facing change?
    
    N/A
    
    ### How was this patch tested?
    
    Manually, please see: HadoopGCPCatalogIT
---
 build.gradle.kts                                   |   4 +-
 bundles/build.gradle.kts                           |  22 +++
 bundles/gcp-bundle/build.gradle.kts                |  46 ++++++
 .../gravitino/gcs/fs/GCSFileSystemProvider.java    |  19 +--
 ....gravitino.catalog.hadoop.fs.FileSystemProvider |  20 +++
 catalogs/catalog-hadoop/build.gradle.kts           |   2 +
 .../catalog/hadoop/fs/HDFSFileSystemProvider.java  |   3 +-
 .../catalog/hadoop/fs/LocalFileSystemProvider.java |   3 +-
 .../hadoop/integration/test/HadoopCatalogIT.java   |  57 ++++---
 .../integration/test/HadoopGCSCatalogIT.java       |  97 ++++++++++++
 clients/filesystem-hadoop3/build.gradle.kts        |   1 +
 .../GravitinoVirtualFileSystemConfiguration.java   |  23 ++-
 .../test/GravitinoVirtualFileSystemGCSIT.java      | 170 +++++++++++++++++++++
 .../test/GravitinoVirtualFileSystemIT.java         |  86 +++++++----
 gradle/libs.versions.toml                          |   2 +
 .../gravitino/integration/test/util/ITUtils.java   |   1 +
 settings.gradle.kts                                |   1 +
 17 files changed, 479 insertions(+), 78 deletions(-)

diff --git a/build.gradle.kts b/build.gradle.kts
index 6db5f00cc..9733a1791 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -745,7 +745,7 @@ tasks {
       if (!it.name.startsWith("catalog") &&
         !it.name.startsWith("authorization") &&
         !it.name.startsWith("client") && !it.name.startsWith("filesystem") && 
!it.name.startsWith("spark") && !it.name.startsWith("iceberg") && it.name != 
"trino-connector" &&
-        it.name != "integration-test" && it.name != "hive-metastore-common" && 
!it.name.startsWith("flink")
+        it.name != "integration-test" && it.name != "hive-metastore-common" && 
!it.name.startsWith("flink") && it.name != "gcp-bundle"
       ) {
         from(it.configurations.runtimeClasspath)
         into("distribution/package/libs")
@@ -764,7 +764,7 @@ tasks {
         !it.name.startsWith("integration-test") &&
         !it.name.startsWith("flink") &&
         !it.name.startsWith("trino-connector") &&
-        it.name != "hive-metastore-common"
+        it.name != "hive-metastore-common" && it.name != "gcp-bundle"
       ) {
         dependsOn("${it.name}:build")
         from("${it.name}/build/libs")
diff --git a/bundles/build.gradle.kts b/bundles/build.gradle.kts
new file mode 100644
index 000000000..043fbfec6
--- /dev/null
+++ b/bundles/build.gradle.kts
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+tasks.all {
+    enabled = false
+}
\ No newline at end of file
diff --git a/bundles/gcp-bundle/build.gradle.kts 
b/bundles/gcp-bundle/build.gradle.kts
new file mode 100644
index 000000000..9433a6004
--- /dev/null
+++ b/bundles/gcp-bundle/build.gradle.kts
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar
+
+plugins {
+  `maven-publish`
+  id("java")
+  alias(libs.plugins.shadow)
+}
+
+dependencies {
+  compileOnly(project(":catalogs:catalog-hadoop"))
+  compileOnly(libs.hadoop3.common)
+  implementation(libs.hadoop3.gcs)
+}
+
+tasks.withType(ShadowJar::class.java) {
+  isZip64 = true
+  configurations = listOf(project.configurations.runtimeClasspath.get())
+  archiveClassifier.set("")
+}
+
+tasks.jar {
+  dependsOn(tasks.named("shadowJar"))
+  archiveClassifier.set("empty")
+}
+
+tasks.compileJava {
+  dependsOn(":catalogs:catalog-hadoop:runtimeJars")
+}
diff --git 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/LocalFileSystemProvider.java
 
b/bundles/gcp-bundle/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
similarity index 70%
copy from 
catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/LocalFileSystemProvider.java
copy to 
bundles/gcp-bundle/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
index 70e44c76f..919baa03b 100644
--- 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/LocalFileSystemProvider.java
+++ 
b/bundles/gcp-bundle/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
@@ -16,38 +16,35 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.gravitino.catalog.hadoop.fs;
-
-import static 
org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.BUILTIN_LOCAL_FS_PROVIDER;
-import static org.apache.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;
+package org.apache.gravitino.gcs.fs;
 
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
 import java.io.IOException;
 import java.util.Map;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 
-public class LocalFileSystemProvider implements FileSystemProvider {
-
+public class GCSFileSystemProvider implements FileSystemProvider {
   @Override
   public FileSystem getFileSystem(Path path, Map<String, String> config) 
throws IOException {
     Configuration configuration = new Configuration();
     config.forEach(
         (k, v) -> {
-          configuration.set(k.replace(CATALOG_BYPASS_PREFIX, ""), v);
+          configuration.set(k.replace("gravitino.bypass.", ""), v);
         });
 
-    return LocalFileSystem.newInstance(path.toUri(), configuration);
+    return GoogleHadoopFileSystem.newInstance(path.toUri(), configuration);
   }
 
   @Override
   public String scheme() {
-    return "file";
+    return "gs";
   }
 
   @Override
   public String name() {
-    return BUILTIN_LOCAL_FS_PROVIDER;
+    return "gcs";
   }
 }
diff --git 
a/bundles/gcp-bundle/src/main/resources/META-INF/services/org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider
 
b/bundles/gcp-bundle/src/main/resources/META-INF/services/org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider
new file mode 100644
index 000000000..8a65be70f
--- /dev/null
+++ 
b/bundles/gcp-bundle/src/main/resources/META-INF/services/org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+org.apache.gravitino.gcs.fs.GCSFileSystemProvider
\ No newline at end of file
diff --git a/catalogs/catalog-hadoop/build.gradle.kts 
b/catalogs/catalog-hadoop/build.gradle.kts
index 940289347..9ff3cc0e3 100644
--- a/catalogs/catalog-hadoop/build.gradle.kts
+++ b/catalogs/catalog-hadoop/build.gradle.kts
@@ -73,6 +73,7 @@ dependencies {
   testImplementation(project(":integration-test-common", "testArtifacts"))
   testImplementation(project(":server"))
   testImplementation(project(":server-common"))
+  testImplementation(project(":bundles:gcp-bundle"))
 
   testImplementation(libs.minikdc)
   testImplementation(libs.hadoop3.minicluster)
@@ -86,6 +87,7 @@ dependencies {
   testImplementation(libs.junit.jupiter.params)
   testImplementation(libs.testcontainers)
   testImplementation(libs.testcontainers.mysql)
+  testImplementation(libs.hadoop3.gcs)
 
   testRuntimeOnly(libs.junit.jupiter.engine)
 }
diff --git 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java
 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java
index 7c9ceebdd..c7c2fd393 100644
--- 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java
+++ 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java
@@ -27,7 +27,6 @@ import javax.annotation.Nonnull;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
 
 public class HDFSFileSystemProvider implements FileSystemProvider {
 
@@ -39,7 +38,7 @@ public class HDFSFileSystemProvider implements 
FileSystemProvider {
         (k, v) -> {
           configuration.set(k.replace(CATALOG_BYPASS_PREFIX, ""), v);
         });
-    return DistributedFileSystem.newInstance(path.toUri(), configuration);
+    return FileSystem.newInstance(path.toUri(), configuration);
   }
 
   @Override
diff --git 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/LocalFileSystemProvider.java
 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/LocalFileSystemProvider.java
index 70e44c76f..e940e2bb6 100644
--- 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/LocalFileSystemProvider.java
+++ 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/LocalFileSystemProvider.java
@@ -25,7 +25,6 @@ import java.io.IOException;
 import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 
 public class LocalFileSystemProvider implements FileSystemProvider {
@@ -38,7 +37,7 @@ public class LocalFileSystemProvider implements 
FileSystemProvider {
           configuration.set(k.replace(CATALOG_BYPASS_PREFIX, ""), v);
         });
 
-    return LocalFileSystem.newInstance(path.toUri(), configuration);
+    return FileSystem.newInstance(path.toUri(), configuration);
   }
 
   @Override
diff --git 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
index 644b98cb9..76d17ff01 100644
--- 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
+++ 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
@@ -59,26 +59,24 @@ import org.slf4j.LoggerFactory;
 @Tag("gravitino-docker-test")
 public class HadoopCatalogIT extends BaseIT {
   private static final Logger LOG = 
LoggerFactory.getLogger(HadoopCatalogIT.class);
-  private static final ContainerSuite containerSuite = 
ContainerSuite.getInstance();
+  protected static final ContainerSuite containerSuite = 
ContainerSuite.getInstance();
 
-  public static final String metalakeName =
-      GravitinoITUtils.genRandomName("CatalogFilesetIT_metalake");
-  public static final String catalogName =
-      GravitinoITUtils.genRandomName("CatalogFilesetIT_catalog");
+  public String metalakeName = 
GravitinoITUtils.genRandomName("CatalogFilesetIT_metalake");
+  public String catalogName = 
GravitinoITUtils.genRandomName("CatalogFilesetIT_catalog");
   public static final String SCHEMA_PREFIX = "CatalogFilesetIT_schema";
-  public static final String schemaName = 
GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
-  private static final String provider = "hadoop";
-  private static GravitinoMetalake metalake;
-  private static Catalog catalog;
-  private static FileSystem hdfs;
-  private static String defaultBaseLocation;
+  public String schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
+  protected static final String provider = "hadoop";
+  protected static GravitinoMetalake metalake;
+  protected static Catalog catalog;
+  protected static FileSystem fileSystem;
+  protected static String defaultBaseLocation;
 
   @BeforeAll
   public void setup() throws IOException {
     containerSuite.startHiveContainer();
     Configuration conf = new Configuration();
     conf.set("fs.defaultFS", defaultBaseLocation());
-    hdfs = FileSystem.get(conf);
+    fileSystem = FileSystem.get(conf);
 
     createMetalake();
     createCatalog();
@@ -91,8 +89,8 @@ public class HadoopCatalogIT extends BaseIT {
     catalog.asSchemas().dropSchema(schemaName, true);
     metalake.dropCatalog(catalogName);
     client.dropMetalake(metalakeName);
-    if (hdfs != null) {
-      hdfs.close();
+    if (fileSystem != null) {
+      fileSystem.close();
     }
 
     try {
@@ -102,7 +100,7 @@ public class HadoopCatalogIT extends BaseIT {
     }
   }
 
-  private void createMetalake() {
+  protected void createMetalake() {
     GravitinoMetalake[] gravitinoMetalakes = client.listMetalakes();
     Assertions.assertEquals(0, gravitinoMetalakes.length);
 
@@ -114,14 +112,14 @@ public class HadoopCatalogIT extends BaseIT {
     metalake = loadMetalake;
   }
 
-  private void createCatalog() {
+  protected void createCatalog() {
     metalake.createCatalog(
         catalogName, Catalog.Type.FILESET, provider, "comment", 
ImmutableMap.of());
 
     catalog = metalake.loadCatalog(catalogName);
   }
 
-  private void createSchema() {
+  protected void createSchema() {
     Map<String, String> properties = Maps.newHashMap();
     properties.put("key1", "val1");
     properties.put("key2", "val2");
@@ -137,7 +135,7 @@ public class HadoopCatalogIT extends BaseIT {
     Assertions.assertNotNull(loadSchema.properties().get("location"));
   }
 
-  private static void dropSchema() {
+  private void dropSchema() {
     catalog.asSchemas().dropSchema(schemaName, true);
     Assertions.assertFalse(catalog.asSchemas().schemaExists(schemaName));
   }
@@ -171,7 +169,7 @@ public class HadoopCatalogIT extends BaseIT {
     String filesetName = "test_create_fileset";
     String storageLocation = storageLocation(filesetName);
     Assertions.assertFalse(
-        hdfs.exists(new Path(storageLocation)), "storage location should not 
exists");
+        fileSystem.exists(new Path(storageLocation)), "storage location should 
not exists");
     Fileset fileset =
         createFileset(
             filesetName,
@@ -242,7 +240,7 @@ public class HadoopCatalogIT extends BaseIT {
     String filesetName = "test_create_fileset_with_chinese";
     String storageLocation = storageLocation(filesetName) + "/中文目录test";
     Assertions.assertFalse(
-        hdfs.exists(new Path(storageLocation)), "storage location should not 
exists");
+        fileSystem.exists(new Path(storageLocation)), "storage location should 
not exists");
     Fileset fileset =
         createFileset(
             filesetName,
@@ -285,7 +283,7 @@ public class HadoopCatalogIT extends BaseIT {
     Assertions.assertEquals(1, fileset.properties().size());
     Assertions.assertEquals("v1", fileset.properties().get("k1"));
     Assertions.assertTrue(
-        hdfs.exists(new Path(storageLocation)), "storage location should be 
created");
+        fileSystem.exists(new Path(storageLocation)), "storage location should 
be created");
 
     // create fileset with storage location that not exist
     String filesetName2 = "test_external_fileset_no_exist";
@@ -349,7 +347,7 @@ public class HadoopCatalogIT extends BaseIT {
     String storageLocation = storageLocation(filesetName);
 
     Assertions.assertFalse(
-        hdfs.exists(new Path(storageLocation)), "storage location should not 
exists");
+        fileSystem.exists(new Path(storageLocation)), "storage location should 
not exists");
 
     createFileset(
         filesetName, "comment", Fileset.Type.MANAGED, storageLocation, 
ImmutableMap.of("k1", "v1"));
@@ -365,7 +363,7 @@ public class HadoopCatalogIT extends BaseIT {
         catalog.asFilesetCatalog().filesetExists(NameIdentifier.of(schemaName, 
filesetName)),
         "fileset should not be exists");
     Assertions.assertFalse(
-        hdfs.exists(new Path(storageLocation)), "storage location should be 
dropped");
+        fileSystem.exists(new Path(storageLocation)), "storage location should 
be dropped");
   }
 
   @Test
@@ -392,7 +390,7 @@ public class HadoopCatalogIT extends BaseIT {
         catalog.asFilesetCatalog().filesetExists(NameIdentifier.of(schemaName, 
filesetName)),
         "fileset should not be exists");
     Assertions.assertTrue(
-        hdfs.exists(new Path(storageLocation)), "storage location should not 
be dropped");
+        fileSystem.exists(new Path(storageLocation)), "storage location should 
not be dropped");
   }
 
   @Test
@@ -688,7 +686,7 @@ public class HadoopCatalogIT extends BaseIT {
     }
   }
 
-  private static String generateLocation(String filesetName) {
+  protected String generateLocation(String filesetName) {
     return String.format(
         "hdfs://%s:%d/user/hadoop/%s/%s/%s",
         containerSuite.getHiveContainer().getContainerIpAddress(),
@@ -707,7 +705,7 @@ public class HadoopCatalogIT extends BaseIT {
     if (storageLocation != null) {
       Path location = new Path(storageLocation);
       try {
-        hdfs.deleteOnExit(location);
+        fileSystem.deleteOnExit(location);
       } catch (IOException e) {
         LOG.warn("Failed to delete location: {}", location, e);
       }
@@ -724,10 +722,11 @@ public class HadoopCatalogIT extends BaseIT {
         catalog.asFilesetCatalog().filesetExists(NameIdentifier.of(schemaName, 
filesetName)),
         "fileset should be exists");
     Assertions.assertTrue(
-        hdfs.exists(new Path(storageLocation(filesetName))), "storage location 
should be exists");
+        fileSystem.exists(new Path(storageLocation(filesetName))),
+        "storage location should be exists");
   }
 
-  private static String defaultBaseLocation() {
+  protected String defaultBaseLocation() {
     if (defaultBaseLocation == null) {
       defaultBaseLocation =
           String.format(
@@ -739,7 +738,7 @@ public class HadoopCatalogIT extends BaseIT {
     return defaultBaseLocation;
   }
 
-  private static String storageLocation(String filesetName) {
+  private String storageLocation(String filesetName) {
     return defaultBaseLocation() + "/" + filesetName;
   }
 }
diff --git 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopGCSCatalogIT.java
 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopGCSCatalogIT.java
new file mode 100644
index 000000000..74ae2a77c
--- /dev/null
+++ 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopGCSCatalogIT.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.hadoop.integration.test;
+
+import static 
org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS;
+
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Tag;
+
+@Tag("gravitino-docker-test")
+@Disabled(
+    "Disabled due to we don't have a real GCP account to test. If you have a 
GCP account,"
+        + "please change the configuration(YOUR_KEY_FILE, YOUR_BUCKET) and 
enable this test.")
+public class HadoopGCSCatalogIT extends HadoopCatalogIT {
+
+  public static final String BUCKET_NAME = "YOUR_BUCKET";
+  public static final String SERVICE_ACCOUNT_FILE = "YOUR_KEY_FILE";
+
+  @BeforeAll
+  public void setup() throws IOException {
+    metalakeName = GravitinoITUtils.genRandomName("CatalogFilesetIT_metalake");
+    catalogName = GravitinoITUtils.genRandomName("CatalogFilesetIT_catalog");
+    schemaName = GravitinoITUtils.genRandomName("CatalogFilesetIT_schema");
+
+    schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
+    Configuration conf = new Configuration();
+
+    conf.set("fs.gs.auth.service.account.enable", "true");
+    conf.set("fs.gs.auth.service.account.json.keyfile", SERVICE_ACCOUNT_FILE);
+    fileSystem = FileSystem.get(URI.create(String.format("gs://%s", 
BUCKET_NAME)), conf);
+
+    createMetalake();
+    createCatalog();
+    createSchema();
+  }
+
+  protected String defaultBaseLocation() {
+    if (defaultBaseLocation == null) {
+      try {
+        Path bucket =
+            new Path(
+                String.format(
+                    "gs://%s/%s", BUCKET_NAME, 
GravitinoITUtils.genRandomName("CatalogFilesetIT")));
+        if (!fileSystem.exists(bucket)) {
+          fileSystem.mkdirs(bucket);
+        }
+
+        defaultBaseLocation = bucket.toString();
+      } catch (IOException e) {
+        throw new RuntimeException("Failed to create default base location", 
e);
+      }
+    }
+
+    return defaultBaseLocation;
+  }
+
+  protected void createCatalog() {
+    Map<String, String> map = Maps.newHashMap();
+    map.put("gravitino.bypass.fs.gs.auth.service.account.enable", "true");
+    map.put("gravitino.bypass.fs.gs.auth.service.account.json.keyfile", 
SERVICE_ACCOUNT_FILE);
+    map.put(FILESYSTEM_PROVIDERS, "gcs");
+
+    metalake.createCatalog(catalogName, Catalog.Type.FILESET, provider, 
"comment", map);
+
+    catalog = metalake.loadCatalog(catalogName);
+  }
+
+  protected String generateLocation(String filesetName) {
+    return String.format("%s/%s", defaultBaseLocation, filesetName);
+  }
+}
diff --git a/clients/filesystem-hadoop3/build.gradle.kts 
b/clients/filesystem-hadoop3/build.gradle.kts
index aefac5f28..cae188818 100644
--- a/clients/filesystem-hadoop3/build.gradle.kts
+++ b/clients/filesystem-hadoop3/build.gradle.kts
@@ -39,6 +39,7 @@ dependencies {
   testImplementation(project(":server-common"))
   testImplementation(project(":clients:client-java"))
   testImplementation(project(":integration-test-common", "testArtifacts"))
+  testImplementation(project(":bundles:gcp-bundle"))
   testImplementation(libs.awaitility)
   testImplementation(libs.bundles.jetty)
   testImplementation(libs.bundles.jersey)
diff --git 
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
index cd1ecb92f..95ce4df2a 100644
--- 
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
+++ 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
@@ -21,9 +21,18 @@ package org.apache.gravitino.filesystem.hadoop;
 import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
 
 /** Configuration class for Gravitino Virtual File System. */
-class GravitinoVirtualFileSystemConfiguration {
+public class GravitinoVirtualFileSystemConfiguration {
+
+  /**
+   * The prefix of the Gravitino fileset URI. The URI of the Gravitino fileset 
should start with
+   * this prefix.
+   */
   public static final String GVFS_FILESET_PREFIX = "gvfs://fileset";
+
+  /** The scheme of the Gravitino Virtual File System. */
   public static final String GVFS_SCHEME = "gvfs";
+
+  /** The prefix of the Gravitino Virtual File System. */
   public static final String GVFS_CONFIG_PREFIX = "fs.gvfs.";
 
   /** The configuration key for the Gravitino server URI. */
@@ -42,8 +51,12 @@ class GravitinoVirtualFileSystemConfiguration {
    */
   public static final String FS_FILESYSTEM_PROVIDERS = 
"fs.gvfs.filesystem.providers";
 
+  /** The authentication type for simple authentication. */
   public static final String SIMPLE_AUTH_TYPE = "simple";
+  /** The authentication type for oauth2 authentication. */
   public static final String OAUTH2_AUTH_TYPE = "oauth2";
+
+  /** The authentication type for kerberos authentication. */
   public static final String KERBEROS_AUTH_TYPE = "kerberos";
   // oauth2
   /** The configuration key for the URI of the default OAuth server. */
@@ -74,6 +87,10 @@ class GravitinoVirtualFileSystemConfiguration {
   public static final String FS_GRAVITINO_FILESET_CACHE_MAX_CAPACITY_KEY =
       "fs.gravitino.fileset.cache.maxCapacity";
 
+  /**
+   * The default value for the maximum capacity of the Gravitino fileset 
cache. The default value is
+   * 20.
+   */
   public static final int FS_GRAVITINO_FILESET_CACHE_MAX_CAPACITY_DEFAULT = 20;
 
   /**
@@ -83,6 +100,10 @@ class GravitinoVirtualFileSystemConfiguration {
   public static final String 
FS_GRAVITINO_FILESET_CACHE_EVICTION_MILLS_AFTER_ACCESS_KEY =
       "fs.gravitino.fileset.cache.evictionMillsAfterAccess";
 
+  /**
+   * The default value for the eviction time of the Gravitino fileset cache, 
measured in mills after
+   * access.
+   */
   public static final long 
FS_GRAVITINO_FILESET_CACHE_EVICTION_MILLS_AFTER_ACCESS_DEFAULT =
       1000L * 60 * 60;
 
diff --git 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemGCSIT.java
 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemGCSIT.java
new file mode 100644
index 000000000..a42d1c4b7
--- /dev/null
+++ 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemGCSIT.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.filesystem.hadoop.integration.test;
+
+import static 
org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS;
+import static 
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemConfiguration.FS_FILESYSTEM_PROVIDERS;
+
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.integration.test.util.DownloaderUtils;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.integration.test.util.ITUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Disabled(
+    "Disabled due to we don't have a real GCP account to test. If you have a 
GCP account,"
+        + "please change the configuration(YOUR_KEY_FILE, YOUR_BUCKET) and 
enable this test.")
+public class GravitinoVirtualFileSystemGCSIT extends 
GravitinoVirtualFileSystemIT {
+  private static final Logger LOG = 
LoggerFactory.getLogger(GravitinoVirtualFileSystemGCSIT.class);
+
+  public static final String BUCKET_NAME = "YOUR_BUCKET";
+  public static final String SERVICE_ACCOUNT_FILE = "YOUR_KEY_FILE";
+
+  @BeforeAll
+  public void startIntegrationTest() {
+    // Do nothing
+  }
+
+  @BeforeAll
+  public void startUp() throws Exception {
+    copyGCPJars();
+    // Need to download jars to gravitino server
+    super.startIntegrationTest();
+
+    // This value can be by tune by the user, please change it accordingly.
+    defaultBockSize = 64 * 1024 * 1024;
+
+    metalakeName = GravitinoITUtils.genRandomName("gvfs_it_metalake");
+    catalogName = GravitinoITUtils.genRandomName("catalog");
+    schemaName = GravitinoITUtils.genRandomName("schema");
+
+    Assertions.assertFalse(client.metalakeExists(metalakeName));
+    metalake = client.createMetalake(metalakeName, "metalake comment", 
Collections.emptyMap());
+    Assertions.assertTrue(client.metalakeExists(metalakeName));
+
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(FILESYSTEM_PROVIDERS, "gcs");
+    properties.put(
+        "gravitino.bypass.fs.gs.auth.service.account.json.keyfile", 
SERVICE_ACCOUNT_FILE);
+
+    Catalog catalog =
+        metalake.createCatalog(
+            catalogName, Catalog.Type.FILESET, "hadoop", "catalog comment", 
properties);
+    Assertions.assertTrue(metalake.catalogExists(catalogName));
+
+    catalog.asSchemas().createSchema(schemaName, "schema comment", properties);
+    Assertions.assertTrue(catalog.asSchemas().schemaExists(schemaName));
+
+    conf.set("fs.gvfs.impl", 
"org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem");
+    conf.set("fs.AbstractFileSystem.gvfs.impl", 
"org.apache.gravitino.filesystem.hadoop.Gvfs");
+    conf.set("fs.gvfs.impl.disable.cache", "true");
+    conf.set("fs.gravitino.server.uri", serverUri);
+    conf.set("fs.gravitino.client.metalake", metalakeName);
+
+    // Pass this configuration to the real file system
+    conf.set("gravitino.bypass.fs.gs.auth.service.account.enable", "true");
+    conf.set("gravitino.bypass.fs.gs.auth.service.account.json.keyfile", 
SERVICE_ACCOUNT_FILE);
+    conf.set(FS_FILESYSTEM_PROVIDERS, "gcs");
+  }
+
+  @AfterAll
+  public void tearDown() throws IOException {
+    Catalog catalog = metalake.loadCatalog(catalogName);
+    catalog.asSchemas().dropSchema(schemaName, true);
+    metalake.dropCatalog(catalogName);
+    client.dropMetalake(metalakeName);
+
+    if (client != null) {
+      client.close();
+      client = null;
+    }
+
+    try {
+      closer.close();
+    } catch (Exception e) {
+      LOG.error("Exception in closing CloseableGroup", e);
+    }
+  }
+
+  /**
+   * Remove the `gravitino.bypass` prefix from the configuration and pass it 
to the real file system
+   * This method corresponds to the method 
org.apache.gravitino.filesystem.hadoop
+   * .GravitinoVirtualFileSystem#getConfigMap(Configuration) in the original 
code.
+   */
+  protected Configuration 
convertGvfsConfigToRealFileSystemConfig(Configuration gvfsConf) {
+    Configuration gcsConf = new Configuration();
+    gvfsConf.forEach(
+        entry -> {
+          gcsConf.set(entry.getKey().replace("gravitino.bypass.", ""), 
entry.getValue());
+        });
+
+    return gcsConf;
+  }
+
+  protected String genStorageLocation(String fileset) {
+    return String.format("gs://%s/%s", BUCKET_NAME, fileset);
+  }
+
+  private static boolean isDeploy() {
+    String mode =
+        System.getProperty(ITUtils.TEST_MODE) == null
+            ? ITUtils.EMBEDDED_TEST_MODE
+            : System.getProperty(ITUtils.TEST_MODE);
+
+    return Objects.equals(mode, ITUtils.DEPLOY_TEST_MODE);
+  }
+
+  private void copyGCPJars() {
+    if (!isDeploy()) {
+      return;
+    }
+
+    String gravitinoHome = System.getenv("GRAVITINO_HOME");
+    String jarName = String.format("gravitino-gcp-bundle-%s.jar", 
System.getenv("PROJECT_VERSION"));
+    String gcsJars =
+        ITUtils.joinPath(
+            gravitinoHome, "..", "..", "bundles", "gcp-bundle", "build", 
"libs", jarName);
+    gcsJars = "file://" + gcsJars;
+    try {
+      if (!ITUtils.EMBEDDED_TEST_MODE.equals(testMode)) {
+        String hadoopLibDirs = ITUtils.joinPath(gravitinoHome, "catalogs", 
"hadoop", "libs");
+        DownloaderUtils.downloadFile(gcsJars, hadoopLibDirs);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(
+          String.format("Failed to copy the gcs dependency jars: %s", 
gcsJars), e);
+    }
+  }
+
+  @Disabled(
+      "GCS does not support append, java.io.IOException: The append operation 
is not supported")
+  public void testAppend() throws IOException {}
+}
diff --git 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemIT.java
 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemIT.java
index 9b6334e09..ced9a0b8b 100644
--- 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemIT.java
+++ 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemIT.java
@@ -57,14 +57,15 @@ import org.slf4j.LoggerFactory;
 public class GravitinoVirtualFileSystemIT extends BaseIT {
   private static final Logger LOG = 
LoggerFactory.getLogger(GravitinoVirtualFileSystemIT.class);
   private static final ContainerSuite containerSuite = 
ContainerSuite.getInstance();
-  private static final String metalakeName = 
GravitinoITUtils.genRandomName("gvfs_it_metalake");
-  private static final String catalogName = 
GravitinoITUtils.genRandomName("catalog");
-  private static final String schemaName = 
GravitinoITUtils.genRandomName("schema");
-  private static GravitinoMetalake metalake;
-  private static Configuration conf = new Configuration();
+  protected String metalakeName = 
GravitinoITUtils.genRandomName("gvfs_it_metalake");
+  protected String catalogName = GravitinoITUtils.genRandomName("catalog");
+  protected String schemaName = GravitinoITUtils.genRandomName("schema");
+  protected GravitinoMetalake metalake;
+  protected Configuration conf = new Configuration();
+  protected int defaultBockSize = 128 * 1024 * 1024;
 
   @BeforeAll
-  public void startUp() {
+  public void startUp() throws Exception {
     containerSuite.startHiveContainer();
     Assertions.assertFalse(client.metalakeExists(metalakeName));
     metalake = client.createMetalake(metalakeName, "metalake comment", 
Collections.emptyMap());
@@ -112,10 +113,14 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
     }
   }
 
+  protected Configuration 
convertGvfsConfigToRealFileSystemConfig(Configuration gvfsConf) {
+    return gvfsConf;
+  }
+
   @Test
   public void testCreate() throws IOException {
     // create fileset
-    String filesetName = "test_fileset_create";
+    String filesetName = GravitinoITUtils.genRandomName("test_fileset_create");
     NameIdentifier filesetIdent = NameIdentifier.of(schemaName, filesetName);
     Catalog catalog = metalake.loadCatalog(catalogName);
     String storageLocation = genStorageLocation(filesetName);
@@ -131,25 +136,28 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
 
     // test gvfs create
     Path hdfsPath = new Path(storageLocation);
-    try (FileSystem fs = hdfsPath.getFileSystem(conf)) {
+    try (FileSystem fs = 
hdfsPath.getFileSystem(convertGvfsConfigToRealFileSystemConfig(conf))) {
       Assertions.assertTrue(fs.exists(hdfsPath));
       Path gvfsPath = genGvfsPath(filesetName);
       try (FileSystem gvfs = gvfsPath.getFileSystem(conf)) {
         Assertions.assertTrue(gvfs.exists(gvfsPath));
         String fileName = "test.txt";
         Path createPath = new Path(gvfsPath + "/" + fileName);
-        gvfs.create(createPath);
+        // GCS need to close the stream to create the file manually.
+        gvfs.create(createPath).close();
         Assertions.assertTrue(gvfs.exists(createPath));
         Assertions.assertTrue(gvfs.getFileStatus(createPath).isFile());
         Assertions.assertTrue(fs.exists(new Path(storageLocation + "/" + 
fileName)));
       }
     }
+
+    catalog.asFilesetCatalog().dropFileset(filesetIdent);
   }
 
   @Test
   public void testAppend() throws IOException {
     // create fileset
-    String filesetName = "test_fileset_append";
+    String filesetName = GravitinoITUtils.genRandomName("test_fileset_append");
     NameIdentifier filesetIdent = NameIdentifier.of(schemaName, filesetName);
     Catalog catalog = metalake.loadCatalog(catalogName);
     String storageLocation = genStorageLocation(filesetName);
@@ -165,7 +173,7 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
 
     // test gvfs append
     Path hdfsPath = new Path(storageLocation);
-    try (FileSystem fs = hdfsPath.getFileSystem(conf)) {
+    try (FileSystem fs = 
hdfsPath.getFileSystem(convertGvfsConfigToRealFileSystemConfig(conf))) {
       Assertions.assertTrue(fs.exists(hdfsPath));
       Path gvfsPath = genGvfsPath(filesetName);
       String fileName = "test.txt";
@@ -173,7 +181,7 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
 
       try (FileSystem gvfs = gvfsPath.getFileSystem(conf)) {
         Assertions.assertTrue(gvfs.exists(gvfsPath));
-        gvfs.create(appendPath);
+        gvfs.create(appendPath).close();
         Assertions.assertTrue(gvfs.exists(appendPath));
         Assertions.assertTrue(gvfs.getFileStatus(appendPath).isFile());
         Assertions.assertTrue(fs.exists(new Path(storageLocation + "/" + 
fileName)));
@@ -203,12 +211,14 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
         }
       }
     }
+
+    catalog.asFilesetCatalog().dropFileset(filesetIdent);
   }
 
   @Test
   public void testDelete() throws IOException {
     // create fileset
-    String filesetName = "test_fileset_delete";
+    String filesetName = GravitinoITUtils.genRandomName("test_fileset_delete");
     NameIdentifier filesetIdent = NameIdentifier.of(schemaName, filesetName);
     Catalog catalog = metalake.loadCatalog(catalogName);
     String storageLocation = genStorageLocation(filesetName);
@@ -224,14 +234,14 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
 
     // test gvfs delete
     Path hdfsPath = new Path(storageLocation);
-    try (FileSystem fs = hdfsPath.getFileSystem(conf)) {
+    try (FileSystem fs = 
hdfsPath.getFileSystem(convertGvfsConfigToRealFileSystemConfig(conf))) {
       Assertions.assertTrue(fs.exists(hdfsPath));
       Path gvfsPath = genGvfsPath(filesetName);
       String fileName = "test.txt";
       Path deletePath = new Path(gvfsPath + "/" + fileName);
       try (FileSystem gvfs = gvfsPath.getFileSystem(conf)) {
         Assertions.assertTrue(gvfs.exists(gvfsPath));
-        gvfs.create(deletePath);
+        gvfs.create(deletePath).close();
         Assertions.assertTrue(gvfs.exists(deletePath));
         Assertions.assertTrue(gvfs.getFileStatus(deletePath).isFile());
         Assertions.assertTrue(fs.exists(new Path(storageLocation + "/" + 
fileName)));
@@ -242,12 +252,14 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
         Assertions.assertFalse(fs.exists(new Path(storageLocation + "/" + 
fileName)));
       }
     }
+
+    catalog.asFilesetCatalog().dropFileset(filesetIdent);
   }
 
   @Test
   public void testGetStatus() throws IOException {
     // create fileset
-    String filesetName = "test_fileset_get_status";
+    String filesetName = 
GravitinoITUtils.genRandomName("test_fileset_get_status");
     NameIdentifier filesetIdent = NameIdentifier.of(schemaName, filesetName);
     Catalog catalog = metalake.loadCatalog(catalogName);
     String storageLocation = genStorageLocation(filesetName);
@@ -263,14 +275,14 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
 
     // test gvfs get status
     Path hdfsPath = new Path(storageLocation);
-    try (FileSystem fs = hdfsPath.getFileSystem(conf)) {
+    try (FileSystem fs = 
hdfsPath.getFileSystem(convertGvfsConfigToRealFileSystemConfig(conf))) {
       Assertions.assertTrue(fs.exists(hdfsPath));
       Path gvfsPath = genGvfsPath(filesetName);
       String fileName = "test.txt";
       Path statusPath = new Path(gvfsPath + "/" + fileName);
       try (FileSystem gvfs = gvfsPath.getFileSystem(conf)) {
         Assertions.assertTrue(gvfs.exists(gvfsPath));
-        gvfs.create(statusPath);
+        gvfs.create(statusPath).close();
         Assertions.assertTrue(gvfs.exists(statusPath));
         Assertions.assertTrue(gvfs.getFileStatus(statusPath).isFile());
         Assertions.assertTrue(fs.exists(new Path(storageLocation + "/" + 
fileName)));
@@ -284,12 +296,14 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
                 .replaceFirst(genGvfsPath(filesetName).toString(), 
storageLocation));
       }
     }
+
+    catalog.asFilesetCatalog().dropFileset(filesetIdent);
   }
 
   @Test
   public void testListStatus() throws IOException {
     // create fileset
-    String filesetName = "test_fileset_list_status";
+    String filesetName = 
GravitinoITUtils.genRandomName("test_fileset_list_status");
     NameIdentifier filesetIdent = NameIdentifier.of(schemaName, filesetName);
     Catalog catalog = metalake.loadCatalog(catalogName);
     String storageLocation = genStorageLocation(filesetName);
@@ -305,7 +319,7 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
 
     // test gvfs list status
     Path hdfsPath = new Path(storageLocation);
-    try (FileSystem fs = hdfsPath.getFileSystem(conf)) {
+    try (FileSystem fs = 
hdfsPath.getFileSystem(convertGvfsConfigToRealFileSystemConfig(conf))) {
       Assertions.assertTrue(fs.exists(hdfsPath));
       Path gvfsPath = genGvfsPath(filesetName);
       for (int i = 0; i < 10; i++) {
@@ -313,7 +327,7 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
         Path statusPath = new Path(gvfsPath + "/" + fileName);
         try (FileSystem gvfs = gvfsPath.getFileSystem(conf)) {
           Assertions.assertTrue(gvfs.exists(gvfsPath));
-          gvfs.create(statusPath);
+          gvfs.create(statusPath).close();
           Assertions.assertTrue(gvfs.exists(statusPath));
           Assertions.assertTrue(gvfs.getFileStatus(statusPath).isFile());
           Assertions.assertTrue(fs.exists(new Path(storageLocation + "/" + 
fileName)));
@@ -340,12 +354,14 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
         }
       }
     }
+
+    catalog.asFilesetCatalog().dropFileset(filesetIdent);
   }
 
   @Test
   public void testMkdirs() throws IOException {
     // create fileset
-    String filesetName = "test_fileset_mkdirs";
+    String filesetName = GravitinoITUtils.genRandomName("test_fileset_mkdirs");
     NameIdentifier filesetIdent = NameIdentifier.of(schemaName, filesetName);
     Catalog catalog = metalake.loadCatalog(catalogName);
     String storageLocation = genStorageLocation(filesetName);
@@ -361,7 +377,7 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
 
     // test gvfs mkdirs
     Path hdfsPath = new Path(storageLocation);
-    try (FileSystem fs = hdfsPath.getFileSystem(conf)) {
+    try (FileSystem fs = 
hdfsPath.getFileSystem(convertGvfsConfigToRealFileSystemConfig(conf))) {
       Assertions.assertTrue(fs.exists(hdfsPath));
       Path gvfsPath = genGvfsPath(filesetName);
       try (FileSystem gvfs = gvfsPath.getFileSystem(conf)) {
@@ -374,12 +390,14 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
         Assertions.assertTrue(fs.exists(new Path(storageLocation + "/" + 
dirName)));
       }
     }
+
+    catalog.asFilesetCatalog().dropFileset(filesetIdent);
   }
 
   @Test
   public void testRename() throws IOException {
     // create fileset
-    String filesetName = "test_fileset_rename";
+    String filesetName = GravitinoITUtils.genRandomName("test_fileset_rename");
     NameIdentifier filesetIdent = NameIdentifier.of(schemaName, filesetName);
     Catalog catalog = metalake.loadCatalog(catalogName);
     String storageLocation = genStorageLocation(filesetName);
@@ -395,7 +413,7 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
 
     // test gvfs rename
     Path hdfsPath = new Path(storageLocation);
-    try (FileSystem fs = hdfsPath.getFileSystem(conf)) {
+    try (FileSystem fs = 
hdfsPath.getFileSystem(convertGvfsConfigToRealFileSystemConfig(conf))) {
       Assertions.assertTrue(fs.exists(hdfsPath));
       Path gvfsPath = genGvfsPath(filesetName);
       String srcName = "test_src";
@@ -420,11 +438,13 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
         Assertions.assertFalse(fs.exists(new Path(storageLocation + "/" + 
srcName)));
       }
     }
+
+    catalog.asFilesetCatalog().dropFileset(filesetIdent);
   }
 
   @Test
   public void testGetDefaultReplications() throws IOException {
-    String filesetName = "test_get_default_replications";
+    String filesetName = 
GravitinoITUtils.genRandomName("test_get_default_replications");
     NameIdentifier filesetIdent = NameIdentifier.of(schemaName, filesetName);
     Catalog catalog = metalake.loadCatalog(catalogName);
     String storageLocation = genStorageLocation(filesetName);
@@ -441,11 +461,13 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
     try (FileSystem gvfs = gvfsPath.getFileSystem(conf)) {
       assertEquals(3, gvfs.getDefaultReplication(gvfsPath));
     }
+
+    catalog.asFilesetCatalog().dropFileset(filesetIdent);
   }
 
   @Test
   public void testGetDefaultBlockSizes() throws IOException {
-    String filesetName = "test_get_default_block_sizes";
+    String filesetName = 
GravitinoITUtils.genRandomName("test_get_default_block_sizes");
     NameIdentifier filesetIdent = NameIdentifier.of(schemaName, filesetName);
     Catalog catalog = metalake.loadCatalog(catalogName);
     String storageLocation = genStorageLocation(filesetName);
@@ -460,15 +482,17 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
     
Assertions.assertTrue(catalog.asFilesetCatalog().filesetExists(filesetIdent));
     Path gvfsPath = genGvfsPath(filesetName);
     try (FileSystem gvfs = gvfsPath.getFileSystem(conf)) {
-      assertEquals(128 * 1024 * 1024, gvfs.getDefaultBlockSize(gvfsPath));
+      assertEquals(defaultBockSize, gvfs.getDefaultBlockSize(gvfsPath));
     }
+
+    catalog.asFilesetCatalog().dropFileset(filesetIdent);
   }
 
-  private String genStorageLocation(String fileset) {
+  protected String genStorageLocation(String fileset) {
     return String.format("%s/%s", baseHdfsPath(), fileset);
   }
 
-  private static String baseHdfsPath() {
+  private String baseHdfsPath() {
     return String.format(
         "hdfs://%s:%d/%s/%s",
         containerSuite.getHiveContainer().getContainerIpAddress(),
@@ -477,7 +501,7 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
         schemaName);
   }
 
-  private Path genGvfsPath(String fileset) {
+  protected Path genGvfsPath(String fileset) {
     return new Path(String.format("gvfs://fileset/%s/%s/%s", catalogName, 
schemaName, fileset));
   }
 }
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 09cfbea2c..af04c6314 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -32,6 +32,7 @@ airlift-resolver = "1.6"
 hive2 = "2.3.9"
 hadoop2 = "2.10.2"
 hadoop3 = "3.1.0"
+hadoop3-gcs = "1.9.4-hadoop3"
 hadoop-minikdc = "3.3.6"
 htrace-core4 = "4.1.0-incubating"
 httpclient5 = "5.2.1"
@@ -152,6 +153,7 @@ hadoop3-hdfs = { group = "org.apache.hadoop", name = 
"hadoop-hdfs", version.ref
 hadoop3-common = { group = "org.apache.hadoop", name = "hadoop-common", 
version.ref = "hadoop3"}
 hadoop3-client = { group = "org.apache.hadoop", name = "hadoop-client", 
version.ref = "hadoop3"}
 hadoop3-minicluster = { group = "org.apache.hadoop", name = 
"hadoop-minicluster", version.ref = "hadoop-minikdc"}
+hadoop3-gcs = { group = "com.google.cloud.bigdataoss", name = "gcs-connector", 
version.ref = "hadoop3-gcs"}
 htrace-core4 = { group = "org.apache.htrace", name = "htrace-core4", 
version.ref = "htrace-core4" }
 airlift-json = { group = "io.airlift", name = "json", version.ref = 
"airlift-json"}
 airlift-resolver = { group = "io.airlift.resolver", name = "resolver", 
version.ref = "airlift-resolver"}
diff --git 
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/ITUtils.java
 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/ITUtils.java
index e5454199f..9a6d7b130 100644
--- 
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/ITUtils.java
+++ 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/ITUtils.java
@@ -50,6 +50,7 @@ import org.junit.jupiter.api.Assertions;
 public class ITUtils {
   public static final String TEST_MODE = "testMode";
   public static final String EMBEDDED_TEST_MODE = "embedded";
+  public static final String DEPLOY_TEST_MODE = "deploy";
 
   public static String joinPath(String... dirs) {
     return String.join(File.separator, dirs);
diff --git a/settings.gradle.kts b/settings.gradle.kts
index e98f81d39..36d66504f 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -70,3 +70,4 @@ project(":spark-connector:spark-runtime-3.5").projectDir = 
file("spark-connector
 include("web:web", "web:integration-test")
 include("docs")
 include("integration-test-common")
+include(":bundles:gcp-bundle")

Reply via email to