This is an automated email from the ASF dual-hosted git repository.
honahx pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new 519e127c5 SigV4 Auth Support for Catalog Federation - Part 4:
Connection Credential Manager (#2759)
519e127c5 is described below
commit 519e127c5f01352d3efa305535a594c734b941b0
Author: Rulin Xing <[email protected]>
AuthorDate: Wed Oct 8 15:25:53 2025 -0700
SigV4 Auth Support for Catalog Federation - Part 4: Connection Credential
Manager (#2759)
This PR introduces a flexible credential management system for Polaris.
Building on Part 3's service identity management, this system combines Polaris
service identities with user-provided authentication parameters to generate
credentials for remote catalog access.
The core of this PR is the new ConnectionCredentialVendor interface, which:
Generates connection credentials by combining service identity with user
auth parameters
Supports different authentication types (AWS SIGV4, AZURE Entra, GCP IAM)
through CDI, currently only supports SigV4.
Provides on-demand credential generation
Enables easy extension for new authentication types
In the long term, we should move the storage credential management logic
out of PolarisMetastoreManager, PolarisMetastoreManager should only provide
persistence interfaces.
---
.../hadoop/HadoopFederatedCatalogFactory.java | 9 +-
.../hive/HiveFederatedCatalogFactory.java | 9 +-
.../core/catalog/ExternalCatalogFactory.java | 9 +-
.../BearerAuthenticationParametersDpo.java | 3 +-
.../ImplicitAuthenticationParametersDpo.java | 7 +-
.../OAuthClientCredentialsParametersDpo.java | 3 +-
.../SigV4AuthenticationParametersDpo.java | 7 +-
.../hadoop/HadoopConnectionConfigInfoDpo.java | 12 +-
.../hive/HiveConnectionConfigInfoDpo.java | 13 +-
.../iceberg/IcebergCatalogPropertiesProvider.java | 4 +-
.../IcebergRestConnectionConfigInfoDpo.java | 12 +-
.../core/credentials/PolarisCredentialManager.java | 52 ++++++
.../connection/CatalogAccessProperty.java | 66 ++++++++
.../connection/ConnectionCredentialVendor.java | 66 ++++++++
.../connection/ConnectionCredentials.java | 82 +++++++++
.../src/main/resources/application.properties | 3 +
.../service/catalog/common/CatalogHandler.java | 8 +
.../generic/GenericTableCatalogAdapter.java | 5 +
.../generic/GenericTableCatalogHandler.java | 3 +
.../catalog/iceberg/IcebergCatalogAdapter.java | 5 +
.../catalog/iceberg/IcebergCatalogHandler.java | 8 +-
.../iceberg/IcebergRESTExternalCatalogFactory.java | 7 +-
.../catalog/policy/PolicyCatalogAdapter.java | 5 +
.../catalog/policy/PolicyCatalogHandler.java | 3 +
.../polaris/service/config/ServiceProducers.java | 10 ++
.../DefaultPolarisCredentialManager.java | 104 ++++++++++++
.../PolarisCredentialManagerConfiguration.java | 29 ++--
.../service/credentials/connection/AuthType.java | 73 ++++++++
.../SigV4ConnectionCredentialVendor.java | 150 ++++++++++++++++
.../AwsIamServiceIdentityConfiguration.java | 44 ++++-
.../RealmServiceIdentityConfiguration.java | 19 +++
.../ResolvableServiceIdentityConfiguration.java | 16 ++
.../identity/ServiceIdentityConfiguration.java | 15 +-
.../service/admin/PolarisAuthzTestBase.java | 2 +
...PolarisGenericTableCatalogHandlerAuthzTest.java | 1 +
.../iceberg/IcebergCatalogHandlerAuthzTest.java | 4 +
...ebergCatalogHandlerFineGrainedDisabledTest.java | 1 +
.../policy/PolicyCatalogHandlerAuthzTest.java | 1 +
.../DefaultPolarisCredentialManagerTest.java | 188 +++++++++++++++++++++
.../SigV4ConnectionCredentialVendorTest.java | 174 +++++++++++++++++++
.../org/apache/polaris/service/TestServices.java | 20 +++
41 files changed, 1211 insertions(+), 41 deletions(-)
diff --git
a/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java
b/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java
index 8da714072..95bd26f9d 100644
---
a/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java
+++
b/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java
@@ -30,6 +30,7 @@ import org.apache.polaris.core.connection.AuthenticationType;
import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
import org.apache.polaris.core.connection.ConnectionType;
import org.apache.polaris.core.connection.hadoop.HadoopConnectionConfigInfoDpo;
+import org.apache.polaris.core.credentials.PolarisCredentialManager;
import org.apache.polaris.core.secrets.UserSecretsManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +43,9 @@ public class HadoopFederatedCatalogFactory implements
ExternalCatalogFactory {
@Override
public Catalog createCatalog(
- ConnectionConfigInfoDpo connectionConfigInfoDpo, UserSecretsManager
userSecretsManager) {
+ ConnectionConfigInfoDpo connectionConfigInfoDpo,
+ UserSecretsManager userSecretsManager,
+ PolarisCredentialManager polarisCredentialManager) {
// Currently, Polaris supports Hadoop federation only via IMPLICIT
authentication.
// Hence, prior to initializing the configuration, ensure that the catalog
uses
// IMPLICIT authentication.
@@ -56,7 +59,9 @@ public class HadoopFederatedCatalogFactory implements
ExternalCatalogFactory {
String warehouse = ((HadoopConnectionConfigInfoDpo)
connectionConfigInfoDpo).getWarehouse();
HadoopCatalog hadoopCatalog = new HadoopCatalog(conf, warehouse);
hadoopCatalog.initialize(
- warehouse,
connectionConfigInfoDpo.asIcebergCatalogProperties(userSecretsManager));
+ warehouse,
+ connectionConfigInfoDpo.asIcebergCatalogProperties(
+ userSecretsManager, polarisCredentialManager));
return hadoopCatalog;
}
diff --git
a/extensions/federation/hive/src/main/java/org/apache/polaris/extensions/federation/hive/HiveFederatedCatalogFactory.java
b/extensions/federation/hive/src/main/java/org/apache/polaris/extensions/federation/hive/HiveFederatedCatalogFactory.java
index 12c8d80f6..0f88acf09 100644
---
a/extensions/federation/hive/src/main/java/org/apache/polaris/extensions/federation/hive/HiveFederatedCatalogFactory.java
+++
b/extensions/federation/hive/src/main/java/org/apache/polaris/extensions/federation/hive/HiveFederatedCatalogFactory.java
@@ -29,6 +29,7 @@ import org.apache.polaris.core.connection.AuthenticationType;
import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
import org.apache.polaris.core.connection.ConnectionType;
import org.apache.polaris.core.connection.hive.HiveConnectionConfigInfoDpo;
+import org.apache.polaris.core.credentials.PolarisCredentialManager;
import org.apache.polaris.core.secrets.UserSecretsManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,7 +42,9 @@ public class HiveFederatedCatalogFactory implements
ExternalCatalogFactory {
@Override
public Catalog createCatalog(
- ConnectionConfigInfoDpo connectionConfigInfoDpo, UserSecretsManager
userSecretsManager) {
+ ConnectionConfigInfoDpo connectionConfigInfoDpo,
+ UserSecretsManager userSecretsManager,
+ PolarisCredentialManager polarisCredentialManager) {
// Currently, Polaris supports Hive federation only via IMPLICIT
authentication.
// Hence, prior to initializing the configuration, ensure that the catalog
uses
// IMPLICIT authentication.
@@ -69,7 +72,9 @@ public class HiveFederatedCatalogFactory implements
ExternalCatalogFactory {
// Kerberos instances are not suitable because Kerberos ties a single
identity to the server.
HiveCatalog hiveCatalog = new HiveCatalog();
hiveCatalog.initialize(
- warehouse,
connectionConfigInfoDpo.asIcebergCatalogProperties(userSecretsManager));
+ warehouse,
+ connectionConfigInfoDpo.asIcebergCatalogProperties(
+ userSecretsManager, polarisCredentialManager));
return hiveCatalog;
}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/catalog/ExternalCatalogFactory.java
b/polaris-core/src/main/java/org/apache/polaris/core/catalog/ExternalCatalogFactory.java
index 039a64ccd..035563b52 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/catalog/ExternalCatalogFactory.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/catalog/ExternalCatalogFactory.java
@@ -20,6 +20,7 @@ package org.apache.polaris.core.catalog;
import org.apache.iceberg.catalog.Catalog;
import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
+import org.apache.polaris.core.credentials.PolarisCredentialManager;
import org.apache.polaris.core.secrets.UserSecretsManager;
/**
@@ -34,12 +35,16 @@ public interface ExternalCatalogFactory {
* Creates a catalog handle for the given connection configuration.
*
* @param connectionConfig the connection configuration
- * @param userSecretsManager the user secrets manager for handling
credentials
+ * @param userSecretsManager the user secrets manager for handling
user-provided credentials
+ * @param polarisCredentialManager the credential manager for generating
temporary credentials
+ * that Polaris uses to access external systems
* @return the initialized catalog
* @throws IllegalStateException if the connection configuration is invalid
*/
Catalog createCatalog(
- ConnectionConfigInfoDpo connectionConfig, UserSecretsManager
userSecretsManager);
+ ConnectionConfigInfoDpo connectionConfig,
+ UserSecretsManager userSecretsManager,
+ PolarisCredentialManager polarisCredentialManager);
/**
* Creates a generic table catalog for the given connection configuration.
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/connection/BearerAuthenticationParametersDpo.java
b/polaris-core/src/main/java/org/apache/polaris/core/connection/BearerAuthenticationParametersDpo.java
index 2da854a59..c11502015 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/connection/BearerAuthenticationParametersDpo.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/connection/BearerAuthenticationParametersDpo.java
@@ -25,6 +25,7 @@ import java.util.Map;
import org.apache.iceberg.rest.auth.OAuth2Properties;
import org.apache.polaris.core.admin.model.AuthenticationParameters;
import org.apache.polaris.core.admin.model.BearerAuthenticationParameters;
+import org.apache.polaris.core.credentials.PolarisCredentialManager;
import org.apache.polaris.core.secrets.SecretReference;
import org.apache.polaris.core.secrets.UserSecretsManager;
@@ -50,7 +51,7 @@ public class BearerAuthenticationParametersDpo extends
AuthenticationParametersD
@Override
public @Nonnull Map<String, String> asIcebergCatalogProperties(
- UserSecretsManager secretsManager) {
+ UserSecretsManager secretsManager, PolarisCredentialManager
credentialManager) {
String bearerToken = secretsManager.readSecret(getBearerTokenReference());
return Map.of(OAuth2Properties.TOKEN, bearerToken);
}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/connection/ImplicitAuthenticationParametersDpo.java
b/polaris-core/src/main/java/org/apache/polaris/core/connection/ImplicitAuthenticationParametersDpo.java
index dc19a789a..113872429 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/connection/ImplicitAuthenticationParametersDpo.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/connection/ImplicitAuthenticationParametersDpo.java
@@ -19,9 +19,11 @@
package org.apache.polaris.core.connection;
import com.google.common.base.MoreObjects;
+import jakarta.annotation.Nonnull;
import java.util.Map;
import org.apache.polaris.core.admin.model.AuthenticationParameters;
import org.apache.polaris.core.admin.model.ImplicitAuthenticationParameters;
+import org.apache.polaris.core.credentials.PolarisCredentialManager;
import org.apache.polaris.core.secrets.UserSecretsManager;
/**
@@ -35,12 +37,13 @@ public class ImplicitAuthenticationParametersDpo extends
AuthenticationParameter
}
@Override
- public Map<String, String> asIcebergCatalogProperties(UserSecretsManager
secretsManager) {
+ public @Nonnull Map<String, String> asIcebergCatalogProperties(
+ UserSecretsManager secretsManager, PolarisCredentialManager
credentialManager) {
return Map.of();
}
@Override
- public AuthenticationParameters asAuthenticationParametersModel() {
+ public @Nonnull AuthenticationParameters asAuthenticationParametersModel() {
return ImplicitAuthenticationParameters.builder()
.setAuthenticationType(AuthenticationParameters.AuthenticationTypeEnum.IMPLICIT)
.build();
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/connection/OAuthClientCredentialsParametersDpo.java
b/polaris-core/src/main/java/org/apache/polaris/core/connection/OAuthClientCredentialsParametersDpo.java
index 270560b50..9c1293624 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/connection/OAuthClientCredentialsParametersDpo.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/connection/OAuthClientCredentialsParametersDpo.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.rest.auth.OAuth2Properties;
import org.apache.iceberg.rest.auth.OAuth2Util;
import org.apache.polaris.core.admin.model.AuthenticationParameters;
import org.apache.polaris.core.admin.model.OAuthClientCredentialsParameters;
+import org.apache.polaris.core.credentials.PolarisCredentialManager;
import org.apache.polaris.core.secrets.SecretReference;
import org.apache.polaris.core.secrets.UserSecretsManager;
@@ -104,7 +105,7 @@ public class OAuthClientCredentialsParametersDpo extends
AuthenticationParameter
@Override
public @Nonnull Map<String, String> asIcebergCatalogProperties(
- UserSecretsManager secretsManager) {
+ UserSecretsManager secretsManager, PolarisCredentialManager
credentialManager) {
HashMap<String, String> properties = new HashMap<>();
if (getTokenUri() != null) {
properties.put(OAuth2Properties.OAUTH2_SERVER_URI, getTokenUri());
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/connection/SigV4AuthenticationParametersDpo.java
b/polaris-core/src/main/java/org/apache/polaris/core/connection/SigV4AuthenticationParametersDpo.java
index 1d5ca8561..061245110 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/connection/SigV4AuthenticationParametersDpo.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/connection/SigV4AuthenticationParametersDpo.java
@@ -28,6 +28,7 @@ import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.rest.auth.AuthProperties;
import org.apache.polaris.core.admin.model.AuthenticationParameters;
import org.apache.polaris.core.admin.model.SigV4AuthenticationParameters;
+import org.apache.polaris.core.credentials.PolarisCredentialManager;
import org.apache.polaris.core.secrets.UserSecretsManager;
/**
@@ -93,15 +94,15 @@ public class SigV4AuthenticationParametersDpo extends
AuthenticationParametersDp
@Nonnull
@Override
- public Map<String, String> asIcebergCatalogProperties(UserSecretsManager
secretsManager) {
+ public Map<String, String> asIcebergCatalogProperties(
+ UserSecretsManager secretsManager, PolarisCredentialManager
credentialManager) {
ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
builder.put(AuthProperties.AUTH_TYPE, AuthProperties.AUTH_TYPE_SIGV4);
builder.put(AwsProperties.REST_SIGNER_REGION, getSigningRegion());
if (getSigningName() != null) {
builder.put(AwsProperties.REST_SIGNING_NAME, getSigningName());
}
-
- // TODO: Add a credential manager to assume the role and get the aws
session credentials
+ // Connection credentials are handled by ConnectionConfigInfoDpo
return builder.build();
}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/connection/hadoop/HadoopConnectionConfigInfoDpo.java
b/polaris-core/src/main/java/org/apache/polaris/core/connection/hadoop/HadoopConnectionConfigInfoDpo.java
index 66fe8bb70..26be6ad83 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/connection/hadoop/HadoopConnectionConfigInfoDpo.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/connection/hadoop/HadoopConnectionConfigInfoDpo.java
@@ -31,6 +31,8 @@ import
org.apache.polaris.core.admin.model.HadoopConnectionConfigInfo;
import org.apache.polaris.core.connection.AuthenticationParametersDpo;
import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
import org.apache.polaris.core.connection.ConnectionType;
+import org.apache.polaris.core.credentials.PolarisCredentialManager;
+import org.apache.polaris.core.credentials.connection.ConnectionCredentials;
import org.apache.polaris.core.identity.dpo.ServiceIdentityInfoDpo;
import org.apache.polaris.core.identity.provider.ServiceIdentityProvider;
import org.apache.polaris.core.secrets.UserSecretsManager;
@@ -71,13 +73,19 @@ public class HadoopConnectionConfigInfoDpo extends
ConnectionConfigInfoDpo {
@Override
public @Nonnull Map<String, String> asIcebergCatalogProperties(
- UserSecretsManager secretsManager) {
+ UserSecretsManager secretsManager, PolarisCredentialManager
credentialManager) {
HashMap<String, String> properties = new HashMap<>();
properties.put(CatalogProperties.URI, getUri());
if (getWarehouse() != null) {
properties.put(CatalogProperties.WAREHOUSE_LOCATION, getWarehouse());
}
-
properties.putAll(getAuthenticationParameters().asIcebergCatalogProperties(secretsManager));
+ // Add authentication-specific properties
+ properties.putAll(
+ getAuthenticationParameters()
+ .asIcebergCatalogProperties(secretsManager, credentialManager));
+ // Add connection credentials from Polaris credential manager
+ ConnectionCredentials connectionCredentials =
credentialManager.getConnectionCredentials(this);
+ properties.putAll(connectionCredentials.credentials());
return properties;
}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/connection/hive/HiveConnectionConfigInfoDpo.java
b/polaris-core/src/main/java/org/apache/polaris/core/connection/hive/HiveConnectionConfigInfoDpo.java
index 22b067a6c..d4a7e2b3d 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/connection/hive/HiveConnectionConfigInfoDpo.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/connection/hive/HiveConnectionConfigInfoDpo.java
@@ -31,6 +31,8 @@ import
org.apache.polaris.core.admin.model.HiveConnectionConfigInfo;
import org.apache.polaris.core.connection.AuthenticationParametersDpo;
import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
import org.apache.polaris.core.connection.ConnectionType;
+import org.apache.polaris.core.credentials.PolarisCredentialManager;
+import org.apache.polaris.core.credentials.connection.ConnectionCredentials;
import org.apache.polaris.core.identity.dpo.ServiceIdentityInfoDpo;
import org.apache.polaris.core.identity.provider.ServiceIdentityProvider;
import org.apache.polaris.core.secrets.UserSecretsManager;
@@ -70,14 +72,21 @@ public class HiveConnectionConfigInfoDpo extends
ConnectionConfigInfoDpo {
@Override
public @Nonnull Map<String, String> asIcebergCatalogProperties(
- UserSecretsManager secretsManager) {
+ UserSecretsManager secretsManager, PolarisCredentialManager
polarisCredentialManager) {
HashMap<String, String> properties = new HashMap<>();
properties.put(CatalogProperties.URI, getUri());
if (getWarehouse() != null) {
properties.put(CatalogProperties.WAREHOUSE_LOCATION, getWarehouse());
}
if (getAuthenticationParameters() != null) {
-
properties.putAll(getAuthenticationParameters().asIcebergCatalogProperties(secretsManager));
+ // Add authentication-specific properties
+ properties.putAll(
+ getAuthenticationParameters()
+ .asIcebergCatalogProperties(secretsManager,
polarisCredentialManager));
+ // Add connection credentials from Polaris credential manager
+ ConnectionCredentials connectionCredentials =
+ polarisCredentialManager.getConnectionCredentials(this);
+ properties.putAll(connectionCredentials.credentials());
}
return properties;
}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/connection/iceberg/IcebergCatalogPropertiesProvider.java
b/polaris-core/src/main/java/org/apache/polaris/core/connection/iceberg/IcebergCatalogPropertiesProvider.java
index 75af01100..e17218f25 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/connection/iceberg/IcebergCatalogPropertiesProvider.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/connection/iceberg/IcebergCatalogPropertiesProvider.java
@@ -20,6 +20,7 @@ package org.apache.polaris.core.connection.iceberg;
import jakarta.annotation.Nonnull;
import java.util.Map;
+import org.apache.polaris.core.credentials.PolarisCredentialManager;
import org.apache.polaris.core.secrets.UserSecretsManager;
/**
@@ -30,5 +31,6 @@ import org.apache.polaris.core.secrets.UserSecretsManager;
*/
public interface IcebergCatalogPropertiesProvider {
@Nonnull
- Map<String, String> asIcebergCatalogProperties(UserSecretsManager
secretsManager);
+ Map<String, String> asIcebergCatalogProperties(
+ UserSecretsManager secretsManager, PolarisCredentialManager
credentialManager);
}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/connection/iceberg/IcebergRestConnectionConfigInfoDpo.java
b/polaris-core/src/main/java/org/apache/polaris/core/connection/iceberg/IcebergRestConnectionConfigInfoDpo.java
index ee01691b9..43f5c8a92 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/connection/iceberg/IcebergRestConnectionConfigInfoDpo.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/connection/iceberg/IcebergRestConnectionConfigInfoDpo.java
@@ -31,6 +31,8 @@ import
org.apache.polaris.core.admin.model.IcebergRestConnectionConfigInfo;
import org.apache.polaris.core.connection.AuthenticationParametersDpo;
import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
import org.apache.polaris.core.connection.ConnectionType;
+import org.apache.polaris.core.credentials.PolarisCredentialManager;
+import org.apache.polaris.core.credentials.connection.ConnectionCredentials;
import org.apache.polaris.core.identity.dpo.ServiceIdentityInfoDpo;
import org.apache.polaris.core.identity.provider.ServiceIdentityProvider;
import org.apache.polaris.core.secrets.UserSecretsManager;
@@ -63,13 +65,19 @@ public class IcebergRestConnectionConfigInfoDpo extends
ConnectionConfigInfoDpo
@Override
public @Nonnull Map<String, String> asIcebergCatalogProperties(
- UserSecretsManager secretsManager) {
+ UserSecretsManager secretsManager, PolarisCredentialManager
credentialManager) {
HashMap<String, String> properties = new HashMap<>();
properties.put(CatalogProperties.URI, getUri());
if (getRemoteCatalogName() != null) {
properties.put(CatalogProperties.WAREHOUSE_LOCATION,
getRemoteCatalogName());
}
-
properties.putAll(getAuthenticationParameters().asIcebergCatalogProperties(secretsManager));
+ // Add authentication-specific properties
+ properties.putAll(
+ getAuthenticationParameters()
+ .asIcebergCatalogProperties(secretsManager, credentialManager));
+ // Add connection credentials from Polaris credential manager
+ ConnectionCredentials connectionCredentials =
credentialManager.getConnectionCredentials(this);
+ properties.putAll(connectionCredentials.credentials());
return properties;
}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/credentials/PolarisCredentialManager.java
b/polaris-core/src/main/java/org/apache/polaris/core/credentials/PolarisCredentialManager.java
new file mode 100644
index 000000000..e8163c16b
--- /dev/null
+++
b/polaris-core/src/main/java/org/apache/polaris/core/credentials/PolarisCredentialManager.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.core.credentials;
+
+import org.apache.polaris.core.connection.AuthenticationParametersDpo;
+import
org.apache.polaris.core.credentials.connection.ConnectionCredentialVendor;
+import org.apache.polaris.core.identity.credential.ServiceIdentityCredential;
+import org.apache.polaris.core.identity.provider.ServiceIdentityProvider;
+
+/**
+ * PolarisCredentialManager is responsible for retrieving the credentials
Polaris needs to access
+ * remote services such as federated catalogs and cloud storage.
+ *
+ * <p>It delegates to {@link ConnectionCredentialVendor} implementations that
combine
+ * service-managed identity information (e.g., an IAM user Polaris uses) with
user-defined
+ * authentication parameters (e.g., roleArn) to generate the credentials
required for authentication
+ * with external systems.
+ *
+ * <p>Typical flow for connection credentials:
+ *
+ * <ol>
+ * <li>The manager selects the appropriate {@link
ConnectionCredentialVendor} based on the
+ * authentication type from the {@link AuthenticationParametersDpo}.
+ * <li>The vendor resolves the service identity using {@link
ServiceIdentityProvider} to obtain a
+ * {@link ServiceIdentityCredential}.
+ * <li>The vendor uses the service identity credential together with
user-provided authentication
+ * parameters to obtain temporary access credentials (e.g., via AWS STS
AssumeRole).
+ * </ol>
+ *
+ * <p>This design supports both SaaS and self-managed deployments, ensuring a
clear separation
+ * between user-provided configuration and Polaris-managed identity. In the
future, this interface
+ * will be extended to also manage storage credentials, providing a unified
interface for all
+ * credential management needs.
+ */
+public interface PolarisCredentialManager extends ConnectionCredentialVendor {}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/credentials/connection/CatalogAccessProperty.java
b/polaris-core/src/main/java/org/apache/polaris/core/credentials/connection/CatalogAccessProperty.java
new file mode 100644
index 000000000..cb42f7e32
--- /dev/null
+++
b/polaris-core/src/main/java/org/apache/polaris/core/credentials/connection/CatalogAccessProperty.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.core.credentials.connection;
+
+import org.apache.iceberg.aws.AwsProperties;
+
+/**
+ * A subset of Iceberg catalog properties recognized by Polaris.
+ *
+ * <p>Most of these properties are meant to initialize Catalog objects for
accessing the remote
+ * Catalog service.
+ */
+public enum CatalogAccessProperty {
+ AWS_ACCESS_KEY_ID(String.class, AwsProperties.REST_ACCESS_KEY_ID, "the aws
access key id", true),
+ AWS_SECRET_ACCESS_KEY(
+ String.class, AwsProperties.REST_SECRET_ACCESS_KEY, "the aws access key
secret", true),
+ AWS_SESSION_TOKEN(
+ String.class, AwsProperties.REST_SESSION_TOKEN, "the aws scoped access
token", true),
+ EXPIRATION_TIME(
+ Long.class,
+ "expiration-time",
+ "the expiration time for the access token, in milliseconds",
+ false);
+
+ private final Class valueType;
+ private final String propertyName;
+ private final String description;
+ private final boolean isCredential;
+
+ CatalogAccessProperty(
+ Class valueType, String propertyName, String description, boolean
isCredential) {
+ this.valueType = valueType;
+ this.propertyName = propertyName;
+ this.description = description;
+ this.isCredential = isCredential;
+ }
+
+ public String getPropertyName() {
+ return propertyName;
+ }
+
+ public boolean isCredential() {
+ return isCredential;
+ }
+
+ public boolean isExpirationTimestamp() {
+ return this == EXPIRATION_TIME;
+ }
+}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/credentials/connection/ConnectionCredentialVendor.java
b/polaris-core/src/main/java/org/apache/polaris/core/credentials/connection/ConnectionCredentialVendor.java
new file mode 100644
index 000000000..e8bd840d6
--- /dev/null
+++
b/polaris-core/src/main/java/org/apache/polaris/core/credentials/connection/ConnectionCredentialVendor.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.core.credentials.connection;
+
+import jakarta.annotation.Nonnull;
+import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
+
+/**
+ * Vendor for generating connection credentials for remote catalog or storage
access.
+ *
+ * <p>Implementations combine Polaris-managed service identity credentials
with user-provided
+ * authentication parameters to produce the final credentials needed to
connect to external systems.
+ *
+ * <p>For CDI-based implementations (e.g., auth-type-specific vendors), use
the {@code AuthType}
+ * annotation to indicate which authentication type(s) they support. The
credential manager uses CDI
+ * to automatically select the appropriate vendor at runtime.
+ *
+ * <p><b>Multiple Implementations:</b> If multiple vendors support the same
authentication type, use
+ * {@code @Priority} to specify precedence. Higher priority values take
precedence. Without
+ * {@code @Priority}, multiple vendors will cause an {@code
IllegalStateException} at runtime.
+ *
+ * <p>Example:
+ *
+ * <pre>
+ * @ApplicationScoped
+ * @AuthType(AuthenticationType.SIGV4)
+ * @Priority(200) // Overrides default implementation
+ * public class CustomSigV4Vendor implements ConnectionCredentialVendor { ... }
+ * </pre>
+ */
+public interface ConnectionCredentialVendor {
+
+ /**
+ * Generate connection credentials by combining service identity with
authentication parameters.
+ *
+ * <p>The connection configuration contains both the Polaris-managed service
identity (e.g., an
+ * IAM user) and user-configured authentication settings (e.g., which role
to assume, signing
+ * region).
+ *
+ * <p>Implementations should validate that the service identity and
authentication parameters are
+ * of the expected types using preconditions.
+ *
+ * @param connectionConfig The connection configuration containing service
identity and
+ * authentication parameters
+ * @return Connection credentials object containing credentials, properties,
and optional
+ * expiration
+ */
+ @Nonnull
+ ConnectionCredentials getConnectionCredentials(@Nonnull
ConnectionConfigInfoDpo connectionConfig);
+}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/credentials/connection/ConnectionCredentials.java
b/polaris-core/src/main/java/org/apache/polaris/core/credentials/connection/ConnectionCredentials.java
new file mode 100644
index 000000000..d9e5e549c
--- /dev/null
+++
b/polaris-core/src/main/java/org/apache/polaris/core/credentials/connection/ConnectionCredentials.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.core.credentials.connection;
+
+import com.google.errorprone.annotations.CanIgnoreReturnValue;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.polaris.core.storage.AccessConfig;
+import org.apache.polaris.immutables.PolarisImmutable;
+
+/**
+ * Encapsulates credentials and configuration needed to connect to external
federated catalogs.
+ *
+ * <p>Similar to {@link AccessConfig} for storage, this class holds the
credentials and properties
+ * required for Polaris to authenticate with remote catalog services (e.g.,
AWS Glue, other Iceberg
+ * REST catalogs).
+ *
+ * <p>Credentials may be temporary and include an expiration time.
+ *
+ * <p><b>Note:</b> This interface currently includes only {@code credentials}
and {@code expiresAt}.
+ * Additional fields like {@code extraProperties} and {@code
internalProperties} (similar to {@link
+ * AccessConfig}) are not included for now but can be added later if needed
for more complex
+ * credential scenarios.
+ */
+@PolarisImmutable
+public interface ConnectionCredentials {
+ /** Sensitive credential properties (e.g., access keys, tokens). */
+ Map<String, String> credentials();
+
+ /** Optional expiration time for the credentials. */
+ Optional<Instant> expiresAt();
+
+ /**
+ * Get a credential value by property key.
+ *
+ * @param key the credential property to retrieve
+ * @return the credential value, or null if not present
+ */
+ default String get(CatalogAccessProperty key) {
+ return credentials().get(key.getPropertyName());
+ }
+
+ static ConnectionCredentials.Builder builder() {
+ return ImmutableConnectionCredentials.builder();
+ }
+
+ interface Builder {
+ @CanIgnoreReturnValue
+ Builder putCredential(String key, String value);
+
+ @CanIgnoreReturnValue
+ Builder expiresAt(Instant expiresAt);
+
+ default Builder put(CatalogAccessProperty key, String value) {
+ if (key.isExpirationTimestamp()) {
+ expiresAt(Instant.ofEpochMilli(Long.parseLong(value)));
+ } else if (key.isCredential()) {
+ putCredential(key.getPropertyName(), value);
+ }
+ return this;
+ }
+
+ ConnectionCredentials build();
+ }
+}
diff --git a/runtime/defaults/src/main/resources/application.properties
b/runtime/defaults/src/main/resources/application.properties
index def1694c7..e0e34fd5b 100644
--- a/runtime/defaults/src/main/resources/application.properties
+++ b/runtime/defaults/src/main/resources/application.properties
@@ -197,6 +197,9 @@ polaris.oidc.principal-roles-mapper.type=default
# polaris.storage.gcp.token=token
# polaris.storage.gcp.lifespan=PT1H
+# Polaris Credential Manager Config
+polaris.credential-manager.type=default
+
quarkus.arc.ignored-split-packages=\
org.apache.polaris.service.catalog.api,\
org.apache.polaris.service.catalog.api.impl,\
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/common/CatalogHandler.java
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/common/CatalogHandler.java
index 8919aeb2a..6cfd16321 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/common/CatalogHandler.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/common/CatalogHandler.java
@@ -40,6 +40,7 @@ import org.apache.polaris.core.catalog.ExternalCatalogFactory;
import org.apache.polaris.core.catalog.PolarisCatalogHelpers;
import org.apache.polaris.core.config.RealmConfig;
import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.credentials.PolarisCredentialManager;
import org.apache.polaris.core.entity.PolarisEntitySubType;
import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
@@ -64,6 +65,7 @@ public abstract class CatalogHandler {
protected final String catalogName;
protected final PolarisAuthorizer authorizer;
protected final UserSecretsManager userSecretsManager;
+ protected final PolarisCredentialManager credentialManager;
protected final Instance<ExternalCatalogFactory> externalCatalogFactories;
protected final PolarisDiagnostics diagnostics;
@@ -80,6 +82,7 @@ public abstract class CatalogHandler {
String catalogName,
PolarisAuthorizer authorizer,
UserSecretsManager userSecretsManager,
+ PolarisCredentialManager credentialManager,
Instance<ExternalCatalogFactory> externalCatalogFactories) {
this.diagnostics = diagnostics;
this.callContext = callContext;
@@ -96,6 +99,7 @@ public abstract class CatalogHandler {
this.polarisPrincipal = (PolarisPrincipal)
securityContext.getUserPrincipal();
this.authorizer = authorizer;
this.userSecretsManager = userSecretsManager;
+ this.credentialManager = credentialManager;
this.externalCatalogFactories = externalCatalogFactories;
}
@@ -103,6 +107,10 @@ public abstract class CatalogHandler {
return userSecretsManager;
}
+ protected PolarisCredentialManager getPolarisCredentialManager() {
+ return credentialManager;
+ }
+
protected PolarisResolutionManifest newResolutionManifest() {
return resolutionManifestFactory.createResolutionManifest(securityContext,
catalogName);
}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/generic/GenericTableCatalogAdapter.java
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/generic/GenericTableCatalogAdapter.java
index 650c747dc..ae16db5ba 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/generic/GenericTableCatalogAdapter.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/generic/GenericTableCatalogAdapter.java
@@ -32,6 +32,7 @@ import org.apache.polaris.core.config.FeatureConfiguration;
import org.apache.polaris.core.config.RealmConfig;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.credentials.PolarisCredentialManager;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
import org.apache.polaris.core.persistence.resolver.ResolutionManifestFactory;
import org.apache.polaris.core.secrets.UserSecretsManager;
@@ -61,6 +62,7 @@ public class GenericTableCatalogAdapter
private final ReservedProperties reservedProperties;
private final CatalogPrefixParser prefixParser;
private final UserSecretsManager userSecretsManager;
+ private final PolarisCredentialManager polarisCredentialManager;
private final Instance<ExternalCatalogFactory> externalCatalogFactories;
@Inject
@@ -74,6 +76,7 @@ public class GenericTableCatalogAdapter
CatalogPrefixParser prefixParser,
ReservedProperties reservedProperties,
UserSecretsManager userSecretsManager,
+ PolarisCredentialManager polarisCredentialManager,
@Any Instance<ExternalCatalogFactory> externalCatalogFactories) {
this.diagnostics = diagnostics;
this.realmContext = realmContext;
@@ -85,6 +88,7 @@ public class GenericTableCatalogAdapter
this.prefixParser = prefixParser;
this.reservedProperties = reservedProperties;
this.userSecretsManager = userSecretsManager;
+ this.polarisCredentialManager = polarisCredentialManager;
this.externalCatalogFactories = externalCatalogFactories;
}
@@ -103,6 +107,7 @@ public class GenericTableCatalogAdapter
prefixParser.prefixToCatalogName(realmContext, prefix),
polarisAuthorizer,
userSecretsManager,
+ polarisCredentialManager,
externalCatalogFactories);
}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/generic/GenericTableCatalogHandler.java
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/generic/GenericTableCatalogHandler.java
index b3fa9362a..4564e117f 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/generic/GenericTableCatalogHandler.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/generic/GenericTableCatalogHandler.java
@@ -34,6 +34,7 @@ import org.apache.polaris.core.config.FeatureConfiguration;
import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
import org.apache.polaris.core.connection.ConnectionType;
import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.credentials.PolarisCredentialManager;
import org.apache.polaris.core.entity.CatalogEntity;
import org.apache.polaris.core.entity.PolarisEntitySubType;
import org.apache.polaris.core.entity.table.GenericTableEntity;
@@ -63,6 +64,7 @@ public class GenericTableCatalogHandler extends
CatalogHandler {
String catalogName,
PolarisAuthorizer authorizer,
UserSecretsManager userSecretsManager,
+ PolarisCredentialManager polarisCredentialManager,
Instance<ExternalCatalogFactory> externalCatalogFactories) {
super(
diagnostics,
@@ -72,6 +74,7 @@ public class GenericTableCatalogHandler extends
CatalogHandler {
catalogName,
authorizer,
userSecretsManager,
+ polarisCredentialManager,
externalCatalogFactories);
this.metaStoreManager = metaStoreManager;
}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
index 38f1fbb4b..a21d2883e 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
@@ -67,6 +67,7 @@ import org.apache.polaris.core.catalog.ExternalCatalogFactory;
import org.apache.polaris.core.config.RealmConfig;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.credentials.PolarisCredentialManager;
import org.apache.polaris.core.entity.PolarisEntity;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
import org.apache.polaris.core.persistence.ResolvedPolarisEntity;
@@ -148,6 +149,7 @@ public class IcebergCatalogAdapter
private final ResolverFactory resolverFactory;
private final PolarisMetaStoreManager metaStoreManager;
private final UserSecretsManager userSecretsManager;
+ private final PolarisCredentialManager credentialManager;
private final PolarisAuthorizer polarisAuthorizer;
private final CatalogPrefixParser prefixParser;
private final ReservedProperties reservedProperties;
@@ -165,6 +167,7 @@ public class IcebergCatalogAdapter
ResolutionManifestFactory resolutionManifestFactory,
PolarisMetaStoreManager metaStoreManager,
UserSecretsManager userSecretsManager,
+ PolarisCredentialManager credentialManager,
PolarisAuthorizer polarisAuthorizer,
CatalogPrefixParser prefixParser,
ReservedProperties reservedProperties,
@@ -180,6 +183,7 @@ public class IcebergCatalogAdapter
this.resolverFactory = resolverFactory;
this.metaStoreManager = metaStoreManager;
this.userSecretsManager = userSecretsManager;
+ this.credentialManager = credentialManager;
this.polarisAuthorizer = polarisAuthorizer;
this.prefixParser = prefixParser;
this.reservedProperties = reservedProperties;
@@ -218,6 +222,7 @@ public class IcebergCatalogAdapter
resolutionManifestFactory,
metaStoreManager,
userSecretsManager,
+ credentialManager,
securityContext,
catalogFactory,
catalogName,
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
index 03a5881c8..07e265c01 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
@@ -84,6 +84,7 @@ import org.apache.polaris.core.config.FeatureConfiguration;
import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
import org.apache.polaris.core.connection.ConnectionType;
import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.credentials.PolarisCredentialManager;
import org.apache.polaris.core.entity.CatalogEntity;
import org.apache.polaris.core.entity.PolarisEntity;
import org.apache.polaris.core.entity.PolarisEntitySubType;
@@ -151,6 +152,7 @@ public class IcebergCatalogHandler extends CatalogHandler
implements AutoCloseab
ResolutionManifestFactory resolutionManifestFactory,
PolarisMetaStoreManager metaStoreManager,
UserSecretsManager userSecretsManager,
+ PolarisCredentialManager credentialManager,
SecurityContext securityContext,
CallContextCatalogFactory catalogFactory,
String catalogName,
@@ -167,6 +169,7 @@ public class IcebergCatalogHandler extends CatalogHandler
implements AutoCloseab
catalogName,
authorizer,
userSecretsManager,
+ credentialManager,
externalCatalogFactories);
this.metaStoreManager = metaStoreManager;
this.catalogFactory = catalogFactory;
@@ -248,7 +251,10 @@ public class IcebergCatalogHandler extends CatalogHandler
implements AutoCloseab
federatedCatalog =
externalCatalogFactory
.get()
- .createCatalog(connectionConfigInfoDpo,
getUserSecretsManager());
+ .createCatalog(
+ connectionConfigInfoDpo,
+ getUserSecretsManager(),
+ getPolarisCredentialManager());
} else {
throw new UnsupportedOperationException(
"External catalog factory for type '" + connectionType + "' is
unavailable.");
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRESTExternalCatalogFactory.java
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRESTExternalCatalogFactory.java
index c1ce3b276..7167e382e 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRESTExternalCatalogFactory.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRESTExternalCatalogFactory.java
@@ -29,6 +29,7 @@ import org.apache.polaris.core.catalog.GenericTableCatalog;
import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
import org.apache.polaris.core.connection.ConnectionType;
import
org.apache.polaris.core.connection.iceberg.IcebergRestConnectionConfigInfoDpo;
+import org.apache.polaris.core.credentials.PolarisCredentialManager;
import org.apache.polaris.core.secrets.UserSecretsManager;
/** Factory class for creating an Iceberg REST catalog handle based on
connection configuration. */
@@ -38,7 +39,9 @@ public class IcebergRESTExternalCatalogFactory implements
ExternalCatalogFactory
@Override
public Catalog createCatalog(
- ConnectionConfigInfoDpo connectionConfig, UserSecretsManager
userSecretsManager) {
+ ConnectionConfigInfoDpo connectionConfig,
+ UserSecretsManager userSecretsManager,
+ PolarisCredentialManager polarisCredentialManager) {
if (!(connectionConfig instanceof IcebergRestConnectionConfigInfoDpo
icebergConfig)) {
throw new IllegalArgumentException(
"Expected IcebergRestConnectionConfigInfoDpo but got: "
@@ -56,7 +59,7 @@ public class IcebergRESTExternalCatalogFactory implements
ExternalCatalogFactory
federatedCatalog.initialize(
icebergConfig.getRemoteCatalogName(),
- connectionConfig.asIcebergCatalogProperties(userSecretsManager));
+ connectionConfig.asIcebergCatalogProperties(userSecretsManager,
polarisCredentialManager));
return federatedCatalog;
}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalogAdapter.java
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalogAdapter.java
index 98bb3d9f1..545848065 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalogAdapter.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalogAdapter.java
@@ -33,6 +33,7 @@ import org.apache.polaris.core.config.FeatureConfiguration;
import org.apache.polaris.core.config.RealmConfig;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.credentials.PolarisCredentialManager;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
import org.apache.polaris.core.persistence.resolver.ResolutionManifestFactory;
import org.apache.polaris.core.policy.PolicyType;
@@ -64,6 +65,7 @@ public class PolicyCatalogAdapter implements
PolarisCatalogPolicyApiService, Cat
private final PolarisAuthorizer polarisAuthorizer;
private final CatalogPrefixParser prefixParser;
private final UserSecretsManager userSecretsManager;
+ private final PolarisCredentialManager polarisCredentialManager;
private final Instance<ExternalCatalogFactory> externalCatalogFactories;
@Inject
@@ -76,6 +78,7 @@ public class PolicyCatalogAdapter implements
PolarisCatalogPolicyApiService, Cat
PolarisAuthorizer polarisAuthorizer,
CatalogPrefixParser prefixParser,
UserSecretsManager userSecretsManager,
+ PolarisCredentialManager polarisCredentialManager,
@Any Instance<ExternalCatalogFactory> externalCatalogFactories) {
this.diagnostics = diagnostics;
this.realmContext = realmContext;
@@ -86,6 +89,7 @@ public class PolicyCatalogAdapter implements
PolarisCatalogPolicyApiService, Cat
this.polarisAuthorizer = polarisAuthorizer;
this.prefixParser = prefixParser;
this.userSecretsManager = userSecretsManager;
+ this.polarisCredentialManager = polarisCredentialManager;
this.externalCatalogFactories = externalCatalogFactories;
}
@@ -103,6 +107,7 @@ public class PolicyCatalogAdapter implements
PolarisCatalogPolicyApiService, Cat
prefixParser.prefixToCatalogName(realmContext, prefix),
polarisAuthorizer,
userSecretsManager,
+ polarisCredentialManager,
externalCatalogFactories);
}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalogHandler.java
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalogHandler.java
index e2cecc99b..0c92c816f 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalogHandler.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalogHandler.java
@@ -36,6 +36,7 @@ import org.apache.polaris.core.auth.PolarisAuthorizer;
import org.apache.polaris.core.catalog.ExternalCatalogFactory;
import org.apache.polaris.core.catalog.PolarisCatalogHelpers;
import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.credentials.PolarisCredentialManager;
import org.apache.polaris.core.entity.PolarisEntitySubType;
import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
@@ -72,6 +73,7 @@ public class PolicyCatalogHandler extends CatalogHandler {
String catalogName,
PolarisAuthorizer authorizer,
UserSecretsManager userSecretsManager,
+ PolarisCredentialManager polarisCredentialManager,
Instance<ExternalCatalogFactory> externalCatalogFactories) {
super(
diagnostics,
@@ -81,6 +83,7 @@ public class PolicyCatalogHandler extends CatalogHandler {
catalogName,
authorizer,
userSecretsManager,
+ polarisCredentialManager,
externalCatalogFactories);
this.metaStoreManager = metaStoreManager;
}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java
b/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java
index 214ba9ad8..6e25dbfc1 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java
@@ -43,6 +43,7 @@ import
org.apache.polaris.core.config.PolarisConfigurationStore;
import org.apache.polaris.core.config.RealmConfig;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.credentials.PolarisCredentialManager;
import org.apache.polaris.core.persistence.BasePersistence;
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
@@ -70,6 +71,7 @@ import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.context.RealmContextConfiguration;
import org.apache.polaris.service.context.RealmContextFilter;
import org.apache.polaris.service.context.RealmContextResolver;
+import
org.apache.polaris.service.credentials.PolarisCredentialManagerConfiguration;
import org.apache.polaris.service.events.PolarisEventListenerConfiguration;
import org.apache.polaris.service.events.listeners.PolarisEventListener;
import org.apache.polaris.service.persistence.PersistenceConfiguration;
@@ -392,6 +394,14 @@ public class ServiceProducers {
return
resolvers.select(Identifier.Literal.of(config.tenantResolver())).get();
}
+ @Produces
+ @RequestScoped
+ public PolarisCredentialManager polarisCredentialManager(
+ PolarisCredentialManagerConfiguration config,
+ @Any Instance<PolarisCredentialManager> credentialManagers) {
+ return
credentialManagers.select(Identifier.Literal.of(config.type())).get();
+ }
+
public void closeTaskExecutor(@Disposes @Identifier("task-executor")
ManagedExecutor executor) {
executor.close();
}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/credentials/DefaultPolarisCredentialManager.java
b/runtime/service/src/main/java/org/apache/polaris/service/credentials/DefaultPolarisCredentialManager.java
new file mode 100644
index 000000000..489e8e043
--- /dev/null
+++
b/runtime/service/src/main/java/org/apache/polaris/service/credentials/DefaultPolarisCredentialManager.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.credentials;
+
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.Nonnull;
+import jakarta.enterprise.context.RequestScoped;
+import jakarta.enterprise.inject.Any;
+import jakarta.enterprise.inject.Instance;
+import jakarta.enterprise.inject.ResolutionException;
+import jakarta.enterprise.inject.UnsatisfiedResolutionException;
+import jakarta.inject.Inject;
+import org.apache.polaris.core.connection.AuthenticationType;
+import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.credentials.PolarisCredentialManager;
+import
org.apache.polaris.core.credentials.connection.ConnectionCredentialVendor;
+import org.apache.polaris.core.credentials.connection.ConnectionCredentials;
+import org.apache.polaris.service.credentials.connection.AuthType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Default implementation of {@link PolarisCredentialManager} responsible for
retrieving credentials
+ * used by Polaris to access external systems such as remote catalogs or cloud
storage.
+ *
+ * <p>This implementation delegates to {@link ConnectionCredentialVendor}
implementations selected
+ * via CDI based on the authentication type. Each vendor handles the
credential transformation logic
+ * for a specific authentication mechanism (e.g., SigV4, OAuth).
+ *
+ * <p>This bean is request-scoped and realm-aware, delegating all credential
generation to
+ * CDI-managed vendors.
+ *
+ * <p>Flow:
+ *
+ * <ol>
+ * <li>Selects the appropriate {@link ConnectionCredentialVendor} based on
the authentication type
+ * <li>Delegates to the vendor to generate the final connection credentials
(the vendor will
+ * resolve the service identity internally)
+ * </ol>
+ */
+@RequestScoped
+@Identifier("default")
+public class DefaultPolarisCredentialManager implements
PolarisCredentialManager {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(DefaultPolarisCredentialManager.class);
+
+ private final RealmContext realmContext;
+ private final Instance<ConnectionCredentialVendor> credentialVendors;
+
+ @Inject
+ public DefaultPolarisCredentialManager(
+ RealmContext realmContext, @Any Instance<ConnectionCredentialVendor>
credentialVendors) {
+ this.realmContext = realmContext;
+ this.credentialVendors = credentialVendors;
+ }
+
+ public RealmContext getRealmContext() {
+ return realmContext;
+ }
+
+ @Override
+ public @Nonnull ConnectionCredentials getConnectionCredentials(
+ @Nonnull ConnectionConfigInfoDpo connectionConfig) {
+
+ AuthenticationType authType =
+ connectionConfig.getAuthenticationParameters().getAuthenticationType();
+
+ // Use CDI to select the appropriate vendor based on the authentication
type
+ ConnectionCredentialVendor selectedVendor;
+ try {
+ selectedVendor =
credentialVendors.select(AuthType.Literal.of(authType)).get();
+ } catch (UnsatisfiedResolutionException e) {
+ // Silently ignore if no vendor found for this auth type for now to pass
tests
+ // TODO: add connection credential vendor for other auth types
+ return ConnectionCredentials.builder().build();
+ } catch (ResolutionException e) {
+ // No vendor found or ambiguous vendors
+ // Multiple vendors found - need @Priority to disambiguate
+ throw new IllegalStateException(
+ "Unable to obtain connection credentials required for executing this
request", e);
+ }
+
+ // Delegate credential generation to the selected vendor
+ return selectedVendor.getConnectionCredentials(connectionConfig);
+ }
+}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/connection/iceberg/IcebergCatalogPropertiesProvider.java
b/runtime/service/src/main/java/org/apache/polaris/service/credentials/PolarisCredentialManagerConfiguration.java
similarity index 50%
copy from
polaris-core/src/main/java/org/apache/polaris/core/connection/iceberg/IcebergCatalogPropertiesProvider.java
copy to
runtime/service/src/main/java/org/apache/polaris/service/credentials/PolarisCredentialManagerConfiguration.java
index 75af01100..6c7d592e7 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/connection/iceberg/IcebergCatalogPropertiesProvider.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/credentials/PolarisCredentialManagerConfiguration.java
@@ -16,19 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.polaris.core.connection.iceberg;
-import jakarta.annotation.Nonnull;
-import java.util.Map;
-import org.apache.polaris.core.secrets.UserSecretsManager;
+package org.apache.polaris.service.credentials;
+
+import io.quarkus.runtime.annotations.StaticInitSafe;
+import io.smallrye.config.ConfigMapping;
+import org.apache.polaris.core.credentials.PolarisCredentialManager;
/**
- * Configuration wrappers which ultimately translate their contents into
Iceberg properties and
- * which may hold other nested configuration wrapper objects implement this
interface to allow
- * delegating type-specific configuration translation logic to subclasses
instead of needing to
- * expose the internals of deeply nested configuration objects to a visitor
class.
+ * Quarkus configuration mapping for Polaris Credential Manager.
+ *
+ * <p>Defines which {@link PolarisCredentialManager} implementation should be
used at runtime. This
+ * allows switching between different credential management strategies via
configuration.
*/
-public interface IcebergCatalogPropertiesProvider {
- @Nonnull
- Map<String, String> asIcebergCatalogProperties(UserSecretsManager
secretsManager);
+@StaticInitSafe
+@ConfigMapping(prefix = "polaris.credential-manager")
+public interface PolarisCredentialManagerConfiguration {
+
+ /**
+ * The type identifier of the PolarisCredentialManager implementation to
use. This corresponds to
+ * the {@code @Identifier} annotation value on the implementation (e.g.,
"default").
+ */
+ String type();
}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/credentials/connection/AuthType.java
b/runtime/service/src/main/java/org/apache/polaris/service/credentials/connection/AuthType.java
new file mode 100644
index 000000000..ada9825bf
--- /dev/null
+++
b/runtime/service/src/main/java/org/apache/polaris/service/credentials/connection/AuthType.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.credentials.connection;
+
+import jakarta.enterprise.util.AnnotationLiteral;
+import jakarta.inject.Qualifier;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import org.apache.polaris.core.connection.AuthenticationType;
+import
org.apache.polaris.core.credentials.connection.ConnectionCredentialVendor;
+
+/**
+ * CDI qualifier to indicate which authentication type a {@link
ConnectionCredentialVendor}
+ * supports.
+ *
+ * <p>This annotation allows the credential manager to automatically select
the appropriate vendor
+ * based on the authentication type specified in the connection configuration.
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * @ApplicationScoped
+ * @AuthType(AuthenticationType.SIGV4)
+ * @Priority(100)
+ * public class SigV4ConnectionCredentialVendor implements
ConnectionCredentialVendor {
+ * // AWS STS AssumeRole logic for SigV4 authentication
+ * }
+ * }</pre>
+ */
+@Qualifier
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface AuthType {
+
+ /** The authentication type this vendor supports. */
+ AuthenticationType value();
+
+ /** Helper for creating {@link AuthType} qualifiers programmatically. */
+ final class Literal extends AnnotationLiteral<AuthType> implements AuthType {
+ private final AuthenticationType value;
+
+ public static Literal of(AuthenticationType value) {
+ return new Literal(value);
+ }
+
+ private Literal(AuthenticationType value) {
+ this.value = value;
+ }
+
+ @Override
+ public AuthenticationType value() {
+ return value;
+ }
+ }
+}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/credentials/connection/SigV4ConnectionCredentialVendor.java
b/runtime/service/src/main/java/org/apache/polaris/service/credentials/connection/SigV4ConnectionCredentialVendor.java
new file mode 100644
index 000000000..d85d318cf
--- /dev/null
+++
b/runtime/service/src/main/java/org/apache/polaris/service/credentials/connection/SigV4ConnectionCredentialVendor.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.credentials.connection;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import jakarta.annotation.Nonnull;
+import jakarta.annotation.Priority;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.util.Optional;
+import org.apache.polaris.core.connection.AuthenticationType;
+import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
+import org.apache.polaris.core.connection.SigV4AuthenticationParametersDpo;
+import org.apache.polaris.core.credentials.connection.CatalogAccessProperty;
+import
org.apache.polaris.core.credentials.connection.ConnectionCredentialVendor;
+import org.apache.polaris.core.credentials.connection.ConnectionCredentials;
+import
org.apache.polaris.core.identity.credential.AwsIamServiceIdentityCredential;
+import org.apache.polaris.core.identity.credential.ServiceIdentityCredential;
+import org.apache.polaris.core.identity.provider.ServiceIdentityProvider;
+import org.apache.polaris.core.storage.aws.StsClientProvider;
+import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
+import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
+
+/**
+ * Connection credential vendor for AWS SigV4 authentication.
+ *
+ * <p>This vendor uses Polaris's AWS IAM service identity to assume a
customer-provided IAM role via
+ * AWS STS, generating temporary credentials that Polaris uses to access
external AWS services
+ * (e.g., AWS Glue catalog) with SigV4 request signing.
+ *
+ * <p>Flow:
+ *
+ * <ol>
+ * <li>Receives Polaris's {@link AwsIamServiceIdentityCredential} (the IAM
user/role Polaris owns)
+ * <li>Extracts customer's role ARN from {@link
SigV4AuthenticationParametersDpo}
+ * <li>Calls AWS STS AssumeRole to get temporary credentials
+ * <li>Returns temporary access key, secret key, and session token
+ * </ol>
+ *
+ * <p>This is the default implementation with {@code @Priority(100)}. Custom
implementations can
+ * override this by providing a higher priority value.
+ */
+@ApplicationScoped
+@AuthType(AuthenticationType.SIGV4)
+@Priority(100)
+public class SigV4ConnectionCredentialVendor implements
ConnectionCredentialVendor {
+
+ private static final String DEFAULT_ROLE_SESSION_NAME = "polaris";
+
+ private final StsClientProvider stsClientProvider;
+ private final ServiceIdentityProvider serviceIdentityProvider;
+
+ @Inject
+ public SigV4ConnectionCredentialVendor(
+ StsClientProvider stsClientProvider, ServiceIdentityProvider
serviceIdentityProvider) {
+ this.stsClientProvider = stsClientProvider;
+ this.serviceIdentityProvider = serviceIdentityProvider;
+ }
+
+ @Override
+ public @Nonnull ConnectionCredentials getConnectionCredentials(
+ @Nonnull ConnectionConfigInfoDpo connectionConfig) {
+
+ // Validate and extract authentication parameters
+ Preconditions.checkArgument(
+ connectionConfig.getAuthenticationParameters() instanceof
SigV4AuthenticationParametersDpo,
+ "Expected SigV4AuthenticationParametersDpo, got: %s",
+ connectionConfig.getAuthenticationParameters().getClass().getName());
+ SigV4AuthenticationParametersDpo sigv4Params =
+ (SigV4AuthenticationParametersDpo)
connectionConfig.getAuthenticationParameters();
+
+ // Resolve the service identity credential
+ Optional<ServiceIdentityCredential> serviceCredentialOpt =
+
serviceIdentityProvider.getServiceIdentityCredential(connectionConfig.getServiceIdentity());
+ if (serviceCredentialOpt.isEmpty()) {
+ return ConnectionCredentials.builder().build();
+ }
+
+ // Validate and cast service identity credential
+ ServiceIdentityCredential serviceCredential = serviceCredentialOpt.get();
+ Preconditions.checkArgument(
+ serviceCredential instanceof AwsIamServiceIdentityCredential,
+ "Expected AwsIamServiceIdentityCredential, got: %s",
+ serviceCredential.getClass().getName());
+ AwsIamServiceIdentityCredential awsCredential =
+ (AwsIamServiceIdentityCredential) serviceCredential;
+
+ // Use Polaris's IAM identity to assume the customer's role
+ StsClient stsClient = getStsClient(sigv4Params);
+
+ // Build the AssumeRole request with Polaris's credentials
+ // TODO: Generate service-level scoping policy to restrict permissions
+ AssumeRoleRequest.Builder requestBuilder =
+ AssumeRoleRequest.builder()
+ .roleArn(sigv4Params.getRoleArn())
+ .roleSessionName(
+ Optional.ofNullable(sigv4Params.getRoleSessionName())
+ .orElse(DEFAULT_ROLE_SESSION_NAME))
+ .externalId(sigv4Params.getExternalId());
+
+ // Configure the request to use Polaris's service identity credentials
+ requestBuilder.overrideConfiguration(
+ config ->
config.credentialsProvider(awsCredential.getAwsCredentialsProvider()));
+
+ AssumeRoleResponse response = stsClient.assumeRole(requestBuilder.build());
+
+ // Build connection credentials from AWS temporary credentials
+ ConnectionCredentials.Builder builder = ConnectionCredentials.builder();
+ builder.putCredential(
+ CatalogAccessProperty.AWS_ACCESS_KEY_ID.getPropertyName(),
+ response.credentials().accessKeyId());
+ builder.putCredential(
+ CatalogAccessProperty.AWS_SECRET_ACCESS_KEY.getPropertyName(),
+ response.credentials().secretAccessKey());
+ builder.putCredential(
+ CatalogAccessProperty.AWS_SESSION_TOKEN.getPropertyName(),
+ response.credentials().sessionToken());
+ Optional.ofNullable(response.credentials().expiration())
+ .ifPresent(expiration -> builder.expiresAt(expiration));
+
+ return builder.build();
+ }
+
+ @VisibleForTesting
+ StsClient getStsClient(@Nonnull SigV4AuthenticationParametersDpo
sigv4Params) {
+ // Get STS client from the provider (potentially pooled)
+ // The Polaris service identity credentials are set on the AssumeRole
request via
+ // overrideConfiguration, not on the STS client itself
+ // TODO: Configure proper StsDestination with region/endpoint from
sigv4Params
+ return
stsClientProvider.stsClient(StsClientProvider.StsDestination.of(null, null));
+ }
+}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/identity/AwsIamServiceIdentityConfiguration.java
b/runtime/service/src/main/java/org/apache/polaris/service/identity/AwsIamServiceIdentityConfiguration.java
index e92b28d88..50074a7d0 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/identity/AwsIamServiceIdentityConfiguration.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/identity/AwsIamServiceIdentityConfiguration.java
@@ -41,7 +41,10 @@ import
software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
*/
public interface AwsIamServiceIdentityConfiguration extends
ResolvableServiceIdentityConfiguration {
- /** The IAM role or user ARN representing the service identity. */
+ /**
+ * The IAM role or user ARN representing the service identity. If not
provided, Polaris won't
+ * surface it in the catalog identity.
+ */
String iamArn();
/**
@@ -104,14 +107,11 @@ public interface AwsIamServiceIdentityConfiguration
extends ResolvableServiceIde
* <p>This method should only be called when credentials are actually needed
for authentication.
*
* @param secretReference the secret reference to associate with this
credential
- * @return the service identity credential, or empty if the IAM ARN is not
configured
+ * @return the service identity credential
*/
@Override
default Optional<AwsIamServiceIdentityCredential>
asServiceIdentityCredential(
@Nonnull SecretReference secretReference) {
- if (iamArn() == null) {
- return Optional.empty();
- }
return Optional.of(
new AwsIamServiceIdentityCredential(secretReference, iamArn(),
awsCredentialsProvider()));
}
@@ -138,4 +138,38 @@ public interface AwsIamServiceIdentityConfiguration
extends ResolvableServiceIde
return DefaultCredentialsProvider.builder().build();
}
}
+
+ /**
+ * Returns the default AWS IAM service identity configuration.
+ *
+ * <p>This configuration is used only when the default realm ({@code
DEFAULT_REALM_KEY}) has no
+ * explicit service identity configuration. It uses the AWS default
credential provider chain to
+ * obtain credentials from the environment (e.g., environment variables, EC2
instance metadata,
+ * ECS task metadata, etc.) without requiring an explicit IAM ARN.
+ *
+ * @return the default AWS IAM service identity configuration
+ */
+ static AwsIamServiceIdentityConfiguration defaultConfiguration() {
+ return new AwsIamServiceIdentityConfiguration() {
+ @Override
+ public String iamArn() {
+ return null; // No ARN - will use environment credentials
+ }
+
+ @Override
+ public Optional<String> accessKeyId() {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<String> secretAccessKey() {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<String> sessionToken() {
+ return Optional.empty();
+ }
+ };
+ }
}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/identity/RealmServiceIdentityConfiguration.java
b/runtime/service/src/main/java/org/apache/polaris/service/identity/RealmServiceIdentityConfiguration.java
index a58fa25a9..7f6154804 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/identity/RealmServiceIdentityConfiguration.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/identity/RealmServiceIdentityConfiguration.java
@@ -48,4 +48,23 @@ public interface RealmServiceIdentityConfiguration {
default List<? extends ResolvableServiceIdentityConfiguration>
serviceIdentityConfigurations() {
return
Stream.of(awsIamServiceIdentity()).flatMap(Optional::stream).toList();
}
+
+ /**
+ * Returns the default realm service identity configuration.
+ *
+ * <p>This configuration is used only when the default realm ({@code
DEFAULT_REALM_KEY}) has no
+ * explicit configuration. It serves as a fallback for development scenarios
where credentials are
+ * obtained from the environment without requiring explicit configuration.
+ *
+ * @return the default realm service identity configuration
+ */
+ static RealmServiceIdentityConfiguration defaultConfiguration() {
+ return new RealmServiceIdentityConfiguration() {
+ @Override
+ public Optional<AwsIamServiceIdentityConfiguration>
awsIamServiceIdentity() {
+ // Return the AWS-specific default configuration that uses environment
credentials
+ return
Optional.of(AwsIamServiceIdentityConfiguration.defaultConfiguration());
+ }
+ };
+ }
}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/identity/ResolvableServiceIdentityConfiguration.java
b/runtime/service/src/main/java/org/apache/polaris/service/identity/ResolvableServiceIdentityConfiguration.java
index dc03aa0c5..407574f84 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/identity/ResolvableServiceIdentityConfiguration.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/identity/ResolvableServiceIdentityConfiguration.java
@@ -72,4 +72,20 @@ public interface ResolvableServiceIdentityConfiguration {
@Nonnull SecretReference secretReference) {
return Optional.empty();
}
+
+ /**
+ * Returns the default resolvable service identity configuration.
+ *
+ * <p>This configuration is used only when the default realm ({@code
DEFAULT_REALM_KEY}) has no
+ * explicit configuration. It serves as a fallback for development scenarios
where credentials are
+ * obtained from the environment without requiring explicit configuration.
+ *
+ * @return the default resolvable service identity configuration
+ */
+ static ResolvableServiceIdentityConfiguration defaultConfiguration() {
+ return new ResolvableServiceIdentityConfiguration() {
+ // Returns empty for all methods - no explicit configuration available
+ // Subclasses like AwsIamServiceIdentityConfiguration handle environment
credentials
+ };
+ }
}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/identity/ServiceIdentityConfiguration.java
b/runtime/service/src/main/java/org/apache/polaris/service/identity/ServiceIdentityConfiguration.java
index fc6a2ce42..189c22675 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/identity/ServiceIdentityConfiguration.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/identity/ServiceIdentityConfiguration.java
@@ -92,7 +92,9 @@ public interface ServiceIdentityConfiguration {
/**
* Retrieves the configuration entry for the given realm identifier.
*
- * <p>If the realm has no specific configuration, falls back to the default
realm configuration.
+ * <p>If the realm has no specific configuration, falls back to the default
realm configuration
+ * ({@code DEFAULT_REALM_KEY}). If the default realm configuration is also
not set, returns a
+ * default configuration that uses environment-based credentials.
*
* @param realmIdentifier the realm identifier
* @return the configuration entry containing the realm identifier and its
configuration
@@ -100,7 +102,16 @@ public interface ServiceIdentityConfiguration {
default RealmConfigEntry forRealm(String realmIdentifier) {
String resolvedRealmIdentifier =
realms().containsKey(realmIdentifier) ? realmIdentifier :
DEFAULT_REALM_KEY;
- return new RealmConfigEntry(resolvedRealmIdentifier,
realms().get(resolvedRealmIdentifier));
+ RealmServiceIdentityConfiguration config;
+ if (realms().containsKey(resolvedRealmIdentifier)) {
+ config = realms().get(resolvedRealmIdentifier);
+ } else {
+ // If no configuration exists for the DEFAULT_REALM_KEY, use the default
configuration
+ // This allows using environment-based AWS credentials without explicit
configuration
+ config = RealmServiceIdentityConfiguration.defaultConfiguration();
+ }
+
+ return new RealmConfigEntry(resolvedRealmIdentifier, config);
}
/**
diff --git
a/runtime/service/src/test/java/org/apache/polaris/service/admin/PolarisAuthzTestBase.java
b/runtime/service/src/test/java/org/apache/polaris/service/admin/PolarisAuthzTestBase.java
index 4eb9b0f46..875c2262a 100644
---
a/runtime/service/src/test/java/org/apache/polaris/service/admin/PolarisAuthzTestBase.java
+++
b/runtime/service/src/test/java/org/apache/polaris/service/admin/PolarisAuthzTestBase.java
@@ -65,6 +65,7 @@ import
org.apache.polaris.core.config.PolarisConfigurationStore;
import org.apache.polaris.core.config.RealmConfig;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.credentials.PolarisCredentialManager;
import org.apache.polaris.core.entity.CatalogEntity;
import org.apache.polaris.core.entity.CatalogRoleEntity;
import org.apache.polaris.core.entity.PolarisBaseEntity;
@@ -197,6 +198,7 @@ public abstract class PolarisAuthzTestBase {
@Inject protected CallContextCatalogFactory callContextCatalogFactory;
@Inject protected UserSecretsManagerFactory userSecretsManagerFactory;
@Inject protected ServiceIdentityProvider serviceIdentityProvider;
+ @Inject protected PolarisCredentialManager credentialManager;
@Inject protected PolarisDiagnostics diagServices;
@Inject protected FileIOFactory fileIOFactory;
@Inject protected PolarisEventListener polarisEventListener;
diff --git
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/generic/PolarisGenericTableCatalogHandlerAuthzTest.java
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/generic/PolarisGenericTableCatalogHandlerAuthzTest.java
index b1296177b..00c87f899 100644
---
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/generic/PolarisGenericTableCatalogHandlerAuthzTest.java
+++
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/generic/PolarisGenericTableCatalogHandlerAuthzTest.java
@@ -54,6 +54,7 @@ public class PolarisGenericTableCatalogHandlerAuthzTest
extends PolarisAuthzTest
catalogName,
polarisAuthorizer,
null,
+ null,
null);
}
diff --git
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandlerAuthzTest.java
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandlerAuthzTest.java
index 0a5f063e2..d930bf1ba 100644
---
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandlerAuthzTest.java
+++
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandlerAuthzTest.java
@@ -127,6 +127,7 @@ public class IcebergCatalogHandlerAuthzTest extends
PolarisAuthzTestBase {
resolutionManifestFactory,
metaStoreManager,
userSecretsManager,
+ credentialManager,
securityContext(authenticatedPrincipal),
factory,
catalogName,
@@ -266,6 +267,7 @@ public class IcebergCatalogHandlerAuthzTest extends
PolarisAuthzTestBase {
resolutionManifestFactory,
metaStoreManager,
userSecretsManager,
+ credentialManager,
securityContext(authenticatedPrincipal),
callContextCatalogFactory,
CATALOG_NAME,
@@ -303,6 +305,7 @@ public class IcebergCatalogHandlerAuthzTest extends
PolarisAuthzTestBase {
resolutionManifestFactory,
metaStoreManager,
userSecretsManager,
+ credentialManager,
securityContext(authenticatedPrincipal1),
callContextCatalogFactory,
CATALOG_NAME,
@@ -1182,6 +1185,7 @@ public class IcebergCatalogHandlerAuthzTest extends
PolarisAuthzTestBase {
resolutionManifestFactory,
metaStoreManager,
userSecretsManager,
+ credentialManager,
securityContext(authenticatedPrincipal),
factory,
catalogName,
diff --git
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandlerFineGrainedDisabledTest.java
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandlerFineGrainedDisabledTest.java
index 0e9719794..55ab5ad88 100644
---
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandlerFineGrainedDisabledTest.java
+++
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandlerFineGrainedDisabledTest.java
@@ -63,6 +63,7 @@ public class IcebergCatalogHandlerFineGrainedDisabledTest
extends PolarisAuthzTe
resolutionManifestFactory,
metaStoreManager,
userSecretsManager,
+ credentialManager,
securityContext(authenticatedPrincipal),
callContextCatalogFactory,
CATALOG_NAME,
diff --git
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/policy/PolicyCatalogHandlerAuthzTest.java
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/policy/PolicyCatalogHandlerAuthzTest.java
index 990a9cff2..9a85496db 100644
---
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/policy/PolicyCatalogHandlerAuthzTest.java
+++
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/policy/PolicyCatalogHandlerAuthzTest.java
@@ -59,6 +59,7 @@ public class PolicyCatalogHandlerAuthzTest extends
PolarisAuthzTestBase {
catalogName,
polarisAuthorizer,
null,
+ null,
null);
}
diff --git
a/runtime/service/src/test/java/org/apache/polaris/service/credentials/DefaultPolarisCredentialManagerTest.java
b/runtime/service/src/test/java/org/apache/polaris/service/credentials/DefaultPolarisCredentialManagerTest.java
new file mode 100644
index 000000000..c98738a2c
--- /dev/null
+++
b/runtime/service/src/test/java/org/apache/polaris/service/credentials/DefaultPolarisCredentialManagerTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.credentials;
+
+import static org.mockito.Mockito.when;
+
+import io.quarkus.test.InjectMock;
+import io.quarkus.test.junit.QuarkusTest;
+import io.quarkus.test.junit.QuarkusTestProfile;
+import io.quarkus.test.junit.TestProfile;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.inject.Alternative;
+import jakarta.inject.Inject;
+import java.util.Map;
+import java.util.Set;
+import org.apache.polaris.core.connection.AuthenticationParametersDpo;
+import org.apache.polaris.core.connection.AuthenticationType;
+import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
+import org.apache.polaris.core.connection.OAuthClientCredentialsParametersDpo;
+import org.apache.polaris.core.connection.SigV4AuthenticationParametersDpo;
+import
org.apache.polaris.core.connection.iceberg.IcebergRestConnectionConfigInfoDpo;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.credentials.PolarisCredentialManager;
+import org.apache.polaris.core.credentials.connection.CatalogAccessProperty;
+import
org.apache.polaris.core.credentials.connection.ConnectionCredentialVendor;
+import org.apache.polaris.core.credentials.connection.ConnectionCredentials;
+import org.apache.polaris.core.identity.dpo.AwsIamServiceIdentityInfoDpo;
+import org.apache.polaris.core.identity.dpo.ServiceIdentityInfoDpo;
+import org.apache.polaris.core.secrets.SecretReference;
+import org.apache.polaris.service.credentials.connection.AuthType;
+import org.assertj.core.api.Assertions;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+/** Tests that {@link DefaultPolarisCredentialManager} correctly delegates to
CDI providers. */
+@QuarkusTest
+@TestProfile(DefaultPolarisCredentialManagerTest.Profile.class)
+public class DefaultPolarisCredentialManagerTest {
+
+ @InjectMock RealmContext realmContext;
+
+ @Inject PolarisCredentialManager credentialManager;
+
+ private ServiceIdentityInfoDpo testServiceIdentity;
+
+ /** Test vendor for SIGV4 authentication */
+ @Alternative
+ @ApplicationScoped
+ @AuthType(AuthenticationType.SIGV4)
+ public static class TestSigV4Vendor implements ConnectionCredentialVendor {
+ @Override
+ public @NotNull ConnectionCredentials getConnectionCredentials(
+ @NotNull ConnectionConfigInfoDpo connectionConfig) {
+
+ // Return test credentials
+ return ConnectionCredentials.builder()
+ .putCredential(
+ CatalogAccessProperty.AWS_ACCESS_KEY_ID.getPropertyName(),
"sigv4-access-key")
+ .putCredential(
+ CatalogAccessProperty.AWS_SECRET_ACCESS_KEY.getPropertyName(),
"sigv4-secret-key")
+ .putCredential(
+ CatalogAccessProperty.AWS_SESSION_TOKEN.getPropertyName(),
"sigv4-session-token")
+ .build();
+ }
+ }
+
+ /** Test vendor for OAuth authentication */
+ @Alternative
+ @ApplicationScoped
+ @AuthType(AuthenticationType.OAUTH)
+ public static class TestOAuthVendor implements ConnectionCredentialVendor {
+ @Override
+ public @NotNull ConnectionCredentials getConnectionCredentials(
+ @NotNull ConnectionConfigInfoDpo connectionConfig) {
+
+ return ConnectionCredentials.builder()
+ .putCredential(
+ CatalogAccessProperty.AWS_ACCESS_KEY_ID.getPropertyName(),
"oauth-access-key")
+ .build();
+ }
+ }
+
+ public static class Profile implements QuarkusTestProfile {
+ @Override
+ public Set<Class<?>> getEnabledAlternatives() {
+ return Set.of(TestSigV4Vendor.class, TestOAuthVendor.class);
+ }
+
+ @Override
+ public Map<String, String> getConfigOverrides() {
+ return Map.of("polaris.credential-manager.type", "default");
+ }
+ }
+
+ @BeforeEach
+ void setup() {
+ when(realmContext.getRealmIdentifier()).thenReturn("test-realm");
+
+ // Create a test service identity
+ testServiceIdentity =
+ new AwsIamServiceIdentityInfoDpo(
+ new SecretReference("urn:polaris-secret:test:my-realm:AWS_IAM",
Map.of()));
+ }
+
+ @Test
+ public void testDelegatesToSigV4Vendor() {
+ // Create SIGV4 auth parameters
+ SigV4AuthenticationParametersDpo authParams =
+ new SigV4AuthenticationParametersDpo(
+ "arn:aws:iam::123456789012:role/test-role", null, null,
"us-west-2", "glue");
+
+ // Create connection config
+ IcebergRestConnectionConfigInfoDpo connectionConfig =
+ new IcebergRestConnectionConfigInfoDpo(
+ "https://test-catalog.example.com", authParams,
testServiceIdentity, "test-catalog");
+
+ // Should delegate to TestSigV4Vendor
+ ConnectionCredentials credentials =
+ credentialManager.getConnectionCredentials(connectionConfig);
+
+ Assertions.assertThat(credentials.credentials())
+ .containsEntry(
+ CatalogAccessProperty.AWS_ACCESS_KEY_ID.getPropertyName(),
"sigv4-access-key")
+ .containsEntry(
+ CatalogAccessProperty.AWS_SECRET_ACCESS_KEY.getPropertyName(),
"sigv4-secret-key")
+ .containsEntry(
+ CatalogAccessProperty.AWS_SESSION_TOKEN.getPropertyName(),
"sigv4-session-token");
+ }
+
+ @Test
+ public void testDelegatesToOAuthVendor() {
+ // Create OAuth auth parameters
+ OAuthClientCredentialsParametersDpo authParams =
+ new OAuthClientCredentialsParametersDpo(
+ "https://auth.example.com/token",
+ "client-id",
+ new
SecretReference("urn:polaris-secret:test-manager:client-secret", Map.of()),
+ null);
+
+ // Create connection config
+ IcebergRestConnectionConfigInfoDpo connectionConfig =
+ new IcebergRestConnectionConfigInfoDpo(
+ "https://test-catalog.example.com", authParams,
testServiceIdentity, "test-catalog");
+
+ // Should delegate to TestOAuthVendor
+ ConnectionCredentials credentials =
+ credentialManager.getConnectionCredentials(connectionConfig);
+
+ Assertions.assertThat(credentials.credentials())
+ .containsEntry(
+ CatalogAccessProperty.AWS_ACCESS_KEY_ID.getPropertyName(),
"oauth-access-key");
+ }
+
+ @Test
+ public void testUnsupportedAuthTypeReturnsEmpty() {
+ // Use a mock connection config with an unsupported authentication type
+ ConnectionConfigInfoDpo mockConfig =
Mockito.mock(ConnectionConfigInfoDpo.class);
+ AuthenticationParametersDpo mockAuthParams =
Mockito.mock(AuthenticationParametersDpo.class);
+
+ when(mockConfig.getAuthenticationParameters()).thenReturn(mockAuthParams);
+
when(mockAuthParams.getAuthenticationType()).thenReturn(AuthenticationType.NULL_TYPE);
+
+ // Should return empty credentials since no vendor supports NULL_TYPE
+ ConnectionCredentials credentials =
credentialManager.getConnectionCredentials(mockConfig);
+
+ Assertions.assertThat(credentials).isNotNull();
+ Assertions.assertThat(credentials.credentials()).isEmpty();
+ Assertions.assertThat(credentials.expiresAt()).isEmpty();
+ }
+}
diff --git
a/runtime/service/src/test/java/org/apache/polaris/service/credentials/connection/SigV4ConnectionCredentialVendorTest.java
b/runtime/service/src/test/java/org/apache/polaris/service/credentials/connection/SigV4ConnectionCredentialVendorTest.java
new file mode 100644
index 000000000..4c4f9c135
--- /dev/null
+++
b/runtime/service/src/test/java/org/apache/polaris/service/credentials/connection/SigV4ConnectionCredentialVendorTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.credentials.connection;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.polaris.core.connection.SigV4AuthenticationParametersDpo;
+import
org.apache.polaris.core.connection.iceberg.IcebergRestConnectionConfigInfoDpo;
+import org.apache.polaris.core.credentials.connection.CatalogAccessProperty;
+import org.apache.polaris.core.credentials.connection.ConnectionCredentials;
+import
org.apache.polaris.core.identity.credential.AwsIamServiceIdentityCredential;
+import org.apache.polaris.core.identity.dpo.AwsIamServiceIdentityInfoDpo;
+import org.apache.polaris.core.identity.dpo.ServiceIdentityInfoDpo;
+import org.apache.polaris.core.identity.provider.ServiceIdentityProvider;
+import org.apache.polaris.core.secrets.SecretReference;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
+import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
+import software.amazon.awssdk.services.sts.model.Credentials;
+
+/** Tests for {@link SigV4ConnectionCredentialVendor}. */
+public class SigV4ConnectionCredentialVendorTest {
+
+ private SigV4ConnectionCredentialVendor vendor;
+ private StsClient mockStsClient;
+ private ServiceIdentityProvider mockServiceIdentityProvider;
+
+ @BeforeEach
+ void setup() {
+ mockStsClient = mock(StsClient.class);
+ mockServiceIdentityProvider = Mockito.mock(ServiceIdentityProvider.class);
+
+ // Mock STS AssumeRole response
+ Credentials stsCredentials =
+ Credentials.builder()
+ .accessKeyId("assumed-access-key-id")
+ .secretAccessKey("assumed-secret-access-key")
+ .sessionToken("assumed-session-token")
+ .expiration(Instant.now().plusSeconds(3600))
+ .build();
+
+ AssumeRoleResponse assumeRoleResponse =
+ AssumeRoleResponse.builder().credentials(stsCredentials).build();
+
+
when(mockStsClient.assumeRole(any(AssumeRoleRequest.class))).thenReturn(assumeRoleResponse);
+
+ // Mock service identity credential resolution
+ AwsIamServiceIdentityCredential mockCredential =
+ new AwsIamServiceIdentityCredential(
+ "arn:aws:iam::123456789012:user/polaris-service-user",
+ StaticCredentialsProvider.create(
+ AwsBasicCredentials.create("polaris-access-key",
"polaris-secret-key")));
+ when(mockServiceIdentityProvider.getServiceIdentityCredential(any()))
+ .thenReturn(Optional.of(mockCredential));
+
+ // Create vendor with mocked dependencies
+ vendor =
+ new SigV4ConnectionCredentialVendor(
+ (destination) -> mockStsClient, mockServiceIdentityProvider);
+ }
+
+ @Test
+ public void testGetCredentialsWithSigV4Auth() {
+ // Create a service identity reference
+ ServiceIdentityInfoDpo serviceIdentity =
+ new AwsIamServiceIdentityInfoDpo(
+ new SecretReference("urn:polaris-secret:test:my-realm:AWS_IAM",
Map.of()));
+
+ // Create SigV4 auth parameters (customer's role to assume)
+ SigV4AuthenticationParametersDpo authParams =
+ new SigV4AuthenticationParametersDpo(
+ "arn:aws:iam::123456789012:role/customer-role",
+ "my-session",
+ "external-id-123",
+ "us-west-2",
+ "glue");
+
+ // Create connection config with service identity and auth params
+ IcebergRestConnectionConfigInfoDpo connectionConfig =
+ new IcebergRestConnectionConfigInfoDpo(
+ "https://test-catalog.example.com", authParams, serviceIdentity,
"test-catalog");
+
+ // Get credentials
+ ConnectionCredentials credentials =
vendor.getConnectionCredentials(connectionConfig);
+
+ // Verify the returned credentials are from STS AssumeRole
+ Assertions.assertThat(credentials.credentials())
+ .containsEntry(
+ CatalogAccessProperty.AWS_ACCESS_KEY_ID.getPropertyName(),
"assumed-access-key-id")
+ .containsEntry(
+ CatalogAccessProperty.AWS_SECRET_ACCESS_KEY.getPropertyName(),
+ "assumed-secret-access-key")
+ .containsEntry(
+ CatalogAccessProperty.AWS_SESSION_TOKEN.getPropertyName(),
"assumed-session-token")
+ .hasSize(3);
+ Assertions.assertThat(credentials.expiresAt()).isPresent();
+
+ // Verify STS was called with correct role, session name, and external ID
+ ArgumentCaptor<AssumeRoleRequest> requestCaptor =
+ ArgumentCaptor.forClass(AssumeRoleRequest.class);
+ Mockito.verify(mockStsClient).assumeRole(requestCaptor.capture());
+
+ AssumeRoleRequest capturedRequest = requestCaptor.getValue();
+ Assertions.assertThat(capturedRequest.roleArn())
+ .isEqualTo("arn:aws:iam::123456789012:role/customer-role");
+
Assertions.assertThat(capturedRequest.roleSessionName()).isEqualTo("my-session");
+
Assertions.assertThat(capturedRequest.externalId()).isEqualTo("external-id-123");
+ }
+
+ @Test
+ public void testGetCredentialsWithDefaultSessionName() {
+ // Create a service identity reference
+ ServiceIdentityInfoDpo serviceIdentity =
+ new AwsIamServiceIdentityInfoDpo(
+ new SecretReference("urn:polaris-secret:test:my-realm:AWS_IAM",
Map.of()));
+
+ // SigV4 auth without explicit session name
+ SigV4AuthenticationParametersDpo authParams =
+ new SigV4AuthenticationParametersDpo(
+ "arn:aws:iam::123456789012:role/customer-role", null, null,
"us-west-2", "glue");
+
+ // Create connection config with service identity and auth params
+ IcebergRestConnectionConfigInfoDpo connectionConfig =
+ new IcebergRestConnectionConfigInfoDpo(
+ "https://test-catalog.example.com", authParams, serviceIdentity,
"test-catalog");
+
+ ConnectionCredentials credentials =
vendor.getConnectionCredentials(connectionConfig);
+
+ // Should still get credentials
+ Assertions.assertThat(credentials.credentials())
+ .containsEntry(
+ CatalogAccessProperty.AWS_ACCESS_KEY_ID.getPropertyName(),
"assumed-access-key-id")
+ .hasSize(3);
+
+ // Verify default session name "polaris" was used
+ ArgumentCaptor<AssumeRoleRequest> requestCaptor =
+ ArgumentCaptor.forClass(AssumeRoleRequest.class);
+ Mockito.verify(mockStsClient).assumeRole(requestCaptor.capture());
+
+ AssumeRoleRequest capturedRequest = requestCaptor.getValue();
+ Assertions.assertThat(capturedRequest.roleArn())
+ .isEqualTo("arn:aws:iam::123456789012:role/customer-role");
+
Assertions.assertThat(capturedRequest.roleSessionName()).isEqualTo("polaris");
+ Assertions.assertThat(capturedRequest.externalId()).isNull();
+ }
+}
diff --git
a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java
b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java
index d118e52e9..ed3c138e0 100644
---
a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java
+++
b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java
@@ -45,6 +45,8 @@ import
org.apache.polaris.core.config.PolarisConfigurationStore;
import org.apache.polaris.core.config.RealmConfig;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.credentials.PolarisCredentialManager;
+import
org.apache.polaris.core.credentials.connection.ConnectionCredentialVendor;
import org.apache.polaris.core.entity.PrincipalEntity;
import org.apache.polaris.core.identity.provider.ServiceIdentityProvider;
import org.apache.polaris.core.persistence.BasePersistence;
@@ -73,6 +75,8 @@ import
org.apache.polaris.service.catalog.io.MeasuredFileIOFactory;
import org.apache.polaris.service.config.ReservedProperties;
import org.apache.polaris.service.context.catalog.CallContextCatalogFactory;
import
org.apache.polaris.service.context.catalog.PolarisCallContextCatalogFactory;
+import org.apache.polaris.service.credentials.DefaultPolarisCredentialManager;
+import
org.apache.polaris.service.credentials.connection.SigV4ConnectionCredentialVendor;
import org.apache.polaris.service.events.listeners.PolarisEventListener;
import org.apache.polaris.service.events.listeners.TestPolarisEventListener;
import
org.apache.polaris.service.identity.provider.DefaultServiceIdentityProvider;
@@ -221,6 +225,21 @@ public record TestServices(
userSecretsManagerFactory.getOrCreateUserSecretsManager(realmContext);
ServiceIdentityProvider serviceIdentityProvider = new
DefaultServiceIdentityProvider();
+ // Create credential vendors for testing
+ @SuppressWarnings("unchecked")
+ Instance<ConnectionCredentialVendor> mockCredentialVendors =
Mockito.mock(Instance.class);
+ SigV4ConnectionCredentialVendor sigV4Vendor =
+ new SigV4ConnectionCredentialVendor((destination) -> stsClient,
serviceIdentityProvider);
+ Mockito.when(
+ mockCredentialVendors.select(
+
any(org.apache.polaris.service.credentials.connection.AuthType.Literal.class)))
+ .thenReturn(mockCredentialVendors);
+ Mockito.when(mockCredentialVendors.isUnsatisfied()).thenReturn(false);
+ Mockito.when(mockCredentialVendors.get()).thenReturn(sigV4Vendor);
+
+ PolarisCredentialManager credentialManager =
+ new DefaultPolarisCredentialManager(realmContext,
mockCredentialVendors);
+
FileIOFactory fileIOFactory =
fileIOFactorySupplier.apply(storageCredentialCache,
metaStoreManagerFactory);
@@ -256,6 +275,7 @@ public record TestServices(
resolutionManifestFactory,
metaStoreManager,
userSecretsManager,
+ credentialManager,
authorizer,
new DefaultCatalogPrefixParser(),
reservedProperties,