This is an automated email from the ASF dual-hosted git repository.

mchades pushed a commit to branch branch-1.3
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/branch-1.3 by this push:
     new c606d4f9cb [Cherry-pick to branch-1.3] [#11263] feat(authz): Extend 
credential vending to Hive/Iceberg/Glue/JDBC catalog types (#11264) (#11554)
c606d4f9cb is described below

commit c606d4f9cb03f51c9633038cdd97cedefdced0c0
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jun 10 18:01:33 2026 +0800

    [Cherry-pick to branch-1.3] [#11263] feat(authz): Extend credential vending 
to Hive/Iceberg/Glue/JDBC catalog types (#11264) (#11554)
    
    **Cherry-pick Information:**
    - Original commit: 90c5bbc96cbc7df678f4bc5e73429111e9c741e0
    - Target branch: `branch-1.3`
    - Status: ✅ Clean cherry-pick (no conflicts)
    
    Co-authored-by: Yuhui <[email protected]>
---
 .../gravitino/credential/JdbcCredential.java       |  3 +-
 catalogs/catalog-glue/build.gradle.kts             |  1 +
 .../apache/gravitino/catalog/glue/GlueCatalog.java | 31 +++++++
 .../glue/GlueCatalogPropertiesMetadata.java        |  4 +-
 .../hive/HiveCatalogPropertiesMetadata.java        |  8 ++
 .../catalog/hive/TestHiveCatalogOperations.java    |  2 +-
 .../apache/gravitino/catalog/jdbc/JdbcCatalog.java | 64 ++-------------
 .../catalog/jdbc/TestJdbcCatalogCredential.java    | 32 ++++++++
 .../catalog/lakehouse/iceberg/IcebergCatalog.java  | 35 +++-----
 .../iceberg/IcebergCatalogPropertiesMetadata.java  | 47 ++++-------
 .../catalog/lakehouse/paimon/PaimonCatalog.java    | 31 +------
 .../credential/CredentialPropertyUtils.java        | 85 +++++++++++++++++++
 .../cloud/storage/AzurePropertiesMetadata.java     | 52 ++++++++++++
 .../cloud/storage/GCSPropertiesMetadata.java       | 44 ++++++++++
 .../cloud/storage/OSSPropertiesMetadata.java       | 52 ++++++++++++
 .../cloud/storage/S3PropertiesMetadata.java        | 52 ++++++++++++
 .../apache/gravitino/connector/BaseCatalog.java    | 70 +++++++++++++---
 .../credential/JdbcCredentialProvider.java         |  2 +-
 .../credential/TestJdbcCredentialProvider.java     |  9 ++-
 .../flink/connector/hive/GravitinoHiveCatalog.java | 45 +++++++++++
 .../connector/iceberg/GravitinoIcebergCatalog.java | 46 +++++++++--
 .../connector/paimon/GravitinoPaimonCatalog.java   | 94 +++++++++++++++++++---
 .../iceberg/GravitinoIcebergCatalogFlink118.java   | 11 +--
 .../iceberg/GravitinoIcebergCatalogFlink119.java   | 10 +--
 .../iceberg/GravitinoIcebergCatalogFlink120.java   | 10 +--
 .../provider/DynamicIcebergConfigProvider.java     | 35 +++++++-
 spark-connector/spark-common/build.gradle.kts      |  4 +
 .../spark/connector/glue/GravitinoGlueCatalog.java | 61 +++++++++++++-
 .../spark/connector/hive/GravitinoHiveCatalog.java | 36 ++++++++-
 .../connector/iceberg/GravitinoIcebergCatalog.java |  3 +
 .../connector/paimon/GravitinoPaimonCatalog.java   |  3 +
 .../connector/glue/TestGravitinoGlueCatalog.java   | 42 ++++++++++
 .../catalog/glue/GlueConnectorAdapter.java         | 27 ++++---
 .../catalog/hive/HiveConnectorAdapter.java         | 17 ++++
 .../iceberg/IcebergCatalogPropertyConverter.java   | 30 +++++++
 .../catalog/iceberg/IcebergConnectorAdapter.java   |  6 +-
 .../catalog/glue/TestGlueConnectorAdapter.java     |  6 +-
 37 files changed, 880 insertions(+), 230 deletions(-)

diff --git 
a/api/src/main/java/org/apache/gravitino/credential/JdbcCredential.java 
b/api/src/main/java/org/apache/gravitino/credential/JdbcCredential.java
index af19110d7e..8c640bcf5b 100644
--- a/api/src/main/java/org/apache/gravitino/credential/JdbcCredential.java
+++ b/api/src/main/java/org/apache/gravitino/credential/JdbcCredential.java
@@ -107,8 +107,7 @@ public class JdbcCredential implements Credential {
 
   private void validate(String jdbcUser, String jdbcPassword, long 
expireTimeInMs) {
     Preconditions.checkArgument(StringUtils.isNotBlank(jdbcUser), "JDBC user 
should not be empty");
-    Preconditions.checkArgument(
-        StringUtils.isNotBlank(jdbcPassword), "JDBC password should not be 
empty");
+    Preconditions.checkArgument(jdbcPassword != null, "JDBC password should 
not be null");
     // JDBC credentials are static (no server-issued expiry). expireTimeInMs 
must always be 0.
     Preconditions.checkArgument(
         expireTimeInMs == 0, "The expiration time of JdbcCredential should be 
0");
diff --git a/catalogs/catalog-glue/build.gradle.kts 
b/catalogs/catalog-glue/build.gradle.kts
index da14ad95bb..477db09503 100644
--- a/catalogs/catalog-glue/build.gradle.kts
+++ b/catalogs/catalog-glue/build.gradle.kts
@@ -44,6 +44,7 @@ dependencies {
   implementation(libs.iceberg.api)
   implementation(libs.iceberg.aws)
   implementation(libs.iceberg.core)
+  runtimeOnly(project(":bundles:aws"))
   implementation(libs.slf4j.api)
 
   annotationProcessor(libs.lombok)
diff --git 
a/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueCatalog.java
 
b/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueCatalog.java
index 471bfccbc5..8222c0d71b 100644
--- 
a/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueCatalog.java
+++ 
b/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueCatalog.java
@@ -18,11 +18,14 @@
  */
 package org.apache.gravitino.catalog.glue;
 
+import java.util.List;
 import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.connector.BaseCatalog;
 import org.apache.gravitino.connector.CatalogOperations;
 import org.apache.gravitino.connector.PropertiesMetadata;
 import org.apache.gravitino.connector.capability.Capability;
+import org.apache.gravitino.storage.S3Properties;
 
 /**
  * Implementation of an AWS Glue Data Catalog connector in Apache Gravitino.
@@ -81,4 +84,32 @@ public class GlueCatalog extends BaseCatalog<GlueCatalog> {
   public PropertiesMetadata tablePropertiesMetadata() throws 
UnsupportedOperationException {
     return TABLE_PROPERTIES_METADATA;
   }
+
+  @Override
+  public Map<String, String> propertiesWithCredentialProviders() {
+    Map<String, String> props = super.propertiesWithCredentialProviders();
+    // super() skips addCatalogSpecificCredentialProviders() when 
credential-providers is already
+    // set, so the aws-* → s3-* key mapping never runs. Apply it 
unconditionally here so that
+    // S3SecretKeyProvider.initialize() can read s3-access-key-id regardless 
of how the catalog
+    // was configured.
+    String accessKeyId = props.get(GlueConstants.AWS_ACCESS_KEY_ID);
+    String secretAccessKey = props.get(GlueConstants.AWS_SECRET_ACCESS_KEY);
+    if (StringUtils.isNotBlank(accessKeyId) && 
StringUtils.isNotBlank(secretAccessKey)) {
+      props.putIfAbsent(S3Properties.GRAVITINO_S3_ACCESS_KEY_ID, accessKeyId);
+      props.putIfAbsent(S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY, 
secretAccessKey);
+    }
+    return props;
+  }
+
+  @Override
+  protected void addCatalogSpecificCredentialProviders(
+      Map<String, String> properties, List<String> credentialProviders) {
+    String accessKeyId = properties.get(GlueConstants.AWS_ACCESS_KEY_ID);
+    String secretAccessKey = 
properties.get(GlueConstants.AWS_SECRET_ACCESS_KEY);
+    if (StringUtils.isNotBlank(accessKeyId) && 
StringUtils.isNotBlank(secretAccessKey)) {
+      properties.putIfAbsent(S3Properties.GRAVITINO_S3_ACCESS_KEY_ID, 
accessKeyId);
+      properties.putIfAbsent(S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY, 
secretAccessKey);
+    }
+    addStorageCredentialProviders(properties, credentialProviders);
+  }
 }
diff --git 
a/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueCatalogPropertiesMetadata.java
 
b/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueCatalogPropertiesMetadata.java
index 16c5bf5b3c..47e5bf31c6 100644
--- 
a/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueCatalogPropertiesMetadata.java
+++ 
b/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueCatalogPropertiesMetadata.java
@@ -65,7 +65,7 @@ public class GlueCatalogPropertiesMetadata extends 
BaseCatalogPropertiesMetadata
                       + " When omitted the default credential chain is used.",
                   false /* immutable */,
                   null /* defaultValue */,
-                  false /* hidden */))
+                  true /* hidden */))
           .put(
               AWS_SECRET_ACCESS_KEY,
               stringOptionalPropertyEntry(
@@ -74,7 +74,7 @@ public class GlueCatalogPropertiesMetadata extends 
BaseCatalogPropertiesMetadata
                       + " When omitted the default credential chain is used.",
                   false /* immutable */,
                   null /* defaultValue */,
-                  false /* hidden */))
+                  true /* hidden */))
           .put(
               AWS_GLUE_ENDPOINT,
               stringOptionalPropertyEntry(
diff --git 
a/catalogs/catalog-hive/src/main/java/org/apache/gravitino/catalog/hive/HiveCatalogPropertiesMetadata.java
 
b/catalogs/catalog-hive/src/main/java/org/apache/gravitino/catalog/hive/HiveCatalogPropertiesMetadata.java
index 2577bdf944..754da95f78 100644
--- 
a/catalogs/catalog-hive/src/main/java/org/apache/gravitino/catalog/hive/HiveCatalogPropertiesMetadata.java
+++ 
b/catalogs/catalog-hive/src/main/java/org/apache/gravitino/catalog/hive/HiveCatalogPropertiesMetadata.java
@@ -23,6 +23,10 @@ import static 
org.apache.gravitino.catalog.hive.HiveConstants.HIVE_DEFAULT_CATAL
 
 import com.google.common.collect.ImmutableMap;
 import java.util.Map;
+import org.apache.gravitino.cloud.storage.AzurePropertiesMetadata;
+import org.apache.gravitino.cloud.storage.GCSPropertiesMetadata;
+import org.apache.gravitino.cloud.storage.OSSPropertiesMetadata;
+import org.apache.gravitino.cloud.storage.S3PropertiesMetadata;
 import org.apache.gravitino.connector.BaseCatalogPropertiesMetadata;
 import org.apache.gravitino.connector.PropertyEntry;
 import org.apache.gravitino.hive.ClientPropertiesMetadata;
@@ -123,6 +127,10 @@ public class HiveCatalogPropertiesMetadata extends 
BaseCatalogPropertiesMetadata
                   DEFAULT_LIST_ALL_TABLES,
                   false /* hidden */,
                   false /* reserved */))
+          .putAll(S3PropertiesMetadata.PROPERTY_ENTRIES)
+          .putAll(OSSPropertiesMetadata.PROPERTY_ENTRIES)
+          .putAll(AzurePropertiesMetadata.PROPERTY_ENTRIES)
+          .putAll(GCSPropertiesMetadata.PROPERTY_ENTRIES)
           .putAll(CLIENT_PROPERTIES_METADATA.propertyEntries())
           .build();
 
diff --git 
a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/TestHiveCatalogOperations.java
 
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/TestHiveCatalogOperations.java
index fbc1e8c54b..e0b600fb69 100644
--- 
a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/TestHiveCatalogOperations.java
+++ 
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/TestHiveCatalogOperations.java
@@ -88,7 +88,7 @@ class TestHiveCatalogOperations {
     Map<String, PropertyEntry<?>> propertyEntryMap =
         HIVE_PROPERTIES_METADATA.catalogPropertiesMetadata().propertyEntries();
 
-    Assertions.assertEquals(18, propertyEntryMap.size());
+    Assertions.assertEquals(25, propertyEntryMap.size());
     Assertions.assertTrue(propertyEntryMap.containsKey(METASTORE_URIS));
     
Assertions.assertTrue(propertyEntryMap.containsKey(Catalog.PROPERTY_PACKAGE));
     
Assertions.assertTrue(propertyEntryMap.containsKey(BaseCatalog.CATALOG_OPERATION_IMPL));
diff --git 
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/JdbcCatalog.java
 
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/JdbcCatalog.java
index 1f6492d90b..55f4b957ed 100644
--- 
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/JdbcCatalog.java
+++ 
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/JdbcCatalog.java
@@ -18,14 +18,10 @@
  */
 package org.apache.gravitino.catalog.jdbc;
 
-import com.google.common.collect.Maps;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.gravitino.Config;
-import org.apache.gravitino.Configs;
-import org.apache.gravitino.GravitinoEnv;
-import org.apache.gravitino.annotation.Evolving;
 import org.apache.gravitino.catalog.jdbc.config.JdbcConfig;
 import 
org.apache.gravitino.catalog.jdbc.converter.JdbcColumnDefaultValueConverter;
 import org.apache.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
@@ -37,7 +33,6 @@ import org.apache.gravitino.connector.CatalogOperations;
 import org.apache.gravitino.connector.PropertiesMetadata;
 import org.apache.gravitino.connector.PropertyEntry;
 import org.apache.gravitino.connector.capability.Capability;
-import org.apache.gravitino.credential.CredentialConstants;
 import org.apache.gravitino.credential.JdbcCredential;
 
 /** Implementation of an Jdbc catalog in Gravitino. */
@@ -124,61 +119,12 @@ public abstract class JdbcCatalog extends 
BaseCatalog<JdbcCatalog> {
   }
 
   @Override
-  @Evolving
-  public Map<String, String> propertiesWithCredentialProviders() {
-    // Use raw entity properties so that hidden credentials 
(jdbc-user/jdbc-password) are visible
-    // to the credential manager even after they are marked hidden in the 
properties metadata.
-    Map<String, String> properties = Maps.newHashMap(entity().getProperties());
-    return applyDefaultCredentialProviders(properties);
-  }
-
-  /**
-   * Returns catalog properties, optionally re-adding hidden JDBC credentials 
for backward
-   * compatibility with connectors that do not support credential vending. The 
backfill behavior is
-   * controlled by the server-level config {@code
-   * gravitino.catalog.credential.backfillToProperties}; it is disabled by 
default and should only
-   * be enabled during rolling upgrades.
-   *
-   * @return the catalog properties map, with credentials backfilled if the 
server config is set
-   */
-  @Override
-  public Map<String, String> properties() {
-    Map<String, String> props = super.properties();
-    if (!shouldBackfillCredential()) {
-      return props;
-    }
-    // Backfill hidden credentials for backward compatibility with connectors 
that do not support
-    // credential vending.
-    Map<String, String> rawProps = entity().getProperties();
-    Map<String, String> result = Maps.newHashMap(props);
-    String user = rawProps.get(JdbcConfig.USERNAME.getKey());
-    String password = rawProps.get(JdbcConfig.PASSWORD.getKey());
-    if (user != null) {
-      result.put(JdbcConfig.USERNAME.getKey(), user);
-    }
-    if (password != null) {
-      result.put(JdbcConfig.PASSWORD.getKey(), password);
-    }
-    return result;
-  }
-
-  private boolean shouldBackfillCredential() {
-    Config serverConfig = GravitinoEnv.getInstance().config();
-    return serverConfig != null
-        && serverConfig.get(Configs.CATALOG_CREDENTIAL_BACKFILL_TO_PROPERTIES);
-  }
-
-  private Map<String, String> applyDefaultCredentialProviders(Map<String, 
String> properties) {
-    if 
(StringUtils.isNotBlank(properties.get(CredentialConstants.CREDENTIAL_PROVIDERS)))
 {
-      return properties;
-    }
-
+  protected void addCatalogSpecificCredentialProviders(
+      Map<String, String> properties, List<String> credentialProviders) {
     String jdbcUser = properties.get(JdbcConfig.USERNAME.getKey());
     String jdbcPassword = properties.get(JdbcConfig.PASSWORD.getKey());
-    if (StringUtils.isNotBlank(jdbcUser) && 
StringUtils.isNotBlank(jdbcPassword)) {
-      properties.put(CredentialConstants.CREDENTIAL_PROVIDERS, 
JdbcCredential.JDBC_CREDENTIAL_TYPE);
+    if (StringUtils.isNotBlank(jdbcUser) && jdbcPassword != null) {
+      credentialProviders.add(JdbcCredential.JDBC_CREDENTIAL_TYPE);
     }
-
-    return properties;
   }
 }
diff --git 
a/catalogs/catalog-jdbc-common/src/test/java/org/apache/gravitino/catalog/jdbc/TestJdbcCatalogCredential.java
 
b/catalogs/catalog-jdbc-common/src/test/java/org/apache/gravitino/catalog/jdbc/TestJdbcCatalogCredential.java
index 3e81356b71..cedf2e6ee5 100644
--- 
a/catalogs/catalog-jdbc-common/src/test/java/org/apache/gravitino/catalog/jdbc/TestJdbcCatalogCredential.java
+++ 
b/catalogs/catalog-jdbc-common/src/test/java/org/apache/gravitino/catalog/jdbc/TestJdbcCatalogCredential.java
@@ -109,6 +109,38 @@ public class TestJdbcCatalogCredential {
     Assertions.assertEquals(JdbcCredential.JDBC_CREDENTIAL_TYPE, 
credentialProviders);
   }
 
+  @Test
+  void testJdbcCatalogDefaultCredentialProvidersWithEmptyPassword() {
+    AuditInfo auditInfo =
+        
AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build();
+
+    // Empty-string password (e.g. StarRocks default) should still register 
the credential provider
+    Map<String, String> jdbcProps = Maps.newHashMap();
+    jdbcProps.put(JdbcConfig.JDBC_URL.getKey(), 
"jdbc:mysql://localhost:9030/");
+    jdbcProps.put(JdbcConfig.JDBC_DRIVER.getKey(), "com.mysql.cj.jdbc.Driver");
+    jdbcProps.put(JdbcConfig.USERNAME.getKey(), "root");
+    jdbcProps.put(JdbcConfig.PASSWORD.getKey(), "");
+
+    CatalogEntity jdbcEntity =
+        CatalogEntity.builder()
+            .withId(7L)
+            .withName("jdbc-catalog-empty-password")
+            .withNamespace(Namespace.of("metalake"))
+            .withType(TestableJdbcCatalog.Type.RELATIONAL)
+            .withProvider("jdbc-starrocks")
+            .withAuditInfo(auditInfo)
+            .withProperties(jdbcProps)
+            .build();
+
+    TestableJdbcCatalog jdbcCatalog = new TestableJdbcCatalog();
+    jdbcCatalog.withCatalogConf(jdbcProps).withCatalogEntity(jdbcEntity);
+    Map<String, String> properties = 
jdbcCatalog.propertiesWithCredentialProviders();
+
+    String credentialProviders = 
properties.get(CredentialConstants.CREDENTIAL_PROVIDERS);
+    Assertions.assertNotNull(credentialProviders);
+    Assertions.assertEquals(JdbcCredential.JDBC_CREDENTIAL_TYPE, 
credentialProviders);
+  }
+
   @Test
   void testJdbcCatalogNoCredentialProvidersWithoutPassword() {
     AuditInfo auditInfo =
diff --git 
a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalog.java
 
b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalog.java
index 6b6b375596..f7dc4acc06 100644
--- 
a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalog.java
+++ 
b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalog.java
@@ -18,8 +18,6 @@
  */
 package org.apache.gravitino.catalog.lakehouse.iceberg;
 
-import com.google.common.collect.Maps;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
@@ -93,40 +91,25 @@ public class IcebergCatalog extends 
BaseCatalog<IcebergCatalog> {
   @Override
   @Evolving
   public Map<String, String> propertiesWithCredentialProviders() {
-    Map<String, String> properties = 
Maps.newHashMap(super.propertiesWithCredentialProviders());
-    // Iceberg is security-first: the vended s3:ListBucket statement keeps the 
bare location prefix
-    // disabled so a credential cannot enumerate sibling keys sharing the 
location prefix. This is
-    // determined by the catalog type and is not meant to be configured by 
users.
-    properties.put(CredentialConstants.S3_CREDENTIAL_LIST_LOCATION_PREFIX, 
"false");
-    return applyDefaultCredentialProviders(properties);
+    Map<String, String> props = super.propertiesWithCredentialProviders();
+    // Iceberg is security-first: disable s3:ListBucket on bare location 
prefix so a vended
+    // credential cannot enumerate sibling keys. This is catalog-type policy, 
not user-configurable.
+    props.put(CredentialConstants.S3_CREDENTIAL_LIST_LOCATION_PREFIX, "false");
+    return props;
   }
 
-  private Map<String, String> applyDefaultCredentialProviders(Map<String, 
String> properties) {
-    // If credential providers already set, return as is
-    if 
(StringUtils.isNotBlank(properties.get(CredentialConstants.CREDENTIAL_PROVIDERS)))
 {
-      return properties;
-    }
-
-    List<String> credentialProviders = new ArrayList<>();
-
-    // Add JDBC credential provider if backend is JDBC and 
jdbc-user/jdbc-password are set
+  @Override
+  protected void addCatalogSpecificCredentialProviders(
+      Map<String, String> properties, List<String> credentialProviders) {
     String catalogBackend = properties.get(IcebergConstants.CATALOG_BACKEND);
     if (catalogBackend != null
         && IcebergCatalogBackend.JDBC.name().equalsIgnoreCase(catalogBackend)) 
{
       String jdbcUser = properties.get(IcebergConstants.GRAVITINO_JDBC_USER);
       String jdbcPassword = 
properties.get(IcebergConstants.GRAVITINO_JDBC_PASSWORD);
-      if (StringUtils.isNotBlank(jdbcUser) && 
StringUtils.isNotBlank(jdbcPassword)) {
+      if (StringUtils.isNotBlank(jdbcUser) && jdbcPassword != null) {
         credentialProviders.add(JdbcCredential.JDBC_CREDENTIAL_TYPE);
       }
     }
-
     addStorageCredentialProviders(properties, credentialProviders);
-
-    if (!credentialProviders.isEmpty()) {
-      properties.put(
-          CredentialConstants.CREDENTIAL_PROVIDERS, String.join(",", 
credentialProviders));
-    }
-
-    return properties;
   }
 }
diff --git 
a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogPropertiesMetadata.java
 
b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogPropertiesMetadata.java
index 2f7900ce8b..7b1466a341 100644
--- 
a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogPropertiesMetadata.java
+++ 
b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogPropertiesMetadata.java
@@ -29,14 +29,15 @@ import com.google.common.collect.Maps;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.gravitino.cloud.storage.AzurePropertiesMetadata;
+import org.apache.gravitino.cloud.storage.GCSPropertiesMetadata;
+import org.apache.gravitino.cloud.storage.OSSPropertiesMetadata;
+import org.apache.gravitino.cloud.storage.S3PropertiesMetadata;
 import org.apache.gravitino.connector.BaseCatalogPropertiesMetadata;
 import org.apache.gravitino.connector.PropertyEntry;
 import org.apache.gravitino.iceberg.common.authentication.AuthenticationConfig;
 import 
org.apache.gravitino.iceberg.common.authentication.kerberos.KerberosConfig;
 import org.apache.gravitino.iceberg.common.cache.LocalTableMetadataCache;
-import org.apache.gravitino.storage.AzureProperties;
-import org.apache.gravitino.storage.OSSProperties;
-import org.apache.gravitino.storage.S3Properties;
 
 public class IcebergCatalogPropertiesMetadata extends 
BaseCatalogPropertiesMetadata {
   public static final String CATALOG_BACKEND = 
IcebergConstants.CATALOG_BACKEND;
@@ -89,41 +90,17 @@ public class IcebergCatalogPropertiesMetadata extends 
BaseCatalogPropertiesMetad
                 null /* defaultValue */,
                 false /* hidden */),
             stringOptionalPropertyEntry(
-                S3Properties.GRAVITINO_S3_ACCESS_KEY_ID,
-                "s3 access key ID",
+                GRAVITINO_JDBC_USER,
+                "JDBC user for Iceberg JDBC backend",
                 false /* immutable */,
                 null /* defaultValue */,
-                false /* hidden */),
-            stringOptionalPropertyEntry(
-                S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY,
-                "s3 secret access key",
-                false /* immutable */,
-                null /* defaultValue */,
-                false /* hidden */),
-            stringOptionalPropertyEntry(
-                OSSProperties.GRAVITINO_OSS_ACCESS_KEY_ID,
-                "OSS access key ID",
-                false /* immutable */,
-                null /* defaultValue */,
-                false /* hidden */),
+                true /* hidden */),
             stringOptionalPropertyEntry(
-                OSSProperties.GRAVITINO_OSS_ACCESS_KEY_SECRET,
-                "OSS access key secret",
+                GRAVITINO_JDBC_PASSWORD,
+                "JDBC password for Iceberg JDBC backend",
                 false /* immutable */,
                 null /* defaultValue */,
-                false /* hidden */),
-            stringOptionalPropertyEntry(
-                AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_NAME,
-                "Azure storage account name",
-                false /* immutable */,
-                null /* defaultValue */,
-                false /* hidden */),
-            stringOptionalPropertyEntry(
-                AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_KEY,
-                "Azure storage account key",
-                false /* immutable */,
-                null /* defaultValue */,
-                false /* hidden */),
+                true /* hidden */),
             stringOptionalPropertyEntry(
                 IcebergConstants.TABLE_METADATA_CACHE_IMPL,
                 "Table metadata cache implementation. Set to empty 
string(\"\") if "
@@ -165,6 +142,10 @@ public class IcebergCatalogPropertiesMetadata extends 
BaseCatalogPropertiesMetad
                 false /* hidden */));
     HashMap<String, PropertyEntry<?>> result = Maps.newHashMap();
     result.putAll(Maps.uniqueIndex(propertyEntries, PropertyEntry::getName));
+    result.putAll(S3PropertiesMetadata.PROPERTY_ENTRIES);
+    result.putAll(OSSPropertiesMetadata.PROPERTY_ENTRIES);
+    result.putAll(AzurePropertiesMetadata.PROPERTY_ENTRIES);
+    result.putAll(GCSPropertiesMetadata.PROPERTY_ENTRIES);
     result.putAll(KerberosConfig.KERBEROS_PROPERTY_ENTRIES);
     result.putAll(AuthenticationConfig.AUTHENTICATION_PROPERTY_ENTRIES);
     PROPERTIES_METADATA = ImmutableMap.copyOf(result);
diff --git 
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalog.java
 
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalog.java
index 34c327bc41..3bf8d7e244 100644
--- 
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalog.java
+++ 
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalog.java
@@ -18,18 +18,14 @@
  */
 package org.apache.gravitino.catalog.lakehouse.paimon;
 
-import com.google.common.collect.Maps;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Catalog;
-import org.apache.gravitino.annotation.Evolving;
 import org.apache.gravitino.connector.BaseCatalog;
 import org.apache.gravitino.connector.CatalogOperations;
 import org.apache.gravitino.connector.PropertiesMetadata;
 import org.apache.gravitino.connector.capability.Capability;
-import org.apache.gravitino.credential.CredentialConstants;
 import org.apache.gravitino.credential.JdbcCredential;
 import org.apache.gravitino.rel.ViewCatalog;
 
@@ -92,38 +88,17 @@ public class PaimonCatalog extends 
BaseCatalog<PaimonCatalog> {
   }
 
   @Override
-  @Evolving
-  public Map<String, String> propertiesWithCredentialProviders() {
-    Map<String, String> properties = 
Maps.newHashMap(super.propertiesWithCredentialProviders());
-    return applyDefaultCredentialProviders(properties);
-  }
-
-  private Map<String, String> applyDefaultCredentialProviders(Map<String, 
String> properties) {
-    // If credential providers already set, return as is
-    if 
(StringUtils.isNotBlank(properties.get(CredentialConstants.CREDENTIAL_PROVIDERS)))
 {
-      return properties;
-    }
-
-    List<String> credentialProviders = new ArrayList<>();
-
-    // Add JDBC credential provider if backend is JDBC and 
jdbc-user/jdbc-password are set
+  protected void addCatalogSpecificCredentialProviders(
+      Map<String, String> properties, List<String> credentialProviders) {
     String catalogBackend = properties.get(PaimonConstants.CATALOG_BACKEND);
     if (catalogBackend != null
         && PaimonCatalogBackend.JDBC.name().equalsIgnoreCase(catalogBackend)) {
       String jdbcUser = properties.get(PaimonConstants.GRAVITINO_JDBC_USER);
       String jdbcPassword = 
properties.get(PaimonConstants.GRAVITINO_JDBC_PASSWORD);
-      if (StringUtils.isNotBlank(jdbcUser) && 
StringUtils.isNotBlank(jdbcPassword)) {
+      if (StringUtils.isNotBlank(jdbcUser) && jdbcPassword != null) {
         credentialProviders.add(JdbcCredential.JDBC_CREDENTIAL_TYPE);
       }
     }
-
     addStorageCredentialProviders(properties, credentialProviders);
-
-    if (!credentialProviders.isEmpty()) {
-      properties.put(
-          CredentialConstants.CREDENTIAL_PROVIDERS, String.join(",", 
credentialProviders));
-    }
-
-    return properties;
   }
 }
diff --git 
a/common/src/main/java/org/apache/gravitino/credential/CredentialPropertyUtils.java
 
b/common/src/main/java/org/apache/gravitino/credential/CredentialPropertyUtils.java
index d9543b93d3..734c991d28 100644
--- 
a/common/src/main/java/org/apache/gravitino/credential/CredentialPropertyUtils.java
+++ 
b/common/src/main/java/org/apache/gravitino/credential/CredentialPropertyUtils.java
@@ -27,12 +27,17 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import org.apache.gravitino.Catalog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Helper class to generate specific credential properties for different table 
format and engine.
  */
 public class CredentialPropertyUtils {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(CredentialPropertyUtils.class);
+
   @VisibleForTesting static final String ICEBERG_S3_ACCESS_KEY_ID = 
"s3.access-key-id";
   @VisibleForTesting static final String ICEBERG_S3_SECRET_ACCESS_KEY = 
"s3.secret-access-key";
   @VisibleForTesting static final String ICEBERG_S3_TOKEN = "s3.session-token";
@@ -76,6 +81,14 @@ public class CredentialPropertyUtils {
   static final String ICEBERG_GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT =
       "gcs.oauth2.refresh-credentials-endpoint";
 
+  @VisibleForTesting static final String ICEBERG_JDBC_USER = "jdbc.user";
+  @VisibleForTesting static final String ICEBERG_JDBC_PASSWORD = 
"jdbc.password";
+
+  private static final String PAIMON_S3_ACCESS_KEY = "s3.access-key";
+  private static final String PAIMON_S3_SECRET_KEY = "s3.secret-key";
+  private static final String PAIMON_OSS_ACCESS_KEY_ID = "fs.oss.accessKeyId";
+  private static final String PAIMON_OSS_ACCESS_KEY_SECRET = 
"fs.oss.accessKeySecret";
+
   private static Map<String, String> icebergCredentialPropertyMap =
       ImmutableMap.<String, String>builder()
           .put(GCSTokenCredential.GCS_TOKEN_NAME, ICEBERG_GCS_TOKEN)
@@ -100,6 +113,78 @@ public class CredentialPropertyUtils {
           .put(AwsIrsaCredential.SESSION_TOKEN, ICEBERG_S3_TOKEN)
           .build();
 
+  /**
+   * Returns credentials from the given catalog, or an empty array if the 
catalog does not support
+   * credential vending.
+   *
+   * @param catalog the Gravitino catalog client
+   * @return credentials, never null
+   */
+  public static Credential[] getCredentials(Catalog catalog) {
+    try {
+      return catalog.supportsCredentials().getCredentials();
+    } catch (UnsupportedOperationException e) {
+      LOG.debug("Catalog does not support credential vending, skipping 
injection", e);
+      return new Credential[0];
+    }
+  }
+
+  /**
+   * Injects vended credentials into Iceberg catalog properties. JDBC 
credentials are applied
+   * directly; all other credential types are converted via {@link
+   * #toIcebergProperties(Credential)}.
+   *
+   * @param credentials the credentials to apply
+   * @param props the mutable properties map to update
+   */
+  public static void applyIcebergCredentials(Credential[] credentials, 
Map<String, String> props) {
+    for (Credential credential : credentials) {
+      if (credential instanceof JdbcCredential) {
+        JdbcCredential jdbc = (JdbcCredential) credential;
+        props.put(ICEBERG_JDBC_USER, jdbc.jdbcUser());
+        props.put(ICEBERG_JDBC_PASSWORD, jdbc.jdbcPassword());
+      } else {
+        Map<String, String> converted = toIcebergProperties(credential);
+        if (converted.isEmpty()) {
+          LOG.warn(
+              "Received unrecognized credential type '{}' for Iceberg catalog, 
skipping",
+              credential.getClass().getName());
+        } else {
+          props.putAll(converted);
+        }
+      }
+    }
+  }
+
+  /**
+   * Injects vended credentials into Paimon catalog properties. Supports JDBC, 
S3, and OSS
+   * credential types.
+   *
+   * @param credentials the credentials to apply
+   * @param props the mutable properties map to update
+   */
+  public static void applyPaimonCredentials(Credential[] credentials, 
Map<String, String> props) {
+    for (Credential credential : credentials) {
+      if (credential instanceof JdbcCredential) {
+        JdbcCredential jdbc = (JdbcCredential) credential;
+        props.put(ICEBERG_JDBC_USER, jdbc.jdbcUser());
+        props.put(ICEBERG_JDBC_PASSWORD, jdbc.jdbcPassword());
+      } else if (credential instanceof S3SecretKeyCredential) {
+        S3SecretKeyCredential s3 = (S3SecretKeyCredential) credential;
+        props.put(PAIMON_S3_ACCESS_KEY, s3.accessKeyId());
+        props.put(PAIMON_S3_SECRET_KEY, s3.secretAccessKey());
+      } else if (credential instanceof OSSSecretKeyCredential) {
+        OSSSecretKeyCredential oss = (OSSSecretKeyCredential) credential;
+        props.put(PAIMON_OSS_ACCESS_KEY_ID, oss.accessKeyId());
+        props.put(PAIMON_OSS_ACCESS_KEY_SECRET, oss.secretAccessKey());
+      } else {
+        LOG.warn(
+            "Received unrecognized credential type '{}' for Paimon catalog, 
skipping",
+            credential.getClass().getName());
+      }
+    }
+  }
+
   /**
    * Transforms a specific credential into a map of Iceberg properties.
    *
diff --git 
a/core/src/main/java/org/apache/gravitino/cloud/storage/AzurePropertiesMetadata.java
 
b/core/src/main/java/org/apache/gravitino/cloud/storage/AzurePropertiesMetadata.java
new file mode 100644
index 0000000000..c6adb5a2d3
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/cloud/storage/AzurePropertiesMetadata.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.cloud.storage;
+
+import static 
org.apache.gravitino.connector.PropertyEntry.stringOptionalPropertyEntry;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.connector.PropertyEntry;
+import org.apache.gravitino.storage.AzureProperties;
+
+/** Shared Azure credential {@link PropertyEntry} definitions for catalog 
properties metadata. */
+public class AzurePropertiesMetadata {
+
+  public static final Map<String, PropertyEntry<?>> PROPERTY_ENTRIES =
+      ImmutableMap.<String, PropertyEntry<?>>builder()
+          .put(
+              AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_NAME,
+              stringOptionalPropertyEntry(
+                  AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_NAME,
+                  "Azure storage account name",
+                  false /* immutable */,
+                  null /* defaultValue */,
+                  true /* hidden */))
+          .put(
+              AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_KEY,
+              stringOptionalPropertyEntry(
+                  AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_KEY,
+                  "Azure storage account key",
+                  false /* immutable */,
+                  null /* defaultValue */,
+                  true /* hidden */))
+          .build();
+
+  private AzurePropertiesMetadata() {}
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/cloud/storage/GCSPropertiesMetadata.java
 
b/core/src/main/java/org/apache/gravitino/cloud/storage/GCSPropertiesMetadata.java
new file mode 100644
index 0000000000..36a66e0972
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/cloud/storage/GCSPropertiesMetadata.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.cloud.storage;
+
+import static 
org.apache.gravitino.connector.PropertyEntry.stringOptionalPropertyEntry;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.connector.PropertyEntry;
+import org.apache.gravitino.storage.GCSProperties;
+
+/** Shared GCS credential {@link PropertyEntry} definitions for catalog 
properties metadata. */
+public class GCSPropertiesMetadata {
+
+  public static final Map<String, PropertyEntry<?>> PROPERTY_ENTRIES =
+      ImmutableMap.<String, PropertyEntry<?>>builder()
+          .put(
+              GCSProperties.GRAVITINO_GCS_SERVICE_ACCOUNT_FILE,
+              stringOptionalPropertyEntry(
+                  GCSProperties.GRAVITINO_GCS_SERVICE_ACCOUNT_FILE,
+                  "GCS service account file path",
+                  false /* immutable */,
+                  null /* defaultValue */,
+                  false /* hidden */))
+          .build();
+
+  private GCSPropertiesMetadata() {}
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/cloud/storage/OSSPropertiesMetadata.java
 
b/core/src/main/java/org/apache/gravitino/cloud/storage/OSSPropertiesMetadata.java
new file mode 100644
index 0000000000..43e1897786
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/cloud/storage/OSSPropertiesMetadata.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.cloud.storage;
+
+import static 
org.apache.gravitino.connector.PropertyEntry.stringOptionalPropertyEntry;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.connector.PropertyEntry;
+import org.apache.gravitino.storage.OSSProperties;
+
+/** Shared OSS credential {@link PropertyEntry} definitions for catalog 
properties metadata. */
+public class OSSPropertiesMetadata {
+
+  public static final Map<String, PropertyEntry<?>> PROPERTY_ENTRIES =
+      ImmutableMap.<String, PropertyEntry<?>>builder()
+          .put(
+              OSSProperties.GRAVITINO_OSS_ACCESS_KEY_ID,
+              stringOptionalPropertyEntry(
+                  OSSProperties.GRAVITINO_OSS_ACCESS_KEY_ID,
+                  "OSS access key ID",
+                  false /* immutable */,
+                  null /* defaultValue */,
+                  true /* hidden */))
+          .put(
+              OSSProperties.GRAVITINO_OSS_ACCESS_KEY_SECRET,
+              stringOptionalPropertyEntry(
+                  OSSProperties.GRAVITINO_OSS_ACCESS_KEY_SECRET,
+                  "OSS access key secret",
+                  false /* immutable */,
+                  null /* defaultValue */,
+                  true /* hidden */))
+          .build();
+
+  private OSSPropertiesMetadata() {}
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/cloud/storage/S3PropertiesMetadata.java
 
b/core/src/main/java/org/apache/gravitino/cloud/storage/S3PropertiesMetadata.java
new file mode 100644
index 0000000000..8879d4b551
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/cloud/storage/S3PropertiesMetadata.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.cloud.storage;
+
+import static 
org.apache.gravitino.connector.PropertyEntry.stringOptionalPropertyEntry;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.connector.PropertyEntry;
+import org.apache.gravitino.storage.S3Properties;
+
+/** Shared S3 credential {@link PropertyEntry} definitions for catalog 
properties metadata. */
+public class S3PropertiesMetadata {
+
+  public static final Map<String, PropertyEntry<?>> PROPERTY_ENTRIES =
+      ImmutableMap.<String, PropertyEntry<?>>builder()
+          .put(
+              S3Properties.GRAVITINO_S3_ACCESS_KEY_ID,
+              stringOptionalPropertyEntry(
+                  S3Properties.GRAVITINO_S3_ACCESS_KEY_ID,
+                  "S3 access key ID",
+                  false /* immutable */,
+                  null /* defaultValue */,
+                  true /* hidden */))
+          .put(
+              S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY,
+              stringOptionalPropertyEntry(
+                  S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY,
+                  "S3 secret access key",
+                  false /* immutable */,
+                  null /* defaultValue */,
+                  true /* hidden */))
+          .build();
+
+  private S3PropertiesMetadata() {}
+}
diff --git a/core/src/main/java/org/apache/gravitino/connector/BaseCatalog.java 
b/core/src/main/java/org/apache/gravitino/connector/BaseCatalog.java
index 947474008f..4ee0b9c794 100644
--- a/core/src/main/java/org/apache/gravitino/connector/BaseCatalog.java
+++ b/core/src/main/java/org/apache/gravitino/connector/BaseCatalog.java
@@ -24,6 +24,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -31,18 +32,24 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Audit;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.CatalogProvider;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.annotation.Evolving;
 import org.apache.gravitino.connector.authorization.AuthorizationPlugin;
 import org.apache.gravitino.connector.authorization.BaseAuthorization;
 import org.apache.gravitino.connector.capability.Capability;
 import org.apache.gravitino.credential.AzureAccountKeyCredential;
 import org.apache.gravitino.credential.CatalogCredentialManager;
+import org.apache.gravitino.credential.CredentialConstants;
+import org.apache.gravitino.credential.GCSTokenCredential;
 import org.apache.gravitino.credential.OSSSecretKeyCredential;
 import org.apache.gravitino.credential.S3SecretKeyCredential;
 import org.apache.gravitino.exceptions.CatalogNotInUseException;
 import org.apache.gravitino.exceptions.MetalakeNotInUseException;
 import org.apache.gravitino.meta.CatalogEntity;
 import org.apache.gravitino.storage.AzureProperties;
+import org.apache.gravitino.storage.GCSProperties;
 import org.apache.gravitino.storage.OSSProperties;
 import org.apache.gravitino.storage.S3Properties;
 import org.apache.gravitino.utils.IsolatedClassLoader;
@@ -451,30 +458,62 @@ public abstract class BaseCatalog<T extends BaseCatalog>
         }
       }
     }
-    return properties;
+    if (!shouldBackfillCredential()) {
+      return properties;
+    }
+    Map<String, String> result = Maps.newHashMap(properties);
+    result.putAll(propertiesWithCredentialProviders());
+    return result;
   }
 
   /**
-   * Retrieves the properties of the catalog including credential providers. 
Subclasses should
-   * override this method to inject auto-detected credential provider names 
into the properties map
-   * before the {@link CatalogCredentialManager} is initialized. The default 
implementation returns
-   * {@link #properties()} unchanged.
+   * Retrieves the properties of the catalog including credential providers. 
Detects storage and
+   * catalog-specific credential providers from the raw entity properties 
(including hidden ones)
+   * and injects them before {@link CatalogCredentialManager} is initialized. 
Subclasses may
+   * override {@link #addCatalogSpecificCredentialProviders} to add additional 
providers.
    *
-   * @return A map of properties including credential providers.
+   * @return A map of raw properties with credential providers set.
    */
   public Map<String, String> propertiesWithCredentialProviders() {
-    return properties();
+    Map<String, String> props = Maps.newHashMap(entity().getProperties());
+    if 
(StringUtils.isNotBlank(props.get(CredentialConstants.CREDENTIAL_PROVIDERS))) {
+      return props;
+    }
+    List<String> credentialProviders = new ArrayList<>();
+    addCatalogSpecificCredentialProviders(props, credentialProviders);
+    if (!credentialProviders.isEmpty()) {
+      props.put(CredentialConstants.CREDENTIAL_PROVIDERS, String.join(",", 
credentialProviders));
+    }
+    return props;
   }
 
   /**
-   * Detects storage credential providers (S3, OSS, Azure) from catalog 
properties and appends them
-   * to the provided list. Subclasses can call this method in their {@link
-   * #propertiesWithCredentialProviders()} implementation to avoid duplicating 
storage credential
-   * detection logic.
+   * Detects credential providers for this catalog type and appends them to 
{@code
+   * credentialProviders}. The default implementation calls {@link
+   * #addStorageCredentialProviders(Map, List)} to detect S3/OSS/Azure/GCS 
credentials. Subclasses
+   * override this to add catalog-specific providers (e.g., JDBC).
    *
-   * @param properties The catalog properties map to scan for storage 
credentials.
-   * @param credentialProviders The list to append detected storage credential 
providers to.
+   * @param properties the raw catalog properties
+   * @param credentialProviders the list to append detected provider names to
    */
+  protected void addCatalogSpecificCredentialProviders(
+      Map<String, String> properties, List<String> credentialProviders) {
+    addStorageCredentialProviders(properties, credentialProviders);
+  }
+
+  /**
+   * Returns whether hidden credentials should be backfilled into catalog 
properties for backward
+   * compatibility with connectors that do not support credential vending. 
Controlled by
+   * server-level config {@code 
gravitino.catalog.credential.backfillToProperties}.
+   *
+   * @return true if backfill is enabled
+   */
+  protected boolean shouldBackfillCredential() {
+    Config serverConfig = GravitinoEnv.getInstance().config();
+    return serverConfig != null
+        && 
Boolean.TRUE.equals(serverConfig.get(Configs.CATALOG_CREDENTIAL_BACKFILL_TO_PROPERTIES));
+  }
+
   @Evolving
   protected void addStorageCredentialProviders(
       Map<String, String> properties, List<String> credentialProviders) {
@@ -495,6 +534,11 @@ public abstract class BaseCatalog<T extends BaseCatalog>
     if (StringUtils.isNotBlank(azureAccountName) && 
StringUtils.isNotBlank(azureAccountKey)) {
       
credentialProviders.add(AzureAccountKeyCredential.AZURE_ACCOUNT_KEY_CREDENTIAL_TYPE);
     }
+
+    String gcsServiceAccountFile = 
properties.get(GCSProperties.GRAVITINO_GCS_SERVICE_ACCOUNT_FILE);
+    if (StringUtils.isNotBlank(gcsServiceAccountFile)) {
+      credentialProviders.add(GCSTokenCredential.GCS_TOKEN_CREDENTIAL_TYPE);
+    }
   }
 
   @Override
diff --git 
a/core/src/main/java/org/apache/gravitino/credential/JdbcCredentialProvider.java
 
b/core/src/main/java/org/apache/gravitino/credential/JdbcCredentialProvider.java
index 834a968e37..0eeee58cfb 100644
--- 
a/core/src/main/java/org/apache/gravitino/credential/JdbcCredentialProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/credential/JdbcCredentialProvider.java
@@ -53,7 +53,7 @@ public class JdbcCredentialProvider implements 
CredentialProvider {
   @Nullable
   @Override
   public Credential getCredential(CredentialContext context) {
-    if (StringUtils.isBlank(jdbcUser) || StringUtils.isBlank(jdbcPassword)) {
+    if (StringUtils.isBlank(jdbcUser) || jdbcPassword == null) {
       return null;
     }
     return new JdbcCredential(jdbcUser, jdbcPassword);
diff --git 
a/core/src/test/java/org/apache/gravitino/credential/TestJdbcCredentialProvider.java
 
b/core/src/test/java/org/apache/gravitino/credential/TestJdbcCredentialProvider.java
index 4f076c3bf1..cdac88fff1 100644
--- 
a/core/src/test/java/org/apache/gravitino/credential/TestJdbcCredentialProvider.java
+++ 
b/core/src/test/java/org/apache/gravitino/credential/TestJdbcCredentialProvider.java
@@ -143,7 +143,7 @@ public class TestJdbcCredentialProvider {
   }
 
   @Test
-  void testEmptyPasswordReturnsEmptyOptional() {
+  void testEmptyPasswordReturnsCredential() {
     Map<String, String> catalogProperties =
         ImmutableMap.of(
             JdbcCredential.GRAVITINO_JDBC_USER,
@@ -157,7 +157,8 @@ public class TestJdbcCredentialProvider {
     CatalogCredentialContext context = new 
CatalogCredentialContext("test-user");
     Optional<Credential> credential = 
credentialProvider.getCredentialOptional(context);
 
-    Assertions.assertFalse(credential.isPresent());
+    Assertions.assertTrue(credential.isPresent());
+    Assertions.assertEquals("", ((JdbcCredential) 
credential.get()).jdbcPassword());
   }
 
   @Test
@@ -167,8 +168,8 @@ public class TestJdbcCredentialProvider {
   }
 
   @Test
-  void testJdbcCredentialConstructorBlankPasswordThrows() {
-    Assertions.assertThrows(IllegalArgumentException.class, () -> new 
JdbcCredential("user", ""));
+  void testJdbcCredentialConstructorNullPasswordThrows() {
+    Assertions.assertThrows(IllegalArgumentException.class, () -> new 
JdbcCredential("user", null));
   }
 
   @Test
diff --git 
a/flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalog.java
 
b/flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalog.java
index 83d2acb11b..feb6362fae 100644
--- 
a/flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalog.java
+++ 
b/flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalog.java
@@ -40,8 +40,15 @@ import 
org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.factories.Factory;
+import org.apache.gravitino.Catalog;
 import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.credential.AzureAccountKeyCredential;
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.CredentialPropertyUtils;
+import org.apache.gravitino.credential.OSSSecretKeyCredential;
+import org.apache.gravitino.credential.S3SecretKeyCredential;
 import org.apache.gravitino.exceptions.ForbiddenException;
+import org.apache.gravitino.exceptions.NoSuchCatalogException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NoSuchTableException;
 import org.apache.gravitino.exceptions.TableAlreadyExistsException;
@@ -55,6 +62,7 @@ import 
org.apache.gravitino.rel.expressions.distributions.Distributions;
 import org.apache.gravitino.rel.expressions.sorts.SortOrder;
 import org.apache.gravitino.rel.expressions.transforms.Transform;
 import org.apache.gravitino.rel.indexes.Index;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -89,6 +97,43 @@ public class GravitinoHiveCatalog extends BaseCatalog {
     return hiveCatalog.getHiveConf();
   }
 
+  @Override
+  public void open() throws CatalogException {
+    try {
+      applyS3Credential(catalog(), hiveCatalog.getHiveConf());
+    } catch (NoSuchCatalogException e) {
+      LOG.warn(
+          "Catalog '{}' not found in Gravitino during open(); credential 
injection skipped."
+              + " This is expected during CREATE CATALOG.",
+          catalogName(),
+          e);
+    }
+    super.open();
+  }
+
+  static void applyS3Credential(Catalog catalog, Configuration conf) {
+    for (Credential credential : 
CredentialPropertyUtils.getCredentials(catalog)) {
+      if (credential instanceof S3SecretKeyCredential) {
+        S3SecretKeyCredential s3 = (S3SecretKeyCredential) credential;
+        conf.set("fs.s3a.access.key", s3.accessKeyId());
+        conf.set("fs.s3a.secret.key", s3.secretAccessKey());
+      } else if (credential instanceof OSSSecretKeyCredential) {
+        OSSSecretKeyCredential oss = (OSSSecretKeyCredential) credential;
+        conf.set("fs.oss.accessKeyId", oss.accessKeyId());
+        conf.set("fs.oss.accessKeySecret", oss.secretAccessKey());
+      } else if (credential instanceof AzureAccountKeyCredential) {
+        AzureAccountKeyCredential azure = (AzureAccountKeyCredential) 
credential;
+        conf.set(
+            String.format("fs.azure.account.key.%s.dfs.core.windows.net", 
azure.accountName()),
+            azure.accountKey());
+      } else {
+        LOG.warn(
+            "Received unrecognized credential type '{}' for Hive catalog, 
skipping",
+            credential.getClass().getName());
+      }
+    }
+  }
+
   @Override
   public Optional<Factory> getFactory() {
     return hiveCatalog.getFactory();
diff --git 
a/flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalog.java
 
b/flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalog.java
index 1befaddd64..7bf33b5e1c 100644
--- 
a/flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalog.java
+++ 
b/flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalog.java
@@ -18,21 +18,31 @@
  */
 package org.apache.gravitino.flink.connector.iceberg;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import org.apache.flink.table.catalog.AbstractCatalog;
-import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.factories.Factory;
 import org.apache.flink.util.Preconditions;
+import org.apache.gravitino.credential.CredentialPropertyUtils;
+import org.apache.gravitino.exceptions.NoSuchCatalogException;
 import org.apache.gravitino.flink.connector.PartitionConverter;
 import org.apache.gravitino.flink.connector.SchemaAndTablePropertiesConverter;
 import org.apache.gravitino.flink.connector.catalog.BaseCatalog;
 import org.apache.iceberg.flink.FlinkCatalogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** Gravitino Iceberg Catalog. */
 public class GravitinoIcebergCatalog extends BaseCatalog {
 
-  private final AbstractCatalog icebergCatalog;
+  private static final Logger LOG = 
LoggerFactory.getLogger(GravitinoIcebergCatalog.class);
+
+  private final String icebergCatalogName;
+  // Mutable copy so credential injection in open() propagates to the inner 
Iceberg catalog.
+  private final Map<String, String> mutableIcebergProperties;
+  private AbstractCatalog icebergCatalog;
 
   protected GravitinoIcebergCatalog(
       String catalogName,
@@ -47,9 +57,8 @@ public class GravitinoIcebergCatalog extends BaseCatalog {
         defaultDatabase,
         schemaAndTablePropertiesConverter,
         partitionConverter);
-    this.icebergCatalog =
-        asAbstractCatalog(
-            new FlinkCatalogFactory().createCatalog(catalogName, 
icebergCatalogProperties));
+    this.icebergCatalogName = catalogName;
+    this.mutableIcebergProperties = new HashMap<>(icebergCatalogProperties);
   }
 
   protected GravitinoIcebergCatalog(
@@ -65,20 +74,45 @@ public class GravitinoIcebergCatalog extends BaseCatalog {
         defaultDatabase,
         schemaAndTablePropertiesConverter,
         partitionConverter);
+    this.icebergCatalogName = catalogName;
+    this.mutableIcebergProperties = null;
     this.icebergCatalog = icebergCatalog;
   }
 
+  @Override
+  public void open() throws CatalogException {
+    if (icebergCatalog == null) {
+      try {
+        CredentialPropertyUtils.applyIcebergCredentials(
+            CredentialPropertyUtils.getCredentials(catalog()), 
mutableIcebergProperties);
+      } catch (NoSuchCatalogException e) {
+        LOG.warn(
+            "Catalog '{}' not found in Gravitino during open(); credential 
injection skipped."
+                + " This is expected during CREATE CATALOG.",
+            catalogName(),
+            e);
+      }
+      this.icebergCatalog =
+          asAbstractCatalog(
+              new FlinkCatalogFactory()
+                  .createCatalog(icebergCatalogName, 
mutableIcebergProperties));
+    }
+    super.open();
+  }
+
   @Override
   public Optional<Factory> getFactory() {
+    Preconditions.checkState(icebergCatalog != null, "Catalog '%s' has not 
been opened", getName());
     return icebergCatalog.getFactory();
   }
 
   @Override
   protected AbstractCatalog realCatalog() {
+    Preconditions.checkState(icebergCatalog != null, "Catalog '%s' has not 
been opened", getName());
     return icebergCatalog;
   }
 
-  protected static AbstractCatalog asAbstractCatalog(Catalog catalog) {
+  protected static AbstractCatalog asAbstractCatalog(Object catalog) {
     Preconditions.checkState(
         catalog instanceof AbstractCatalog,
         "Expected AbstractCatalog from FlinkCatalogFactory but got %s.",
diff --git 
a/flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java
 
b/flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java
index aa9edabd84..5290490f04 100644
--- 
a/flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java
+++ 
b/flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.catalog.AbstractCatalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogTable;
@@ -39,6 +40,8 @@ import org.apache.flink.table.factories.CatalogFactory;
 import org.apache.flink.table.factories.Factory;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants;
+import org.apache.gravitino.credential.CredentialPropertyUtils;
+import org.apache.gravitino.exceptions.NoSuchCatalogException;
 import org.apache.gravitino.flink.connector.PartitionConverter;
 import org.apache.gravitino.flink.connector.SchemaAndTablePropertiesConverter;
 import org.apache.gravitino.flink.connector.catalog.BaseCatalog;
@@ -53,6 +56,8 @@ import 
org.apache.gravitino.rel.expressions.distributions.Strategy;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.FlinkCatalog;
 import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The GravitinoPaimonCatalog class is an implementation of the BaseCatalog 
class that is used to
@@ -68,32 +73,103 @@ import org.apache.paimon.flink.FlinkCatalogFactory;
  */
 public class GravitinoPaimonCatalog extends BaseCatalog {
 
-  private final AbstractCatalog paimonCatalog;
+  private static final Logger LOG = 
LoggerFactory.getLogger(GravitinoPaimonCatalog.class);
+
+  private final CatalogFactory.Context context;
+  // Mutable copy shared with BaseCatalog.catalogOptions so credential 
injection in open() is
+  // visible to the inner Paimon catalog context.
+  private final Map<String, String> mutableOptions;
+  private AbstractCatalog paimonCatalog;
 
   protected GravitinoPaimonCatalog(
       CatalogFactory.Context context,
       String defaultDatabase,
       SchemaAndTablePropertiesConverter schemaAndTablePropertiesConverter,
       PartitionConverter partitionConverter) {
+    this(context, defaultDatabase, schemaAndTablePropertiesConverter, 
partitionConverter, null);
+  }
+
+  @VisibleForTesting
+  protected GravitinoPaimonCatalog(
+      CatalogFactory.Context context,
+      String defaultDatabase,
+      SchemaAndTablePropertiesConverter schemaAndTablePropertiesConverter,
+      PartitionConverter partitionConverter,
+      AbstractCatalog paimonCatalog) {
+    this(
+        context,
+        defaultDatabase,
+        schemaAndTablePropertiesConverter,
+        partitionConverter,
+        paimonCatalog,
+        new HashMap<>(context.getOptions()));
+  }
+
+  private GravitinoPaimonCatalog(
+      CatalogFactory.Context context,
+      String defaultDatabase,
+      SchemaAndTablePropertiesConverter schemaAndTablePropertiesConverter,
+      PartitionConverter partitionConverter,
+      AbstractCatalog paimonCatalog,
+      Map<String, String> mutableOptions) {
     super(
         context.getName(),
-        context.getOptions(),
+        mutableOptions,
         defaultDatabase,
         schemaAndTablePropertiesConverter,
         partitionConverter);
-    FlinkCatalogFactory flinkCatalogFactory = new FlinkCatalogFactory();
-    this.paimonCatalog = flinkCatalogFactory.createCatalog(context);
+    this.context = context;
+    this.mutableOptions = mutableOptions;
+    this.paimonCatalog = paimonCatalog;
   }
 
-  // 
---------------------------------------------------------------------------
-  // Lifecycle — keep paimonCatalog in sync with the outer catalog
-  // 
---------------------------------------------------------------------------
-
   @Override
   public void open() throws CatalogException {
-    super.open(); // opens realCatalog() == paimonCatalog, so 
paimonCatalog.open() is called here
+    if (paimonCatalog != null) {
+      super.open();
+      return;
+    }
+    try {
+      CredentialPropertyUtils.applyPaimonCredentials(
+          CredentialPropertyUtils.getCredentials(catalog()), mutableOptions);
+    } catch (NoSuchCatalogException e) {
+      LOG.warn(
+          "Catalog '{}' not found in Gravitino during open(); credential 
injection skipped."
+              + " This is expected during CREATE CATALOG.",
+          getName(),
+          e);
+    }
+    CatalogFactory.Context contextWithCredentials =
+        new CatalogFactory.Context() {
+          @Override
+          public String getName() {
+            return context.getName();
+          }
+
+          @Override
+          public Map<String, String> getOptions() {
+            return mutableOptions;
+          }
+
+          @Override
+          public ReadableConfig getConfiguration() {
+            return context.getConfiguration();
+          }
+
+          @Override
+          public ClassLoader getClassLoader() {
+            return context.getClassLoader();
+          }
+        };
+    this.paimonCatalog =
+        (AbstractCatalog) new 
FlinkCatalogFactory().createCatalog(contextWithCredentials);
+    super.open();
   }
 
+  // 
---------------------------------------------------------------------------
+  // Lifecycle — keep paimonCatalog in sync with the outer catalog
+  // 
---------------------------------------------------------------------------
+
   @Override
   public void close() throws CatalogException {
     super.close(); // closes realCatalog() == paimonCatalog
diff --git 
a/flink-connector/v1.18/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFlink118.java
 
b/flink-connector/v1.18/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFlink118.java
index dd42dc79b9..b691f79cf2 100644
--- 
a/flink-connector/v1.18/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFlink118.java
+++ 
b/flink-connector/v1.18/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFlink118.java
@@ -20,13 +20,12 @@
 package org.apache.gravitino.flink.connector.iceberg;
 
 import java.util.Map;
-import org.apache.flink.table.catalog.AbstractCatalog;
 import org.apache.gravitino.flink.connector.PartitionConverter;
 import org.apache.gravitino.flink.connector.SchemaAndTablePropertiesConverter;
 import org.apache.gravitino.flink.connector.utils.CatalogCompat;
 import org.apache.gravitino.flink.connector.utils.CatalogCompatFlink118;
-import org.apache.iceberg.flink.FlinkCatalogFactory;
 
+/** {@link GravitinoIcebergCatalog} implementation for Flink 1.18. */
 public class GravitinoIcebergCatalogFlink118 extends GravitinoIcebergCatalog {
 
   protected GravitinoIcebergCatalogFlink118(
@@ -42,17 +41,11 @@ public class GravitinoIcebergCatalogFlink118 extends 
GravitinoIcebergCatalog {
         schemaAndTablePropertiesConverter,
         partitionConverter,
         catalogOptions,
-        createIcebergCatalog(catalogName, icebergCatalogProperties));
+        icebergCatalogProperties);
   }
 
   @Override
   protected CatalogCompat catalogCompat() {
     return CatalogCompatFlink118.INSTANCE;
   }
-
-  private static AbstractCatalog createIcebergCatalog(
-      String catalogName, Map<String, String> icebergCatalogProperties) {
-    return asAbstractCatalog(
-        new FlinkCatalogFactory().createCatalog(catalogName, 
icebergCatalogProperties));
-  }
 }
diff --git 
a/flink-connector/v1.19/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFlink119.java
 
b/flink-connector/v1.19/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFlink119.java
index e4f56e3a43..2320695f18 100644
--- 
a/flink-connector/v1.19/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFlink119.java
+++ 
b/flink-connector/v1.19/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFlink119.java
@@ -20,12 +20,10 @@
 package org.apache.gravitino.flink.connector.iceberg;
 
 import java.util.Map;
-import org.apache.flink.table.catalog.AbstractCatalog;
 import org.apache.gravitino.flink.connector.PartitionConverter;
 import org.apache.gravitino.flink.connector.SchemaAndTablePropertiesConverter;
 import org.apache.gravitino.flink.connector.utils.CatalogCompat;
 import org.apache.gravitino.flink.connector.utils.DefaultCatalogCompat;
-import org.apache.iceberg.flink.FlinkCatalogFactory;
 
 /** {@link GravitinoIcebergCatalog} implementation for Flink 1.19. */
 public class GravitinoIcebergCatalogFlink119 extends GravitinoIcebergCatalog {
@@ -43,17 +41,11 @@ public class GravitinoIcebergCatalogFlink119 extends 
GravitinoIcebergCatalog {
         schemaAndTablePropertiesConverter,
         partitionConverter,
         catalogOptions,
-        createIcebergCatalog(catalogName, icebergCatalogProperties));
+        icebergCatalogProperties);
   }
 
   @Override
   protected CatalogCompat catalogCompat() {
     return DefaultCatalogCompat.INSTANCE;
   }
-
-  private static AbstractCatalog createIcebergCatalog(
-      String catalogName, Map<String, String> icebergCatalogProperties) {
-    return asAbstractCatalog(
-        new FlinkCatalogFactory().createCatalog(catalogName, 
icebergCatalogProperties));
-  }
 }
diff --git 
a/flink-connector/v1.20/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFlink120.java
 
b/flink-connector/v1.20/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFlink120.java
index f105b8b545..446c976f52 100644
--- 
a/flink-connector/v1.20/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFlink120.java
+++ 
b/flink-connector/v1.20/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFlink120.java
@@ -20,12 +20,10 @@
 package org.apache.gravitino.flink.connector.iceberg;
 
 import java.util.Map;
-import org.apache.flink.table.catalog.AbstractCatalog;
 import org.apache.gravitino.flink.connector.PartitionConverter;
 import org.apache.gravitino.flink.connector.SchemaAndTablePropertiesConverter;
 import org.apache.gravitino.flink.connector.utils.CatalogCompat;
 import org.apache.gravitino.flink.connector.utils.CatalogCompatFlink120;
-import org.apache.iceberg.flink.FlinkCatalogFactory;
 
 /** {@link GravitinoIcebergCatalog} implementation for Flink 1.20. */
 public class GravitinoIcebergCatalogFlink120 extends GravitinoIcebergCatalog {
@@ -43,17 +41,11 @@ public class GravitinoIcebergCatalogFlink120 extends 
GravitinoIcebergCatalog {
         schemaAndTablePropertiesConverter,
         partitionConverter,
         catalogOptions,
-        createIcebergCatalog(catalogName, icebergCatalogProperties));
+        icebergCatalogProperties);
   }
 
   @Override
   protected CatalogCompat catalogCompat() {
     return CatalogCompatFlink120.INSTANCE;
   }
-
-  private static AbstractCatalog createIcebergCatalog(
-      String catalogName, Map<String, String> icebergCatalogProperties) {
-    return asAbstractCatalog(
-        new FlinkCatalogFactory().createCatalog(catalogName, 
icebergCatalogProperties));
-  }
 }
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/DynamicIcebergConfigProvider.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/DynamicIcebergConfigProvider.java
index 2bbefbade0..43ec66812e 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/DynamicIcebergConfigProvider.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/DynamicIcebergConfigProvider.java
@@ -23,6 +23,7 @@ import static 
org.apache.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.io.Closeable;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -36,6 +37,9 @@ import 
org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
 import org.apache.gravitino.client.DefaultOAuth2TokenProvider;
 import org.apache.gravitino.client.GravitinoClient;
 import org.apache.gravitino.client.GravitinoClient.ClientBuilder;
+import org.apache.gravitino.connector.BaseCatalog;
+import org.apache.gravitino.credential.JdbcCredential;
+import org.apache.gravitino.credential.SupportsCredentials;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
 import org.apache.gravitino.iceberg.common.IcebergConfig;
 import 
org.apache.gravitino.iceberg.service.authorization.IcebergRESTServerContext;
@@ -100,7 +104,36 @@ public class DynamicIcebergConfigProvider implements 
IcebergConfigProvider {
         "lakehouse-iceberg".equals(catalog.provider()),
         String.format("%s.%s is not iceberg catalog", gravitinoMetalake, 
catalogName));
 
-    return 
Optional.of(getIcebergConfigFromCatalogProperties(catalog.properties()));
+    // Sensitive credentials (e.g. jdbc-password) are marked hidden in 
PropertiesMetadata and
+    // filtered out of catalog.properties(). We need two different strategies 
to recover them:
+    //
+    // Auxiliary mode: the catalog is a BaseCatalog running in the same JVM as 
the Gravitino
+    // server. Call propertiesWithCredentialProviders() which returns the raw 
entity properties
+    // including all hidden fields.
+    //
+    // Standalone mode: the catalog is a client-side object obtained via the 
Gravitino REST API.
+    // Call getCredentials() to retrieve vended credentials, then inject any 
JdbcCredential
+    // fields into the properties map so the JDBC backend can connect.
+    Map<String, String> catalogProperties;
+    if (catalog instanceof BaseCatalog) {
+      catalogProperties = ((BaseCatalog<?>) 
catalog).propertiesWithCredentialProviders();
+    } else {
+      catalogProperties = new HashMap<>(catalog.properties());
+      if (catalog instanceof SupportsCredentials) {
+        Arrays.stream(((SupportsCredentials) catalog).getCredentials())
+            .filter(c -> c instanceof JdbcCredential)
+            .map(c -> (JdbcCredential) c)
+            .findFirst()
+            .ifPresent(
+                jdbc -> {
+                  catalogProperties.putIfAbsent(
+                      IcebergConstants.GRAVITINO_JDBC_USER, jdbc.jdbcUser());
+                  catalogProperties.putIfAbsent(
+                      IcebergConstants.GRAVITINO_JDBC_PASSWORD, 
jdbc.jdbcPassword());
+                });
+      }
+    }
+    return 
Optional.of(getIcebergConfigFromCatalogProperties(catalogProperties));
   }
 
   /**
diff --git a/spark-connector/spark-common/build.gradle.kts 
b/spark-connector/spark-common/build.gradle.kts
index 7211d6e3bb..c7aceb9e16 100644
--- a/spark-connector/spark-common/build.gradle.kts
+++ b/spark-connector/spark-common/build.gradle.kts
@@ -176,6 +176,10 @@ val glueLibsApiUrl =
 val downloadGlueHiveJars by
 tasks.registering {
   glueHiveJarsDir?.let { outputs.dir(it) }
+  onlyIf {
+    val outputDir = file(glueHiveJarsDir ?: return@onlyIf false)
+    outputDir.listFiles()?.none { it.name.endsWith(".jar") } ?: true
+  }
   doLast {
     val outputDir = file(glueHiveJarsDir ?: return@doLast)
     outputDir.mkdirs()
diff --git 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/glue/GravitinoGlueCatalog.java
 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/glue/GravitinoGlueCatalog.java
index fc7dbce568..47100103c0 100644
--- 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/glue/GravitinoGlueCatalog.java
+++ 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/glue/GravitinoGlueCatalog.java
@@ -20,8 +20,12 @@
 package org.apache.gravitino.spark.connector.glue;
 
 import com.google.common.base.Preconditions;
+import java.util.HashMap;
 import java.util.Map;
 import org.apache.gravitino.catalog.glue.GlueConstants;
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.CredentialPropertyUtils;
+import org.apache.gravitino.credential.S3SecretKeyCredential;
 import org.apache.gravitino.spark.connector.PropertiesConverter;
 import org.apache.gravitino.spark.connector.SparkTransformConverter;
 import org.apache.gravitino.spark.connector.SparkTypeConverter;
@@ -38,6 +42,8 @@ import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.Table;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Gravitino Glue catalog implementation for Apache Spark.
@@ -60,6 +66,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
  */
 public class GravitinoGlueCatalog extends BaseCatalog {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(GravitinoGlueCatalog.class);
+
   // Lazily initialized Iceberg GlueCatalog for Iceberg tables
   private volatile SparkCatalog icebergGlueCatalog;
 
@@ -67,6 +75,14 @@ public class GravitinoGlueCatalog extends BaseCatalog {
   private String catalogName;
   private Map<String, String> catalogProperties;
 
+  /**
+   * AWS credentials obtained via Gravitino credential vending at init time. 
Keyed by Gravitino
+   * catalog property names (e.g. "aws-access-key-id") so they can be merged 
directly into catalog
+   * properties before being passed to {@link 
GluePropertiesConverter#toIcebergCatalogProperties}.
+   * Null when credential vending returns no S3 credentials.
+   */
+  private Map<String, String> vendedAwsCredentials;
+
   /** Creates a new GravitinoGlueCatalog. */
   public GravitinoGlueCatalog() {}
 
@@ -88,10 +104,46 @@ public class GravitinoGlueCatalog extends BaseCatalog {
     TableCatalog hiveCatalog = createHiveTableCatalog();
     Map<String, String> all =
         getPropertiesConverter().toSparkCatalogProperties(options, properties);
+    this.vendedAwsCredentials = applyS3Credential(gravitinoCatalogClient, all);
     hiveCatalog.initialize(name, new CaseInsensitiveStringMap(all));
     return hiveCatalog;
   }
 
+  /**
+   * Obtains S3 credentials via Gravitino credential vending and injects them 
into {@code props} as
+   * {@code hadoop.fs.s3a.*} for the non-Iceberg (Hive) path's S3 data access. 
Returns the vended
+   * credentials keyed by Gravitino catalog property names for reuse in the 
Iceberg path. Returns an
+   * empty map if credential vending is unavailable.
+   *
+   * @return map of vended AWS credentials (Gravitino key names), or empty map 
if none vended
+   */
+  static Map<String, String> applyS3Credential(
+      org.apache.gravitino.Catalog catalog, Map<String, String> props) {
+    Map<String, String> vended = new HashMap<>();
+    Credential[] credentials;
+    try {
+      credentials = CredentialPropertyUtils.getCredentials(catalog);
+    } catch (RuntimeException e) {
+      LOG.debug(
+          "Failed to obtain credentials from Glue catalog, S3 credential 
injection skipped", e);
+      return vended;
+    }
+    for (Credential credential : credentials) {
+      if (credential instanceof S3SecretKeyCredential) {
+        S3SecretKeyCredential s3 = (S3SecretKeyCredential) credential;
+        props.put("hadoop.fs.s3a.access.key", s3.accessKeyId());
+        props.put("hadoop.fs.s3a.secret.key", s3.secretAccessKey());
+        vended.put(GluePropertiesConverter.AWS_ACCESS_KEY_ID, 
s3.accessKeyId());
+        vended.put(GluePropertiesConverter.AWS_SECRET_ACCESS_KEY, 
s3.secretAccessKey());
+      } else {
+        LOG.warn(
+            "Received unrecognized credential type '{}' for Glue catalog, 
skipping",
+            credential.getClass().getName());
+      }
+    }
+    return vended;
+  }
+
   /**
    * Routes Spark table loading to the correct backend.
    *
@@ -226,7 +278,14 @@ public class GravitinoGlueCatalog extends BaseCatalog {
    */
   private SparkCatalog createIcebergGlueCatalog() {
     GluePropertiesConverter converter = GluePropertiesConverter.getInstance();
-    Map<String, String> icebergProperties = 
converter.toIcebergCatalogProperties(catalogProperties);
+    Map<String, String> effectiveProperties = new HashMap<>(catalogProperties);
+    // Vended credentials take precedence over static catalog properties so 
that
+    // GluePropertiesConverter sets up GravitinoGlueCredentialsProvider with 
the vended AK/SK.
+    if (vendedAwsCredentials != null && !vendedAwsCredentials.isEmpty()) {
+      effectiveProperties.putAll(vendedAwsCredentials);
+    }
+    Map<String, String> icebergProperties =
+        converter.toIcebergCatalogProperties(effectiveProperties);
     SparkCatalog catalog = new SparkCatalog();
     catalog.initialize(catalogName + "_iceberg", new 
CaseInsensitiveStringMap(icebergProperties));
     return catalog;
diff --git 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/hive/GravitinoHiveCatalog.java
 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/hive/GravitinoHiveCatalog.java
index dd7851828c..4179a79da2 100644
--- 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/hive/GravitinoHiveCatalog.java
+++ 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/hive/GravitinoHiveCatalog.java
@@ -20,6 +20,12 @@
 package org.apache.gravitino.spark.connector.hive;
 
 import java.util.Map;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.credential.AzureAccountKeyCredential;
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.CredentialPropertyUtils;
+import org.apache.gravitino.credential.OSSSecretKeyCredential;
+import org.apache.gravitino.credential.S3SecretKeyCredential;
 import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.rel.View;
 import org.apache.gravitino.spark.connector.PropertiesConverter;
@@ -31,20 +37,48 @@ import 
org.apache.kyuubi.spark.connector.hive.HiveTableCatalog;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class GravitinoHiveCatalog extends BaseCatalog {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(GravitinoHiveCatalog.class);
+
   @Override
   protected TableCatalog createAndInitSparkCatalog(
       String name, CaseInsensitiveStringMap options, Map<String, String> 
properties) {
     TableCatalog hiveCatalog = new HiveTableCatalog();
     Map<String, String> all =
         getPropertiesConverter().toSparkCatalogProperties(options, properties);
+    applyS3Credential(gravitinoCatalogClient, all);
     hiveCatalog.initialize(name, new CaseInsensitiveStringMap(all));
-
     return hiveCatalog;
   }
 
+  static void applyS3Credential(Catalog catalog, Map<String, String> props) {
+    for (Credential credential : 
CredentialPropertyUtils.getCredentials(catalog)) {
+      if (credential instanceof S3SecretKeyCredential) {
+        S3SecretKeyCredential s3 = (S3SecretKeyCredential) credential;
+        props.put("hadoop.fs.s3a.access.key", s3.accessKeyId());
+        props.put("hadoop.fs.s3a.secret.key", s3.secretAccessKey());
+      } else if (credential instanceof OSSSecretKeyCredential) {
+        OSSSecretKeyCredential oss = (OSSSecretKeyCredential) credential;
+        props.put("hadoop.fs.oss.accessKeyId", oss.accessKeyId());
+        props.put("hadoop.fs.oss.accessKeySecret", oss.secretAccessKey());
+      } else if (credential instanceof AzureAccountKeyCredential) {
+        AzureAccountKeyCredential azure = (AzureAccountKeyCredential) 
credential;
+        props.put(
+            String.format(
+                "hadoop.fs.azure.account.key.%s.dfs.core.windows.net", 
azure.accountName()),
+            azure.accountKey());
+      } else {
+        LOG.warn(
+            "Received unrecognized credential type '{}' for Hive catalog, 
skipping",
+            credential.getClass().getName());
+      }
+    }
+  }
+
   @Override
   protected org.apache.spark.sql.connector.catalog.Table createSparkTable(
       Identifier identifier,
diff --git 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java
 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java
index 4ccacdec1d..0fa99d3ab7 100644
--- 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java
+++ 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
 import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergPropertiesUtils;
+import org.apache.gravitino.credential.CredentialPropertyUtils;
 import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.spark.connector.PropertiesConverter;
 import org.apache.gravitino.spark.connector.SparkTransformConverter;
@@ -73,6 +74,8 @@ public class GravitinoIcebergCatalog extends BaseCatalog
     String catalogBackendName = 
IcebergPropertiesUtils.getCatalogBackendName(properties);
     Map<String, String> all =
         getPropertiesConverter().toSparkCatalogProperties(options, properties);
+    CredentialPropertyUtils.applyIcebergCredentials(
+        CredentialPropertyUtils.getCredentials(gravitinoCatalogClient), all);
     TableCatalog icebergCatalog = new SparkCatalog();
     icebergCatalog.initialize(catalogBackendName, new 
CaseInsensitiveStringMap(all));
     return icebergCatalog;
diff --git 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalog.java
 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalog.java
index 86ca680c45..76ed6ed899 100644
--- 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalog.java
+++ 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalog.java
@@ -22,6 +22,7 @@ package org.apache.gravitino.spark.connector.paimon;
 import java.util.Map;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.catalog.lakehouse.paimon.PaimonPropertiesUtils;
+import org.apache.gravitino.credential.CredentialPropertyUtils;
 import org.apache.gravitino.spark.connector.PropertiesConverter;
 import org.apache.gravitino.spark.connector.SparkTransformConverter;
 import org.apache.gravitino.spark.connector.SparkTypeConverter;
@@ -42,6 +43,8 @@ public class GravitinoPaimonCatalog extends BaseCatalog {
     TableCatalog paimonCatalog = new SparkCatalog();
     Map<String, String> all =
         getPropertiesConverter().toSparkCatalogProperties(options, properties);
+    CredentialPropertyUtils.applyPaimonCredentials(
+        CredentialPropertyUtils.getCredentials(gravitinoCatalogClient), all);
     paimonCatalog.initialize(catalogBackendName, new 
CaseInsensitiveStringMap(all));
     return paimonCatalog;
   }
diff --git 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/glue/TestGravitinoGlueCatalog.java
 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/glue/TestGravitinoGlueCatalog.java
index 130ddee519..757bb6510b 100644
--- 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/glue/TestGravitinoGlueCatalog.java
+++ 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/glue/TestGravitinoGlueCatalog.java
@@ -25,8 +25,13 @@ import static org.mockito.Mockito.when;
 
 import com.google.common.collect.ImmutableMap;
 import java.util.HashMap;
+import java.util.Map;
+import org.apache.gravitino.Catalog;
 import org.apache.gravitino.catalog.glue.GlueConstants;
 import org.apache.gravitino.client.GravitinoClient;
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.S3SecretKeyCredential;
+import org.apache.gravitino.credential.SupportsCredentials;
 import org.apache.gravitino.rel.Column;
 import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.rel.types.Types;
@@ -176,6 +181,43 @@ public class TestGravitinoGlueCatalog {
     Assertions.assertSame(mockSparkTable, result);
   }
 
+  // -------------------------------------------------------------------------
+  // Test applyS3Credential (credential vending)
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testApplyS3CredentialInjectsHadoopProperties() {
+    Catalog mockCatalog = mock(Catalog.class);
+    S3SecretKeyCredential s3Cred = new 
S3SecretKeyCredential("test-access-key", "test-secret-key");
+    SupportsCredentials supportsCredentials = mock(SupportsCredentials.class);
+    when(mockCatalog.supportsCredentials()).thenReturn(supportsCredentials);
+    when(supportsCredentials.getCredentials()).thenReturn(new Credential[] 
{s3Cred});
+
+    Map<String, String> props = new HashMap<>();
+    Map<String, String> vended = 
GravitinoGlueCatalog.applyS3Credential(mockCatalog, props);
+
+    Assertions.assertEquals("test-access-key", 
props.get("hadoop.fs.s3a.access.key"));
+    Assertions.assertEquals("test-secret-key", 
props.get("hadoop.fs.s3a.secret.key"));
+    Assertions.assertEquals(
+        "test-access-key", 
vended.get(GluePropertiesConverter.AWS_ACCESS_KEY_ID));
+    Assertions.assertEquals(
+        "test-secret-key", 
vended.get(GluePropertiesConverter.AWS_SECRET_ACCESS_KEY));
+  }
+
+  @Test
+  void testApplyS3CredentialReturnsEmptyWhenNoCredentials() {
+    Catalog mockCatalog = mock(Catalog.class);
+    SupportsCredentials supportsCredentials = mock(SupportsCredentials.class);
+    when(mockCatalog.supportsCredentials()).thenReturn(supportsCredentials);
+    when(supportsCredentials.getCredentials()).thenReturn(new Credential[] {});
+
+    Map<String, String> props = new HashMap<>();
+    Map<String, String> vended = 
GravitinoGlueCatalog.applyS3Credential(mockCatalog, props);
+
+    Assertions.assertTrue(props.isEmpty());
+    Assertions.assertTrue(vended.isEmpty());
+  }
+
   // -------------------------------------------------------------------------
   // Test converter methods
   // -------------------------------------------------------------------------
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/glue/GlueConnectorAdapter.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/glue/GlueConnectorAdapter.java
index 24b812c115..ac6a56ad01 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/glue/GlueConnectorAdapter.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/glue/GlueConnectorAdapter.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.S3SecretKeyCredential;
 import org.apache.gravitino.trino.connector.catalog.CatalogConnectorAdapter;
 import 
org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdapter;
 import org.apache.gravitino.trino.connector.catalog.HasPropertyMeta;
@@ -42,8 +43,6 @@ public class GlueConnectorAdapter implements 
CatalogConnectorAdapter {
   // Gravitino catalog property keys for AWS Glue
   private static final String PROP_AWS_REGION = "aws-region";
   private static final String PROP_AWS_GLUE_CATALOG_ID = "aws-glue-catalog-id";
-  private static final String PROP_AWS_ACCESS_KEY_ID = "aws-access-key-id";
-  private static final String PROP_AWS_SECRET_ACCESS_KEY = 
"aws-secret-access-key";
   private static final String PROP_AWS_GLUE_ENDPOINT = "aws-glue-endpoint";
 
   // Trino Hive connector configuration keys for Glue
@@ -82,16 +81,7 @@ public class GlueConnectorAdapter implements 
CatalogConnectorAdapter {
       config.put(HIVE_METASTORE_GLUE_CATALOG_ID, catalogId);
     }
 
-    String accessKey = catalog.getProperty(PROP_AWS_ACCESS_KEY_ID, null);
-    String secretKey = catalog.getProperty(PROP_AWS_SECRET_ACCESS_KEY, null);
-    if (accessKey != null && secretKey != null) {
-      // Glue metastore credentials
-      config.put(HIVE_METASTORE_GLUE_ACCESS_KEY, accessKey);
-      config.put(HIVE_METASTORE_GLUE_SECRET_KEY, secretKey);
-      // S3 credentials for data access
-      config.put(HIVE_S3_ACCESS_KEY, accessKey);
-      config.put(HIVE_S3_SECRET_KEY, secretKey);
-    }
+    applyS3Credential(credentials, config);
 
     String endpoint = catalog.getProperty(PROP_AWS_GLUE_ENDPOINT, null);
     if (StringUtils.isNotBlank(endpoint)) {
@@ -101,6 +91,19 @@ public class GlueConnectorAdapter implements 
CatalogConnectorAdapter {
     return config;
   }
 
+  static void applyS3Credential(Credential[] credentials, Map<String, String> 
config) {
+    for (Credential credential : credentials) {
+      if (credential instanceof S3SecretKeyCredential) {
+        S3SecretKeyCredential s3 = (S3SecretKeyCredential) credential;
+        config.put(HIVE_METASTORE_GLUE_ACCESS_KEY, s3.accessKeyId());
+        config.put(HIVE_METASTORE_GLUE_SECRET_KEY, s3.secretAccessKey());
+        config.put(HIVE_S3_ACCESS_KEY, s3.accessKeyId());
+        config.put(HIVE_S3_SECRET_KEY, s3.secretAccessKey());
+        return;
+      }
+    }
+  }
+
   @Override
   public String internalConnectorName() {
     return CONNECTOR_LAKEHOUSE;
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/hive/HiveConnectorAdapter.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/hive/HiveConnectorAdapter.java
index ce52e7de24..a888b9012e 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/hive/HiveConnectorAdapter.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/hive/HiveConnectorAdapter.java
@@ -24,7 +24,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.gravitino.catalog.property.PropertyConverter;
+import org.apache.gravitino.credential.AzureAccountKeyCredential;
 import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.S3SecretKeyCredential;
 import org.apache.gravitino.trino.connector.catalog.CatalogConnectorAdapter;
 import 
org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdapter;
 import org.apache.gravitino.trino.connector.catalog.CatalogPropertyConverter;
@@ -58,9 +60,24 @@ public class HiveConnectorAdapter implements 
CatalogConnectorAdapter {
     config.put("hive.metastore.uri", metastoreUri);
     config.put("hive.security", "allow-all");
     config.put("fs.hadoop.enabled", "true");
+    applyS3Credential(credentials, config);
     return config;
   }
 
+  static void applyS3Credential(Credential[] credentials, Map<String, String> 
config) {
+    for (Credential credential : credentials) {
+      if (credential instanceof S3SecretKeyCredential) {
+        S3SecretKeyCredential s3 = (S3SecretKeyCredential) credential;
+        config.put("hive.s3.aws-access-key", s3.accessKeyId());
+        config.put("hive.s3.aws-secret-key", s3.secretAccessKey());
+      } else if (credential instanceof AzureAccountKeyCredential) {
+        AzureAccountKeyCredential azure = (AzureAccountKeyCredential) 
credential;
+        config.put("hive.azure.abfs-storage-account", azure.accountName());
+        config.put("hive.azure.abfs-access-key", azure.accountKey());
+      }
+    }
+  }
+
   @Override
   public String internalConnectorName() {
     return CONNECTOR_HIVE;
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/iceberg/IcebergCatalogPropertyConverter.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/iceberg/IcebergCatalogPropertyConverter.java
index b0b220c934..5150435ecf 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/iceberg/IcebergCatalogPropertyConverter.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/iceberg/IcebergCatalogPropertyConverter.java
@@ -26,6 +26,10 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
 import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergPropertiesUtils;
+import org.apache.gravitino.credential.AzureAccountKeyCredential;
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.JdbcCredential;
+import org.apache.gravitino.credential.S3SecretKeyCredential;
 import org.apache.gravitino.trino.connector.GravitinoErrorCode;
 import org.apache.gravitino.trino.connector.catalog.CatalogPropertyConverter;
 
@@ -43,6 +47,32 @@ public class IcebergCatalogPropertyConverter extends 
CatalogPropertyConverter {
 
   private static final Set<String> REST_BACKEND_REQUIRED_PROPERTIES = 
Set.of("uri");
 
+  /**
+   * Injects credentials from credential vending into the Iceberg catalog 
config. Applies JDBC
+   * user/password for the JDBC backend and S3 credentials for S3-backed 
storage.
+   *
+   * @param credentials the credentials returned by the server
+   * @param config the mutable Trino Iceberg connector config map to update
+   */
+  public static void applyCredentials(Credential[] credentials, Map<String, 
String> config) {
+    for (Credential credential : credentials) {
+      if (credential instanceof JdbcCredential) {
+        JdbcCredential jdbc = (JdbcCredential) credential;
+        config.put("iceberg.jdbc-catalog.connection-user", jdbc.jdbcUser());
+        config.put("iceberg.jdbc-catalog.connection-password", 
jdbc.jdbcPassword());
+      } else if (credential instanceof S3SecretKeyCredential) {
+        S3SecretKeyCredential s3 = (S3SecretKeyCredential) credential;
+        config.put("hive.s3.aws-access-key", s3.accessKeyId());
+        config.put("hive.s3.aws-secret-key", s3.secretAccessKey());
+      } else if (credential instanceof AzureAccountKeyCredential) {
+        AzureAccountKeyCredential azure = (AzureAccountKeyCredential) 
credential;
+        config.put(
+            String.format("fs.azure.account.key.%s.dfs.core.windows.net", 
azure.accountName()),
+            azure.accountKey());
+      }
+    }
+  }
+
   @Override
   public Map<String, String> gravitinoToEngineProperties(Map<String, String> 
properties) {
     Map<String, String> stringStringMap;
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/iceberg/IcebergConnectorAdapter.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/iceberg/IcebergConnectorAdapter.java
index a9fec3206b..8900775fb7 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/iceberg/IcebergConnectorAdapter.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/iceberg/IcebergConnectorAdapter.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.trino.connector.catalog.iceberg;
 import static java.util.Collections.emptyList;
 
 import io.trino.spi.session.PropertyMetadata;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.gravitino.catalog.property.PropertyConverter;
@@ -51,7 +52,10 @@ public class IcebergConnectorAdapter implements 
CatalogConnectorAdapter {
   @Override
   public Map<String, String> buildInternalConnectorConfig(
       GravitinoCatalog catalog, Credential[] credentials) throws Exception {
-    return 
catalogConverter.gravitinoToEngineProperties(catalog.getProperties());
+    Map<String, String> config =
+        new 
HashMap<>(catalogConverter.gravitinoToEngineProperties(catalog.getProperties()));
+    IcebergCatalogPropertyConverter.applyCredentials(credentials, config);
+    return config;
   }
 
   @Override
diff --git 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/glue/TestGlueConnectorAdapter.java
 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/glue/TestGlueConnectorAdapter.java
index 77697c2a4e..81313f305d 100644
--- 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/glue/TestGlueConnectorAdapter.java
+++ 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/glue/TestGlueConnectorAdapter.java
@@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap;
 import java.util.Map;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.S3SecretKeyCredential;
 import org.apache.gravitino.trino.connector.metadata.GravitinoCatalog;
 import org.apache.gravitino.trino.connector.metadata.TestGravitinoCatalog;
 import org.junit.jupiter.api.Assertions;
@@ -57,16 +58,15 @@ class TestGlueConnectorAdapter {
             "aws-region", "us-east-1",
             "warehouse", "s3://my-bucket/warehouse",
             "aws-glue-catalog-id", "123456789",
-            "aws-access-key-id", "test-access-key",
-            "aws-secret-access-key", "test-secret-key",
             "aws-glue-endpoint", "https://glue.custom.endpoint";);
     Catalog mockCatalog =
         TestGravitinoCatalog.mockCatalog(
             "test_glue", "glue", "test catalog", Catalog.Type.RELATIONAL, 
properties);
     GlueConnectorAdapter adapter = new GlueConnectorAdapter();
+    Credential[] credentials = {new S3SecretKeyCredential("test-access-key", 
"test-secret-key")};
     Map<String, String> config =
         adapter.buildInternalConnectorConfig(
-            new GravitinoCatalog("test", mockCatalog), new Credential[0]);
+            new GravitinoCatalog("test", mockCatalog), credentials);
 
     Assertions.assertEquals("123456789", 
config.get("hive.metastore.glue.catalogid"));
     Assertions.assertEquals("test-access-key", 
config.get("hive.metastore.glue.aws-access-key"));

Reply via email to