This is an automated email from the ASF dual-hosted git repository.
mchades pushed a commit to branch branch-1.3
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/branch-1.3 by this push:
new c606d4f9cb [Cherry-pick to branch-1.3] [#11263] feat(authz): Extend
credential vending to Hive/Iceberg/Glue/JDBC catalog types (#11264) (#11554)
c606d4f9cb is described below
commit c606d4f9cb03f51c9633038cdd97cedefdced0c0
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jun 10 18:01:33 2026 +0800
[Cherry-pick to branch-1.3] [#11263] feat(authz): Extend credential vending
to Hive/Iceberg/Glue/JDBC catalog types (#11264) (#11554)
**Cherry-pick Information:**
- Original commit: 90c5bbc96cbc7df678f4bc5e73429111e9c741e0
- Target branch: `branch-1.3`
- Status: ✅ Clean cherry-pick (no conflicts)
Co-authored-by: Yuhui <[email protected]>
---
.../gravitino/credential/JdbcCredential.java | 3 +-
catalogs/catalog-glue/build.gradle.kts | 1 +
.../apache/gravitino/catalog/glue/GlueCatalog.java | 31 +++++++
.../glue/GlueCatalogPropertiesMetadata.java | 4 +-
.../hive/HiveCatalogPropertiesMetadata.java | 8 ++
.../catalog/hive/TestHiveCatalogOperations.java | 2 +-
.../apache/gravitino/catalog/jdbc/JdbcCatalog.java | 64 ++-------------
.../catalog/jdbc/TestJdbcCatalogCredential.java | 32 ++++++++
.../catalog/lakehouse/iceberg/IcebergCatalog.java | 35 +++-----
.../iceberg/IcebergCatalogPropertiesMetadata.java | 47 ++++-------
.../catalog/lakehouse/paimon/PaimonCatalog.java | 31 +------
.../credential/CredentialPropertyUtils.java | 85 +++++++++++++++++++
.../cloud/storage/AzurePropertiesMetadata.java | 52 ++++++++++++
.../cloud/storage/GCSPropertiesMetadata.java | 44 ++++++++++
.../cloud/storage/OSSPropertiesMetadata.java | 52 ++++++++++++
.../cloud/storage/S3PropertiesMetadata.java | 52 ++++++++++++
.../apache/gravitino/connector/BaseCatalog.java | 70 +++++++++++++---
.../credential/JdbcCredentialProvider.java | 2 +-
.../credential/TestJdbcCredentialProvider.java | 9 ++-
.../flink/connector/hive/GravitinoHiveCatalog.java | 45 +++++++++++
.../connector/iceberg/GravitinoIcebergCatalog.java | 46 +++++++++--
.../connector/paimon/GravitinoPaimonCatalog.java | 94 +++++++++++++++++++---
.../iceberg/GravitinoIcebergCatalogFlink118.java | 11 +--
.../iceberg/GravitinoIcebergCatalogFlink119.java | 10 +--
.../iceberg/GravitinoIcebergCatalogFlink120.java | 10 +--
.../provider/DynamicIcebergConfigProvider.java | 35 +++++++-
spark-connector/spark-common/build.gradle.kts | 4 +
.../spark/connector/glue/GravitinoGlueCatalog.java | 61 +++++++++++++-
.../spark/connector/hive/GravitinoHiveCatalog.java | 36 ++++++++-
.../connector/iceberg/GravitinoIcebergCatalog.java | 3 +
.../connector/paimon/GravitinoPaimonCatalog.java | 3 +
.../connector/glue/TestGravitinoGlueCatalog.java | 42 ++++++++++
.../catalog/glue/GlueConnectorAdapter.java | 27 ++++---
.../catalog/hive/HiveConnectorAdapter.java | 17 ++++
.../iceberg/IcebergCatalogPropertyConverter.java | 30 +++++++
.../catalog/iceberg/IcebergConnectorAdapter.java | 6 +-
.../catalog/glue/TestGlueConnectorAdapter.java | 6 +-
37 files changed, 880 insertions(+), 230 deletions(-)
diff --git
a/api/src/main/java/org/apache/gravitino/credential/JdbcCredential.java
b/api/src/main/java/org/apache/gravitino/credential/JdbcCredential.java
index af19110d7e..8c640bcf5b 100644
--- a/api/src/main/java/org/apache/gravitino/credential/JdbcCredential.java
+++ b/api/src/main/java/org/apache/gravitino/credential/JdbcCredential.java
@@ -107,8 +107,7 @@ public class JdbcCredential implements Credential {
private void validate(String jdbcUser, String jdbcPassword, long
expireTimeInMs) {
Preconditions.checkArgument(StringUtils.isNotBlank(jdbcUser), "JDBC user
should not be empty");
- Preconditions.checkArgument(
- StringUtils.isNotBlank(jdbcPassword), "JDBC password should not be
empty");
+ Preconditions.checkArgument(jdbcPassword != null, "JDBC password should
not be null");
// JDBC credentials are static (no server-issued expiry). expireTimeInMs
must always be 0.
Preconditions.checkArgument(
expireTimeInMs == 0, "The expiration time of JdbcCredential should be
0");
diff --git a/catalogs/catalog-glue/build.gradle.kts
b/catalogs/catalog-glue/build.gradle.kts
index da14ad95bb..477db09503 100644
--- a/catalogs/catalog-glue/build.gradle.kts
+++ b/catalogs/catalog-glue/build.gradle.kts
@@ -44,6 +44,7 @@ dependencies {
implementation(libs.iceberg.api)
implementation(libs.iceberg.aws)
implementation(libs.iceberg.core)
+ runtimeOnly(project(":bundles:aws"))
implementation(libs.slf4j.api)
annotationProcessor(libs.lombok)
diff --git
a/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueCatalog.java
b/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueCatalog.java
index 471bfccbc5..8222c0d71b 100644
---
a/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueCatalog.java
+++
b/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueCatalog.java
@@ -18,11 +18,14 @@
*/
package org.apache.gravitino.catalog.glue;
+import java.util.List;
import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.connector.BaseCatalog;
import org.apache.gravitino.connector.CatalogOperations;
import org.apache.gravitino.connector.PropertiesMetadata;
import org.apache.gravitino.connector.capability.Capability;
+import org.apache.gravitino.storage.S3Properties;
/**
* Implementation of an AWS Glue Data Catalog connector in Apache Gravitino.
@@ -81,4 +84,32 @@ public class GlueCatalog extends BaseCatalog<GlueCatalog> {
public PropertiesMetadata tablePropertiesMetadata() throws
UnsupportedOperationException {
return TABLE_PROPERTIES_METADATA;
}
+
+ @Override
+ public Map<String, String> propertiesWithCredentialProviders() {
+ Map<String, String> props = super.propertiesWithCredentialProviders();
+ // super() skips addCatalogSpecificCredentialProviders() when
credential-providers is already
+ // set, so the aws-* → s3-* key mapping never runs. Apply it
unconditionally here so that
+ // S3SecretKeyProvider.initialize() can read s3-access-key-id regardless
of how the catalog
+ // was configured.
+ String accessKeyId = props.get(GlueConstants.AWS_ACCESS_KEY_ID);
+ String secretAccessKey = props.get(GlueConstants.AWS_SECRET_ACCESS_KEY);
+ if (StringUtils.isNotBlank(accessKeyId) &&
StringUtils.isNotBlank(secretAccessKey)) {
+ props.putIfAbsent(S3Properties.GRAVITINO_S3_ACCESS_KEY_ID, accessKeyId);
+ props.putIfAbsent(S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY,
secretAccessKey);
+ }
+ return props;
+ }
+
+ @Override
+ protected void addCatalogSpecificCredentialProviders(
+ Map<String, String> properties, List<String> credentialProviders) {
+ String accessKeyId = properties.get(GlueConstants.AWS_ACCESS_KEY_ID);
+ String secretAccessKey =
properties.get(GlueConstants.AWS_SECRET_ACCESS_KEY);
+ if (StringUtils.isNotBlank(accessKeyId) &&
StringUtils.isNotBlank(secretAccessKey)) {
+ properties.putIfAbsent(S3Properties.GRAVITINO_S3_ACCESS_KEY_ID,
accessKeyId);
+ properties.putIfAbsent(S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY,
secretAccessKey);
+ }
+ addStorageCredentialProviders(properties, credentialProviders);
+ }
}
diff --git
a/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueCatalogPropertiesMetadata.java
b/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueCatalogPropertiesMetadata.java
index 16c5bf5b3c..47e5bf31c6 100644
---
a/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueCatalogPropertiesMetadata.java
+++
b/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueCatalogPropertiesMetadata.java
@@ -65,7 +65,7 @@ public class GlueCatalogPropertiesMetadata extends
BaseCatalogPropertiesMetadata
+ " When omitted the default credential chain is used.",
false /* immutable */,
null /* defaultValue */,
- false /* hidden */))
+ true /* hidden */))
.put(
AWS_SECRET_ACCESS_KEY,
stringOptionalPropertyEntry(
@@ -74,7 +74,7 @@ public class GlueCatalogPropertiesMetadata extends
BaseCatalogPropertiesMetadata
+ " When omitted the default credential chain is used.",
false /* immutable */,
null /* defaultValue */,
- false /* hidden */))
+ true /* hidden */))
.put(
AWS_GLUE_ENDPOINT,
stringOptionalPropertyEntry(
diff --git
a/catalogs/catalog-hive/src/main/java/org/apache/gravitino/catalog/hive/HiveCatalogPropertiesMetadata.java
b/catalogs/catalog-hive/src/main/java/org/apache/gravitino/catalog/hive/HiveCatalogPropertiesMetadata.java
index 2577bdf944..754da95f78 100644
---
a/catalogs/catalog-hive/src/main/java/org/apache/gravitino/catalog/hive/HiveCatalogPropertiesMetadata.java
+++
b/catalogs/catalog-hive/src/main/java/org/apache/gravitino/catalog/hive/HiveCatalogPropertiesMetadata.java
@@ -23,6 +23,10 @@ import static
org.apache.gravitino.catalog.hive.HiveConstants.HIVE_DEFAULT_CATAL
import com.google.common.collect.ImmutableMap;
import java.util.Map;
+import org.apache.gravitino.cloud.storage.AzurePropertiesMetadata;
+import org.apache.gravitino.cloud.storage.GCSPropertiesMetadata;
+import org.apache.gravitino.cloud.storage.OSSPropertiesMetadata;
+import org.apache.gravitino.cloud.storage.S3PropertiesMetadata;
import org.apache.gravitino.connector.BaseCatalogPropertiesMetadata;
import org.apache.gravitino.connector.PropertyEntry;
import org.apache.gravitino.hive.ClientPropertiesMetadata;
@@ -123,6 +127,10 @@ public class HiveCatalogPropertiesMetadata extends
BaseCatalogPropertiesMetadata
DEFAULT_LIST_ALL_TABLES,
false /* hidden */,
false /* reserved */))
+ .putAll(S3PropertiesMetadata.PROPERTY_ENTRIES)
+ .putAll(OSSPropertiesMetadata.PROPERTY_ENTRIES)
+ .putAll(AzurePropertiesMetadata.PROPERTY_ENTRIES)
+ .putAll(GCSPropertiesMetadata.PROPERTY_ENTRIES)
.putAll(CLIENT_PROPERTIES_METADATA.propertyEntries())
.build();
diff --git
a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/TestHiveCatalogOperations.java
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/TestHiveCatalogOperations.java
index fbc1e8c54b..e0b600fb69 100644
---
a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/TestHiveCatalogOperations.java
+++
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/TestHiveCatalogOperations.java
@@ -88,7 +88,7 @@ class TestHiveCatalogOperations {
Map<String, PropertyEntry<?>> propertyEntryMap =
HIVE_PROPERTIES_METADATA.catalogPropertiesMetadata().propertyEntries();
- Assertions.assertEquals(18, propertyEntryMap.size());
+ Assertions.assertEquals(25, propertyEntryMap.size());
Assertions.assertTrue(propertyEntryMap.containsKey(METASTORE_URIS));
Assertions.assertTrue(propertyEntryMap.containsKey(Catalog.PROPERTY_PACKAGE));
Assertions.assertTrue(propertyEntryMap.containsKey(BaseCatalog.CATALOG_OPERATION_IMPL));
diff --git
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/JdbcCatalog.java
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/JdbcCatalog.java
index 1f6492d90b..55f4b957ed 100644
---
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/JdbcCatalog.java
+++
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/JdbcCatalog.java
@@ -18,14 +18,10 @@
*/
package org.apache.gravitino.catalog.jdbc;
-import com.google.common.collect.Maps;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
-import org.apache.gravitino.Config;
-import org.apache.gravitino.Configs;
-import org.apache.gravitino.GravitinoEnv;
-import org.apache.gravitino.annotation.Evolving;
import org.apache.gravitino.catalog.jdbc.config.JdbcConfig;
import
org.apache.gravitino.catalog.jdbc.converter.JdbcColumnDefaultValueConverter;
import org.apache.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
@@ -37,7 +33,6 @@ import org.apache.gravitino.connector.CatalogOperations;
import org.apache.gravitino.connector.PropertiesMetadata;
import org.apache.gravitino.connector.PropertyEntry;
import org.apache.gravitino.connector.capability.Capability;
-import org.apache.gravitino.credential.CredentialConstants;
import org.apache.gravitino.credential.JdbcCredential;
/** Implementation of an Jdbc catalog in Gravitino. */
@@ -124,61 +119,12 @@ public abstract class JdbcCatalog extends
BaseCatalog<JdbcCatalog> {
}
@Override
- @Evolving
- public Map<String, String> propertiesWithCredentialProviders() {
- // Use raw entity properties so that hidden credentials
(jdbc-user/jdbc-password) are visible
- // to the credential manager even after they are marked hidden in the
properties metadata.
- Map<String, String> properties = Maps.newHashMap(entity().getProperties());
- return applyDefaultCredentialProviders(properties);
- }
-
- /**
- * Returns catalog properties, optionally re-adding hidden JDBC credentials
for backward
- * compatibility with connectors that do not support credential vending. The
backfill behavior is
- * controlled by the server-level config {@code
- * gravitino.catalog.credential.backfillToProperties}; it is disabled by
default and should only
- * be enabled during rolling upgrades.
- *
- * @return the catalog properties map, with credentials backfilled if the
server config is set
- */
- @Override
- public Map<String, String> properties() {
- Map<String, String> props = super.properties();
- if (!shouldBackfillCredential()) {
- return props;
- }
- // Backfill hidden credentials for backward compatibility with connectors
that do not support
- // credential vending.
- Map<String, String> rawProps = entity().getProperties();
- Map<String, String> result = Maps.newHashMap(props);
- String user = rawProps.get(JdbcConfig.USERNAME.getKey());
- String password = rawProps.get(JdbcConfig.PASSWORD.getKey());
- if (user != null) {
- result.put(JdbcConfig.USERNAME.getKey(), user);
- }
- if (password != null) {
- result.put(JdbcConfig.PASSWORD.getKey(), password);
- }
- return result;
- }
-
- private boolean shouldBackfillCredential() {
- Config serverConfig = GravitinoEnv.getInstance().config();
- return serverConfig != null
- && serverConfig.get(Configs.CATALOG_CREDENTIAL_BACKFILL_TO_PROPERTIES);
- }
-
- private Map<String, String> applyDefaultCredentialProviders(Map<String,
String> properties) {
- if
(StringUtils.isNotBlank(properties.get(CredentialConstants.CREDENTIAL_PROVIDERS)))
{
- return properties;
- }
-
+ protected void addCatalogSpecificCredentialProviders(
+ Map<String, String> properties, List<String> credentialProviders) {
String jdbcUser = properties.get(JdbcConfig.USERNAME.getKey());
String jdbcPassword = properties.get(JdbcConfig.PASSWORD.getKey());
- if (StringUtils.isNotBlank(jdbcUser) &&
StringUtils.isNotBlank(jdbcPassword)) {
- properties.put(CredentialConstants.CREDENTIAL_PROVIDERS,
JdbcCredential.JDBC_CREDENTIAL_TYPE);
+ if (StringUtils.isNotBlank(jdbcUser) && jdbcPassword != null) {
+ credentialProviders.add(JdbcCredential.JDBC_CREDENTIAL_TYPE);
}
-
- return properties;
}
}
diff --git
a/catalogs/catalog-jdbc-common/src/test/java/org/apache/gravitino/catalog/jdbc/TestJdbcCatalogCredential.java
b/catalogs/catalog-jdbc-common/src/test/java/org/apache/gravitino/catalog/jdbc/TestJdbcCatalogCredential.java
index 3e81356b71..cedf2e6ee5 100644
---
a/catalogs/catalog-jdbc-common/src/test/java/org/apache/gravitino/catalog/jdbc/TestJdbcCatalogCredential.java
+++
b/catalogs/catalog-jdbc-common/src/test/java/org/apache/gravitino/catalog/jdbc/TestJdbcCatalogCredential.java
@@ -109,6 +109,38 @@ public class TestJdbcCatalogCredential {
Assertions.assertEquals(JdbcCredential.JDBC_CREDENTIAL_TYPE,
credentialProviders);
}
+ @Test
+ void testJdbcCatalogDefaultCredentialProvidersWithEmptyPassword() {
+ AuditInfo auditInfo =
+
AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build();
+
+ // Empty-string password (e.g. StarRocks default) should still register
the credential provider
+ Map<String, String> jdbcProps = Maps.newHashMap();
+ jdbcProps.put(JdbcConfig.JDBC_URL.getKey(),
"jdbc:mysql://localhost:9030/");
+ jdbcProps.put(JdbcConfig.JDBC_DRIVER.getKey(), "com.mysql.cj.jdbc.Driver");
+ jdbcProps.put(JdbcConfig.USERNAME.getKey(), "root");
+ jdbcProps.put(JdbcConfig.PASSWORD.getKey(), "");
+
+ CatalogEntity jdbcEntity =
+ CatalogEntity.builder()
+ .withId(7L)
+ .withName("jdbc-catalog-empty-password")
+ .withNamespace(Namespace.of("metalake"))
+ .withType(TestableJdbcCatalog.Type.RELATIONAL)
+ .withProvider("jdbc-starrocks")
+ .withAuditInfo(auditInfo)
+ .withProperties(jdbcProps)
+ .build();
+
+ TestableJdbcCatalog jdbcCatalog = new TestableJdbcCatalog();
+ jdbcCatalog.withCatalogConf(jdbcProps).withCatalogEntity(jdbcEntity);
+ Map<String, String> properties =
jdbcCatalog.propertiesWithCredentialProviders();
+
+ String credentialProviders =
properties.get(CredentialConstants.CREDENTIAL_PROVIDERS);
+ Assertions.assertNotNull(credentialProviders);
+ Assertions.assertEquals(JdbcCredential.JDBC_CREDENTIAL_TYPE,
credentialProviders);
+ }
+
@Test
void testJdbcCatalogNoCredentialProvidersWithoutPassword() {
AuditInfo auditInfo =
diff --git
a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalog.java
b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalog.java
index 6b6b375596..f7dc4acc06 100644
---
a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalog.java
+++
b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalog.java
@@ -18,8 +18,6 @@
*/
package org.apache.gravitino.catalog.lakehouse.iceberg;
-import com.google.common.collect.Maps;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
@@ -93,40 +91,25 @@ public class IcebergCatalog extends
BaseCatalog<IcebergCatalog> {
@Override
@Evolving
public Map<String, String> propertiesWithCredentialProviders() {
- Map<String, String> properties =
Maps.newHashMap(super.propertiesWithCredentialProviders());
- // Iceberg is security-first: the vended s3:ListBucket statement keeps the
bare location prefix
- // disabled so a credential cannot enumerate sibling keys sharing the
location prefix. This is
- // determined by the catalog type and is not meant to be configured by
users.
- properties.put(CredentialConstants.S3_CREDENTIAL_LIST_LOCATION_PREFIX,
"false");
- return applyDefaultCredentialProviders(properties);
+ Map<String, String> props = super.propertiesWithCredentialProviders();
+ // Iceberg is security-first: disable s3:ListBucket on bare location
prefix so a vended
+ // credential cannot enumerate sibling keys. This is catalog-type policy,
not user-configurable.
+ props.put(CredentialConstants.S3_CREDENTIAL_LIST_LOCATION_PREFIX, "false");
+ return props;
}
- private Map<String, String> applyDefaultCredentialProviders(Map<String,
String> properties) {
- // If credential providers already set, return as is
- if
(StringUtils.isNotBlank(properties.get(CredentialConstants.CREDENTIAL_PROVIDERS)))
{
- return properties;
- }
-
- List<String> credentialProviders = new ArrayList<>();
-
- // Add JDBC credential provider if backend is JDBC and
jdbc-user/jdbc-password are set
+ @Override
+ protected void addCatalogSpecificCredentialProviders(
+ Map<String, String> properties, List<String> credentialProviders) {
String catalogBackend = properties.get(IcebergConstants.CATALOG_BACKEND);
if (catalogBackend != null
&& IcebergCatalogBackend.JDBC.name().equalsIgnoreCase(catalogBackend))
{
String jdbcUser = properties.get(IcebergConstants.GRAVITINO_JDBC_USER);
String jdbcPassword =
properties.get(IcebergConstants.GRAVITINO_JDBC_PASSWORD);
- if (StringUtils.isNotBlank(jdbcUser) &&
StringUtils.isNotBlank(jdbcPassword)) {
+ if (StringUtils.isNotBlank(jdbcUser) && jdbcPassword != null) {
credentialProviders.add(JdbcCredential.JDBC_CREDENTIAL_TYPE);
}
}
-
addStorageCredentialProviders(properties, credentialProviders);
-
- if (!credentialProviders.isEmpty()) {
- properties.put(
- CredentialConstants.CREDENTIAL_PROVIDERS, String.join(",",
credentialProviders));
- }
-
- return properties;
}
}
diff --git
a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogPropertiesMetadata.java
b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogPropertiesMetadata.java
index 2f7900ce8b..7b1466a341 100644
---
a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogPropertiesMetadata.java
+++
b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogPropertiesMetadata.java
@@ -29,14 +29,15 @@ import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.gravitino.cloud.storage.AzurePropertiesMetadata;
+import org.apache.gravitino.cloud.storage.GCSPropertiesMetadata;
+import org.apache.gravitino.cloud.storage.OSSPropertiesMetadata;
+import org.apache.gravitino.cloud.storage.S3PropertiesMetadata;
import org.apache.gravitino.connector.BaseCatalogPropertiesMetadata;
import org.apache.gravitino.connector.PropertyEntry;
import org.apache.gravitino.iceberg.common.authentication.AuthenticationConfig;
import
org.apache.gravitino.iceberg.common.authentication.kerberos.KerberosConfig;
import org.apache.gravitino.iceberg.common.cache.LocalTableMetadataCache;
-import org.apache.gravitino.storage.AzureProperties;
-import org.apache.gravitino.storage.OSSProperties;
-import org.apache.gravitino.storage.S3Properties;
public class IcebergCatalogPropertiesMetadata extends
BaseCatalogPropertiesMetadata {
public static final String CATALOG_BACKEND =
IcebergConstants.CATALOG_BACKEND;
@@ -89,41 +90,17 @@ public class IcebergCatalogPropertiesMetadata extends
BaseCatalogPropertiesMetad
null /* defaultValue */,
false /* hidden */),
stringOptionalPropertyEntry(
- S3Properties.GRAVITINO_S3_ACCESS_KEY_ID,
- "s3 access key ID",
+ GRAVITINO_JDBC_USER,
+ "JDBC user for Iceberg JDBC backend",
false /* immutable */,
null /* defaultValue */,
- false /* hidden */),
- stringOptionalPropertyEntry(
- S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY,
- "s3 secret access key",
- false /* immutable */,
- null /* defaultValue */,
- false /* hidden */),
- stringOptionalPropertyEntry(
- OSSProperties.GRAVITINO_OSS_ACCESS_KEY_ID,
- "OSS access key ID",
- false /* immutable */,
- null /* defaultValue */,
- false /* hidden */),
+ true /* hidden */),
stringOptionalPropertyEntry(
- OSSProperties.GRAVITINO_OSS_ACCESS_KEY_SECRET,
- "OSS access key secret",
+ GRAVITINO_JDBC_PASSWORD,
+ "JDBC password for Iceberg JDBC backend",
false /* immutable */,
null /* defaultValue */,
- false /* hidden */),
- stringOptionalPropertyEntry(
- AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_NAME,
- "Azure storage account name",
- false /* immutable */,
- null /* defaultValue */,
- false /* hidden */),
- stringOptionalPropertyEntry(
- AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_KEY,
- "Azure storage account key",
- false /* immutable */,
- null /* defaultValue */,
- false /* hidden */),
+ true /* hidden */),
stringOptionalPropertyEntry(
IcebergConstants.TABLE_METADATA_CACHE_IMPL,
"Table metadata cache implementation. Set to empty
string(\"\") if "
@@ -165,6 +142,10 @@ public class IcebergCatalogPropertiesMetadata extends
BaseCatalogPropertiesMetad
false /* hidden */));
HashMap<String, PropertyEntry<?>> result = Maps.newHashMap();
result.putAll(Maps.uniqueIndex(propertyEntries, PropertyEntry::getName));
+ result.putAll(S3PropertiesMetadata.PROPERTY_ENTRIES);
+ result.putAll(OSSPropertiesMetadata.PROPERTY_ENTRIES);
+ result.putAll(AzurePropertiesMetadata.PROPERTY_ENTRIES);
+ result.putAll(GCSPropertiesMetadata.PROPERTY_ENTRIES);
result.putAll(KerberosConfig.KERBEROS_PROPERTY_ENTRIES);
result.putAll(AuthenticationConfig.AUTHENTICATION_PROPERTY_ENTRIES);
PROPERTIES_METADATA = ImmutableMap.copyOf(result);
diff --git
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalog.java
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalog.java
index 34c327bc41..3bf8d7e244 100644
---
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalog.java
+++
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalog.java
@@ -18,18 +18,14 @@
*/
package org.apache.gravitino.catalog.lakehouse.paimon;
-import com.google.common.collect.Maps;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
-import org.apache.gravitino.annotation.Evolving;
import org.apache.gravitino.connector.BaseCatalog;
import org.apache.gravitino.connector.CatalogOperations;
import org.apache.gravitino.connector.PropertiesMetadata;
import org.apache.gravitino.connector.capability.Capability;
-import org.apache.gravitino.credential.CredentialConstants;
import org.apache.gravitino.credential.JdbcCredential;
import org.apache.gravitino.rel.ViewCatalog;
@@ -92,38 +88,17 @@ public class PaimonCatalog extends
BaseCatalog<PaimonCatalog> {
}
@Override
- @Evolving
- public Map<String, String> propertiesWithCredentialProviders() {
- Map<String, String> properties =
Maps.newHashMap(super.propertiesWithCredentialProviders());
- return applyDefaultCredentialProviders(properties);
- }
-
- private Map<String, String> applyDefaultCredentialProviders(Map<String,
String> properties) {
- // If credential providers already set, return as is
- if
(StringUtils.isNotBlank(properties.get(CredentialConstants.CREDENTIAL_PROVIDERS)))
{
- return properties;
- }
-
- List<String> credentialProviders = new ArrayList<>();
-
- // Add JDBC credential provider if backend is JDBC and
jdbc-user/jdbc-password are set
+ protected void addCatalogSpecificCredentialProviders(
+ Map<String, String> properties, List<String> credentialProviders) {
String catalogBackend = properties.get(PaimonConstants.CATALOG_BACKEND);
if (catalogBackend != null
&& PaimonCatalogBackend.JDBC.name().equalsIgnoreCase(catalogBackend)) {
String jdbcUser = properties.get(PaimonConstants.GRAVITINO_JDBC_USER);
String jdbcPassword =
properties.get(PaimonConstants.GRAVITINO_JDBC_PASSWORD);
- if (StringUtils.isNotBlank(jdbcUser) &&
StringUtils.isNotBlank(jdbcPassword)) {
+ if (StringUtils.isNotBlank(jdbcUser) && jdbcPassword != null) {
credentialProviders.add(JdbcCredential.JDBC_CREDENTIAL_TYPE);
}
}
-
addStorageCredentialProviders(properties, credentialProviders);
-
- if (!credentialProviders.isEmpty()) {
- properties.put(
- CredentialConstants.CREDENTIAL_PROVIDERS, String.join(",",
credentialProviders));
- }
-
- return properties;
}
}
diff --git
a/common/src/main/java/org/apache/gravitino/credential/CredentialPropertyUtils.java
b/common/src/main/java/org/apache/gravitino/credential/CredentialPropertyUtils.java
index d9543b93d3..734c991d28 100644
---
a/common/src/main/java/org/apache/gravitino/credential/CredentialPropertyUtils.java
+++
b/common/src/main/java/org/apache/gravitino/credential/CredentialPropertyUtils.java
@@ -27,12 +27,17 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import org.apache.gravitino.Catalog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Helper class to generate specific credential properties for different table
format and engine.
*/
public class CredentialPropertyUtils {
+ private static final Logger LOG =
LoggerFactory.getLogger(CredentialPropertyUtils.class);
+
@VisibleForTesting static final String ICEBERG_S3_ACCESS_KEY_ID =
"s3.access-key-id";
@VisibleForTesting static final String ICEBERG_S3_SECRET_ACCESS_KEY =
"s3.secret-access-key";
@VisibleForTesting static final String ICEBERG_S3_TOKEN = "s3.session-token";
@@ -76,6 +81,14 @@ public class CredentialPropertyUtils {
static final String ICEBERG_GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT =
"gcs.oauth2.refresh-credentials-endpoint";
+ @VisibleForTesting static final String ICEBERG_JDBC_USER = "jdbc.user";
+ @VisibleForTesting static final String ICEBERG_JDBC_PASSWORD =
"jdbc.password";
+
+ private static final String PAIMON_S3_ACCESS_KEY = "s3.access-key";
+ private static final String PAIMON_S3_SECRET_KEY = "s3.secret-key";
+ private static final String PAIMON_OSS_ACCESS_KEY_ID = "fs.oss.accessKeyId";
+ private static final String PAIMON_OSS_ACCESS_KEY_SECRET =
"fs.oss.accessKeySecret";
+
private static Map<String, String> icebergCredentialPropertyMap =
ImmutableMap.<String, String>builder()
.put(GCSTokenCredential.GCS_TOKEN_NAME, ICEBERG_GCS_TOKEN)
@@ -100,6 +113,78 @@ public class CredentialPropertyUtils {
.put(AwsIrsaCredential.SESSION_TOKEN, ICEBERG_S3_TOKEN)
.build();
+ /**
+ * Returns credentials from the given catalog, or an empty array if the
catalog does not support
+ * credential vending.
+ *
+ * @param catalog the Gravitino catalog client
+ * @return credentials, never null
+ */
+ public static Credential[] getCredentials(Catalog catalog) {
+ try {
+ return catalog.supportsCredentials().getCredentials();
+ } catch (UnsupportedOperationException e) {
+ LOG.debug("Catalog does not support credential vending, skipping
injection", e);
+ return new Credential[0];
+ }
+ }
+
+ /**
+ * Injects vended credentials into Iceberg catalog properties. JDBC
credentials are applied
+ * directly; all other credential types are converted via {@link
+ * #toIcebergProperties(Credential)}.
+ *
+ * @param credentials the credentials to apply
+ * @param props the mutable properties map to update
+ */
+ public static void applyIcebergCredentials(Credential[] credentials,
Map<String, String> props) {
+ for (Credential credential : credentials) {
+ if (credential instanceof JdbcCredential) {
+ JdbcCredential jdbc = (JdbcCredential) credential;
+ props.put(ICEBERG_JDBC_USER, jdbc.jdbcUser());
+ props.put(ICEBERG_JDBC_PASSWORD, jdbc.jdbcPassword());
+ } else {
+ Map<String, String> converted = toIcebergProperties(credential);
+ if (converted.isEmpty()) {
+ LOG.warn(
+ "Received unrecognized credential type '{}' for Iceberg catalog,
skipping",
+ credential.getClass().getName());
+ } else {
+ props.putAll(converted);
+ }
+ }
+ }
+ }
+
+ /**
+ * Injects vended credentials into Paimon catalog properties. Supports JDBC,
S3, and OSS
+ * credential types.
+ *
+ * @param credentials the credentials to apply
+ * @param props the mutable properties map to update
+ */
+ public static void applyPaimonCredentials(Credential[] credentials,
Map<String, String> props) {
+ for (Credential credential : credentials) {
+ if (credential instanceof JdbcCredential) {
+ JdbcCredential jdbc = (JdbcCredential) credential;
+ props.put(ICEBERG_JDBC_USER, jdbc.jdbcUser());
+ props.put(ICEBERG_JDBC_PASSWORD, jdbc.jdbcPassword());
+ } else if (credential instanceof S3SecretKeyCredential) {
+ S3SecretKeyCredential s3 = (S3SecretKeyCredential) credential;
+ props.put(PAIMON_S3_ACCESS_KEY, s3.accessKeyId());
+ props.put(PAIMON_S3_SECRET_KEY, s3.secretAccessKey());
+ } else if (credential instanceof OSSSecretKeyCredential) {
+ OSSSecretKeyCredential oss = (OSSSecretKeyCredential) credential;
+ props.put(PAIMON_OSS_ACCESS_KEY_ID, oss.accessKeyId());
+ props.put(PAIMON_OSS_ACCESS_KEY_SECRET, oss.secretAccessKey());
+ } else {
+ LOG.warn(
+ "Received unrecognized credential type '{}' for Paimon catalog,
skipping",
+ credential.getClass().getName());
+ }
+ }
+ }
+
/**
* Transforms a specific credential into a map of Iceberg properties.
*
diff --git
a/core/src/main/java/org/apache/gravitino/cloud/storage/AzurePropertiesMetadata.java
b/core/src/main/java/org/apache/gravitino/cloud/storage/AzurePropertiesMetadata.java
new file mode 100644
index 0000000000..c6adb5a2d3
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/cloud/storage/AzurePropertiesMetadata.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.cloud.storage;
+
+import static
org.apache.gravitino.connector.PropertyEntry.stringOptionalPropertyEntry;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.connector.PropertyEntry;
+import org.apache.gravitino.storage.AzureProperties;
+
+/** Shared Azure credential {@link PropertyEntry} definitions for catalog
properties metadata. */
+public class AzurePropertiesMetadata {
+
+ public static final Map<String, PropertyEntry<?>> PROPERTY_ENTRIES =
+ ImmutableMap.<String, PropertyEntry<?>>builder()
+ .put(
+ AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_NAME,
+ stringOptionalPropertyEntry(
+ AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_NAME,
+ "Azure storage account name",
+ false /* immutable */,
+ null /* defaultValue */,
+ true /* hidden */))
+ .put(
+ AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_KEY,
+ stringOptionalPropertyEntry(
+ AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_KEY,
+ "Azure storage account key",
+ false /* immutable */,
+ null /* defaultValue */,
+ true /* hidden */))
+ .build();
+
+ private AzurePropertiesMetadata() {}
+}
diff --git
a/core/src/main/java/org/apache/gravitino/cloud/storage/GCSPropertiesMetadata.java
b/core/src/main/java/org/apache/gravitino/cloud/storage/GCSPropertiesMetadata.java
new file mode 100644
index 0000000000..36a66e0972
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/cloud/storage/GCSPropertiesMetadata.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.cloud.storage;
+
+import static
org.apache.gravitino.connector.PropertyEntry.stringOptionalPropertyEntry;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.connector.PropertyEntry;
+import org.apache.gravitino.storage.GCSProperties;
+
+/** Shared GCS credential {@link PropertyEntry} definitions for catalog
properties metadata. */
+public class GCSPropertiesMetadata {
+
+ public static final Map<String, PropertyEntry<?>> PROPERTY_ENTRIES =
+ ImmutableMap.<String, PropertyEntry<?>>builder()
+ .put(
+ GCSProperties.GRAVITINO_GCS_SERVICE_ACCOUNT_FILE,
+ stringOptionalPropertyEntry(
+ GCSProperties.GRAVITINO_GCS_SERVICE_ACCOUNT_FILE,
+ "GCS service account file path",
+ false /* immutable */,
+ null /* defaultValue */,
+ false /* hidden */))
+ .build();
+
+ private GCSPropertiesMetadata() {}
+}
diff --git
a/core/src/main/java/org/apache/gravitino/cloud/storage/OSSPropertiesMetadata.java
b/core/src/main/java/org/apache/gravitino/cloud/storage/OSSPropertiesMetadata.java
new file mode 100644
index 0000000000..43e1897786
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/cloud/storage/OSSPropertiesMetadata.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.cloud.storage;
+
+import static
org.apache.gravitino.connector.PropertyEntry.stringOptionalPropertyEntry;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.connector.PropertyEntry;
+import org.apache.gravitino.storage.OSSProperties;
+
+/** Shared OSS credential {@link PropertyEntry} definitions for catalog
properties metadata. */
+public class OSSPropertiesMetadata {
+
+ public static final Map<String, PropertyEntry<?>> PROPERTY_ENTRIES =
+ ImmutableMap.<String, PropertyEntry<?>>builder()
+ .put(
+ OSSProperties.GRAVITINO_OSS_ACCESS_KEY_ID,
+ stringOptionalPropertyEntry(
+ OSSProperties.GRAVITINO_OSS_ACCESS_KEY_ID,
+ "OSS access key ID",
+ false /* immutable */,
+ null /* defaultValue */,
+ true /* hidden */))
+ .put(
+ OSSProperties.GRAVITINO_OSS_ACCESS_KEY_SECRET,
+ stringOptionalPropertyEntry(
+ OSSProperties.GRAVITINO_OSS_ACCESS_KEY_SECRET,
+ "OSS access key secret",
+ false /* immutable */,
+ null /* defaultValue */,
+ true /* hidden */))
+ .build();
+
+ private OSSPropertiesMetadata() {}
+}
diff --git
a/core/src/main/java/org/apache/gravitino/cloud/storage/S3PropertiesMetadata.java
b/core/src/main/java/org/apache/gravitino/cloud/storage/S3PropertiesMetadata.java
new file mode 100644
index 0000000000..8879d4b551
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/cloud/storage/S3PropertiesMetadata.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.cloud.storage;
+
+import static
org.apache.gravitino.connector.PropertyEntry.stringOptionalPropertyEntry;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.connector.PropertyEntry;
+import org.apache.gravitino.storage.S3Properties;
+
+/** Shared S3 credential {@link PropertyEntry} definitions for catalog
properties metadata. */
+public class S3PropertiesMetadata {
+
+ public static final Map<String, PropertyEntry<?>> PROPERTY_ENTRIES =
+ ImmutableMap.<String, PropertyEntry<?>>builder()
+ .put(
+ S3Properties.GRAVITINO_S3_ACCESS_KEY_ID,
+ stringOptionalPropertyEntry(
+ S3Properties.GRAVITINO_S3_ACCESS_KEY_ID,
+ "S3 access key ID",
+ false /* immutable */,
+ null /* defaultValue */,
+ true /* hidden */))
+ .put(
+ S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY,
+ stringOptionalPropertyEntry(
+ S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY,
+ "S3 secret access key",
+ false /* immutable */,
+ null /* defaultValue */,
+ true /* hidden */))
+ .build();
+
+ private S3PropertiesMetadata() {}
+}
diff --git a/core/src/main/java/org/apache/gravitino/connector/BaseCatalog.java
b/core/src/main/java/org/apache/gravitino/connector/BaseCatalog.java
index 947474008f..4ee0b9c794 100644
--- a/core/src/main/java/org/apache/gravitino/connector/BaseCatalog.java
+++ b/core/src/main/java/org/apache/gravitino/connector/BaseCatalog.java
@@ -24,6 +24,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.io.Closeable;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -31,18 +32,24 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Audit;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.CatalogProvider;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.annotation.Evolving;
import org.apache.gravitino.connector.authorization.AuthorizationPlugin;
import org.apache.gravitino.connector.authorization.BaseAuthorization;
import org.apache.gravitino.connector.capability.Capability;
import org.apache.gravitino.credential.AzureAccountKeyCredential;
import org.apache.gravitino.credential.CatalogCredentialManager;
+import org.apache.gravitino.credential.CredentialConstants;
+import org.apache.gravitino.credential.GCSTokenCredential;
import org.apache.gravitino.credential.OSSSecretKeyCredential;
import org.apache.gravitino.credential.S3SecretKeyCredential;
import org.apache.gravitino.exceptions.CatalogNotInUseException;
import org.apache.gravitino.exceptions.MetalakeNotInUseException;
import org.apache.gravitino.meta.CatalogEntity;
import org.apache.gravitino.storage.AzureProperties;
+import org.apache.gravitino.storage.GCSProperties;
import org.apache.gravitino.storage.OSSProperties;
import org.apache.gravitino.storage.S3Properties;
import org.apache.gravitino.utils.IsolatedClassLoader;
@@ -451,30 +458,62 @@ public abstract class BaseCatalog<T extends BaseCatalog>
}
}
}
- return properties;
+ if (!shouldBackfillCredential()) {
+ return properties;
+ }
+ Map<String, String> result = Maps.newHashMap(properties);
+ result.putAll(propertiesWithCredentialProviders());
+ return result;
}
/**
- * Retrieves the properties of the catalog including credential providers.
Subclasses should
- * override this method to inject auto-detected credential provider names
into the properties map
- * before the {@link CatalogCredentialManager} is initialized. The default
implementation returns
- * {@link #properties()} unchanged.
+ * Retrieves the properties of the catalog including credential providers.
Detects storage and
+ * catalog-specific credential providers from the raw entity properties
(including hidden ones)
+ * and injects them before {@link CatalogCredentialManager} is initialized.
Subclasses may
+ * override {@link #addCatalogSpecificCredentialProviders} to add additional
providers.
*
- * @return A map of properties including credential providers.
+ * @return A map of raw properties with credential providers set.
*/
public Map<String, String> propertiesWithCredentialProviders() {
- return properties();
+ Map<String, String> props = Maps.newHashMap(entity().getProperties());
+ if
(StringUtils.isNotBlank(props.get(CredentialConstants.CREDENTIAL_PROVIDERS))) {
+ return props;
+ }
+ List<String> credentialProviders = new ArrayList<>();
+ addCatalogSpecificCredentialProviders(props, credentialProviders);
+ if (!credentialProviders.isEmpty()) {
+ props.put(CredentialConstants.CREDENTIAL_PROVIDERS, String.join(",",
credentialProviders));
+ }
+ return props;
}
/**
- * Detects storage credential providers (S3, OSS, Azure) from catalog
properties and appends them
- * to the provided list. Subclasses can call this method in their {@link
- * #propertiesWithCredentialProviders()} implementation to avoid duplicating
storage credential
- * detection logic.
+ * Detects credential providers for this catalog type and appends them to
{@code
+ * credentialProviders}. The default implementation calls {@link
+ * #addStorageCredentialProviders(Map, List)} to detect S3/OSS/Azure/GCS
credentials. Subclasses
+ * override this to add catalog-specific providers (e.g., JDBC).
*
- * @param properties The catalog properties map to scan for storage
credentials.
- * @param credentialProviders The list to append detected storage credential
providers to.
+ * @param properties the raw catalog properties
+ * @param credentialProviders the list to append detected provider names to
*/
+ protected void addCatalogSpecificCredentialProviders(
+ Map<String, String> properties, List<String> credentialProviders) {
+ addStorageCredentialProviders(properties, credentialProviders);
+ }
+
+ /**
+ * Returns whether hidden credentials should be backfilled into catalog
properties for backward
+ * compatibility with connectors that do not support credential vending.
Controlled by
+ * server-level config {@code
gravitino.catalog.credential.backfillToProperties}.
+ *
+ * @return true if backfill is enabled
+ */
+ protected boolean shouldBackfillCredential() {
+ Config serverConfig = GravitinoEnv.getInstance().config();
+ return serverConfig != null
+ &&
Boolean.TRUE.equals(serverConfig.get(Configs.CATALOG_CREDENTIAL_BACKFILL_TO_PROPERTIES));
+ }
+
@Evolving
protected void addStorageCredentialProviders(
Map<String, String> properties, List<String> credentialProviders) {
@@ -495,6 +534,11 @@ public abstract class BaseCatalog<T extends BaseCatalog>
if (StringUtils.isNotBlank(azureAccountName) &&
StringUtils.isNotBlank(azureAccountKey)) {
credentialProviders.add(AzureAccountKeyCredential.AZURE_ACCOUNT_KEY_CREDENTIAL_TYPE);
}
+
+ String gcsServiceAccountFile =
properties.get(GCSProperties.GRAVITINO_GCS_SERVICE_ACCOUNT_FILE);
+ if (StringUtils.isNotBlank(gcsServiceAccountFile)) {
+ credentialProviders.add(GCSTokenCredential.GCS_TOKEN_CREDENTIAL_TYPE);
+ }
}
@Override
diff --git
a/core/src/main/java/org/apache/gravitino/credential/JdbcCredentialProvider.java
b/core/src/main/java/org/apache/gravitino/credential/JdbcCredentialProvider.java
index 834a968e37..0eeee58cfb 100644
---
a/core/src/main/java/org/apache/gravitino/credential/JdbcCredentialProvider.java
+++
b/core/src/main/java/org/apache/gravitino/credential/JdbcCredentialProvider.java
@@ -53,7 +53,7 @@ public class JdbcCredentialProvider implements
CredentialProvider {
@Nullable
@Override
public Credential getCredential(CredentialContext context) {
- if (StringUtils.isBlank(jdbcUser) || StringUtils.isBlank(jdbcPassword)) {
+ if (StringUtils.isBlank(jdbcUser) || jdbcPassword == null) {
return null;
}
return new JdbcCredential(jdbcUser, jdbcPassword);
diff --git
a/core/src/test/java/org/apache/gravitino/credential/TestJdbcCredentialProvider.java
b/core/src/test/java/org/apache/gravitino/credential/TestJdbcCredentialProvider.java
index 4f076c3bf1..cdac88fff1 100644
---
a/core/src/test/java/org/apache/gravitino/credential/TestJdbcCredentialProvider.java
+++
b/core/src/test/java/org/apache/gravitino/credential/TestJdbcCredentialProvider.java
@@ -143,7 +143,7 @@ public class TestJdbcCredentialProvider {
}
@Test
- void testEmptyPasswordReturnsEmptyOptional() {
+ void testEmptyPasswordReturnsCredential() {
Map<String, String> catalogProperties =
ImmutableMap.of(
JdbcCredential.GRAVITINO_JDBC_USER,
@@ -157,7 +157,8 @@ public class TestJdbcCredentialProvider {
CatalogCredentialContext context = new
CatalogCredentialContext("test-user");
Optional<Credential> credential =
credentialProvider.getCredentialOptional(context);
- Assertions.assertFalse(credential.isPresent());
+ Assertions.assertTrue(credential.isPresent());
+ Assertions.assertEquals("", ((JdbcCredential)
credential.get()).jdbcPassword());
}
@Test
@@ -167,8 +168,8 @@ public class TestJdbcCredentialProvider {
}
@Test
- void testJdbcCredentialConstructorBlankPasswordThrows() {
- Assertions.assertThrows(IllegalArgumentException.class, () -> new
JdbcCredential("user", ""));
+ void testJdbcCredentialConstructorNullPasswordThrows() {
+ Assertions.assertThrows(IllegalArgumentException.class, () -> new
JdbcCredential("user", null));
}
@Test
diff --git
a/flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalog.java
b/flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalog.java
index 83d2acb11b..feb6362fae 100644
---
a/flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalog.java
+++
b/flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalog.java
@@ -40,8 +40,15 @@ import
org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.factories.Factory;
+import org.apache.gravitino.Catalog;
import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.credential.AzureAccountKeyCredential;
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.CredentialPropertyUtils;
+import org.apache.gravitino.credential.OSSSecretKeyCredential;
+import org.apache.gravitino.credential.S3SecretKeyCredential;
import org.apache.gravitino.exceptions.ForbiddenException;
+import org.apache.gravitino.exceptions.NoSuchCatalogException;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.NoSuchTableException;
import org.apache.gravitino.exceptions.TableAlreadyExistsException;
@@ -55,6 +62,7 @@ import
org.apache.gravitino.rel.expressions.distributions.Distributions;
import org.apache.gravitino.rel.expressions.sorts.SortOrder;
import org.apache.gravitino.rel.expressions.transforms.Transform;
import org.apache.gravitino.rel.indexes.Index;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -89,6 +97,43 @@ public class GravitinoHiveCatalog extends BaseCatalog {
return hiveCatalog.getHiveConf();
}
+ @Override
+ public void open() throws CatalogException {
+ try {
+ applyS3Credential(catalog(), hiveCatalog.getHiveConf());
+ } catch (NoSuchCatalogException e) {
+ LOG.warn(
+ "Catalog '{}' not found in Gravitino during open(); credential
injection skipped."
+ + " This is expected during CREATE CATALOG.",
+ catalogName(),
+ e);
+ }
+ super.open();
+ }
+
+ static void applyS3Credential(Catalog catalog, Configuration conf) {
+ for (Credential credential :
CredentialPropertyUtils.getCredentials(catalog)) {
+ if (credential instanceof S3SecretKeyCredential) {
+ S3SecretKeyCredential s3 = (S3SecretKeyCredential) credential;
+ conf.set("fs.s3a.access.key", s3.accessKeyId());
+ conf.set("fs.s3a.secret.key", s3.secretAccessKey());
+ } else if (credential instanceof OSSSecretKeyCredential) {
+ OSSSecretKeyCredential oss = (OSSSecretKeyCredential) credential;
+ conf.set("fs.oss.accessKeyId", oss.accessKeyId());
+ conf.set("fs.oss.accessKeySecret", oss.secretAccessKey());
+ } else if (credential instanceof AzureAccountKeyCredential) {
+ AzureAccountKeyCredential azure = (AzureAccountKeyCredential)
credential;
+ conf.set(
+ String.format("fs.azure.account.key.%s.dfs.core.windows.net",
azure.accountName()),
+ azure.accountKey());
+ } else {
+ LOG.warn(
+ "Received unrecognized credential type '{}' for Hive catalog,
skipping",
+ credential.getClass().getName());
+ }
+ }
+ }
+
@Override
public Optional<Factory> getFactory() {
return hiveCatalog.getFactory();
diff --git
a/flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalog.java
b/flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalog.java
index 1befaddd64..7bf33b5e1c 100644
---
a/flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalog.java
+++
b/flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalog.java
@@ -18,21 +18,31 @@
*/
package org.apache.gravitino.flink.connector.iceberg;
+import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.table.catalog.AbstractCatalog;
-import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.util.Preconditions;
+import org.apache.gravitino.credential.CredentialPropertyUtils;
+import org.apache.gravitino.exceptions.NoSuchCatalogException;
import org.apache.gravitino.flink.connector.PartitionConverter;
import org.apache.gravitino.flink.connector.SchemaAndTablePropertiesConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalog;
import org.apache.iceberg.flink.FlinkCatalogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** Gravitino Iceberg Catalog. */
public class GravitinoIcebergCatalog extends BaseCatalog {
- private final AbstractCatalog icebergCatalog;
+ private static final Logger LOG =
LoggerFactory.getLogger(GravitinoIcebergCatalog.class);
+
+ private final String icebergCatalogName;
+ // Mutable copy so credential injection in open() propagates to the inner
Iceberg catalog.
+ private final Map<String, String> mutableIcebergProperties;
+ private AbstractCatalog icebergCatalog;
protected GravitinoIcebergCatalog(
String catalogName,
@@ -47,9 +57,8 @@ public class GravitinoIcebergCatalog extends BaseCatalog {
defaultDatabase,
schemaAndTablePropertiesConverter,
partitionConverter);
- this.icebergCatalog =
- asAbstractCatalog(
- new FlinkCatalogFactory().createCatalog(catalogName,
icebergCatalogProperties));
+ this.icebergCatalogName = catalogName;
+ this.mutableIcebergProperties = new HashMap<>(icebergCatalogProperties);
}
protected GravitinoIcebergCatalog(
@@ -65,20 +74,45 @@ public class GravitinoIcebergCatalog extends BaseCatalog {
defaultDatabase,
schemaAndTablePropertiesConverter,
partitionConverter);
+ this.icebergCatalogName = catalogName;
+ this.mutableIcebergProperties = null;
this.icebergCatalog = icebergCatalog;
}
+ @Override
+ public void open() throws CatalogException {
+ if (icebergCatalog == null) {
+ try {
+ CredentialPropertyUtils.applyIcebergCredentials(
+ CredentialPropertyUtils.getCredentials(catalog()),
mutableIcebergProperties);
+ } catch (NoSuchCatalogException e) {
+ LOG.warn(
+ "Catalog '{}' not found in Gravitino during open(); credential
injection skipped."
+ + " This is expected during CREATE CATALOG.",
+ catalogName(),
+ e);
+ }
+ this.icebergCatalog =
+ asAbstractCatalog(
+ new FlinkCatalogFactory()
+ .createCatalog(icebergCatalogName,
mutableIcebergProperties));
+ }
+ super.open();
+ }
+
@Override
public Optional<Factory> getFactory() {
+ Preconditions.checkState(icebergCatalog != null, "Catalog '%s' has not
been opened", getName());
return icebergCatalog.getFactory();
}
@Override
protected AbstractCatalog realCatalog() {
+ Preconditions.checkState(icebergCatalog != null, "Catalog '%s' has not
been opened", getName());
return icebergCatalog;
}
- protected static AbstractCatalog asAbstractCatalog(Catalog catalog) {
+ protected static AbstractCatalog asAbstractCatalog(Object catalog) {
Preconditions.checkState(
catalog instanceof AbstractCatalog,
"Expected AbstractCatalog from FlinkCatalogFactory but got %s.",
diff --git
a/flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java
b/flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java
index aa9edabd84..5290490f04 100644
---
a/flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java
+++
b/flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogTable;
@@ -39,6 +40,8 @@ import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.Factory;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants;
+import org.apache.gravitino.credential.CredentialPropertyUtils;
+import org.apache.gravitino.exceptions.NoSuchCatalogException;
import org.apache.gravitino.flink.connector.PartitionConverter;
import org.apache.gravitino.flink.connector.SchemaAndTablePropertiesConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalog;
@@ -53,6 +56,8 @@ import
org.apache.gravitino.rel.expressions.distributions.Strategy;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalog;
import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* The GravitinoPaimonCatalog class is an implementation of the BaseCatalog
class that is used to
@@ -68,32 +73,103 @@ import org.apache.paimon.flink.FlinkCatalogFactory;
*/
public class GravitinoPaimonCatalog extends BaseCatalog {
- private final AbstractCatalog paimonCatalog;
+ private static final Logger LOG =
LoggerFactory.getLogger(GravitinoPaimonCatalog.class);
+
+ private final CatalogFactory.Context context;
+ // Mutable copy shared with BaseCatalog.catalogOptions so credential
injection in open() is
+ // visible to the inner Paimon catalog context.
+ private final Map<String, String> mutableOptions;
+ private AbstractCatalog paimonCatalog;
protected GravitinoPaimonCatalog(
CatalogFactory.Context context,
String defaultDatabase,
SchemaAndTablePropertiesConverter schemaAndTablePropertiesConverter,
PartitionConverter partitionConverter) {
+ this(context, defaultDatabase, schemaAndTablePropertiesConverter,
partitionConverter, null);
+ }
+
+ @VisibleForTesting
+ protected GravitinoPaimonCatalog(
+ CatalogFactory.Context context,
+ String defaultDatabase,
+ SchemaAndTablePropertiesConverter schemaAndTablePropertiesConverter,
+ PartitionConverter partitionConverter,
+ AbstractCatalog paimonCatalog) {
+ this(
+ context,
+ defaultDatabase,
+ schemaAndTablePropertiesConverter,
+ partitionConverter,
+ paimonCatalog,
+ new HashMap<>(context.getOptions()));
+ }
+
+ private GravitinoPaimonCatalog(
+ CatalogFactory.Context context,
+ String defaultDatabase,
+ SchemaAndTablePropertiesConverter schemaAndTablePropertiesConverter,
+ PartitionConverter partitionConverter,
+ AbstractCatalog paimonCatalog,
+ Map<String, String> mutableOptions) {
super(
context.getName(),
- context.getOptions(),
+ mutableOptions,
defaultDatabase,
schemaAndTablePropertiesConverter,
partitionConverter);
- FlinkCatalogFactory flinkCatalogFactory = new FlinkCatalogFactory();
- this.paimonCatalog = flinkCatalogFactory.createCatalog(context);
+ this.context = context;
+ this.mutableOptions = mutableOptions;
+ this.paimonCatalog = paimonCatalog;
}
- //
---------------------------------------------------------------------------
- // Lifecycle — keep paimonCatalog in sync with the outer catalog
- //
---------------------------------------------------------------------------
-
@Override
public void open() throws CatalogException {
- super.open(); // opens realCatalog() == paimonCatalog, so
paimonCatalog.open() is called here
+ if (paimonCatalog != null) {
+ super.open();
+ return;
+ }
+ try {
+ CredentialPropertyUtils.applyPaimonCredentials(
+ CredentialPropertyUtils.getCredentials(catalog()), mutableOptions);
+ } catch (NoSuchCatalogException e) {
+ LOG.warn(
+ "Catalog '{}' not found in Gravitino during open(); credential
injection skipped."
+ + " This is expected during CREATE CATALOG.",
+ getName(),
+ e);
+ }
+ CatalogFactory.Context contextWithCredentials =
+ new CatalogFactory.Context() {
+ @Override
+ public String getName() {
+ return context.getName();
+ }
+
+ @Override
+ public Map<String, String> getOptions() {
+ return mutableOptions;
+ }
+
+ @Override
+ public ReadableConfig getConfiguration() {
+ return context.getConfiguration();
+ }
+
+ @Override
+ public ClassLoader getClassLoader() {
+ return context.getClassLoader();
+ }
+ };
+ this.paimonCatalog =
+ (AbstractCatalog) new
FlinkCatalogFactory().createCatalog(contextWithCredentials);
+ super.open();
}
+ //
---------------------------------------------------------------------------
+ // Lifecycle — keep paimonCatalog in sync with the outer catalog
+ //
---------------------------------------------------------------------------
+
@Override
public void close() throws CatalogException {
super.close(); // closes realCatalog() == paimonCatalog
diff --git
a/flink-connector/v1.18/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFlink118.java
b/flink-connector/v1.18/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFlink118.java
index dd42dc79b9..b691f79cf2 100644
---
a/flink-connector/v1.18/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFlink118.java
+++
b/flink-connector/v1.18/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFlink118.java
@@ -20,13 +20,12 @@
package org.apache.gravitino.flink.connector.iceberg;
import java.util.Map;
-import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.gravitino.flink.connector.PartitionConverter;
import org.apache.gravitino.flink.connector.SchemaAndTablePropertiesConverter;
import org.apache.gravitino.flink.connector.utils.CatalogCompat;
import org.apache.gravitino.flink.connector.utils.CatalogCompatFlink118;
-import org.apache.iceberg.flink.FlinkCatalogFactory;
+/** {@link GravitinoIcebergCatalog} implementation for Flink 1.18. */
public class GravitinoIcebergCatalogFlink118 extends GravitinoIcebergCatalog {
protected GravitinoIcebergCatalogFlink118(
@@ -42,17 +41,11 @@ public class GravitinoIcebergCatalogFlink118 extends
GravitinoIcebergCatalog {
schemaAndTablePropertiesConverter,
partitionConverter,
catalogOptions,
- createIcebergCatalog(catalogName, icebergCatalogProperties));
+ icebergCatalogProperties);
}
@Override
protected CatalogCompat catalogCompat() {
return CatalogCompatFlink118.INSTANCE;
}
-
- private static AbstractCatalog createIcebergCatalog(
- String catalogName, Map<String, String> icebergCatalogProperties) {
- return asAbstractCatalog(
- new FlinkCatalogFactory().createCatalog(catalogName,
icebergCatalogProperties));
- }
}
diff --git
a/flink-connector/v1.19/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFlink119.java
b/flink-connector/v1.19/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFlink119.java
index e4f56e3a43..2320695f18 100644
---
a/flink-connector/v1.19/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFlink119.java
+++
b/flink-connector/v1.19/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFlink119.java
@@ -20,12 +20,10 @@
package org.apache.gravitino.flink.connector.iceberg;
import java.util.Map;
-import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.gravitino.flink.connector.PartitionConverter;
import org.apache.gravitino.flink.connector.SchemaAndTablePropertiesConverter;
import org.apache.gravitino.flink.connector.utils.CatalogCompat;
import org.apache.gravitino.flink.connector.utils.DefaultCatalogCompat;
-import org.apache.iceberg.flink.FlinkCatalogFactory;
/** {@link GravitinoIcebergCatalog} implementation for Flink 1.19. */
public class GravitinoIcebergCatalogFlink119 extends GravitinoIcebergCatalog {
@@ -43,17 +41,11 @@ public class GravitinoIcebergCatalogFlink119 extends
GravitinoIcebergCatalog {
schemaAndTablePropertiesConverter,
partitionConverter,
catalogOptions,
- createIcebergCatalog(catalogName, icebergCatalogProperties));
+ icebergCatalogProperties);
}
@Override
protected CatalogCompat catalogCompat() {
return DefaultCatalogCompat.INSTANCE;
}
-
- private static AbstractCatalog createIcebergCatalog(
- String catalogName, Map<String, String> icebergCatalogProperties) {
- return asAbstractCatalog(
- new FlinkCatalogFactory().createCatalog(catalogName,
icebergCatalogProperties));
- }
}
diff --git
a/flink-connector/v1.20/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFlink120.java
b/flink-connector/v1.20/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFlink120.java
index f105b8b545..446c976f52 100644
---
a/flink-connector/v1.20/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFlink120.java
+++
b/flink-connector/v1.20/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFlink120.java
@@ -20,12 +20,10 @@
package org.apache.gravitino.flink.connector.iceberg;
import java.util.Map;
-import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.gravitino.flink.connector.PartitionConverter;
import org.apache.gravitino.flink.connector.SchemaAndTablePropertiesConverter;
import org.apache.gravitino.flink.connector.utils.CatalogCompat;
import org.apache.gravitino.flink.connector.utils.CatalogCompatFlink120;
-import org.apache.iceberg.flink.FlinkCatalogFactory;
/** {@link GravitinoIcebergCatalog} implementation for Flink 1.20. */
public class GravitinoIcebergCatalogFlink120 extends GravitinoIcebergCatalog {
@@ -43,17 +41,11 @@ public class GravitinoIcebergCatalogFlink120 extends
GravitinoIcebergCatalog {
schemaAndTablePropertiesConverter,
partitionConverter,
catalogOptions,
- createIcebergCatalog(catalogName, icebergCatalogProperties));
+ icebergCatalogProperties);
}
@Override
protected CatalogCompat catalogCompat() {
return CatalogCompatFlink120.INSTANCE;
}
-
- private static AbstractCatalog createIcebergCatalog(
- String catalogName, Map<String, String> icebergCatalogProperties) {
- return asAbstractCatalog(
- new FlinkCatalogFactory().createCatalog(catalogName,
icebergCatalogProperties));
- }
}
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/DynamicIcebergConfigProvider.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/DynamicIcebergConfigProvider.java
index 2bbefbade0..43ec66812e 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/DynamicIcebergConfigProvider.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/DynamicIcebergConfigProvider.java
@@ -23,6 +23,7 @@ import static
org.apache.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.Closeable;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -36,6 +37,9 @@ import
org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.client.DefaultOAuth2TokenProvider;
import org.apache.gravitino.client.GravitinoClient;
import org.apache.gravitino.client.GravitinoClient.ClientBuilder;
+import org.apache.gravitino.connector.BaseCatalog;
+import org.apache.gravitino.credential.JdbcCredential;
+import org.apache.gravitino.credential.SupportsCredentials;
import org.apache.gravitino.exceptions.NoSuchCatalogException;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import
org.apache.gravitino.iceberg.service.authorization.IcebergRESTServerContext;
@@ -100,7 +104,36 @@ public class DynamicIcebergConfigProvider implements
IcebergConfigProvider {
"lakehouse-iceberg".equals(catalog.provider()),
String.format("%s.%s is not iceberg catalog", gravitinoMetalake,
catalogName));
- return
Optional.of(getIcebergConfigFromCatalogProperties(catalog.properties()));
+ // Sensitive credentials (e.g. jdbc-password) are marked hidden in
PropertiesMetadata and
+ // filtered out of catalog.properties(). We need two different strategies
to recover them:
+ //
+ // Auxiliary mode: the catalog is a BaseCatalog running in the same JVM as
the Gravitino
+ // server. Call propertiesWithCredentialProviders() which returns the raw
entity properties
+ // including all hidden fields.
+ //
+ // Standalone mode: the catalog is a client-side object obtained via the
Gravitino REST API.
+ // Call getCredentials() to retrieve vended credentials, then inject any
JdbcCredential
+ // fields into the properties map so the JDBC backend can connect.
+ Map<String, String> catalogProperties;
+ if (catalog instanceof BaseCatalog) {
+ catalogProperties = ((BaseCatalog<?>)
catalog).propertiesWithCredentialProviders();
+ } else {
+ catalogProperties = new HashMap<>(catalog.properties());
+ if (catalog instanceof SupportsCredentials) {
+ Arrays.stream(((SupportsCredentials) catalog).getCredentials())
+ .filter(c -> c instanceof JdbcCredential)
+ .map(c -> (JdbcCredential) c)
+ .findFirst()
+ .ifPresent(
+ jdbc -> {
+ catalogProperties.putIfAbsent(
+ IcebergConstants.GRAVITINO_JDBC_USER, jdbc.jdbcUser());
+ catalogProperties.putIfAbsent(
+ IcebergConstants.GRAVITINO_JDBC_PASSWORD,
jdbc.jdbcPassword());
+ });
+ }
+ }
+ return
Optional.of(getIcebergConfigFromCatalogProperties(catalogProperties));
}
/**
diff --git a/spark-connector/spark-common/build.gradle.kts
b/spark-connector/spark-common/build.gradle.kts
index 7211d6e3bb..c7aceb9e16 100644
--- a/spark-connector/spark-common/build.gradle.kts
+++ b/spark-connector/spark-common/build.gradle.kts
@@ -176,6 +176,10 @@ val glueLibsApiUrl =
val downloadGlueHiveJars by
tasks.registering {
glueHiveJarsDir?.let { outputs.dir(it) }
+ onlyIf {
+ val outputDir = file(glueHiveJarsDir ?: return@onlyIf false)
+ outputDir.listFiles()?.none { it.name.endsWith(".jar") } ?: true
+ }
doLast {
val outputDir = file(glueHiveJarsDir ?: return@doLast)
outputDir.mkdirs()
diff --git
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/glue/GravitinoGlueCatalog.java
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/glue/GravitinoGlueCatalog.java
index fc7dbce568..47100103c0 100644
---
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/glue/GravitinoGlueCatalog.java
+++
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/glue/GravitinoGlueCatalog.java
@@ -20,8 +20,12 @@
package org.apache.gravitino.spark.connector.glue;
import com.google.common.base.Preconditions;
+import java.util.HashMap;
import java.util.Map;
import org.apache.gravitino.catalog.glue.GlueConstants;
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.CredentialPropertyUtils;
+import org.apache.gravitino.credential.S3SecretKeyCredential;
import org.apache.gravitino.spark.connector.PropertiesConverter;
import org.apache.gravitino.spark.connector.SparkTransformConverter;
import org.apache.gravitino.spark.connector.SparkTypeConverter;
@@ -38,6 +42,8 @@ import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Gravitino Glue catalog implementation for Apache Spark.
@@ -60,6 +66,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
*/
public class GravitinoGlueCatalog extends BaseCatalog {
+ private static final Logger LOG =
LoggerFactory.getLogger(GravitinoGlueCatalog.class);
+
// Lazily initialized Iceberg GlueCatalog for Iceberg tables
private volatile SparkCatalog icebergGlueCatalog;
@@ -67,6 +75,14 @@ public class GravitinoGlueCatalog extends BaseCatalog {
private String catalogName;
private Map<String, String> catalogProperties;
+ /**
+ * AWS credentials obtained via Gravitino credential vending at init time.
Keyed by Gravitino
+ * catalog property names (e.g. "aws-access-key-id") so they can be merged
directly into catalog
+ * properties before being passed to {@link
GluePropertiesConverter#toIcebergCatalogProperties}.
+ * Null when credential vending returns no S3 credentials.
+ */
+ private Map<String, String> vendedAwsCredentials;
+
/** Creates a new GravitinoGlueCatalog. */
public GravitinoGlueCatalog() {}
@@ -88,10 +104,46 @@ public class GravitinoGlueCatalog extends BaseCatalog {
TableCatalog hiveCatalog = createHiveTableCatalog();
Map<String, String> all =
getPropertiesConverter().toSparkCatalogProperties(options, properties);
+ this.vendedAwsCredentials = applyS3Credential(gravitinoCatalogClient, all);
hiveCatalog.initialize(name, new CaseInsensitiveStringMap(all));
return hiveCatalog;
}
+ /**
+ * Obtains S3 credentials via Gravitino credential vending and injects them
into {@code props} as
+ * {@code hadoop.fs.s3a.*} for the non-Iceberg (Hive) path's S3 data access.
Returns the vended
+ * credentials keyed by Gravitino catalog property names for reuse in the
Iceberg path. Returns an
+ * empty map if credential vending is unavailable.
+ *
+ * @return map of vended AWS credentials (Gravitino key names), or empty map
if none vended
+ */
+ static Map<String, String> applyS3Credential(
+ org.apache.gravitino.Catalog catalog, Map<String, String> props) {
+ Map<String, String> vended = new HashMap<>();
+ Credential[] credentials;
+ try {
+ credentials = CredentialPropertyUtils.getCredentials(catalog);
+ } catch (RuntimeException e) {
+ LOG.debug(
+ "Failed to obtain credentials from Glue catalog, S3 credential
injection skipped", e);
+ return vended;
+ }
+ for (Credential credential : credentials) {
+ if (credential instanceof S3SecretKeyCredential) {
+ S3SecretKeyCredential s3 = (S3SecretKeyCredential) credential;
+ props.put("hadoop.fs.s3a.access.key", s3.accessKeyId());
+ props.put("hadoop.fs.s3a.secret.key", s3.secretAccessKey());
+ vended.put(GluePropertiesConverter.AWS_ACCESS_KEY_ID,
s3.accessKeyId());
+ vended.put(GluePropertiesConverter.AWS_SECRET_ACCESS_KEY,
s3.secretAccessKey());
+ } else {
+ LOG.warn(
+ "Received unrecognized credential type '{}' for Glue catalog,
skipping",
+ credential.getClass().getName());
+ }
+ }
+ return vended;
+ }
+
/**
* Routes Spark table loading to the correct backend.
*
@@ -226,7 +278,14 @@ public class GravitinoGlueCatalog extends BaseCatalog {
*/
private SparkCatalog createIcebergGlueCatalog() {
GluePropertiesConverter converter = GluePropertiesConverter.getInstance();
- Map<String, String> icebergProperties =
converter.toIcebergCatalogProperties(catalogProperties);
+ Map<String, String> effectiveProperties = new HashMap<>(catalogProperties);
+ // Vended credentials take precedence over static catalog properties so
that
+ // GluePropertiesConverter sets up GravitinoGlueCredentialsProvider with
the vended AK/SK.
+ if (vendedAwsCredentials != null && !vendedAwsCredentials.isEmpty()) {
+ effectiveProperties.putAll(vendedAwsCredentials);
+ }
+ Map<String, String> icebergProperties =
+ converter.toIcebergCatalogProperties(effectiveProperties);
SparkCatalog catalog = new SparkCatalog();
catalog.initialize(catalogName + "_iceberg", new
CaseInsensitiveStringMap(icebergProperties));
return catalog;
diff --git
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/hive/GravitinoHiveCatalog.java
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/hive/GravitinoHiveCatalog.java
index dd7851828c..4179a79da2 100644
---
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/hive/GravitinoHiveCatalog.java
+++
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/hive/GravitinoHiveCatalog.java
@@ -20,6 +20,12 @@
package org.apache.gravitino.spark.connector.hive;
import java.util.Map;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.credential.AzureAccountKeyCredential;
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.CredentialPropertyUtils;
+import org.apache.gravitino.credential.OSSSecretKeyCredential;
+import org.apache.gravitino.credential.S3SecretKeyCredential;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.View;
import org.apache.gravitino.spark.connector.PropertiesConverter;
@@ -31,20 +37,48 @@ import
org.apache.kyuubi.spark.connector.hive.HiveTableCatalog;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class GravitinoHiveCatalog extends BaseCatalog {
+ private static final Logger LOG =
LoggerFactory.getLogger(GravitinoHiveCatalog.class);
+
@Override
protected TableCatalog createAndInitSparkCatalog(
String name, CaseInsensitiveStringMap options, Map<String, String>
properties) {
TableCatalog hiveCatalog = new HiveTableCatalog();
Map<String, String> all =
getPropertiesConverter().toSparkCatalogProperties(options, properties);
+ applyS3Credential(gravitinoCatalogClient, all);
hiveCatalog.initialize(name, new CaseInsensitiveStringMap(all));
-
return hiveCatalog;
}
+ static void applyS3Credential(Catalog catalog, Map<String, String> props) {
+ for (Credential credential :
CredentialPropertyUtils.getCredentials(catalog)) {
+ if (credential instanceof S3SecretKeyCredential) {
+ S3SecretKeyCredential s3 = (S3SecretKeyCredential) credential;
+ props.put("hadoop.fs.s3a.access.key", s3.accessKeyId());
+ props.put("hadoop.fs.s3a.secret.key", s3.secretAccessKey());
+ } else if (credential instanceof OSSSecretKeyCredential) {
+ OSSSecretKeyCredential oss = (OSSSecretKeyCredential) credential;
+ props.put("hadoop.fs.oss.accessKeyId", oss.accessKeyId());
+ props.put("hadoop.fs.oss.accessKeySecret", oss.secretAccessKey());
+ } else if (credential instanceof AzureAccountKeyCredential) {
+ AzureAccountKeyCredential azure = (AzureAccountKeyCredential)
credential;
+ props.put(
+ String.format(
+ "hadoop.fs.azure.account.key.%s.dfs.core.windows.net",
azure.accountName()),
+ azure.accountKey());
+ } else {
+ LOG.warn(
+ "Received unrecognized credential type '{}' for Hive catalog,
skipping",
+ credential.getClass().getName());
+ }
+ }
+ }
+
@Override
protected org.apache.spark.sql.connector.catalog.Table createSparkTable(
Identifier identifier,
diff --git
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java
index 4ccacdec1d..0fa99d3ab7 100644
---
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java
+++
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java
@@ -25,6 +25,7 @@ import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergPropertiesUtils;
+import org.apache.gravitino.credential.CredentialPropertyUtils;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.spark.connector.PropertiesConverter;
import org.apache.gravitino.spark.connector.SparkTransformConverter;
@@ -73,6 +74,8 @@ public class GravitinoIcebergCatalog extends BaseCatalog
String catalogBackendName =
IcebergPropertiesUtils.getCatalogBackendName(properties);
Map<String, String> all =
getPropertiesConverter().toSparkCatalogProperties(options, properties);
+ CredentialPropertyUtils.applyIcebergCredentials(
+ CredentialPropertyUtils.getCredentials(gravitinoCatalogClient), all);
TableCatalog icebergCatalog = new SparkCatalog();
icebergCatalog.initialize(catalogBackendName, new
CaseInsensitiveStringMap(all));
return icebergCatalog;
diff --git
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalog.java
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalog.java
index 86ca680c45..76ed6ed899 100644
---
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalog.java
+++
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalog.java
@@ -22,6 +22,7 @@ package org.apache.gravitino.spark.connector.paimon;
import java.util.Map;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.catalog.lakehouse.paimon.PaimonPropertiesUtils;
+import org.apache.gravitino.credential.CredentialPropertyUtils;
import org.apache.gravitino.spark.connector.PropertiesConverter;
import org.apache.gravitino.spark.connector.SparkTransformConverter;
import org.apache.gravitino.spark.connector.SparkTypeConverter;
@@ -42,6 +43,8 @@ public class GravitinoPaimonCatalog extends BaseCatalog {
TableCatalog paimonCatalog = new SparkCatalog();
Map<String, String> all =
getPropertiesConverter().toSparkCatalogProperties(options, properties);
+ CredentialPropertyUtils.applyPaimonCredentials(
+ CredentialPropertyUtils.getCredentials(gravitinoCatalogClient), all);
paimonCatalog.initialize(catalogBackendName, new
CaseInsensitiveStringMap(all));
return paimonCatalog;
}
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/glue/TestGravitinoGlueCatalog.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/glue/TestGravitinoGlueCatalog.java
index 130ddee519..757bb6510b 100644
---
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/glue/TestGravitinoGlueCatalog.java
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/glue/TestGravitinoGlueCatalog.java
@@ -25,8 +25,13 @@ import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableMap;
import java.util.HashMap;
+import java.util.Map;
+import org.apache.gravitino.Catalog;
import org.apache.gravitino.catalog.glue.GlueConstants;
import org.apache.gravitino.client.GravitinoClient;
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.S3SecretKeyCredential;
+import org.apache.gravitino.credential.SupportsCredentials;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.types.Types;
@@ -176,6 +181,43 @@ public class TestGravitinoGlueCatalog {
Assertions.assertSame(mockSparkTable, result);
}
+ // -------------------------------------------------------------------------
+ // Test applyS3Credential (credential vending)
+ // -------------------------------------------------------------------------
+
+ @Test
+ void testApplyS3CredentialInjectsHadoopProperties() {
+ Catalog mockCatalog = mock(Catalog.class);
+ S3SecretKeyCredential s3Cred = new
S3SecretKeyCredential("test-access-key", "test-secret-key");
+ SupportsCredentials supportsCredentials = mock(SupportsCredentials.class);
+ when(mockCatalog.supportsCredentials()).thenReturn(supportsCredentials);
+ when(supportsCredentials.getCredentials()).thenReturn(new Credential[]
{s3Cred});
+
+ Map<String, String> props = new HashMap<>();
+ Map<String, String> vended =
GravitinoGlueCatalog.applyS3Credential(mockCatalog, props);
+
+ Assertions.assertEquals("test-access-key",
props.get("hadoop.fs.s3a.access.key"));
+ Assertions.assertEquals("test-secret-key",
props.get("hadoop.fs.s3a.secret.key"));
+ Assertions.assertEquals(
+ "test-access-key",
vended.get(GluePropertiesConverter.AWS_ACCESS_KEY_ID));
+ Assertions.assertEquals(
+ "test-secret-key",
vended.get(GluePropertiesConverter.AWS_SECRET_ACCESS_KEY));
+ }
+
+ @Test
+ void testApplyS3CredentialReturnsEmptyWhenNoCredentials() {
+ Catalog mockCatalog = mock(Catalog.class);
+ SupportsCredentials supportsCredentials = mock(SupportsCredentials.class);
+ when(mockCatalog.supportsCredentials()).thenReturn(supportsCredentials);
+ when(supportsCredentials.getCredentials()).thenReturn(new Credential[] {});
+
+ Map<String, String> props = new HashMap<>();
+ Map<String, String> vended =
GravitinoGlueCatalog.applyS3Credential(mockCatalog, props);
+
+ Assertions.assertTrue(props.isEmpty());
+ Assertions.assertTrue(vended.isEmpty());
+ }
+
// -------------------------------------------------------------------------
// Test converter methods
// -------------------------------------------------------------------------
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/glue/GlueConnectorAdapter.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/glue/GlueConnectorAdapter.java
index 24b812c115..ac6a56ad01 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/glue/GlueConnectorAdapter.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/glue/GlueConnectorAdapter.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.S3SecretKeyCredential;
import org.apache.gravitino.trino.connector.catalog.CatalogConnectorAdapter;
import
org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdapter;
import org.apache.gravitino.trino.connector.catalog.HasPropertyMeta;
@@ -42,8 +43,6 @@ public class GlueConnectorAdapter implements
CatalogConnectorAdapter {
// Gravitino catalog property keys for AWS Glue
private static final String PROP_AWS_REGION = "aws-region";
private static final String PROP_AWS_GLUE_CATALOG_ID = "aws-glue-catalog-id";
- private static final String PROP_AWS_ACCESS_KEY_ID = "aws-access-key-id";
- private static final String PROP_AWS_SECRET_ACCESS_KEY =
"aws-secret-access-key";
private static final String PROP_AWS_GLUE_ENDPOINT = "aws-glue-endpoint";
// Trino Hive connector configuration keys for Glue
@@ -82,16 +81,7 @@ public class GlueConnectorAdapter implements
CatalogConnectorAdapter {
config.put(HIVE_METASTORE_GLUE_CATALOG_ID, catalogId);
}
- String accessKey = catalog.getProperty(PROP_AWS_ACCESS_KEY_ID, null);
- String secretKey = catalog.getProperty(PROP_AWS_SECRET_ACCESS_KEY, null);
- if (accessKey != null && secretKey != null) {
- // Glue metastore credentials
- config.put(HIVE_METASTORE_GLUE_ACCESS_KEY, accessKey);
- config.put(HIVE_METASTORE_GLUE_SECRET_KEY, secretKey);
- // S3 credentials for data access
- config.put(HIVE_S3_ACCESS_KEY, accessKey);
- config.put(HIVE_S3_SECRET_KEY, secretKey);
- }
+ applyS3Credential(credentials, config);
String endpoint = catalog.getProperty(PROP_AWS_GLUE_ENDPOINT, null);
if (StringUtils.isNotBlank(endpoint)) {
@@ -101,6 +91,19 @@ public class GlueConnectorAdapter implements
CatalogConnectorAdapter {
return config;
}
+ static void applyS3Credential(Credential[] credentials, Map<String, String>
config) {
+ for (Credential credential : credentials) {
+ if (credential instanceof S3SecretKeyCredential) {
+ S3SecretKeyCredential s3 = (S3SecretKeyCredential) credential;
+ config.put(HIVE_METASTORE_GLUE_ACCESS_KEY, s3.accessKeyId());
+ config.put(HIVE_METASTORE_GLUE_SECRET_KEY, s3.secretAccessKey());
+ config.put(HIVE_S3_ACCESS_KEY, s3.accessKeyId());
+ config.put(HIVE_S3_SECRET_KEY, s3.secretAccessKey());
+ return;
+ }
+ }
+ }
+
@Override
public String internalConnectorName() {
return CONNECTOR_LAKEHOUSE;
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/hive/HiveConnectorAdapter.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/hive/HiveConnectorAdapter.java
index ce52e7de24..a888b9012e 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/hive/HiveConnectorAdapter.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/hive/HiveConnectorAdapter.java
@@ -24,7 +24,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.gravitino.catalog.property.PropertyConverter;
+import org.apache.gravitino.credential.AzureAccountKeyCredential;
import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.S3SecretKeyCredential;
import org.apache.gravitino.trino.connector.catalog.CatalogConnectorAdapter;
import
org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdapter;
import org.apache.gravitino.trino.connector.catalog.CatalogPropertyConverter;
@@ -58,9 +60,24 @@ public class HiveConnectorAdapter implements
CatalogConnectorAdapter {
config.put("hive.metastore.uri", metastoreUri);
config.put("hive.security", "allow-all");
config.put("fs.hadoop.enabled", "true");
+ applyS3Credential(credentials, config);
return config;
}
+ static void applyS3Credential(Credential[] credentials, Map<String, String>
config) {
+ for (Credential credential : credentials) {
+ if (credential instanceof S3SecretKeyCredential) {
+ S3SecretKeyCredential s3 = (S3SecretKeyCredential) credential;
+ config.put("hive.s3.aws-access-key", s3.accessKeyId());
+ config.put("hive.s3.aws-secret-key", s3.secretAccessKey());
+ } else if (credential instanceof AzureAccountKeyCredential) {
+ AzureAccountKeyCredential azure = (AzureAccountKeyCredential)
credential;
+ config.put("hive.azure.abfs-storage-account", azure.accountName());
+ config.put("hive.azure.abfs-access-key", azure.accountKey());
+ }
+ }
+ }
+
@Override
public String internalConnectorName() {
return CONNECTOR_HIVE;
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/iceberg/IcebergCatalogPropertyConverter.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/iceberg/IcebergCatalogPropertyConverter.java
index b0b220c934..5150435ecf 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/iceberg/IcebergCatalogPropertyConverter.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/iceberg/IcebergCatalogPropertyConverter.java
@@ -26,6 +26,10 @@ import java.util.Map;
import java.util.Set;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergPropertiesUtils;
+import org.apache.gravitino.credential.AzureAccountKeyCredential;
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.JdbcCredential;
+import org.apache.gravitino.credential.S3SecretKeyCredential;
import org.apache.gravitino.trino.connector.GravitinoErrorCode;
import org.apache.gravitino.trino.connector.catalog.CatalogPropertyConverter;
@@ -43,6 +47,32 @@ public class IcebergCatalogPropertyConverter extends
CatalogPropertyConverter {
private static final Set<String> REST_BACKEND_REQUIRED_PROPERTIES =
Set.of("uri");
+ /**
+ * Injects credentials from credential vending into the Iceberg catalog
config. Applies JDBC
+ * user/password for the JDBC backend and S3 credentials for S3-backed
storage.
+ *
+ * @param credentials the credentials returned by the server
+ * @param config the mutable Trino Iceberg connector config map to update
+ */
+ public static void applyCredentials(Credential[] credentials, Map<String,
String> config) {
+ for (Credential credential : credentials) {
+ if (credential instanceof JdbcCredential) {
+ JdbcCredential jdbc = (JdbcCredential) credential;
+ config.put("iceberg.jdbc-catalog.connection-user", jdbc.jdbcUser());
+ config.put("iceberg.jdbc-catalog.connection-password",
jdbc.jdbcPassword());
+ } else if (credential instanceof S3SecretKeyCredential) {
+ S3SecretKeyCredential s3 = (S3SecretKeyCredential) credential;
+ config.put("hive.s3.aws-access-key", s3.accessKeyId());
+ config.put("hive.s3.aws-secret-key", s3.secretAccessKey());
+ } else if (credential instanceof AzureAccountKeyCredential) {
+ AzureAccountKeyCredential azure = (AzureAccountKeyCredential)
credential;
+ config.put(
+ String.format("fs.azure.account.key.%s.dfs.core.windows.net",
azure.accountName()),
+ azure.accountKey());
+ }
+ }
+ }
+
@Override
public Map<String, String> gravitinoToEngineProperties(Map<String, String>
properties) {
Map<String, String> stringStringMap;
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/iceberg/IcebergConnectorAdapter.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/iceberg/IcebergConnectorAdapter.java
index a9fec3206b..8900775fb7 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/iceberg/IcebergConnectorAdapter.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/iceberg/IcebergConnectorAdapter.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.trino.connector.catalog.iceberg;
import static java.util.Collections.emptyList;
import io.trino.spi.session.PropertyMetadata;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.gravitino.catalog.property.PropertyConverter;
@@ -51,7 +52,10 @@ public class IcebergConnectorAdapter implements
CatalogConnectorAdapter {
@Override
public Map<String, String> buildInternalConnectorConfig(
GravitinoCatalog catalog, Credential[] credentials) throws Exception {
- return
catalogConverter.gravitinoToEngineProperties(catalog.getProperties());
+ Map<String, String> config =
+ new
HashMap<>(catalogConverter.gravitinoToEngineProperties(catalog.getProperties()));
+ IcebergCatalogPropertyConverter.applyCredentials(credentials, config);
+ return config;
}
@Override
diff --git
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/glue/TestGlueConnectorAdapter.java
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/glue/TestGlueConnectorAdapter.java
index 77697c2a4e..81313f305d 100644
---
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/glue/TestGlueConnectorAdapter.java
+++
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/glue/TestGlueConnectorAdapter.java
@@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.S3SecretKeyCredential;
import org.apache.gravitino.trino.connector.metadata.GravitinoCatalog;
import org.apache.gravitino.trino.connector.metadata.TestGravitinoCatalog;
import org.junit.jupiter.api.Assertions;
@@ -57,16 +58,15 @@ class TestGlueConnectorAdapter {
"aws-region", "us-east-1",
"warehouse", "s3://my-bucket/warehouse",
"aws-glue-catalog-id", "123456789",
- "aws-access-key-id", "test-access-key",
- "aws-secret-access-key", "test-secret-key",
"aws-glue-endpoint", "https://glue.custom.endpoint");
Catalog mockCatalog =
TestGravitinoCatalog.mockCatalog(
"test_glue", "glue", "test catalog", Catalog.Type.RELATIONAL,
properties);
GlueConnectorAdapter adapter = new GlueConnectorAdapter();
+ Credential[] credentials = {new S3SecretKeyCredential("test-access-key",
"test-secret-key")};
Map<String, String> config =
adapter.buildInternalConnectorConfig(
- new GravitinoCatalog("test", mockCatalog), new Credential[0]);
+ new GravitinoCatalog("test", mockCatalog), credentials);
Assertions.assertEquals("123456789",
config.get("hive.metastore.glue.catalogid"));
Assertions.assertEquals("test-access-key",
config.get("hive.metastore.glue.aws-access-key"));