This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 5168460b78 [#9506][followup] improvement(authz): Add GCS integration 
tests for fileset credential vending (#9552)
5168460b78 is described below

commit 5168460b78ad415ee8902b61ed8e022fd772e09f
Author: roryqi <[email protected]>
AuthorDate: Fri Dec 26 10:08:48 2025 +0800

    [#9506][followup] improvement(authz): Add GCS integration tests for fileset 
credential vending (#9552)
    
    ### What changes were proposed in this pull request?
    
    Add GCS integration tests for fileset credential vending
    
    ### Why are the changes needed?
    
    This is a follow up PR to add more integration tests.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Add more tests. I verified them in the local machine.
---
 ...stractFileSystemCredentialAuthorizationIT.java} | 213 +++++++++------------
 .../FileSystemGCSCredentialAuthorizationIT.java    |  99 ++++++++++
 .../FileSystemS3CredentialAuthorizationIT.java     | 121 ++++++++++++
 .../GravitinoVirtualFileSystemGCSCredentialIT.java |   7 +-
 4 files changed, 316 insertions(+), 124 deletions(-)

diff --git 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemS3CredentialAuthorizationIT.java
 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/AbstractFileSystemCredentialAuthorizationIT.java
similarity index 54%
rename from 
clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemS3CredentialAuthorizationIT.java
rename to 
clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/AbstractFileSystemCredentialAuthorizationIT.java
index ed83990358..5050edbe1b 100644
--- 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemS3CredentialAuthorizationIT.java
+++ 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/AbstractFileSystemCredentialAuthorizationIT.java
@@ -1,20 +1,20 @@
 /*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.gravitino.filesystem.hadoop.integration.test;
@@ -37,19 +37,16 @@ import org.apache.gravitino.Configs;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.MetadataObjects;
 import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.authorization.Privilege;
 import org.apache.gravitino.authorization.Privileges;
 import org.apache.gravitino.authorization.SecurableObject;
 import org.apache.gravitino.authorization.SecurableObjects;
-import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
 import org.apache.gravitino.client.GravitinoAdminClient;
 import org.apache.gravitino.client.GravitinoMetalake;
 import org.apache.gravitino.credential.CredentialConstants;
-import org.apache.gravitino.credential.S3TokenCredential;
 import org.apache.gravitino.file.Fileset;
 import org.apache.gravitino.integration.test.util.BaseIT;
 import org.apache.gravitino.integration.test.util.GravitinoITUtils;
-import org.apache.gravitino.s3.fs.S3FileSystemProvider;
-import org.apache.gravitino.storage.S3Properties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -59,41 +56,31 @@ import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.condition.EnabledIf;
-import org.junit.platform.commons.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * GVFS integration test that verifies credential vending with authorization 
enabled on S3. <br>
- * - READ_FILESET: can read via GVFS but cannot write. <br>
- * - WRITE_FILESET: can read and write via GVFS.
+ * Base class for GVFS credential vending authorization tests across cloud 
providers. Subclasses
+ * supply provider-specific configuration and environment guards.
  */
-@EnabledIf(value = "s3IsConfigured", disabledReason = "s3 with credential is 
not prepared")
-public class GravitinoVirtualFileSystemS3CredentialAuthorizationIT extends 
BaseIT {
+abstract class AbstractFileSystemCredentialAuthorizationIT extends BaseIT {
   private static final Logger LOG =
-      
LoggerFactory.getLogger(GravitinoVirtualFileSystemS3CredentialAuthorizationIT.class);
+      
LoggerFactory.getLogger(AbstractFileSystemCredentialAuthorizationIT.class);
 
-  public static final String BUCKET_NAME = 
System.getenv("S3_BUCKET_NAME_FOR_CREDENTIAL");
-  public static final String S3_ACCESS_KEY = 
System.getenv("S3_ACCESS_KEY_ID_FOR_CREDENTIAL");
-  public static final String S3_SECRET_KEY = 
System.getenv("S3_SECRET_ACCESS_KEY_FOR_CREDENTIAL");
-  public static final String S3_ENDPOINT = 
System.getenv("S3_ENDPOINT_FOR_CREDENTIAL");
-  public static final String S3_REGION = 
System.getenv("S3_REGION_FOR_CREDENTIAL");
-  public static final String S3_ROLE_ARN = 
System.getenv("S3_ROLE_ARN_FOR_CREDENTIAL");
+  protected static final String SUPER_USER = "gravitino_admin";
+  protected static final String NORMAL_USER = "normal_user";
 
-  private static final String SUPER_USER = "gravitino_admin";
-  private static final String NORMAL_USER = "normal_user";
-  private static final String ROLE_NAME = "gvfs_s3_credential_auth_role";
+  protected GravitinoMetalake metalake;
+  protected GravitinoAdminClient adminClient;
+  protected GravitinoAdminClient normalUserClient;
+  protected Configuration gvfsConf;
 
-  private GravitinoMetalake metalake;
-  private GravitinoAdminClient adminClient;
-  private GravitinoAdminClient normalUserClient;
-  private Configuration gvfsConf;
-
-  private String metalakeName;
-  private String catalogName;
-  private String schemaName;
+  protected String metalakeName;
+  protected String catalogName;
+  protected String schemaName;
+  protected String roleName;
 
+  // Public lifecycle
   @BeforeAll
   public void startIntegrationTest() {
     // Prevent BaseIT from starting before we inject auth configs.
@@ -101,7 +88,7 @@ public class 
GravitinoVirtualFileSystemS3CredentialAuthorizationIT extends BaseI
 
   @BeforeAll
   public void startUp() throws Exception {
-    copyBundleJarsToHadoop("aws-bundle");
+    copyBundleJarsToHadoop(providerBundleName());
 
     Map<String, String> configs = Maps.newHashMap();
     configs.put(Configs.ENABLE_AUTHORIZATION.getKey(), "true");
@@ -114,33 +101,26 @@ public class 
GravitinoVirtualFileSystemS3CredentialAuthorizationIT extends BaseI
     adminClient = 
GravitinoAdminClient.builder(serverUri).withSimpleAuth(SUPER_USER).build();
     normalUserClient = 
GravitinoAdminClient.builder(serverUri).withSimpleAuth(NORMAL_USER).build();
 
-    metalakeName = GravitinoITUtils.genRandomName("gvfs_auth_metalake");
+    metalakeName = GravitinoITUtils.genRandomName("gvfs_cloud_auth_metalake");
     catalogName = GravitinoITUtils.genRandomName("catalog");
     schemaName = GravitinoITUtils.genRandomName("schema");
+    roleName = providerRoleName();
 
     metalake = adminClient.createMetalake(metalakeName, "metalake comment", 
Collections.emptyMap());
     Assertions.assertTrue(adminClient.metalakeExists(metalakeName));
     metalake.addUser(NORMAL_USER);
 
-    Map<String, String> properties = Maps.newHashMap();
-    properties.put(S3Properties.GRAVITINO_S3_ACCESS_KEY_ID, S3_ACCESS_KEY);
-    properties.put(S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY, S3_SECRET_KEY);
-    properties.put(S3Properties.GRAVITINO_S3_ENDPOINT, S3_ENDPOINT);
-    properties.put(
-        "gravitino.bypass.fs.s3a.aws.credentials.provider",
-        "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
-    properties.put(FILESYSTEM_PROVIDERS, "s3");
-    properties.put(S3Properties.GRAVITINO_S3_REGION, S3_REGION);
-    properties.put(S3Properties.GRAVITINO_S3_ROLE_ARN, S3_ROLE_ARN);
-    properties.put(
-        CredentialConstants.CREDENTIAL_PROVIDERS, 
S3TokenCredential.S3_TOKEN_CREDENTIAL_TYPE);
-
-    Catalog catalog =
-        metalake.createCatalog(
-            catalogName, Catalog.Type.FILESET, "hadoop", "catalog comment", 
properties);
+    Map<String, String> properties = new HashMap<>(catalogBaseProperties());
+    properties.put(FILESYSTEM_PROVIDERS, providerName());
+    properties.put(CredentialConstants.CREDENTIAL_PROVIDERS, 
credentialProviderType());
+    metalake.createCatalog(
+        catalogName, Catalog.Type.FILESET, "hadoop", "catalog comment", 
properties);
     Assertions.assertTrue(metalake.catalogExists(catalogName));
-    catalog.asSchemas().createSchema(schemaName, "schema comment", properties);
-    Assertions.assertTrue(catalog.asSchemas().schemaExists(schemaName));
+    metalake
+        .loadCatalog(catalogName)
+        .asSchemas()
+        .createSchema(schemaName, "schema comment", properties);
+    
Assertions.assertTrue(metalake.loadCatalog(catalogName).asSchemas().schemaExists(schemaName));
 
     List<SecurableObject> securableObjects = new ArrayList<>();
     SecurableObject catalogObject =
@@ -150,8 +130,8 @@ public class 
GravitinoVirtualFileSystemS3CredentialAuthorizationIT extends BaseI
         SecurableObjects.ofSchema(
             catalogObject, schemaName, 
ImmutableList.of(Privileges.UseSchema.allow()));
     securableObjects.add(schemaObject);
-    metalake.createRole(ROLE_NAME, new HashMap<>(), securableObjects);
-    metalake.grantRolesToUser(ImmutableList.of(ROLE_NAME), NORMAL_USER);
+    metalake.createRole(roleName, new HashMap<>(), securableObjects);
+    metalake.grantRolesToUser(ImmutableList.of(roleName), NORMAL_USER);
 
     gvfsConf = new Configuration();
     gvfsConf.set(
@@ -161,9 +141,6 @@ public class 
GravitinoVirtualFileSystemS3CredentialAuthorizationIT extends BaseI
     gvfsConf.set("fs.gravitino.server.uri", serverUri);
     gvfsConf.set("fs.gravitino.client.metalake", metalakeName);
     gvfsConf.set("fs.gravitino.enableCredentialVending", "true");
-    gvfsConf.set(S3Properties.GRAVITINO_S3_ENDPOINT, S3_ENDPOINT);
-    gvfsConf.set(S3Properties.GRAVITINO_S3_REGION, S3_REGION);
-    gvfsConf.set(S3Properties.GRAVITINO_S3_ROLE_ARN, S3_ROLE_ARN);
   }
 
   @AfterAll
@@ -198,7 +175,7 @@ public class 
GravitinoVirtualFileSystemS3CredentialAuthorizationIT extends BaseI
 
   @Test
   void testCredentialVendingWithReadPrivilegeAllowsReadOnly() throws 
IOException {
-    String filesetName = GravitinoITUtils.genRandomName("gvfs_auth_read");
+    String filesetName = GravitinoITUtils.genRandomName(providerPrefix() + 
"_auth_read");
     NameIdentifier filesetIdent = NameIdentifier.of(schemaName, filesetName);
     String storageLocation = genStorageLocation(filesetName);
 
@@ -210,22 +187,12 @@ public class 
GravitinoVirtualFileSystemS3CredentialAuthorizationIT extends BaseI
             "fileset comment",
             Fileset.Type.MANAGED,
             storageLocation,
-            ImmutableMap.of(
-                CredentialConstants.CREDENTIAL_PROVIDERS,
-                S3TokenCredential.S3_TOKEN_CREDENTIAL_TYPE));
+            filesetProperties());
 
-    // Seed a file so list/open works.
-    Path realPath = new Path(storageLocation);
-    try (FileSystem realFs =
-        
realPath.getFileSystem(convertGvfsConfigToRealFileSystemConfig(gvfsConf))) {
-      realFs.mkdirs(realPath);
-      try (FSDataOutputStream out = realFs.create(new Path(realPath, 
"seed.txt"), true)) {
-        out.write("seed".getBytes(StandardCharsets.UTF_8));
-      }
-    }
+    seedFile(storageLocation, "seed.txt");
 
     metalake.grantPrivilegesToRole(
-        ROLE_NAME,
+        roleName,
         MetadataObjects.of(
             ImmutableList.of(catalogName, schemaName, filesetName), 
MetadataObject.Type.FILESET),
         ImmutableList.of(Privileges.ReadFileset.allow()));
@@ -244,16 +211,12 @@ public class 
GravitinoVirtualFileSystemS3CredentialAuthorizationIT extends BaseI
             Assertions.assertThrows(IOException.class, () -> 
gvfs.create(denyWrite, true).close());
         String msg = ioe.getMessage() == null ? "" : 
ioe.getMessage().toLowerCase();
         Assertions.assertTrue(
-            msg.contains("accessdenied"),
-            "Expected write to be denied due to missing WRITE_FILESET 
privilege, but got: "
+            msg.contains("forbidden") || msg.contains("accessdenied") || 
msg.contains("permission"),
+            "Expected auth failure (forbidden/access denied) due to missing 
WRITE_FILESET privilege, but got: "
                 + ioe.getMessage());
       }
     } finally {
-      metalake.revokePrivilegesFromRole(
-          ROLE_NAME,
-          MetadataObjects.of(
-              ImmutableList.of(catalogName, schemaName, filesetName), 
MetadataObject.Type.FILESET),
-          ImmutableSet.of(Privileges.ReadFileset.allow()));
+      revokeRolePrivilege(filesetIdent, Privileges.ReadFileset.allow());
       if (originalUser != null) {
         System.setProperty("user.name", originalUser);
       }
@@ -263,7 +226,7 @@ public class 
GravitinoVirtualFileSystemS3CredentialAuthorizationIT extends BaseI
 
   @Test
   void testCredentialVendingWithWritePrivilegeAllowsReadAndWrite() throws 
IOException {
-    String filesetName = GravitinoITUtils.genRandomName("gvfs_auth_write");
+    String filesetName = GravitinoITUtils.genRandomName(providerPrefix() + 
"_auth_write");
     NameIdentifier filesetIdent = NameIdentifier.of(schemaName, filesetName);
     String storageLocation = genStorageLocation(filesetName);
 
@@ -275,12 +238,10 @@ public class 
GravitinoVirtualFileSystemS3CredentialAuthorizationIT extends BaseI
             "fileset comment",
             Fileset.Type.MANAGED,
             storageLocation,
-            ImmutableMap.of(
-                CredentialConstants.CREDENTIAL_PROVIDERS,
-                S3TokenCredential.S3_TOKEN_CREDENTIAL_TYPE));
+            filesetProperties());
 
     metalake.grantPrivilegesToRole(
-        ROLE_NAME,
+        roleName,
         MetadataObjects.of(
             ImmutableList.of(catalogName, schemaName, filesetName), 
MetadataObject.Type.FILESET),
         ImmutableList.of(Privileges.WriteFileset.allow()));
@@ -300,11 +261,7 @@ public class 
GravitinoVirtualFileSystemS3CredentialAuthorizationIT extends BaseI
         }
       }
     } finally {
-      metalake.revokePrivilegesFromRole(
-          ROLE_NAME,
-          MetadataObjects.of(
-              ImmutableList.of(catalogName, schemaName, filesetName), 
MetadataObject.Type.FILESET),
-          ImmutableSet.of(Privileges.WriteFileset.allow()));
+      revokeRolePrivilege(filesetIdent, Privileges.WriteFileset.allow());
       if (originalUser != null) {
         System.setProperty("user.name", originalUser);
       }
@@ -312,32 +269,46 @@ public class 
GravitinoVirtualFileSystemS3CredentialAuthorizationIT extends BaseI
     }
   }
 
-  private Configuration convertGvfsConfigToRealFileSystemConfig(Configuration 
gvfsConf) {
-    Configuration s3Conf = new Configuration();
-    Map<String, String> map = Maps.newHashMap();
-    gvfsConf.forEach(entry -> map.put(entry.getKey(), entry.getValue()));
-    map.put(S3Properties.GRAVITINO_S3_ACCESS_KEY_ID, S3_ACCESS_KEY);
-    map.put(S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY, S3_SECRET_KEY);
-    Map<String, String> hadoopConfMap =
-        FileSystemUtils.toHadoopConfigMap(map, 
S3FileSystemProvider.GRAVITINO_KEY_TO_S3_HADOOP_KEY);
-    hadoopConfMap.forEach(s3Conf::set);
-    return s3Conf;
+  protected void seedFile(String storageLocation, String fileName) throws 
IOException {
+    Path realPath = new Path(storageLocation);
+    try (FileSystem realFs =
+        realPath.getFileSystem(
+            convertGvfsConfigToRealFileSystemConfig(new 
Configuration(gvfsConf)))) {
+      realFs.mkdirs(realPath);
+      try (FSDataOutputStream out = realFs.create(new Path(realPath, 
fileName), true)) {
+        out.write("seed".getBytes(StandardCharsets.UTF_8));
+      }
+    }
   }
 
-  private String genStorageLocation(String fileset) {
-    return String.format("s3a://%s/%s", BUCKET_NAME, fileset);
+  protected void revokeRolePrivilege(NameIdentifier filesetIdent, Privilege 
privilege) {
+    metalake.revokePrivilegesFromRole(
+        roleName,
+        MetadataObjects.of(
+            ImmutableList.of(catalogName, schemaName, filesetIdent.name()),
+            MetadataObject.Type.FILESET),
+        ImmutableSet.of(privilege));
   }
 
-  private Path genGvfsPath(String fileset) {
-    return new Path(String.format("gvfs://fileset/%s/%s/%s", catalogName, 
schemaName, fileset));
+  protected Map<String, String> filesetProperties() {
+    return ImmutableMap.of(CredentialConstants.CREDENTIAL_PROVIDERS, 
credentialProviderType());
   }
 
-  private static boolean s3IsConfigured() {
-    return 
StringUtils.isNotBlank(System.getenv("S3_ACCESS_KEY_ID_FOR_CREDENTIAL"))
-        && 
StringUtils.isNotBlank(System.getenv("S3_SECRET_ACCESS_KEY_FOR_CREDENTIAL"))
-        && StringUtils.isNotBlank(System.getenv("S3_ENDPOINT_FOR_CREDENTIAL"))
-        && 
StringUtils.isNotBlank(System.getenv("S3_BUCKET_NAME_FOR_CREDENTIAL"))
-        && StringUtils.isNotBlank(System.getenv("S3_REGION_FOR_CREDENTIAL"))
-        && StringUtils.isNotBlank(System.getenv("S3_ROLE_ARN_FOR_CREDENTIAL"));
-  }
+  protected abstract String providerName();
+
+  protected abstract String providerBundleName();
+
+  protected abstract String credentialProviderType();
+
+  protected abstract Map<String, String> catalogBaseProperties();
+
+  protected abstract String genStorageLocation(String fileset);
+
+  protected abstract Path genGvfsPath(String fileset);
+
+  protected abstract Configuration 
convertGvfsConfigToRealFileSystemConfig(Configuration gvfsConf);
+
+  protected abstract String providerPrefix();
+
+  protected abstract String providerRoleName();
 }
diff --git 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/FileSystemGCSCredentialAuthorizationIT.java
 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/FileSystemGCSCredentialAuthorizationIT.java
new file mode 100644
index 0000000000..5bf78888ee
--- /dev/null
+++ 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/FileSystemGCSCredentialAuthorizationIT.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.filesystem.hadoop.integration.test;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
+import org.apache.gravitino.credential.GCSTokenCredential;
+import org.apache.gravitino.gcs.fs.GCSFileSystemProvider;
+import org.apache.gravitino.storage.GCSProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.condition.EnabledIf;
+import org.junit.platform.commons.util.StringUtils;
+
+@EnabledIf(value = "isGCPConfigured", disabledReason = "GCP is not configured")
+public class FileSystemGCSCredentialAuthorizationIT
+    extends AbstractFileSystemCredentialAuthorizationIT {
+  public static final String BUCKET_NAME = 
System.getenv("GCS_BUCKET_NAME_FOR_CREDENTIAL");
+  public static final String SERVICE_ACCOUNT_FILE =
+      System.getenv("GCS_SERVICE_ACCOUNT_JSON_PATH_FOR_CREDENTIAL");
+
+  @Override
+  protected String providerName() {
+    return "gcs";
+  }
+
+  @Override
+  protected String providerBundleName() {
+    return "gcp-bundle";
+  }
+
+  @Override
+  protected String credentialProviderType() {
+    return GCSTokenCredential.GCS_TOKEN_CREDENTIAL_TYPE;
+  }
+
+  @Override
+  protected Map<String, String> catalogBaseProperties() {
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(GCSProperties.GRAVITINO_GCS_SERVICE_ACCOUNT_FILE, 
SERVICE_ACCOUNT_FILE);
+    return properties;
+  }
+
+  @Override
+  protected String genStorageLocation(String fileset) {
+    return String.format("gs://%s/dir1/dir2/%s", BUCKET_NAME, fileset);
+  }
+
+  @Override
+  protected Path genGvfsPath(String fileset) {
+    return new Path(String.format("gvfs://fileset/%s/%s/%s", catalogName, 
schemaName, fileset));
+  }
+
+  @Override
+  protected Configuration 
convertGvfsConfigToRealFileSystemConfig(Configuration gvfsConf) {
+    Configuration gcsConf = new Configuration();
+    Map<String, String> map = Maps.newHashMap();
+    map.put(GCSProperties.GRAVITINO_GCS_SERVICE_ACCOUNT_FILE, 
SERVICE_ACCOUNT_FILE);
+    gvfsConf.forEach(entry -> map.put(entry.getKey(), entry.getValue()));
+    Map<String, String> hadoopConfMap =
+        FileSystemUtils.toHadoopConfigMap(
+            map, GCSFileSystemProvider.GRAVITINO_KEY_TO_GCS_HADOOP_KEY);
+    hadoopConfMap.forEach(gcsConf::set);
+    return gcsConf;
+  }
+
+  @Override
+  protected String providerPrefix() {
+    return "gvfs_gcs";
+  }
+
+  @Override
+  protected String providerRoleName() {
+    return "gvfs_gcs_credential_auth_role";
+  }
+
+  protected static boolean isGCPConfigured() {
+    return 
StringUtils.isNotBlank(System.getenv("GCS_SERVICE_ACCOUNT_JSON_PATH_FOR_CREDENTIAL"))
+        && 
StringUtils.isNotBlank(System.getenv("GCS_BUCKET_NAME_FOR_CREDENTIAL"));
+  }
+}
diff --git 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/FileSystemS3CredentialAuthorizationIT.java
 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/FileSystemS3CredentialAuthorizationIT.java
new file mode 100644
index 0000000000..9ba793a693
--- /dev/null
+++ 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/FileSystemS3CredentialAuthorizationIT.java
@@ -0,0 +1,121 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.filesystem.hadoop.integration.test;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
+import org.apache.gravitino.credential.S3TokenCredential;
+import org.apache.gravitino.s3.fs.S3FileSystemProvider;
+import org.apache.gravitino.storage.S3Properties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.condition.EnabledIf;
+import org.junit.platform.commons.util.StringUtils;
+
+/**
+ * GVFS integration test that verifies credential vending with authorization 
enabled on S3. <br>
+ * - READ_FILESET: can read via GVFS but cannot write. <br>
+ * - WRITE_FILESET: can read and write via GVFS.
+ */
+@EnabledIf(value = "s3IsConfigured", disabledReason = "s3 with credential is 
not prepared")
+public class FileSystemS3CredentialAuthorizationIT
+    extends AbstractFileSystemCredentialAuthorizationIT {
+
+  public static final String BUCKET_NAME = 
System.getenv("S3_BUCKET_NAME_FOR_CREDENTIAL");
+  public static final String S3_ACCESS_KEY = 
System.getenv("S3_ACCESS_KEY_ID_FOR_CREDENTIAL");
+  public static final String S3_SECRET_KEY = 
System.getenv("S3_SECRET_ACCESS_KEY_FOR_CREDENTIAL");
+  public static final String S3_ENDPOINT = 
System.getenv("S3_ENDPOINT_FOR_CREDENTIAL");
+  public static final String S3_REGION = 
System.getenv("S3_REGION_FOR_CREDENTIAL");
+  public static final String S3_ROLE_ARN = 
System.getenv("S3_ROLE_ARN_FOR_CREDENTIAL");
+
+  @Override
+  protected String providerName() {
+    return "s3";
+  }
+
+  @Override
+  protected String providerBundleName() {
+    return "aws-bundle";
+  }
+
+  @Override
+  protected String credentialProviderType() {
+    return S3TokenCredential.S3_TOKEN_CREDENTIAL_TYPE;
+  }
+
+  @Override
+  protected Map<String, String> catalogBaseProperties() {
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(S3Properties.GRAVITINO_S3_ACCESS_KEY_ID, S3_ACCESS_KEY);
+    properties.put(S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY, S3_SECRET_KEY);
+    properties.put(S3Properties.GRAVITINO_S3_ENDPOINT, S3_ENDPOINT);
+    properties.put(
+        "gravitino.bypass.fs.s3a.aws.credentials.provider",
+        "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
+    properties.put(S3Properties.GRAVITINO_S3_REGION, S3_REGION);
+    properties.put(S3Properties.GRAVITINO_S3_ROLE_ARN, S3_ROLE_ARN);
+    return properties;
+  }
+
+  @Override
+  protected String genStorageLocation(String fileset) {
+    return String.format("s3a://%s/%s", BUCKET_NAME, fileset);
+  }
+
+  @Override
+  protected Path genGvfsPath(String fileset) {
+    return new Path(String.format("gvfs://fileset/%s/%s/%s", catalogName, 
schemaName, fileset));
+  }
+
+  @Override
+  protected Configuration 
convertGvfsConfigToRealFileSystemConfig(Configuration gvfsConf) {
+    Configuration s3Conf = new Configuration();
+    Map<String, String> map = Maps.newHashMap();
+    gvfsConf.forEach(entry -> map.put(entry.getKey(), entry.getValue()));
+    map.put(S3Properties.GRAVITINO_S3_ACCESS_KEY_ID, S3_ACCESS_KEY);
+    map.put(S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY, S3_SECRET_KEY);
+    map.put(S3Properties.GRAVITINO_S3_REGION, S3_REGION);
+    map.put(S3Properties.GRAVITINO_S3_ENDPOINT, S3_ENDPOINT);
+    Map<String, String> hadoopConfMap =
+        FileSystemUtils.toHadoopConfigMap(map, 
S3FileSystemProvider.GRAVITINO_KEY_TO_S3_HADOOP_KEY);
+    hadoopConfMap.forEach(s3Conf::set);
+    return s3Conf;
+  }
+
+  @Override
+  protected String providerPrefix() {
+    return "gvfs_s3";
+  }
+
+  @Override
+  protected String providerRoleName() {
+    return "gvfs_s3_credential_auth_role";
+  }
+
+  protected static boolean s3IsConfigured() {
+    return 
StringUtils.isNotBlank(System.getenv("S3_ACCESS_KEY_ID_FOR_CREDENTIAL"))
+        && 
StringUtils.isNotBlank(System.getenv("S3_SECRET_ACCESS_KEY_FOR_CREDENTIAL"))
+        && StringUtils.isNotBlank(System.getenv("S3_ENDPOINT_FOR_CREDENTIAL"))
+        && 
StringUtils.isNotBlank(System.getenv("S3_BUCKET_NAME_FOR_CREDENTIAL"))
+        && StringUtils.isNotBlank(System.getenv("S3_REGION_FOR_CREDENTIAL"))
+        && StringUtils.isNotBlank(System.getenv("S3_ROLE_ARN_FOR_CREDENTIAL"));
+  }
+}
diff --git 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemGCSCredentialIT.java
 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemGCSCredentialIT.java
index fcf523feae..f8b062b7af 100644
--- 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemGCSCredentialIT.java
+++ 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemGCSCredentialIT.java
@@ -91,9 +91,7 @@ public class GravitinoVirtualFileSystemGCSCredentialIT 
extends GravitinoVirtualF
     conf.set("fs.gvfs.impl.disable.cache", "true");
     conf.set("fs.gravitino.server.uri", serverUri);
     conf.set("fs.gravitino.client.metalake", metalakeName);
-
-    // Pass this configuration to the real file system
-    conf.set(GCSProperties.GRAVITINO_GCS_SERVICE_ACCOUNT_FILE, 
SERVICE_ACCOUNT_FILE);
+    conf.set("fs.gravitino.enableCredentialVending", "true");
   }
 
   @AfterAll
@@ -124,6 +122,9 @@ public class GravitinoVirtualFileSystemGCSCredentialIT 
extends GravitinoVirtualF
     Configuration gcsConf = new Configuration();
     Map<String, String> map = Maps.newHashMap();
 
+    // Pass this configuration to the real file system
+    map.put(GCSProperties.GRAVITINO_GCS_SERVICE_ACCOUNT_FILE, 
SERVICE_ACCOUNT_FILE);
+
     gvfsConf.forEach(entry -> map.put(entry.getKey(), entry.getValue()));
 
     Map<String, String> hadoopConfMap =

Reply via email to