This is an automated email from the ASF dual-hosted git repository. dimas pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push: new 9e6d92937 add refresh credentials property to loadTableResult (#2341) 9e6d92937 is described below commit 9e6d92937f5f11e9879b1cc9a7a80b75cf341e31 Author: Jason <jason...@gmail.com> AuthorDate: Thu Aug 28 05:03:39 2025 +0300 add refresh credentials property to loadTableResult (#2341) * add refresh credentials property to loadTableResult * IcebergCatalogAdapterTest: Added test to ensure refresh credentials endpoint is included * delegate refresh credential endpoint configuration to storage integration * GCP: Add refresh credential properties --- CHANGELOG.md | 5 ++ .../AtomicOperationMetaStoreManager.java | 6 +- .../TransactionWorkspaceMetaStoreManager.java | 6 +- .../TransactionalMetaStoreManagerImpl.java | 6 +- .../polaris/core/rest/PolarisResourcePaths.java | 11 ++++ .../core/storage/PolarisCredentialVendor.java | 8 ++- .../core/storage/PolarisStorageIntegration.java | 8 ++- .../core/storage/StorageAccessProperty.java | 21 +++++++ .../aws/AwsCredentialsStorageIntegration.java | 8 ++- .../azure/AzureCredentialsStorageIntegration.java | 17 ++++- .../core/storage/cache/StorageCredentialCache.java | 9 ++- .../storage/cache/StorageCredentialCacheKey.java | 10 ++- .../gcp/GcpCredentialsStorageIntegration.java | 10 ++- .../storage/InMemoryStorageIntegrationTest.java | 4 +- .../AzureCredentialsStorageIntegrationTest.java | 22 ++++++- .../storage/cache/StorageCredentialCacheTest.java | 73 +++++++++++++++------- .../aws/AwsCredentialsStorageIntegrationTest.java | 51 ++++++++++++--- .../AzureCredentialStorageIntegrationTest.java | 4 +- .../gcp/GcpCredentialsStorageIntegrationTest.java | 20 +++++- .../service/it/RestCatalogMinIOSpecialIT.java | 7 ++- .../service/catalog/iceberg/IcebergCatalog.java | 6 +- .../catalog/iceberg/IcebergCatalogAdapter.java | 31 +++++++-- .../catalog/iceberg/IcebergCatalogHandler.java | 42 ++++++++++--- .../iceberg/SupportsCredentialDelegation.java | 4 +- .../service/catalog/io/DefaultFileIOFactory.java | 3 +- .../polaris/service/catalog/io/FileIOUtil.java | 6 +- .../PolarisStorageIntegrationProviderImpl.java | 3 +- .../catalog/AbstractIcebergCatalogTest.java | 4 +- .../catalog/IcebergCatalogHandlerAuthzTest.java | 29 +++++---- 29 files changed, 341 insertions(+), 93 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cb4333995..f0ffb54e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -83,6 +83,11 @@ at locations that better optimize for object storage. - Introduced bootstrap command options to specify custom schema files for database initialization. +- Added refresh credentials endpoint configuration to LoadTableResponse for AWS, Azure, and GCP. Enabling +automatic storage credential refresh per table on the client side. Java client version >= 1.8.0 is required. +The endpoint path is always returned when using vended credentials, but clients must enable the +refresh-credentials flag for the desired storage provider. + ### Changes - Polaris Management API clients must be prepared to deal with new attributes in `AwsStorageConfigInfo` objects. diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java index 947d41511..f6f04143f 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java @@ -1582,7 +1582,8 @@ public class AtomicOperationMetaStoreManager extends BaseMetaStoreManager { PolarisEntityType entityType, boolean allowListOperation, @Nonnull Set<String> allowedReadLocations, - @Nonnull Set<String> allowedWriteLocations) { + @Nonnull Set<String> allowedWriteLocations, + Optional<String> refreshCredentialsEndpoint) { // get meta store session we should be using BasePersistence ms = callCtx.getMetaStore(); @@ -1622,7 +1623,8 @@ public class AtomicOperationMetaStoreManager extends BaseMetaStoreManager { callCtx.getRealmConfig(), allowListOperation, allowedReadLocations, - allowedWriteLocations); + allowedWriteLocations, + refreshCredentialsEndpoint); return new ScopedCredentialsResult(accessConfig); } catch (Exception ex) { return new ScopedCredentialsResult( diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java index 2671ad98b..3d9f3c052 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java @@ -327,7 +327,8 @@ public class TransactionWorkspaceMetaStoreManager implements PolarisMetaStoreMan PolarisEntityType entityType, boolean allowListOperation, @Nonnull Set<String> allowedReadLocations, - @Nonnull Set<String> allowedWriteLocations) { + @Nonnull Set<String> allowedWriteLocations, + Optional<String> refreshCredentialsEndpoint) { return delegate.getSubscopedCredsForEntity( callCtx, catalogId, @@ -335,7 +336,8 @@ public class TransactionWorkspaceMetaStoreManager implements PolarisMetaStoreMan entityType, allowListOperation, allowedReadLocations, - allowedWriteLocations); + allowedWriteLocations, + refreshCredentialsEndpoint); } @Override diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java index c3e1a9fac..97af650b0 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java @@ -2040,7 +2040,8 @@ public class TransactionalMetaStoreManagerImpl extends BaseMetaStoreManager { PolarisEntityType entityType, boolean allowListOperation, @Nonnull Set<String> allowedReadLocations, - @Nonnull Set<String> allowedWriteLocations) { + @Nonnull Set<String> allowedWriteLocations, + Optional<String> refreshCredentialsEndpoint) { // get meta store session we should be using TransactionalPersistence ms = ((TransactionalPersistence) callCtx.getMetaStore()); @@ -2075,7 +2076,8 @@ public class TransactionalMetaStoreManagerImpl extends BaseMetaStoreManager { callCtx.getRealmConfig(), allowListOperation, allowedReadLocations, - allowedWriteLocations); + allowedWriteLocations, + refreshCredentialsEndpoint); return new ScopedCredentialsResult(accessConfig); } catch (Exception ex) { return new ScopedCredentialsResult( diff --git a/polaris-core/src/main/java/org/apache/polaris/core/rest/PolarisResourcePaths.java b/polaris-core/src/main/java/org/apache/polaris/core/rest/PolarisResourcePaths.java index 8a30d7962..16eea08da 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/rest/PolarisResourcePaths.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/rest/PolarisResourcePaths.java @@ -57,6 +57,17 @@ public class PolarisResourcePaths { "polaris", "v1", prefix, "namespaces", RESTUtil.encodeNamespace(ns), "generic-tables"); } + public String credentialsPath(TableIdentifier ident) { + return SLASH.join( + "v1", + prefix, + "namespaces", + RESTUtil.encodeNamespace(ident.namespace()), + "tables", + RESTUtil.encodeString(ident.name()), + "credentials"); + } + public String genericTable(TableIdentifier ident) { return SLASH.join( "polaris", diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java index 04022d233..d64e9ad88 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java @@ -19,6 +19,7 @@ package org.apache.polaris.core.storage; import jakarta.annotation.Nonnull; +import java.util.Optional; import java.util.Set; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.entity.PolarisEntityType; @@ -37,6 +38,10 @@ public interface PolarisCredentialVendor { * allowedWriteLocations * @param allowedReadLocations a set of allowed to read locations * @param allowedWriteLocations a set of allowed to write locations + * @param refreshCredentialsEndpoint an optional endpoint to use for refreshing credentials. If + * supported by the storage type it will be returned to the client in the appropriate + * properties. The endpoint may be relative to the base URI and the client is responsible for + * handling the relative path * @return an enum map containing the scoped credentials */ @Nonnull @@ -47,5 +52,6 @@ public interface PolarisCredentialVendor { PolarisEntityType entityType, boolean allowListOperation, @Nonnull Set<String> allowedReadLocations, - @Nonnull Set<String> allowedWriteLocations); + @Nonnull Set<String> allowedWriteLocations, + Optional<String> refreshCredentialsEndpoint); } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageIntegration.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageIntegration.java index c98982091..1828d01c8 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageIntegration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageIntegration.java @@ -20,6 +20,7 @@ package org.apache.polaris.core.storage; import jakarta.annotation.Nonnull; import java.util.Map; +import java.util.Optional; import java.util.Set; import org.apache.polaris.core.config.RealmConfig; @@ -55,13 +56,18 @@ public abstract class PolarisStorageIntegration<T extends PolarisStorageConfigur * locations * @param allowedReadLocations a set of allowed to read locations * @param allowedWriteLocations a set of allowed to write locations + * @param refreshCredentialsEndpoint an optional endpoint to use for refreshing credentials. If + * supported by the storage type it will be returned to the client in the appropriate + * properties. The endpoint may be relative to the base URI and the client is responsible for + * handling the relative path * @return An enum map including the scoped credentials */ public abstract AccessConfig getSubscopedCreds( @Nonnull RealmConfig realmConfig, boolean allowListOperation, @Nonnull Set<String> allowedReadLocations, - @Nonnull Set<String> allowedWriteLocations); + @Nonnull Set<String> allowedWriteLocations, + Optional<String> refreshCredentialsEndpoint); /** * Validate access for the provided operation actions and locations. diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageAccessProperty.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageAccessProperty.java index 33526d2e2..faa29c31e 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageAccessProperty.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageAccessProperty.java @@ -18,6 +18,9 @@ */ package org.apache.polaris.core.storage; +import org.apache.iceberg.aws.AwsClientProperties; +import org.apache.iceberg.gcp.GCPProperties; + /** * A subset of Iceberg catalog properties recognized by Polaris. * @@ -39,6 +42,12 @@ public enum StorageAccessProperty { Boolean.class, "s3.path-style-access", "whether to use S3 path style access", false), CLIENT_REGION( String.class, "client.region", "region to configure client for making requests to AWS"), + AWS_REFRESH_CREDENTIALS_ENDPOINT( + String.class, + AwsClientProperties.REFRESH_CREDENTIALS_ENDPOINT, + "the endpoint to load vended credentials for a table from the catalog", + false, + false), GCS_ACCESS_TOKEN(String.class, "gcs.oauth2.token", "the gcs scoped access token"), GCS_ACCESS_TOKEN_EXPIRES_AT( @@ -47,11 +56,23 @@ public enum StorageAccessProperty { "the time the gcs access token expires, in milliseconds", true, true), + GCS_REFRESH_CREDENTIALS_ENDPOINT( + String.class, + GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, + "the endpoint to load vended credentials for a table from the catalog", + false, + false), // Currently not using ACCESS TOKEN as the ResolvingFileIO is using ADLSFileIO for azure case and // it expects for SAS AZURE_ACCESS_TOKEN(String.class, "", "the azure scoped access token"), AZURE_SAS_TOKEN(String.class, "adls.sas-token.", "an azure shared access signature token"), + AZURE_REFRESH_CREDENTIALS_ENDPOINT( + String.class, + "adls.refresh-credentials-endpoint", + "the endpoint to load vended credentials for a table from the catalog", + false, + false), EXPIRATION_TIME( Long.class, "expiration-time", diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java index 616fb1f4d..3e93ba7b4 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java @@ -74,7 +74,8 @@ public class AwsCredentialsStorageIntegration @Nonnull RealmConfig realmConfig, boolean allowListOperation, @Nonnull Set<String> allowedReadLocations, - @Nonnull Set<String> allowedWriteLocations) { + @Nonnull Set<String> allowedWriteLocations, + Optional<String> refreshCredentialsEndpoint) { int storageCredentialDurationSeconds = realmConfig.getConfig(STORAGE_CREDENTIAL_DURATION_SECONDS); AwsStorageConfigurationInfo storageConfig = config(); @@ -120,6 +121,11 @@ public class AwsCredentialsStorageIntegration accessConfig.put(StorageAccessProperty.CLIENT_REGION, region); } + refreshCredentialsEndpoint.ifPresent( + endpoint -> { + accessConfig.put(StorageAccessProperty.AWS_REFRESH_CREDENTIALS_ENDPOINT, endpoint); + }); + URI endpointUri = storageConfig.getEndpointUri(); if (endpointUri != null) { accessConfig.put(StorageAccessProperty.AWS_ENDPOINT, endpointUri.toString()); diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegration.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegration.java index 50dd8c414..5b466b0c3 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegration.java @@ -46,6 +46,7 @@ import java.time.ZoneOffset; import java.time.temporal.ChronoUnit; import java.util.HashSet; import java.util.Objects; +import java.util.Optional; import java.util.Set; import org.apache.polaris.core.config.RealmConfig; import org.apache.polaris.core.storage.AccessConfig; @@ -76,7 +77,8 @@ public class AzureCredentialsStorageIntegration @Nonnull RealmConfig realmConfig, boolean allowListOperation, @Nonnull Set<String> allowedReadLocations, - @Nonnull Set<String> allowedWriteLocations) { + @Nonnull Set<String> allowedWriteLocations, + Optional<String> refreshCredentialsEndpoint) { String loc = !allowedWriteLocations.isEmpty() ? allowedWriteLocations.stream().findAny().orElse(null) @@ -169,15 +171,24 @@ public class AzureCredentialsStorageIntegration String.format("Endpoint %s not supported", location.getEndpoint())); } - return toAccessConfig(sasToken, storageDnsName, sanitizedEndTime.toInstant()); + return toAccessConfig( + sasToken, storageDnsName, sanitizedEndTime.toInstant(), refreshCredentialsEndpoint); } @VisibleForTesting - static AccessConfig toAccessConfig(String sasToken, String storageDnsName, Instant expiresAt) { + static AccessConfig toAccessConfig( + String sasToken, + String storageDnsName, + Instant expiresAt, + Optional<String> refreshCredentialsEndpoint) { AccessConfig.Builder accessConfig = AccessConfig.builder(); handleAzureCredential(accessConfig, sasToken, storageDnsName); accessConfig.put( StorageAccessProperty.EXPIRATION_TIME, String.valueOf(expiresAt.toEpochMilli())); + refreshCredentialsEndpoint.ifPresent( + endpoint -> { + accessConfig.put(StorageAccessProperty.AZURE_REFRESH_CREDENTIALS_ENDPOINT, endpoint); + }); return accessConfig.build(); } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java index d8d88edc6..d166ee4b1 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java @@ -105,7 +105,8 @@ public class StorageCredentialCache { @Nonnull PolarisEntity polarisEntity, boolean allowListOperation, @Nonnull Set<String> allowedReadLocations, - @Nonnull Set<String> allowedWriteLocations) { + @Nonnull Set<String> allowedWriteLocations, + Optional<String> refreshCredentialsEndpoint) { if (!isTypeSupported(polarisEntity.getType())) { callCtx .getDiagServices() @@ -117,7 +118,8 @@ public class StorageCredentialCache { polarisEntity, allowListOperation, allowedReadLocations, - allowedWriteLocations); + allowedWriteLocations, + refreshCredentialsEndpoint); LOGGER.atDebug().addKeyValue("key", key).log("subscopedCredsCache"); Function<StorageCredentialCacheKey, StorageCredentialCacheEntry> loader = k -> { @@ -130,7 +132,8 @@ public class StorageCredentialCache { polarisEntity.getType(), k.allowedListAction(), k.allowedReadLocations(), - k.allowedWriteLocations()); + k.allowedWriteLocations(), + k.refreshCredentialsEndpoint()); if (scopedCredentialsResult.isSuccess()) { long maxCacheDurationMs = maxCacheDurationMs(callCtx.getRealmConfig()); return new StorageCredentialCacheEntry( diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheKey.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheKey.java index 79eba7d1d..8b9d0542d 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheKey.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheKey.java @@ -19,6 +19,7 @@ package org.apache.polaris.core.storage.cache; import jakarta.annotation.Nullable; +import java.util.Optional; import java.util.Set; import org.apache.polaris.core.entity.PolarisEntity; import org.apache.polaris.core.entity.PolarisEntityConstants; @@ -47,12 +48,16 @@ public interface StorageCredentialCacheKey { @Value.Parameter(order = 6) Set<String> allowedWriteLocations(); + @Value.Parameter(order = 7) + Optional<String> refreshCredentialsEndpoint(); + static StorageCredentialCacheKey of( String realmId, PolarisEntity entity, boolean allowedListAction, Set<String> allowedReadLocations, - Set<String> allowedWriteLocations) { + Set<String> allowedWriteLocations, + Optional<String> refreshCredentialsEndpoint) { String storageConfigSerializedStr = entity .getInternalPropertiesAsMap() @@ -63,6 +68,7 @@ public interface StorageCredentialCacheKey { storageConfigSerializedStr, allowedListAction, allowedReadLocations, - allowedWriteLocations); + allowedWriteLocations, + refreshCredentialsEndpoint); } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java index 0120df2b1..c0568cc9b 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java @@ -35,6 +35,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.stream.Stream; import org.apache.polaris.core.config.RealmConfig; @@ -75,7 +76,8 @@ public class GcpCredentialsStorageIntegration @Nonnull RealmConfig realmConfig, boolean allowListOperation, @Nonnull Set<String> allowedReadLocations, - @Nonnull Set<String> allowedWriteLocations) { + @Nonnull Set<String> allowedWriteLocations, + Optional<String> refreshCredentialsEndpoint) { try { sourceCredentials.refresh(); } catch (IOException e) { @@ -112,6 +114,12 @@ public class GcpCredentialsStorageIntegration accessConfig.put( StorageAccessProperty.GCS_ACCESS_TOKEN_EXPIRES_AT, String.valueOf(token.getExpirationTime().getTime())); + + refreshCredentialsEndpoint.ifPresent( + endpoint -> { + accessConfig.put(StorageAccessProperty.GCS_REFRESH_CREDENTIALS_ENDPOINT, endpoint); + }); + return accessConfig.build(); } diff --git a/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java b/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java index fa7777814..9ba5271ab 100644 --- a/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java @@ -21,6 +21,7 @@ package org.apache.polaris.core.storage; import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; import java.util.Map; +import java.util.Optional; import java.util.Set; import org.apache.polaris.core.config.PolarisConfigurationStore; import org.apache.polaris.core.config.RealmConfig; @@ -197,7 +198,8 @@ class InMemoryStorageIntegrationTest { @Nonnull RealmConfig realmConfig, boolean allowListOperation, @Nonnull Set<String> allowedReadLocations, - @Nonnull Set<String> allowedWriteLocations) { + @Nonnull Set<String> allowedWriteLocations, + Optional<String> refreshCredentialsEndpoint) { return null; } } diff --git a/polaris-core/src/test/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegrationTest.java b/polaris-core/src/test/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegrationTest.java index 89b60dba5..d613e5154 100644 --- a/polaris-core/src/test/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegrationTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegrationTest.java @@ -22,7 +22,9 @@ package org.apache.polaris.core.storage.azure; import static org.apache.polaris.core.storage.azure.AzureCredentialsStorageIntegration.toAccessConfig; import java.time.Instant; +import java.util.Optional; import org.apache.polaris.core.storage.AccessConfig; +import org.apache.polaris.core.storage.StorageAccessProperty; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; @@ -32,20 +34,34 @@ public class AzureCredentialsStorageIntegrationTest { public void testAzureCredentialFormatting() { Instant expiresAt = Instant.ofEpochMilli(Long.MAX_VALUE); - AccessConfig noSuffixResult = toAccessConfig("sasToken", "some_account", expiresAt); + AccessConfig noSuffixResult = + toAccessConfig("sasToken", "some_account", expiresAt, Optional.empty()); Assertions.assertThat(noSuffixResult.credentials()).hasSize(2); Assertions.assertThat(noSuffixResult.credentials()).containsKey("adls.sas-token.some_account"); + Assertions.assertThat(noSuffixResult.credentials()) + .doesNotContainKey( + StorageAccessProperty.AZURE_REFRESH_CREDENTIALS_ENDPOINT.getPropertyName()); AccessConfig adlsSuffixResult = - toAccessConfig("sasToken", "some_account." + AzureLocation.ADLS_ENDPOINT, expiresAt); + toAccessConfig( + "sasToken", + "some_account." + AzureLocation.ADLS_ENDPOINT, + expiresAt, + Optional.of("endpoint/credentials")); Assertions.assertThat(adlsSuffixResult.credentials()).hasSize(3); Assertions.assertThat(adlsSuffixResult.credentials()) .containsKey("adls.sas-token.some_account"); Assertions.assertThat(adlsSuffixResult.credentials()) .containsKey("adls.sas-token.some_account." + AzureLocation.ADLS_ENDPOINT); + Assertions.assertThat(adlsSuffixResult.extraProperties()) + .containsEntry( + StorageAccessProperty.AZURE_REFRESH_CREDENTIALS_ENDPOINT.getPropertyName(), + "endpoint/credentials"); + AccessConfig blobSuffixResult = - toAccessConfig("sasToken", "some_account." + AzureLocation.BLOB_ENDPOINT, expiresAt); + toAccessConfig( + "sasToken", "some_account." + AzureLocation.BLOB_ENDPOINT, expiresAt, Optional.empty()); Assertions.assertThat(blobSuffixResult.credentials()).hasSize(3); Assertions.assertThat(blobSuffixResult.credentials()) .containsKey("adls.sas-token.some_account"); diff --git a/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java b/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java index a8e97133b..becc220a6 100644 --- a/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import org.apache.iceberg.exceptions.UnprocessableEntityException; import org.apache.polaris.core.PolarisCallContext; @@ -89,7 +90,8 @@ public class StorageCredentialCacheTest { Mockito.any(), Mockito.anyBoolean(), Mockito.anySet(), - Mockito.anySet())) + Mockito.anySet(), + Mockito.any())) .thenReturn(badResult); PolarisEntity polarisEntity = new PolarisEntity( @@ -103,7 +105,8 @@ public class StorageCredentialCacheTest { polarisEntity, true, Set.of("s3://bucket1/path"), - Set.of("s3://bucket3/path"))) + Set.of("s3://bucket3/path"), + Optional.empty())) .isInstanceOf(UnprocessableEntityException.class) .hasMessage("Failed to get subscoped credentials: extra_error_info"); } @@ -121,7 +124,8 @@ public class StorageCredentialCacheTest { Mockito.any(), Mockito.anyBoolean(), Mockito.anySet(), - Mockito.anySet())) + Mockito.anySet(), + Mockito.any())) .thenReturn(mockedScopedCreds.get(0)) .thenReturn(mockedScopedCreds.get(1)) .thenReturn(mockedScopedCreds.get(1)); @@ -137,7 +141,8 @@ public class StorageCredentialCacheTest { polarisEntity, true, Set.of("s3://bucket1/path", "s3://bucket2/path"), - Set.of("s3://bucket3/path", "s3://bucket4/path")); + Set.of("s3://bucket3/path", "s3://bucket4/path"), + Optional.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(1); // subscope for the same entity and same allowed locations, will hit the cache @@ -147,7 +152,8 @@ public class StorageCredentialCacheTest { polarisEntity, true, Set.of("s3://bucket1/path", "s3://bucket2/path"), - Set.of("s3://bucket3/path", "s3://bucket4/path")); + Set.of("s3://bucket3/path", "s3://bucket4/path"), + Optional.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(1); } @@ -164,7 +170,8 @@ public class StorageCredentialCacheTest { Mockito.any(), Mockito.anyBoolean(), Mockito.anySet(), - Mockito.anySet())) + Mockito.anySet(), + Mockito.any())) .thenReturn(mockedScopedCreds.get(0)) .thenReturn(mockedScopedCreds.get(1)) .thenReturn(mockedScopedCreds.get(2)); @@ -178,7 +185,8 @@ public class StorageCredentialCacheTest { polarisEntity, true, Set.of("s3://bucket1/path", "s3://bucket2/path"), - Set.of("s3://bucket/path")); + Set.of("s3://bucket/path"), + Optional.empty()); // the entry will be evicted immediately because the token is expired storageCredentialCache.getOrGenerateSubScopeCreds( @@ -187,7 +195,8 @@ public class StorageCredentialCacheTest { polarisEntity, true, Set.of("s3://bucket1/path", "s3://bucket2/path"), - Set.of("s3://bucket/path")); + Set.of("s3://bucket/path"), + Optional.empty()); Assertions.assertThat(storageCredentialCache.getIfPresent(cacheKey)).isNull(); storageCredentialCache.getOrGenerateSubScopeCreds( @@ -196,7 +205,8 @@ public class StorageCredentialCacheTest { polarisEntity, true, Set.of("s3://bucket1/path", "s3://bucket2/path"), - Set.of("s3://bucket/path")); + Set.of("s3://bucket/path"), + Optional.empty()); Assertions.assertThat(storageCredentialCache.getIfPresent(cacheKey)).isNull(); storageCredentialCache.getOrGenerateSubScopeCreds( @@ -205,7 +215,8 @@ public class StorageCredentialCacheTest { polarisEntity, true, Set.of("s3://bucket1/path", "s3://bucket2/path"), - Set.of("s3://bucket/path")); + Set.of("s3://bucket/path"), + Optional.empty()); Assertions.assertThat(storageCredentialCache.getIfPresent(cacheKey)).isNull(); } @@ -222,7 +233,8 @@ public class StorageCredentialCacheTest { Mockito.any(), Mockito.anyBoolean(), Mockito.anySet(), - Mockito.anySet())) + Mockito.anySet(), + Mockito.any())) .thenReturn(mockedScopedCreds.get(0)) .thenReturn(mockedScopedCreds.get(1)) .thenReturn(mockedScopedCreds.get(2)); @@ -236,7 +248,8 @@ public class StorageCredentialCacheTest { entity, true, Set.of("s3://bucket1/path", "s3://bucket2/path"), - Set.of("s3://bucket/path")); + Set.of("s3://bucket/path"), + Optional.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(++cacheSize); } // update the entity's storage config, since StorageConfig changed, cache will generate new @@ -253,7 +266,8 @@ public class StorageCredentialCacheTest { PolarisEntity.of(updateEntity), /* allowedListAction= */ true, Set.of("s3://bucket1/path", "s3://bucket2/path"), - Set.of("s3://bucket/path")); + Set.of("s3://bucket/path"), + Optional.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(++cacheSize); } // allowedListAction changed to different value FALSE, will generate new entry @@ -264,7 +278,8 @@ public class StorageCredentialCacheTest { entity, /* allowedListAction= */ false, Set.of("s3://bucket1/path", "s3://bucket2/path"), - Set.of("s3://bucket/path")); + Set.of("s3://bucket/path"), + Optional.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(++cacheSize); } // different allowedWriteLocations, will generate new entry @@ -275,7 +290,8 @@ public class StorageCredentialCacheTest { entity, /* allowedListAction= */ false, Set.of("s3://bucket1/path", "s3://bucket2/path"), - Set.of("s3://differentbucket/path")); + Set.of("s3://differentbucket/path"), + Optional.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(++cacheSize); } // different allowedReadLocations, will generate new try @@ -291,7 +307,8 @@ public class StorageCredentialCacheTest { PolarisEntity.of(updateEntity), /* allowedListAction= */ false, Set.of("s3://differentbucket/path", "s3://bucket2/path"), - Set.of("s3://bucket/path")); + Set.of("s3://bucket/path"), + Optional.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(++cacheSize); } } @@ -310,7 +327,8 @@ public class StorageCredentialCacheTest { Mockito.any(), Mockito.anyBoolean(), Mockito.anySet(), - Mockito.anySet())) + Mockito.anySet(), + Mockito.any())) .thenReturn(mockedScopedCreds.get(0)) .thenReturn(mockedScopedCreds.get(1)) .thenReturn(mockedScopedCreds.get(2)); @@ -322,7 +340,8 @@ public class StorageCredentialCacheTest { entity, true, Set.of("s3://bucket1/path", "s3://bucket2/path"), - Set.of("s3://bucket3/path", "s3://bucket4/path")); + Set.of("s3://bucket3/path", "s3://bucket4/path"), + Optional.empty()); } Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(entityList.size()); @@ -334,7 +353,8 @@ public class StorageCredentialCacheTest { new PolarisEntity(new PolarisBaseEntity.Builder(entity).id(1234).build()), true, Set.of("s3://bucket1/path", "s3://bucket2/path"), - Set.of("s3://bucket3/path", "s3://bucket4/path")); + Set.of("s3://bucket3/path", "s3://bucket4/path"), + Optional.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(entityList.size()); } @@ -346,7 +366,8 @@ public class StorageCredentialCacheTest { new PolarisEntity(new PolarisBaseEntity.Builder(entity).entityVersion(5).build()), true, Set.of("s3://bucket1/path", "s3://bucket2/path"), - Set.of("s3://bucket3/path", "s3://bucket4/path")); + Set.of("s3://bucket3/path", "s3://bucket4/path"), + Optional.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(entityList.size()); } // order of the allowedReadLocations does not affect the cache @@ -357,7 +378,8 @@ public class StorageCredentialCacheTest { new PolarisEntity(new PolarisBaseEntity.Builder(entity).entityVersion(5).build()), true, Set.of("s3://bucket2/path", "s3://bucket1/path"), - Set.of("s3://bucket3/path", "s3://bucket4/path")); + Set.of("s3://bucket3/path", "s3://bucket4/path"), + Optional.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(entityList.size()); } @@ -369,7 +391,8 @@ public class StorageCredentialCacheTest { new PolarisEntity(new PolarisBaseEntity.Builder(entity).entityVersion(5).build()), true, Set.of("s3://bucket2/path", "s3://bucket1/path"), - Set.of("s3://bucket4/path", "s3://bucket3/path")); + Set.of("s3://bucket4/path", "s3://bucket3/path"), + Optional.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(entityList.size()); } } @@ -451,7 +474,8 @@ public class StorageCredentialCacheTest { Mockito.any(), Mockito.anyBoolean(), Mockito.anySet(), - Mockito.anySet())) + Mockito.anySet(), + Mockito.any())) .thenReturn(properties); List<PolarisEntity> entityList = getPolarisEntities(); @@ -462,7 +486,8 @@ public class StorageCredentialCacheTest { entityList.get(0), true, Set.of("s3://bucket1/path", "s3://bucket2/path"), - Set.of("s3://bucket3/path", "s3://bucket4/path")); + Set.of("s3://bucket3/path", "s3://bucket4/path"), + Optional.empty()); Assertions.assertThat(config.credentials()) .containsExactly(Map.entry("s3.secret-access-key", "super-secret-123")); Assertions.assertThat(config.extraProperties()) diff --git a/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java b/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java index 10ba4b908..7b4b50dec 100644 --- a/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java @@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat; import jakarta.annotation.Nonnull; import java.time.Instant; import java.util.List; +import java.util.Optional; import java.util.Set; import org.apache.polaris.core.storage.AccessConfig; import org.apache.polaris.core.storage.BaseStorageIntegrationTest; @@ -95,7 +96,8 @@ class AwsCredentialsStorageIntegrationTest extends BaseStorageIntegrationTest { EMPTY_REALM_CONFIG, true, Set.of(warehouseDir + "/namespace/table"), - Set.of(warehouseDir + "/namespace/table")); + Set.of(warehouseDir + "/namespace/table"), + Optional.of("/namespace/table/credentials")); assertThat(accessConfig.credentials()) .isNotEmpty() .containsEntry(StorageAccessProperty.AWS_TOKEN.getPropertyName(), "sess") @@ -104,6 +106,10 @@ class AwsCredentialsStorageIntegrationTest extends BaseStorageIntegrationTest { .containsEntry( StorageAccessProperty.AWS_SESSION_TOKEN_EXPIRES_AT_MS.getPropertyName(), String.valueOf(EXPIRE_TIME.toEpochMilli())); + assertThat(accessConfig.extraProperties()) + .containsEntry( + StorageAccessProperty.AWS_REFRESH_CREDENTIALS_ENDPOINT.getPropertyName(), + "/namespace/table/credentials"); } @ParameterizedTest @@ -242,7 +248,8 @@ class AwsCredentialsStorageIntegrationTest extends BaseStorageIntegrationTest { EMPTY_REALM_CONFIG, true, Set.of(s3Path(bucket, firstPath), s3Path(bucket, secondPath)), - Set.of(s3Path(bucket, firstPath)))) + Set.of(s3Path(bucket, firstPath)), + null)) .isInstanceOf(IllegalArgumentException.class); break; case AWS_PARTITION: @@ -260,7 +267,8 @@ class AwsCredentialsStorageIntegrationTest extends BaseStorageIntegrationTest { EMPTY_REALM_CONFIG, true, Set.of(s3Path(bucket, firstPath), s3Path(bucket, secondPath)), - Set.of(s3Path(bucket, firstPath))); + Set.of(s3Path(bucket, firstPath)), + Optional.empty()); assertThat(accessConfig.credentials()) .isNotEmpty() .containsEntry(StorageAccessProperty.AWS_TOKEN.getPropertyName(), "sess") @@ -360,7 +368,8 @@ class AwsCredentialsStorageIntegrationTest extends BaseStorageIntegrationTest { EMPTY_REALM_CONFIG, false, /* allowList = false*/ Set.of(s3Path(bucket, firstPath), s3Path(bucket, secondPath)), - Set.of(s3Path(bucket, firstPath))); + Set.of(s3Path(bucket, firstPath)), + Optional.empty()); assertThat(accessConfig.credentials()) .isNotEmpty() .containsEntry(StorageAccessProperty.AWS_TOKEN.getPropertyName(), "sess") @@ -454,7 +463,8 @@ class AwsCredentialsStorageIntegrationTest extends BaseStorageIntegrationTest { EMPTY_REALM_CONFIG, true, /* allowList = true */ Set.of(s3Path(bucket, firstPath), s3Path(bucket, secondPath)), - Set.of()); + Set.of(), + Optional.empty()); assertThat(accessConfig.credentials()) .isNotEmpty() .containsEntry(StorageAccessProperty.AWS_TOKEN.getPropertyName(), "sess") @@ -516,7 +526,12 @@ class AwsCredentialsStorageIntegrationTest extends BaseStorageIntegrationTest { .region("us-east-2") .build(), stsClient) - .getSubscopedCreds(EMPTY_REALM_CONFIG, true, /* allowList = true */ Set.of(), Set.of()); + .getSubscopedCreds( + EMPTY_REALM_CONFIG, + true, /* allowList = true */ + Set.of(), + Set.of(), + Optional.empty()); assertThat(accessConfig.credentials()) .isNotEmpty() .containsEntry(StorageAccessProperty.AWS_TOKEN.getPropertyName(), "sess") @@ -554,7 +569,11 @@ class AwsCredentialsStorageIntegrationTest extends BaseStorageIntegrationTest { .build(), stsClient) .getSubscopedCreds( - EMPTY_REALM_CONFIG, true, /* allowList = true */ Set.of(), Set.of())) + EMPTY_REALM_CONFIG, + true, /* allowList = true */ + Set.of(), + Set.of(), + Optional.empty())) .isInstanceOf(IllegalArgumentException.class); break; case AWS_PARTITION: @@ -569,7 +588,11 @@ class AwsCredentialsStorageIntegrationTest extends BaseStorageIntegrationTest { .build(), stsClient) .getSubscopedCreds( - EMPTY_REALM_CONFIG, true, /* allowList = true */ Set.of(), Set.of()); + EMPTY_REALM_CONFIG, + true, /* allowList = true */ + Set.of(), + Set.of(), + Optional.empty()); assertThat(accessConfig.credentials()) .isNotEmpty() .containsEntry(StorageAccessProperty.CLIENT_REGION.getPropertyName(), clientRegion); @@ -604,7 +627,11 @@ class AwsCredentialsStorageIntegrationTest extends BaseStorageIntegrationTest { .build(), stsClient) .getSubscopedCreds( - EMPTY_REALM_CONFIG, true, /* allowList = true */ Set.of(), Set.of()); + EMPTY_REALM_CONFIG, + true, /* allowList = true */ + Set.of(), + Set.of(), + Optional.empty()); assertThat(accessConfig.credentials()) .isNotEmpty() .doesNotContainKey(StorageAccessProperty.CLIENT_REGION.getPropertyName()); @@ -621,7 +648,11 @@ class AwsCredentialsStorageIntegrationTest extends BaseStorageIntegrationTest { .build(), stsClient) .getSubscopedCreds( - EMPTY_REALM_CONFIG, true, /* allowList = true */ Set.of(), Set.of())) + EMPTY_REALM_CONFIG, + true, /* allowList = true */ + Set.of(), + Set.of(), + Optional.empty())) .isInstanceOf(IllegalArgumentException.class); break; default: diff --git a/polaris-core/src/test/java/org/apache/polaris/service/storage/azure/AzureCredentialStorageIntegrationTest.java b/polaris-core/src/test/java/org/apache/polaris/service/storage/azure/AzureCredentialStorageIntegrationTest.java index d1783baed..96e441000 100644 --- a/polaris-core/src/test/java/org/apache/polaris/service/storage/azure/AzureCredentialStorageIntegrationTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/service/storage/azure/AzureCredentialStorageIntegrationTest.java @@ -45,6 +45,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.stream.Stream; import org.apache.polaris.core.storage.AccessConfig; import org.apache.polaris.core.storage.BaseStorageIntegrationTest; @@ -352,7 +353,8 @@ public class AzureCredentialStorageIntegrationTest extends BaseStorageIntegratio EMPTY_REALM_CONFIG, allowListAction, new HashSet<>(allowedReadLoc), - new HashSet<>(allowedWriteLoc)); + new HashSet<>(allowedWriteLoc), + Optional.empty()); } private BlobContainerClient createContainerClient( diff --git a/polaris-core/src/test/java/org/apache/polaris/service/storage/gcp/GcpCredentialsStorageIntegrationTest.java b/polaris-core/src/test/java/org/apache/polaris/service/storage/gcp/GcpCredentialsStorageIntegrationTest.java index da890627e..f1a7afc63 100644 --- a/polaris-core/src/test/java/org/apache/polaris/service/storage/gcp/GcpCredentialsStorageIntegrationTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/service/storage/gcp/GcpCredentialsStorageIntegrationTest.java @@ -41,6 +41,7 @@ import java.util.Arrays; import java.util.Date; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Set; import org.apache.polaris.core.storage.AccessConfig; import org.apache.polaris.core.storage.BaseStorageIntegrationTest; @@ -59,6 +60,8 @@ class GcpCredentialsStorageIntegrationTest extends BaseStorageIntegrationTest { private final String gcsServiceKeyJsonFileLocation = System.getenv("GOOGLE_APPLICATION_CREDENTIALS"); + private static final String REFRESH_ENDPOINT = "get/credentials"; + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testSubscope(boolean allowedListAction) throws Exception { @@ -170,7 +173,8 @@ class GcpCredentialsStorageIntegrationTest extends BaseStorageIntegrationTest { EMPTY_REALM_CONFIG, allowListAction, new HashSet<>(allowedReadLoc), - new HashSet<>(allowedWriteLoc)); + new HashSet<>(allowedWriteLoc), + Optional.of(REFRESH_ENDPOINT)); } @Test @@ -295,6 +299,20 @@ class GcpCredentialsStorageIntegrationTest extends BaseStorageIntegrationTest { return true; } + @Test + public void testRefreshCredentialsEndpointIsReturned() throws IOException { + Assumptions.assumeThat(gcsServiceKeyJsonFileLocation) + .describedAs("Environment variable GOOGLE_APPLICATION_CREDENTIALS not exits") + .isNotNull() + .isNotEmpty(); + + AccessConfig accessConfig = + subscopedCredsForOperations( + List.of("gs://bucket1/path/to/data"), List.of("gs://bucket1/path/to/data"), true); + assertThat(accessConfig.get(StorageAccessProperty.GCS_REFRESH_CREDENTIALS_ENDPOINT)) + .isEqualTo(REFRESH_ENDPOINT); + } + private boolean isNotNull(JsonNode node) { return node != null && !node.isNull(); } diff --git a/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java b/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java index 6a93da886..83ddd0c57 100644 --- a/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java +++ b/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java @@ -42,6 +42,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableOperations; +import org.apache.iceberg.aws.AwsClientProperties; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; @@ -266,7 +267,11 @@ public class RestCatalogMinIOSpecialIT { LoadTableResponse loadTableResponse = catalogApi.loadTableWithAccessDelegation(catalogName, id, "ALL"); - assertThat(loadTableResponse.config()).containsKey("s3.endpoint"); + assertThat(loadTableResponse.config()) + .containsKey("s3.endpoint") + .containsEntry( + AwsClientProperties.REFRESH_CREDENTIALS_ENDPOINT, + "v1/" + catalogName + "/namespaces/test-ns/tables/t1/credentials"); restCatalog.dropTable(id); assertThat(restCatalog.tableExists(id)).isFalse(); diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java index 438cf9922..5da1eeb12 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java @@ -830,7 +830,8 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog public AccessConfig getAccessConfig( TableIdentifier tableIdentifier, TableMetadata tableMetadata, - Set<PolarisStorageActions> storageActions) { + Set<PolarisStorageActions> storageActions, + Optional<String> refreshCredentialsEndpoint) { Optional<PolarisEntity> storageInfo = findStorageInfo(tableIdentifier); if (storageInfo.isEmpty()) { LOGGER @@ -846,7 +847,8 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog tableIdentifier, StorageUtil.getLocationsAllowedToBeAccessed(tableMetadata), storageActions, - storageInfo.get()); + storageInfo.get(), + refreshCredentialsEndpoint); } private String buildPrefixedLocation(TableIdentifier tableIdentifier) { diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java index 76401582a..860476cf8 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java @@ -75,6 +75,7 @@ import org.apache.polaris.core.persistence.resolver.Resolver; import org.apache.polaris.core.persistence.resolver.ResolverFactory; import org.apache.polaris.core.persistence.resolver.ResolverStatus; import org.apache.polaris.core.rest.PolarisEndpoints; +import org.apache.polaris.core.rest.PolarisResourcePaths; import org.apache.polaris.core.secrets.UserSecretsManager; import org.apache.polaris.service.catalog.AccessDelegationMode; import org.apache.polaris.service.catalog.CatalogPrefixParser; @@ -359,12 +360,18 @@ public class IcebergCatalogAdapter securityContext, prefix, catalog -> { + Optional<String> refreshCredentialsEndpoint = + getRefreshCredentialsEndpoint( + delegationModes, + prefix, + TableIdentifier.of(namespace, createTableRequest.name())); if (createTableRequest.stageCreate()) { if (delegationModes.isEmpty()) { return Response.ok(catalog.createTableStaged(ns, createTableRequest)).build(); } else { return Response.ok( - catalog.createTableStagedWithWriteDelegation(ns, createTableRequest)) + catalog.createTableStagedWithWriteDelegation( + ns, createTableRequest, refreshCredentialsEndpoint)) .build(); } } else if (delegationModes.isEmpty()) { @@ -374,7 +381,8 @@ public class IcebergCatalogAdapter .build(); } else { LoadTableResponse response = - catalog.createTableDirectWithWriteDelegation(ns, createTableRequest); + catalog.createTableDirectWithWriteDelegation( + ns, createTableRequest, refreshCredentialsEndpoint); return tryInsertETagHeader( Response.ok(response), response, namespace, createTableRequest.name()) .build(); @@ -430,9 +438,12 @@ public class IcebergCatalogAdapter .loadTableIfStale(tableIdentifier, ifNoneMatch, snapshots) .orElseThrow(() -> new WebApplicationException(Response.Status.NOT_MODIFIED)); } else { + Optional<String> refreshCredentialsEndpoint = + getRefreshCredentialsEndpoint(delegationModes, prefix, tableIdentifier); response = catalog - .loadTableWithAccessDelegationIfStale(tableIdentifier, ifNoneMatch, snapshots) + .loadTableWithAccessDelegationIfStale( + tableIdentifier, ifNoneMatch, snapshots, refreshCredentialsEndpoint) .orElseThrow(() -> new WebApplicationException(Response.Status.NOT_MODIFIED)); } @@ -440,6 +451,15 @@ public class IcebergCatalogAdapter }); } + private static Optional<String> getRefreshCredentialsEndpoint( + EnumSet<AccessDelegationMode> delegationModes, + String prefix, + TableIdentifier tableIdentifier) { + return delegationModes.contains(AccessDelegationMode.VENDED_CREDENTIALS) + ? Optional.of(new PolarisResourcePaths(prefix).credentialsPath(tableIdentifier)) + : Optional.empty(); + } + @Override public Response tableExists( String prefix, @@ -599,7 +619,10 @@ public class IcebergCatalogAdapter prefix, catalog -> { LoadTableResponse loadTableResponse = - catalog.loadTableWithAccessDelegation(tableIdentifier, "all"); + catalog.loadTableWithAccessDelegation( + tableIdentifier, + "all", + Optional.of(new PolarisResourcePaths(prefix).credentialsPath(tableIdentifier))); return Response.ok( ImmutableLoadCredentialsResponse.builder() .credentials(loadTableResponse.credentials()) diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index 17cdd7af3..57b8a990a 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -393,7 +393,9 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab * @return ETagged {@link LoadTableResponse} to uniquely identify the table metadata */ public LoadTableResponse createTableDirectWithWriteDelegation( - Namespace namespace, CreateTableRequest request) { + Namespace namespace, + CreateTableRequest request, + Optional<String> refreshCredentialsEndpoint) { PolarisAuthorizableOperation op = PolarisAuthorizableOperation.CREATE_TABLE_DIRECT_WITH_WRITE_DELEGATION; authorizeCreateTableLikeUnderNamespaceOperationOrThrow( @@ -432,7 +434,8 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab PolarisStorageActions.READ, PolarisStorageActions.WRITE, PolarisStorageActions.LIST), - SNAPSHOTS_ALL) + SNAPSHOTS_ALL, + refreshCredentialsEndpoint) .build(); } else if (table instanceof BaseMetadataTable) { // metadata tables are loaded on the client side, return NoSuchTableException for now @@ -500,7 +503,9 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab } public LoadTableResponse createTableStagedWithWriteDelegation( - Namespace namespace, CreateTableRequest request) { + Namespace namespace, + CreateTableRequest request, + Optional<String> refreshCredentialsEndpoint) { PolarisAuthorizableOperation op = PolarisAuthorizableOperation.CREATE_TABLE_STAGED_WITH_WRITE_DELEGATION; authorizeCreateTableLikeUnderNamespaceOperationOrThrow( @@ -514,7 +519,11 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab TableMetadata metadata = stageTableCreateHelper(namespace, request); return buildLoadTableResponseWithDelegationCredentials( - ident, metadata, Set.of(PolarisStorageActions.ALL), SNAPSHOTS_ALL) + ident, + metadata, + Set.of(PolarisStorageActions.ALL), + SNAPSHOTS_ALL, + refreshCredentialsEndpoint) .build(); } @@ -623,8 +632,12 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab } public LoadTableResponse loadTableWithAccessDelegation( - TableIdentifier tableIdentifier, String snapshots) { - return loadTableWithAccessDelegationIfStale(tableIdentifier, null, snapshots).get(); + TableIdentifier tableIdentifier, + String snapshots, + Optional<String> refreshCredentialsEndpoint) { + return loadTableWithAccessDelegationIfStale( + tableIdentifier, null, snapshots, refreshCredentialsEndpoint) + .get(); } /** @@ -638,7 +651,10 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab * load table response, otherwise */ public Optional<LoadTableResponse> loadTableWithAccessDelegationIfStale( - TableIdentifier tableIdentifier, IfNoneMatch ifNoneMatch, String snapshots) { + TableIdentifier tableIdentifier, + IfNoneMatch ifNoneMatch, + String snapshots, + Optional<String> refreshCredentialsEndpoint) { // Here we have a single method that falls through multiple candidate // PolarisAuthorizableOperations because instead of identifying the desired operation up-front // and @@ -708,7 +724,11 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab TableMetadata tableMetadata = baseTable.operations().current(); return Optional.of( buildLoadTableResponseWithDelegationCredentials( - tableIdentifier, tableMetadata, actionsRequested, snapshots) + tableIdentifier, + tableMetadata, + actionsRequested, + snapshots, + refreshCredentialsEndpoint) .build()); } else if (table instanceof BaseMetadataTable) { // metadata tables are loaded on the client side, return NoSuchTableException for now @@ -722,7 +742,8 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab TableIdentifier tableIdentifier, TableMetadata tableMetadata, Set<PolarisStorageActions> actions, - String snapshots) { + String snapshots, + Optional<String> refreshCredentialsEndpoint) { LoadTableResponse.Builder responseBuilder = LoadTableResponse.builder().withTableMetadata(tableMetadata); if (baseCatalog instanceof SupportsCredentialDelegation credentialDelegation) { @@ -732,7 +753,8 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab .addKeyValue("tableLocation", tableMetadata.location()) .log("Fetching client credentials for table"); AccessConfig accessConfig = - credentialDelegation.getAccessConfig(tableIdentifier, tableMetadata, actions); + credentialDelegation.getAccessConfig( + tableIdentifier, tableMetadata, actions, refreshCredentialsEndpoint); Map<String, String> credentialConfig = accessConfig.credentials(); responseBuilder.addAllConfig(credentialConfig); responseBuilder.addAllConfig(accessConfig.extraProperties()); diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/SupportsCredentialDelegation.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/SupportsCredentialDelegation.java index 21ec380eb..b85973ed8 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/SupportsCredentialDelegation.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/SupportsCredentialDelegation.java @@ -18,6 +18,7 @@ */ package org.apache.polaris.service.catalog.iceberg; +import java.util.Optional; import java.util.Set; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.catalog.TableIdentifier; @@ -35,5 +36,6 @@ public interface SupportsCredentialDelegation { AccessConfig getAccessConfig( TableIdentifier tableIdentifier, TableMetadata tableMetadata, - Set<PolarisStorageActions> storageActions); + Set<PolarisStorageActions> storageActions, + Optional<String> refreshCredentialsEndpoint); } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java index d2c73e268..81f6e7c8f 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java @@ -91,7 +91,8 @@ public class DefaultFileIOFactory implements FileIOFactory { identifier, tableLocations, storageActions, - storageInfo)); + storageInfo, + Optional.empty())); // Update the FileIO with the subscoped credentials // Update with properties in case there are table-level overrides the credentials should diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java index c5ef12d78..f4a6320d6 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java @@ -81,7 +81,8 @@ public class FileIOUtil { TableIdentifier tableIdentifier, Set<String> tableLocations, Set<PolarisStorageActions> storageActions, - PolarisEntity entity) { + PolarisEntity entity, + Optional<String> refreshCredentialsEndpoint) { boolean skipCredentialSubscopingIndirection = callContext @@ -111,7 +112,8 @@ public class FileIOUtil { entity, allowList, tableLocations, - writeLocations); + writeLocations, + refreshCredentialsEndpoint); LOGGER .atDebug() .addKeyValue("tableIdentifier", tableIdentifier) diff --git a/runtime/service/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java b/runtime/service/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java index e07bdd082..e04a9525b 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java @@ -113,7 +113,8 @@ public class PolarisStorageIntegrationProviderImpl implements PolarisStorageInte @Nonnull RealmConfig realmConfig, boolean allowListOperation, @Nonnull Set<String> allowedReadLocations, - @Nonnull Set<String> allowedWriteLocations) { + @Nonnull Set<String> allowedWriteLocations, + Optional<String> refreshCredentialsEndpoint) { return AccessConfig.builder().build(); } diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/AbstractIcebergCatalogTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/AbstractIcebergCatalogTest.java index 696bca432..d66d25cd5 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/AbstractIcebergCatalogTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/AbstractIcebergCatalogTest.java @@ -46,6 +46,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.function.Function; @@ -1828,7 +1829,8 @@ public abstract class AbstractIcebergCatalogTest extends CatalogTests<IcebergCat taskEntity.getType(), true, Set.of(tableMetadata.location()), - Set.of(tableMetadata.location())) + Set.of(tableMetadata.location()), + Optional.empty()) .getAccessConfig() .credentials(); Assertions.assertThat(credentials) diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/IcebergCatalogHandlerAuthzTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/IcebergCatalogHandlerAuthzTest.java index a3ab18e3f..a8090f038 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/IcebergCatalogHandlerAuthzTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/IcebergCatalogHandlerAuthzTest.java @@ -26,6 +26,7 @@ import jakarta.ws.rs.core.SecurityContext; import java.time.Instant; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import org.apache.hadoop.conf.Configuration; @@ -616,7 +617,8 @@ public class IcebergCatalogHandlerAuthzTest extends PolarisAuthzTestBase { Set.of(PolarisPrivilege.CATALOG_MANAGE_CONTENT)), () -> { newWrapper(Set.of(PRINCIPAL_ROLE1)) - .createTableDirectWithWriteDelegation(NS2, createDirectWithWriteDelegationRequest); + .createTableDirectWithWriteDelegation( + NS2, createDirectWithWriteDelegationRequest, Optional.empty()); }, () -> { newWrapper(Set.of(PRINCIPAL_ROLE2)).dropTableWithPurge(newtable); @@ -646,7 +648,8 @@ public class IcebergCatalogHandlerAuthzTest extends PolarisAuthzTestBase { PolarisPrivilege.TABLE_LIST), () -> { newWrapper(Set.of(PRINCIPAL_ROLE1)) - .createTableDirectWithWriteDelegation(NS2, createDirectWithWriteDelegationRequest); + .createTableDirectWithWriteDelegation( + NS2, createDirectWithWriteDelegationRequest, Optional.empty()); }); } @@ -719,7 +722,8 @@ public class IcebergCatalogHandlerAuthzTest extends PolarisAuthzTestBase { Set.of(PolarisPrivilege.CATALOG_MANAGE_CONTENT)), () -> { newWrapper(Set.of(PRINCIPAL_ROLE1)) - .createTableStagedWithWriteDelegation(NS2, createStagedWithWriteDelegationRequest); + .createTableStagedWithWriteDelegation( + NS2, createStagedWithWriteDelegationRequest, Optional.empty()); }, // createTableStagedWithWriteDelegation doesn't actually commit any metadata null, @@ -748,7 +752,8 @@ public class IcebergCatalogHandlerAuthzTest extends PolarisAuthzTestBase { PolarisPrivilege.TABLE_LIST), () -> { newWrapper(Set.of(PRINCIPAL_ROLE1)) - .createTableStagedWithWriteDelegation(NS2, createStagedWithWriteDelegationRequest); + .createTableStagedWithWriteDelegation( + NS2, createStagedWithWriteDelegationRequest, Optional.empty()); }); } @@ -892,7 +897,7 @@ public class IcebergCatalogHandlerAuthzTest extends PolarisAuthzTestBase { PolarisPrivilege.TABLE_READ_DATA, PolarisPrivilege.TABLE_WRITE_DATA, PolarisPrivilege.CATALOG_MANAGE_CONTENT), - () -> newWrapper().loadTableWithAccessDelegation(TABLE_NS1A_2, "all"), + () -> newWrapper().loadTableWithAccessDelegation(TABLE_NS1A_2, "all", Optional.empty()), null /* cleanupAction */); } @@ -908,7 +913,7 @@ public class IcebergCatalogHandlerAuthzTest extends PolarisAuthzTestBase { PolarisPrivilege.TABLE_CREATE, PolarisPrivilege.TABLE_LIST, PolarisPrivilege.TABLE_DROP), - () -> newWrapper().loadTableWithAccessDelegation(TABLE_NS1A_2, "all")); + () -> newWrapper().loadTableWithAccessDelegation(TABLE_NS1A_2, "all", Optional.empty())); } @Test @@ -921,7 +926,7 @@ public class IcebergCatalogHandlerAuthzTest extends PolarisAuthzTestBase { PolarisPrivilege.TABLE_READ_DATA, PolarisPrivilege.TABLE_WRITE_DATA, PolarisPrivilege.CATALOG_MANAGE_CONTENT), - () -> newWrapper().loadTableWithAccessDelegation(TABLE_NS1A_2, "all"), + () -> newWrapper().loadTableWithAccessDelegation(TABLE_NS1A_2, "all", Optional.empty()), null /* cleanupAction */); } @@ -937,7 +942,7 @@ public class IcebergCatalogHandlerAuthzTest extends PolarisAuthzTestBase { PolarisPrivilege.TABLE_CREATE, PolarisPrivilege.TABLE_LIST, PolarisPrivilege.TABLE_DROP), - () -> newWrapper().loadTableWithAccessDelegation(TABLE_NS1A_2, "all")); + () -> newWrapper().loadTableWithAccessDelegation(TABLE_NS1A_2, "all", Optional.empty())); } @Test @@ -950,7 +955,7 @@ public class IcebergCatalogHandlerAuthzTest extends PolarisAuthzTestBase { () -> newWrapper() .loadTableWithAccessDelegationIfStale( - TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all"), + TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all", Optional.empty()), null /* cleanupAction */); } @@ -969,7 +974,7 @@ public class IcebergCatalogHandlerAuthzTest extends PolarisAuthzTestBase { () -> newWrapper() .loadTableWithAccessDelegationIfStale( - TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all")); + TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all", Optional.empty())); } @Test @@ -985,7 +990,7 @@ public class IcebergCatalogHandlerAuthzTest extends PolarisAuthzTestBase { () -> newWrapper() .loadTableWithAccessDelegationIfStale( - TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all"), + TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all", Optional.empty()), null /* cleanupAction */); } @@ -1004,7 +1009,7 @@ public class IcebergCatalogHandlerAuthzTest extends PolarisAuthzTestBase { () -> newWrapper() .loadTableWithAccessDelegationIfStale( - TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all")); + TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all", Optional.empty())); } @Test