dimas-b commented on code in PR #3699:
URL: https://github.com/apache/polaris/pull/3699#discussion_r3236614178
##########
persistence/nosql/persistence/metastore/src/testFixtures/java/org/apache/polaris/persistence/nosql/metastore/CdiProducers.java:
##########
@@ -35,9 +34,8 @@ public class CdiProducers {
PolarisStorageIntegrationProvider producePolarisStorageIntegrationProvider()
{
return new PolarisStorageIntegrationProvider() {
@Override
- public @Nullable <T extends PolarisStorageConfigurationInfo>
- PolarisStorageIntegration<T> getStorageIntegrationForConfig(
- PolarisStorageConfigurationInfo polarisStorageConfigurationInfo)
{
+ public @Nullable PolarisStorageIntegration getStorageIntegration(
+ java.util.List<org.apache.polaris.core.entity.PolarisEntity>
resolvedEntityPath) {
Review Comment:
nit: import
##########
persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java:
##########
@@ -1251,31 +1250,23 @@ private List<PolarisPolicyMappingRecord>
fetchPolicyMappingRecords(
@Nullable
@Override
- public <T extends PolarisStorageConfigurationInfo>
- PolarisStorageIntegration<T> createStorageIntegration(
- @Nonnull PolarisCallContext callCtx,
- long catalogId,
- long entityId,
- PolarisStorageConfigurationInfo polarisStorageConfigurationInfo) {
- return storageIntegrationProvider.getStorageIntegrationForConfig(
- polarisStorageConfigurationInfo);
+ public PolarisStorageIntegration createStorageIntegration(
+ @Nonnull PolarisCallContext callCtx,
+ long catalogId,
+ long entityId,
+ PolarisStorageConfigurationInfo polarisStorageConfigurationInfo) {
+ // No-op in OSS: the storage integration is resolved at credential-vending
time via
+ //
PolarisStorageIntegrationProvider.getStorageIntegration(resolvedEntityPath).
This hook
+ // remains available for custom deployments that need to allocate/lease
external state
+ // atomically with the catalog-creation transaction.
+ return null;
Review Comment:
NoSQL code throws in this case... I think JDBC should throw too for the sake
of consistency.
As far as I can tell this method is not called is OSS anyway.
##########
persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/MetricsReportPersistenceTest.java:
##########
@@ -63,9 +62,8 @@ void setUp() throws SQLException {
PolarisStorageIntegrationProvider storageProvider =
new PolarisStorageIntegrationProvider() {
@Override
- public <T extends PolarisStorageConfigurationInfo>
- PolarisStorageIntegration<T> getStorageIntegrationForConfig(
- PolarisStorageConfigurationInfo config) {
+ public PolarisStorageIntegration getStorageIntegration(
+ java.util.List<org.apache.polaris.core.entity.PolarisEntity>
resolvedEntityPath) {
Review Comment:
nit: import
##########
polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java:
##########
@@ -57,86 +60,174 @@
/** Credential vendor that supports generating */
public class AwsCredentialsStorageIntegration
- extends InMemoryStorageIntegration<AwsStorageConfigurationInfo> {
+ extends CachingStorageIntegration<AwsStorageConfigurationInfo> {
private final StsClientProvider stsClientProvider;
- private final Optional<AwsCredentialsProvider> credentialsProvider;
+ private final Function<AwsStorageConfigurationInfo,
Optional<AwsCredentialsProvider>>
+ credentialsResolver;
private static final Logger LOGGER =
LoggerFactory.getLogger(AwsCredentialsStorageIntegration.class);
+ /** Test constructor — no cache, no request-scoped suppliers. */
public AwsCredentialsStorageIntegration(
- AwsStorageConfigurationInfo config, StsClient fixedClient) {
- this(config, (destination) -> fixedClient);
+ StsClient fixedClient, AwsStorageConfigurationInfo storageConfig,
RealmConfig realmConfig) {
+ this((destination) -> fixedClient, config -> Optional.empty(),
storageConfig, realmConfig);
}
+ /** Test constructor with credentials. */
public AwsCredentialsStorageIntegration(
- AwsStorageConfigurationInfo config, StsClientProvider stsClientProvider)
{
- this(config, stsClientProvider, Optional.empty());
+ StsClientProvider stsClientProvider,
+ Optional<AwsCredentialsProvider> credentialsProvider,
+ AwsStorageConfigurationInfo storageConfig,
+ RealmConfig realmConfig) {
+ this(stsClientProvider, config -> credentialsProvider, storageConfig,
realmConfig);
+ }
+
+ /** Constructor with credentials resolver (no cache). */
+ public AwsCredentialsStorageIntegration(
+ StsClientProvider stsClientProvider,
+ Function<AwsStorageConfigurationInfo, Optional<AwsCredentialsProvider>>
credentialsResolver,
+ AwsStorageConfigurationInfo storageConfig,
+ RealmConfig realmConfig) {
+ this(stsClientProvider, credentialsResolver, null, storageConfig,
realmConfig);
}
+ /** Production constructor with cache and realm config. */
public AwsCredentialsStorageIntegration(
- AwsStorageConfigurationInfo config,
StsClientProvider stsClientProvider,
- Optional<AwsCredentialsProvider> credentialsProvider) {
- super(config, AwsCredentialsStorageIntegration.class.getName());
+ Function<AwsStorageConfigurationInfo, Optional<AwsCredentialsProvider>>
credentialsResolver,
+ org.apache.polaris.core.storage.cache.StorageCredentialCache cache,
+ AwsStorageConfigurationInfo storageConfig,
+ RealmConfig realmConfig) {
+ super(cache, realmConfig, storageConfig);
this.stsClientProvider = stsClientProvider;
- this.credentialsProvider = credentialsProvider;
+ this.credentialsResolver = credentialsResolver;
}
- /** {@inheritDoc} */
@Override
- public StorageAccessConfig getSubscopedCreds(
- @Nonnull RealmConfig realmConfig,
- boolean allowListOperation,
- @Nonnull Set<String> allowedReadLocations,
- @Nonnull Set<String> allowedWriteLocations,
- @Nonnull PolarisPrincipal polarisPrincipal,
- Optional<String> refreshCredentialsEndpoint,
- @Nonnull CredentialVendingContext credentialVendingContext) {
- int storageCredentialDurationSeconds =
- realmConfig.getConfig(STORAGE_CREDENTIAL_DURATION_SECONDS);
- AwsStorageConfigurationInfo storageConfig = config();
- String region = storageConfig.getRegion();
- StorageAccessConfig.Builder accessConfig = StorageAccessConfig.builder();
+ protected StorageCredentialCacheKey buildCacheKey(
+ @Nonnull List<LocationGrant> grants,
+ @Nonnull Optional<String> refreshEndpoint,
+ @Nonnull CredentialVendingContext context) {
+ return buildCacheKey(
+ readLocations(grants),
+ listLocations(grants),
+ writeLocations(grants),
+ refreshEndpoint,
+ context);
+ }
+
+ @Override
+ protected StorageAccessConfig generateStorageAccessConfig(
+ @Nonnull List<LocationGrant> grants,
+ @Nonnull Optional<String> refreshEndpoint,
+ @Nonnull CredentialVendingContext context) {
+ return generateStorageAccessConfig(
+ readLocations(grants),
+ listLocations(grants),
+ writeLocations(grants),
+ refreshEndpoint,
+ context);
+ }
+
+ private static Set<String> readLocations(List<LocationGrant> grants) {
+ return locationsFor(grants, PolarisStorageActions.READ,
PolarisStorageActions.ALL);
+ }
+
+ private static Set<String> listLocations(List<LocationGrant> grants) {
+ return locationsFor(grants, PolarisStorageActions.LIST,
PolarisStorageActions.ALL);
+ }
+
+ private static Set<String> writeLocations(List<LocationGrant> grants) {
+ return locationsFor(
+ grants,
+ PolarisStorageActions.WRITE,
+ PolarisStorageActions.DELETE,
+ PolarisStorageActions.ALL);
+ }
+
+ private static Set<String> locationsFor(
+ List<LocationGrant> grants, PolarisStorageActions... wantedActions) {
+ EnumSet<PolarisStorageActions> wanted =
EnumSet.noneOf(PolarisStorageActions.class);
+ for (PolarisStorageActions a : wantedActions) {
+ wanted.add(a);
Review Comment:
`addAll()`?
##########
runtime/service/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java:
##########
@@ -45,120 +49,136 @@
import org.apache.polaris.core.storage.aws.StsClientProvider;
import
org.apache.polaris.core.storage.azure.AzureCredentialsStorageIntegration;
import org.apache.polaris.core.storage.azure.AzureStorageConfigurationInfo;
+import org.apache.polaris.core.storage.cache.StorageCredentialCache;
import org.apache.polaris.core.storage.gcp.GcpCredentialsStorageIntegration;
import org.apache.polaris.core.storage.gcp.GcpStorageConfigurationInfo;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+/**
+ * Provider that returns a {@link PolarisStorageIntegration} for a resolved
entity path.
+ *
+ * <p>Integration instances are bound to a specific {@link
PolarisStorageConfigurationInfo} and
+ * cached per-config in an internal map: every unique config gets exactly one
integration instance
+ * for the lifetime of the provider. Construction is driven by a
per-storage-type factory so we can
+ * still share expensive request-scope state (STS client provider, GCP
credentials supplier, etc.)
+ * across all instances of the same backend.
+ */
@ApplicationScoped
public class PolarisStorageIntegrationProviderImpl implements
PolarisStorageIntegrationProvider {
- private final StsClientProvider stsClientProvider;
- private final Optional<AwsCredentialsProvider> stsCredentials;
- private final Supplier<GoogleCredentials> gcpCredsProvider;
- private final StorageConfiguration storageConfiguration;
- private final RealmConfig realmConfig;
+ private final PolarisDiagnostics diagnostics;
+ private final Function<AwsStorageConfigurationInfo,
AwsCredentialsStorageIntegration> awsFactory;
+ private final Function<GcpStorageConfigurationInfo,
GcpCredentialsStorageIntegration> gcpFactory;
+ private final Function<AzureStorageConfigurationInfo,
AzureCredentialsStorageIntegration>
+ azureFactory;
+
+ private final ConcurrentMap<PolarisStorageConfigurationInfo,
PolarisStorageIntegration>
+ integrationCache = new ConcurrentHashMap<>();
@SuppressWarnings("CdiInjectionPointsInspection")
@Inject
public PolarisStorageIntegrationProviderImpl(
StorageConfiguration storageConfiguration,
StsClientProvider stsClientProvider,
RealmConfig realmConfig,
- Clock clock) {
- this.storageConfiguration = storageConfiguration;
- this.stsClientProvider = stsClientProvider;
- this.stsCredentials = Optional.empty();
- this.gcpCredsProvider = storageConfiguration.gcpCredentialsSupplier(clock);
- this.realmConfig = realmConfig;
+ Clock clock,
+ StorageCredentialCache cache,
+ PolarisDiagnostics diagnostics) {
+ this.diagnostics = diagnostics;
+ this.awsFactory =
+ storageConfig ->
+ new AwsCredentialsStorageIntegration(
+ stsClientProvider,
+ config -> {
+ if (realmConfig.getConfig(
+
FeatureConfiguration.RESOLVE_CREDENTIALS_BY_STORAGE_NAME)) {
+ return Optional.of(
+
storageConfiguration.stsCredentials(config.getStorageName()));
+ }
+ return Optional.of(storageConfiguration.stsCredentials());
+ },
+ cache,
+ storageConfig,
+ realmConfig);
+ Supplier<GoogleCredentials> gcpCredsProvider =
+ storageConfiguration.gcpCredentialsSupplier(clock);
+ this.gcpFactory =
+ storageConfig ->
+ new GcpCredentialsStorageIntegration(
+ gcpCredsProvider.get(),
+ ServiceOptions.getFromServiceLoader(
+ HttpTransportFactory.class, NetHttpTransport::new),
+ cache,
+ storageConfig,
+ realmConfig);
+ this.azureFactory =
+ storageConfig -> new AzureCredentialsStorageIntegration(cache,
storageConfig, realmConfig);
}
public PolarisStorageIntegrationProviderImpl(
StsClientProvider stsClientProvider,
Optional<AwsCredentialsProvider> stsCredentials,
- Supplier<GoogleCredentials> gcpCredsProvider) {
- this.stsClientProvider = stsClientProvider;
- this.stsCredentials = stsCredentials;
- this.gcpCredsProvider = gcpCredsProvider;
- this.storageConfiguration = null;
- this.realmConfig = null;
+ Supplier<GoogleCredentials> gcpCredsProvider,
+ StorageCredentialCache cache,
+ RealmConfig realmConfig,
+ PolarisDiagnostics diagnostics) {
+ this.diagnostics = diagnostics;
+ this.awsFactory =
+ storageConfig ->
+ new AwsCredentialsStorageIntegration(
+ stsClientProvider, config -> stsCredentials, cache,
storageConfig, realmConfig);
+ this.gcpFactory =
+ storageConfig ->
+ new GcpCredentialsStorageIntegration(
+ gcpCredsProvider.get(),
+ ServiceOptions.getFromServiceLoader(
+ HttpTransportFactory.class, NetHttpTransport::new),
+ cache,
+ storageConfig,
+ realmConfig);
+ this.azureFactory =
+ storageConfig -> new AzureCredentialsStorageIntegration(cache,
storageConfig, realmConfig);
}
@Override
- @SuppressWarnings("unchecked")
- public <T extends PolarisStorageConfigurationInfo>
- @Nullable PolarisStorageIntegration<T> getStorageIntegrationForConfig(
- PolarisStorageConfigurationInfo polarisStorageConfigurationInfo) {
- if (polarisStorageConfigurationInfo == null) {
- return null;
- }
- PolarisStorageIntegration<T> storageIntegration;
- switch (polarisStorageConfigurationInfo.getStorageType()) {
- case S3:
- Optional<AwsCredentialsProvider> awsCreds = stsCredentials;
- if (awsCreds.isEmpty() && storageConfiguration != null) {
- if (realmConfig != null
- &&
realmConfig.getConfig(FeatureConfiguration.RESOLVE_CREDENTIALS_BY_STORAGE_NAME))
{
- awsCreds =
- Optional.of(
- storageConfiguration.stsCredentials(
- polarisStorageConfigurationInfo.getStorageName()));
- } else {
- awsCreds = Optional.of(storageConfiguration.stsCredentials());
- }
- }
- storageIntegration =
- (PolarisStorageIntegration<T>)
- new AwsCredentialsStorageIntegration(
- (AwsStorageConfigurationInfo)
polarisStorageConfigurationInfo,
- stsClientProvider,
- awsCreds);
- break;
- case GCS:
- storageIntegration =
- (PolarisStorageIntegration<T>)
- new GcpCredentialsStorageIntegration(
- (GcpStorageConfigurationInfo)
polarisStorageConfigurationInfo,
- gcpCredsProvider.get(),
- ServiceOptions.getFromServiceLoader(
- HttpTransportFactory.class, NetHttpTransport::new));
- break;
- case AZURE:
- storageIntegration =
- (PolarisStorageIntegration<T>)
- new AzureCredentialsStorageIntegration(
- (AzureStorageConfigurationInfo)
polarisStorageConfigurationInfo);
- break;
- case FILE:
- storageIntegration =
- new PolarisStorageIntegration<>((T)
polarisStorageConfigurationInfo, "file") {
- @Override
- public StorageAccessConfig getSubscopedCreds(
- @Nonnull RealmConfig realmConfig,
- boolean allowListOperation,
- @Nonnull Set<String> allowedReadLocations,
- @Nonnull Set<String> allowedWriteLocations,
- @Nonnull PolarisPrincipal polarisPrincipal,
- Optional<String> refreshCredentialsEndpoint,
- @Nonnull CredentialVendingContext credentialVendingContext) {
- // FILE storage does not support credential vending
- return
StorageAccessConfig.builder().supportsCredentialVending(false).build();
- }
+ public @Nullable PolarisStorageIntegration getStorageIntegration(
+ @Nonnull List<PolarisEntity> resolvedEntityPath) {
+ return
PolarisStorageConfigurationInfo.findStorageInfoFromHierarchy(resolvedEntityPath)
+ .map(entity ->
BaseMetaStoreManager.extractStorageConfiguration(diagnostics, entity))
+ .map(this::getOrCreateIntegration)
+ .orElse(null);
+ }
+
+ private PolarisStorageIntegration getOrCreateIntegration(
+ PolarisStorageConfigurationInfo storageConfig) {
+ return integrationCache.computeIfAbsent(storageConfig,
this::createIntegration);
Review Comment:
Is the `integrationCache` Map worth having? `CachingStorageIntegration`
sub-classes a light-weight objects. Creating them on the spot is probably
non-consequential.
The heavy part is the `StorageCredentialCache`, which is shared already.
##########
persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java:
##########
@@ -1251,31 +1250,23 @@ private List<PolarisPolicyMappingRecord>
fetchPolicyMappingRecords(
@Nullable
@Override
- public <T extends PolarisStorageConfigurationInfo>
- PolarisStorageIntegration<T> createStorageIntegration(
- @Nonnull PolarisCallContext callCtx,
- long catalogId,
- long entityId,
- PolarisStorageConfigurationInfo polarisStorageConfigurationInfo) {
- return storageIntegrationProvider.getStorageIntegrationForConfig(
- polarisStorageConfigurationInfo);
+ public PolarisStorageIntegration createStorageIntegration(
+ @Nonnull PolarisCallContext callCtx,
+ long catalogId,
+ long entityId,
+ PolarisStorageConfigurationInfo polarisStorageConfigurationInfo) {
+ // No-op in OSS: the storage integration is resolved at credential-vending
time via
+ //
PolarisStorageIntegrationProvider.getStorageIntegration(resolvedEntityPath).
This hook
+ // remains available for custom deployments that need to allocate/lease
external state
+ // atomically with the catalog-creation transaction.
+ return null;
}
@Override
- public <T extends PolarisStorageConfigurationInfo> void
persistStorageIntegrationIfNeeded(
+ public void persistStorageIntegrationIfNeeded(
@Nonnull PolarisCallContext callContext,
@Nonnull PolarisBaseEntity entity,
- @Nullable PolarisStorageIntegration<T> storageIntegration) {}
-
- @Nullable
- @Override
- public <T extends PolarisStorageConfigurationInfo>
- PolarisStorageIntegration<T> loadPolarisStorageIntegration(
- @Nonnull PolarisCallContext callContext, @Nonnull PolarisBaseEntity
entity) {
- PolarisStorageConfigurationInfo storageConfig =
- BaseMetaStoreManager.extractStorageConfiguration(diagnostics, entity);
- return
storageIntegrationProvider.getStorageIntegrationForConfig(storageConfig);
- }
+ @Nullable PolarisStorageIntegration storageIntegration) {}
Review Comment:
same here: let's throw an `UnsupportedOperationException` for clarity
##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/io/StorageAccessConfigProvider.java:
##########
@@ -154,68 +103,88 @@ public StorageAccessConfig getStorageAccessConfig(
return accessConfig;
}
- /**
- * Builds a credential vending context from the table identifier and
resolved path. This context
- * is used to populate session tags in cloud provider credentials for
audit/correlation purposes.
- *
- * <p>The activated roles are included in this context (rather than
extracted from
- * PolarisPrincipal during session tag generation) to ensure they are part
of the cache key when
- * session tags are enabled. This prevents false positive cache hits when a
principal's roles
- * change.
- *
- * @param tableIdentifier the table identifier containing namespace and
table name
- * @param resolvedPath the resolved entity path containing the catalog entity
- * @return a credential vending context with catalog, namespace, table, and
activated roles
- */
+ private StorageAccessConfig getStorageAccessConfig(
+ @Nonnull List<PolarisEntity> resolvedEntityPath,
+ @Nonnull Set<String> locations,
+ @Nonnull Set<PolarisStorageActions> storageActions,
+ @Nonnull Optional<String> refreshCredentialsEndpoint) {
+
+ boolean skipCredentialSubscopingIndirection =
+ callContext
+ .getRealmConfig()
+
.getConfig(FeatureConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION);
+ if (skipCredentialSubscopingIndirection) {
+ return StorageAccessConfig.builder().build();
Review Comment:
Should we use `.supportsCredentialVending(false)` here too?
##########
polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java:
##########
@@ -532,24 +530,16 @@ public void deletePrincipalSecretsInCurrentTxn(
/** {@inheritDoc} */
@Override
- public @Nullable <T extends PolarisStorageConfigurationInfo>
- PolarisStorageIntegration<T> createStorageIntegrationInCurrentTxn(
- @Nonnull PolarisCallContext callCtx,
- long catalogId,
- long entityId,
- PolarisStorageConfigurationInfo polarisStorageConfigurationInfo) {
- return storageIntegrationProvider.getStorageIntegrationForConfig(
- polarisStorageConfigurationInfo);
- }
-
- /** {@inheritDoc} */
- @Override
- public @Nullable <T extends PolarisStorageConfigurationInfo>
- PolarisStorageIntegration<T> loadPolarisStorageIntegrationInCurrentTxn(
- @Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity
entity) {
- PolarisStorageConfigurationInfo storageConfig =
- BaseMetaStoreManager.extractStorageConfiguration(getDiagnostics(),
entity);
- return
storageIntegrationProvider.getStorageIntegrationForConfig(storageConfig);
+ public @Nullable PolarisStorageIntegration
createStorageIntegrationInCurrentTxn(
+ @Nonnull PolarisCallContext callCtx,
+ long catalogId,
+ long entityId,
+ PolarisStorageConfigurationInfo polarisStorageConfigurationInfo) {
+ // No-op in OSS: the storage integration is resolved at credential-vending
time via
+ //
PolarisStorageIntegrationProvider.getStorageIntegration(resolvedEntityPath).
This hook
+ // remains available for custom deployments that need to allocate/lease
external state
+ // atomically with the catalog-creation transaction.
+ return null;
Review Comment:
let's throw here too... it will provide a clear indication for needing extra
work if new code changes touch this call path.
##########
runtime/admin/src/main/java/org/apache/polaris/admintool/config/AdminToolProducers.java:
##########
@@ -67,9 +66,9 @@ public PolarisStorageIntegrationProvider
storageIntegrationProvider() {
return new PolarisStorageIntegrationProvider() {
@Override
@Nullable
- public <T extends PolarisStorageConfigurationInfo>
- PolarisStorageIntegration<T> getStorageIntegrationForConfig(
- PolarisStorageConfigurationInfo polarisStorageConfigurationInfo)
{
+ public PolarisStorageIntegration getStorageIntegration(
+ @jakarta.annotation.Nonnull
+ java.util.List<org.apache.polaris.core.entity.PolarisEntity>
resolvedEntityPath) {
Review Comment:
nit: import
##########
polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java:
##########
@@ -95,94 +77,27 @@ private long maxCacheDurationMs(RealmConfig realmConfig) {
}
/**
- * Either get from the cache or generate a new entry for a scoped creds
+ * Get cached credentials or load new ones using the provided supplier.
*
- * @param storageCredentialsVendor the credential vendor used to generate a
new scoped creds if
- * needed
- * @param polarisEntity the polaris entity that is going to scoped creds
- * @param allowListOperation whether allow list action on the provided read
and write locations
- * @param allowedReadLocations a set of allowed to read locations
- * @param allowedWriteLocations a set of allowed to write locations.
- * @param polarisPrincipal the principal requesting credentials
- * @param refreshCredentialsEndpoint optional endpoint for credential refresh
- * @param credentialVendingContext context containing metadata for session
tags (catalog,
- * namespace, table, roles) for audit/correlation purposes
- * @return the a map of string containing the scoped creds information
+ * @param key the cache key
+ * @param realmConfig realm configuration for cache duration settings
+ * @param loader supplier that produces scoped credentials on cache miss;
may throw on error
+ * @return the storage access config with scoped credentials
*/
- public StorageAccessConfig getOrGenerateSubScopeCreds(
- @Nonnull StorageCredentialsVendor storageCredentialsVendor,
- @Nonnull PolarisEntity polarisEntity,
- boolean allowListOperation,
- @Nonnull Set<String> allowedReadLocations,
- @Nonnull Set<String> allowedWriteLocations,
- @Nonnull PolarisPrincipal polarisPrincipal,
- Optional<String> refreshCredentialsEndpoint,
- @Nonnull CredentialVendingContext credentialVendingContext) {
- RealmContext realmContext = storageCredentialsVendor.getRealmContext();
- RealmConfig realmConfig = storageCredentialsVendor.getRealmConfig();
- if (!isTypeSupported(polarisEntity.getType())) {
- diagnostics.fail(
- "entity_type_not_suppported_to_scope_creds", "type={}",
polarisEntity.getType());
- }
-
- boolean includePrincipalNameInSubscopedCredential =
-
realmConfig.getConfig(FeatureConfiguration.INCLUDE_PRINCIPAL_NAME_IN_SUBSCOPED_CREDENTIAL);
- boolean includeSessionTags =
-
realmConfig.getConfig(FeatureConfiguration.INCLUDE_SESSION_TAGS_IN_SUBSCOPED_CREDENTIAL);
-
- // When session tags are enabled, the cache key needs to include:
- // 1. The credential vending context to avoid returning cached credentials
with different
- // session tags (catalog/namespace/table/roles/traceId)
- // 2. The principal, because the polaris:principal session tag is included
in AWS credentials
- // and we must not serve credentials tagged for principal A to
principal B
- // When session tags are disabled, we only include principal if explicitly
configured.
- //
- // Note: The trace ID is controlled at the source
(StorageAccessConfigProvider). When
- // INCLUDE_TRACE_ID_IN_SESSION_TAGS is disabled, the context's traceId is
left empty,
- // which allows efficient caching across requests with different trace IDs.
- boolean includePrincipalInCacheKey =
- includePrincipalNameInSubscopedCredential || includeSessionTags;
- // When session tags are disabled, use empty context to ensure consistent
cache key behavior
- CredentialVendingContext contextForCacheKey =
- includeSessionTags ? credentialVendingContext :
CredentialVendingContext.empty();
- StorageCredentialCacheKey key =
- StorageCredentialCacheKey.of(
- realmContext.getRealmIdentifier(),
- polarisEntity,
- allowListOperation,
- allowedReadLocations,
- allowedWriteLocations,
- refreshCredentialsEndpoint,
- includePrincipalInCacheKey ? Optional.of(polarisPrincipal) :
Optional.empty(),
- contextForCacheKey);
- Function<StorageCredentialCacheKey, StorageCredentialCacheEntry> loader =
- k -> {
- LOGGER.atDebug().log("StorageCredentialCache::load");
- // Use credentialVendingContext from the cache key for correctness.
- // This ensures we use the same context that was used for cache key
comparison.
- ScopedCredentialsResult scopedCredentialsResult =
- storageCredentialsVendor.getSubscopedCredsForEntity(
- polarisEntity,
- allowListOperation,
- allowedReadLocations,
- allowedWriteLocations,
- polarisPrincipal,
- refreshCredentialsEndpoint,
- k.credentialVendingContext());
- if (scopedCredentialsResult.isSuccess()) {
- long maxCacheDurationMs = maxCacheDurationMs(realmConfig);
- return new StorageCredentialCacheEntry(
- scopedCredentialsResult.getStorageAccessConfig(),
maxCacheDurationMs);
- }
- LOGGER
- .atDebug()
- .addKeyValue("errorMessage",
scopedCredentialsResult.getExtraInformation())
- .log("Failed to get subscoped credentials");
- throw new UnprocessableEntityException(
- "Failed to get subscoped credentials: %s",
- scopedCredentialsResult.getExtraInformation());
- };
- return cache.get(key, loader).toAccessConfig();
+ public StorageAccessConfig getOrLoad(
+ StorageCredentialCacheKey key,
+ RealmConfig realmConfig,
+ Supplier<StorageAccessConfig> loader) {
+ long maxCacheDurationMs = maxCacheDurationMs(realmConfig);
+ return cache
+ .get(
Review Comment:
I believe this comment thread still applies... WDYT?
##########
polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java:
##########
@@ -95,94 +77,27 @@ private long maxCacheDurationMs(RealmConfig realmConfig) {
}
/**
- * Either get from the cache or generate a new entry for a scoped creds
+ * Get cached credentials or load new ones using the provided supplier.
*
- * @param storageCredentialsVendor the credential vendor used to generate a
new scoped creds if
- * needed
- * @param polarisEntity the polaris entity that is going to scoped creds
- * @param allowListOperation whether allow list action on the provided read
and write locations
- * @param allowedReadLocations a set of allowed to read locations
- * @param allowedWriteLocations a set of allowed to write locations.
- * @param polarisPrincipal the principal requesting credentials
- * @param refreshCredentialsEndpoint optional endpoint for credential refresh
- * @param credentialVendingContext context containing metadata for session
tags (catalog,
- * namespace, table, roles) for audit/correlation purposes
- * @return the a map of string containing the scoped creds information
+ * @param key the cache key
+ * @param realmConfig realm configuration for cache duration settings
+ * @param loader supplier that produces scoped credentials on cache miss;
may throw on error
+ * @return the storage access config with scoped credentials
*/
- public StorageAccessConfig getOrGenerateSubScopeCreds(
- @Nonnull StorageCredentialsVendor storageCredentialsVendor,
- @Nonnull PolarisEntity polarisEntity,
- boolean allowListOperation,
- @Nonnull Set<String> allowedReadLocations,
- @Nonnull Set<String> allowedWriteLocations,
- @Nonnull PolarisPrincipal polarisPrincipal,
- Optional<String> refreshCredentialsEndpoint,
- @Nonnull CredentialVendingContext credentialVendingContext) {
- RealmContext realmContext = storageCredentialsVendor.getRealmContext();
- RealmConfig realmConfig = storageCredentialsVendor.getRealmConfig();
- if (!isTypeSupported(polarisEntity.getType())) {
- diagnostics.fail(
- "entity_type_not_suppported_to_scope_creds", "type={}",
polarisEntity.getType());
- }
-
- boolean includePrincipalNameInSubscopedCredential =
-
realmConfig.getConfig(FeatureConfiguration.INCLUDE_PRINCIPAL_NAME_IN_SUBSCOPED_CREDENTIAL);
- boolean includeSessionTags =
-
realmConfig.getConfig(FeatureConfiguration.INCLUDE_SESSION_TAGS_IN_SUBSCOPED_CREDENTIAL);
-
- // When session tags are enabled, the cache key needs to include:
- // 1. The credential vending context to avoid returning cached credentials
with different
- // session tags (catalog/namespace/table/roles/traceId)
- // 2. The principal, because the polaris:principal session tag is included
in AWS credentials
- // and we must not serve credentials tagged for principal A to
principal B
- // When session tags are disabled, we only include principal if explicitly
configured.
- //
- // Note: The trace ID is controlled at the source
(StorageAccessConfigProvider). When
- // INCLUDE_TRACE_ID_IN_SESSION_TAGS is disabled, the context's traceId is
left empty,
- // which allows efficient caching across requests with different trace IDs.
- boolean includePrincipalInCacheKey =
- includePrincipalNameInSubscopedCredential || includeSessionTags;
- // When session tags are disabled, use empty context to ensure consistent
cache key behavior
- CredentialVendingContext contextForCacheKey =
- includeSessionTags ? credentialVendingContext :
CredentialVendingContext.empty();
- StorageCredentialCacheKey key =
- StorageCredentialCacheKey.of(
- realmContext.getRealmIdentifier(),
- polarisEntity,
- allowListOperation,
- allowedReadLocations,
- allowedWriteLocations,
- refreshCredentialsEndpoint,
- includePrincipalInCacheKey ? Optional.of(polarisPrincipal) :
Optional.empty(),
- contextForCacheKey);
- Function<StorageCredentialCacheKey, StorageCredentialCacheEntry> loader =
- k -> {
- LOGGER.atDebug().log("StorageCredentialCache::load");
- // Use credentialVendingContext from the cache key for correctness.
- // This ensures we use the same context that was used for cache key
comparison.
- ScopedCredentialsResult scopedCredentialsResult =
- storageCredentialsVendor.getSubscopedCredsForEntity(
- polarisEntity,
- allowListOperation,
- allowedReadLocations,
- allowedWriteLocations,
- polarisPrincipal,
- refreshCredentialsEndpoint,
- k.credentialVendingContext());
- if (scopedCredentialsResult.isSuccess()) {
- long maxCacheDurationMs = maxCacheDurationMs(realmConfig);
- return new StorageCredentialCacheEntry(
- scopedCredentialsResult.getStorageAccessConfig(),
maxCacheDurationMs);
- }
- LOGGER
- .atDebug()
- .addKeyValue("errorMessage",
scopedCredentialsResult.getExtraInformation())
- .log("Failed to get subscoped credentials");
- throw new UnprocessableEntityException(
- "Failed to get subscoped credentials: %s",
- scopedCredentialsResult.getExtraInformation());
- };
- return cache.get(key, loader).toAccessConfig();
+ public StorageAccessConfig getOrLoad(
+ StorageCredentialCacheKey key,
+ RealmConfig realmConfig,
+ Supplier<StorageAccessConfig> loader) {
+ long maxCacheDurationMs = maxCacheDurationMs(realmConfig);
+ return cache
+ .get(
+ key,
+ k -> {
+ LOGGER.atDebug().log("StorageCredentialCache::load");
+ StorageAccessConfig accessConfig = loader.get();
Review Comment:
ping :)
##########
runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogRustFSSpecialIT.java:
##########
@@ -383,7 +383,6 @@ private void
assertLoadTableWithVendedCredentialsFailsWithKmsError(TableIdentifi
id,
"ALL",
Map.of("X-Iceberg-Access-Delegation",
VENDED_CREDENTIALS.protocolValue())))
- .hasMessageContaining("Failed to get subscoped credentials")
Review Comment:
Can we assert something else to be sure the error comes from where it is
expected to come?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]