tokoko commented on code in PR #3699:
URL: https://github.com/apache/polaris/pull/3699#discussion_r3242525062
##########
runtime/service/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java:
##########
@@ -45,120 +49,136 @@
import org.apache.polaris.core.storage.aws.StsClientProvider;
import
org.apache.polaris.core.storage.azure.AzureCredentialsStorageIntegration;
import org.apache.polaris.core.storage.azure.AzureStorageConfigurationInfo;
+import org.apache.polaris.core.storage.cache.StorageCredentialCache;
import org.apache.polaris.core.storage.gcp.GcpCredentialsStorageIntegration;
import org.apache.polaris.core.storage.gcp.GcpStorageConfigurationInfo;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+/**
+ * Provider that returns a {@link PolarisStorageIntegration} for a resolved
entity path.
+ *
+ * <p>Integration instances are bound to a specific {@link
PolarisStorageConfigurationInfo} and
+ * cached per-config in an internal map: every unique config gets exactly one
integration instance
+ * for the lifetime of the provider. Construction is driven by a
per-storage-type factory so we can
+ * still share expensive request-scope state (STS client provider, GCP
credentials supplier, etc.)
+ * across all instances of the same backend.
+ */
@ApplicationScoped
public class PolarisStorageIntegrationProviderImpl implements
PolarisStorageIntegrationProvider {
- private final StsClientProvider stsClientProvider;
- private final Optional<AwsCredentialsProvider> stsCredentials;
- private final Supplier<GoogleCredentials> gcpCredsProvider;
- private final StorageConfiguration storageConfiguration;
- private final RealmConfig realmConfig;
+ private final PolarisDiagnostics diagnostics;
+ private final Function<AwsStorageConfigurationInfo,
AwsCredentialsStorageIntegration> awsFactory;
+ private final Function<GcpStorageConfigurationInfo,
GcpCredentialsStorageIntegration> gcpFactory;
+ private final Function<AzureStorageConfigurationInfo,
AzureCredentialsStorageIntegration>
+ azureFactory;
+
+ private final ConcurrentMap<PolarisStorageConfigurationInfo,
PolarisStorageIntegration>
+ integrationCache = new ConcurrentHashMap<>();
@SuppressWarnings("CdiInjectionPointsInspection")
@Inject
public PolarisStorageIntegrationProviderImpl(
StorageConfiguration storageConfiguration,
StsClientProvider stsClientProvider,
RealmConfig realmConfig,
- Clock clock) {
- this.storageConfiguration = storageConfiguration;
- this.stsClientProvider = stsClientProvider;
- this.stsCredentials = Optional.empty();
- this.gcpCredsProvider = storageConfiguration.gcpCredentialsSupplier(clock);
- this.realmConfig = realmConfig;
+ Clock clock,
+ StorageCredentialCache cache,
+ PolarisDiagnostics diagnostics) {
+ this.diagnostics = diagnostics;
+ this.awsFactory =
+ storageConfig ->
+ new AwsCredentialsStorageIntegration(
+ stsClientProvider,
+ config -> {
+ if (realmConfig.getConfig(
+
FeatureConfiguration.RESOLVE_CREDENTIALS_BY_STORAGE_NAME)) {
+ return Optional.of(
+
storageConfiguration.stsCredentials(config.getStorageName()));
+ }
+ return Optional.of(storageConfiguration.stsCredentials());
+ },
+ cache,
+ storageConfig,
+ realmConfig);
+ Supplier<GoogleCredentials> gcpCredsProvider =
+ storageConfiguration.gcpCredentialsSupplier(clock);
+ this.gcpFactory =
+ storageConfig ->
+ new GcpCredentialsStorageIntegration(
+ gcpCredsProvider.get(),
+ ServiceOptions.getFromServiceLoader(
+ HttpTransportFactory.class, NetHttpTransport::new),
+ cache,
+ storageConfig,
+ realmConfig);
+ this.azureFactory =
+ storageConfig -> new AzureCredentialsStorageIntegration(cache,
storageConfig, realmConfig);
}
public PolarisStorageIntegrationProviderImpl(
StsClientProvider stsClientProvider,
Optional<AwsCredentialsProvider> stsCredentials,
- Supplier<GoogleCredentials> gcpCredsProvider) {
- this.stsClientProvider = stsClientProvider;
- this.stsCredentials = stsCredentials;
- this.gcpCredsProvider = gcpCredsProvider;
- this.storageConfiguration = null;
- this.realmConfig = null;
+ Supplier<GoogleCredentials> gcpCredsProvider,
+ StorageCredentialCache cache,
+ RealmConfig realmConfig,
+ PolarisDiagnostics diagnostics) {
+ this.diagnostics = diagnostics;
+ this.awsFactory =
+ storageConfig ->
+ new AwsCredentialsStorageIntegration(
+ stsClientProvider, config -> stsCredentials, cache,
storageConfig, realmConfig);
+ this.gcpFactory =
+ storageConfig ->
+ new GcpCredentialsStorageIntegration(
+ gcpCredsProvider.get(),
+ ServiceOptions.getFromServiceLoader(
+ HttpTransportFactory.class, NetHttpTransport::new),
+ cache,
+ storageConfig,
+ realmConfig);
+ this.azureFactory =
+ storageConfig -> new AzureCredentialsStorageIntegration(cache,
storageConfig, realmConfig);
}
@Override
- @SuppressWarnings("unchecked")
- public <T extends PolarisStorageConfigurationInfo>
- @Nullable PolarisStorageIntegration<T> getStorageIntegrationForConfig(
- PolarisStorageConfigurationInfo polarisStorageConfigurationInfo) {
- if (polarisStorageConfigurationInfo == null) {
- return null;
- }
- PolarisStorageIntegration<T> storageIntegration;
- switch (polarisStorageConfigurationInfo.getStorageType()) {
- case S3:
- Optional<AwsCredentialsProvider> awsCreds = stsCredentials;
- if (awsCreds.isEmpty() && storageConfiguration != null) {
- if (realmConfig != null
- &&
realmConfig.getConfig(FeatureConfiguration.RESOLVE_CREDENTIALS_BY_STORAGE_NAME))
{
- awsCreds =
- Optional.of(
- storageConfiguration.stsCredentials(
- polarisStorageConfigurationInfo.getStorageName()));
- } else {
- awsCreds = Optional.of(storageConfiguration.stsCredentials());
- }
- }
- storageIntegration =
- (PolarisStorageIntegration<T>)
- new AwsCredentialsStorageIntegration(
- (AwsStorageConfigurationInfo)
polarisStorageConfigurationInfo,
- stsClientProvider,
- awsCreds);
- break;
- case GCS:
- storageIntegration =
- (PolarisStorageIntegration<T>)
- new GcpCredentialsStorageIntegration(
- (GcpStorageConfigurationInfo)
polarisStorageConfigurationInfo,
- gcpCredsProvider.get(),
- ServiceOptions.getFromServiceLoader(
- HttpTransportFactory.class, NetHttpTransport::new));
- break;
- case AZURE:
- storageIntegration =
- (PolarisStorageIntegration<T>)
- new AzureCredentialsStorageIntegration(
- (AzureStorageConfigurationInfo)
polarisStorageConfigurationInfo);
- break;
- case FILE:
- storageIntegration =
- new PolarisStorageIntegration<>((T)
polarisStorageConfigurationInfo, "file") {
- @Override
- public StorageAccessConfig getSubscopedCreds(
- @Nonnull RealmConfig realmConfig,
- boolean allowListOperation,
- @Nonnull Set<String> allowedReadLocations,
- @Nonnull Set<String> allowedWriteLocations,
- @Nonnull PolarisPrincipal polarisPrincipal,
- Optional<String> refreshCredentialsEndpoint,
- @Nonnull CredentialVendingContext credentialVendingContext) {
- // FILE storage does not support credential vending
- return
StorageAccessConfig.builder().supportsCredentialVending(false).build();
- }
+ public @Nullable PolarisStorageIntegration getStorageIntegration(
+ @Nonnull List<PolarisEntity> resolvedEntityPath) {
+ return
PolarisStorageConfigurationInfo.findStorageInfoFromHierarchy(resolvedEntityPath)
+ .map(entity ->
BaseMetaStoreManager.extractStorageConfiguration(diagnostics, entity))
+ .map(this::getOrCreateIntegration)
+ .orElse(null);
+ }
+
+ private PolarisStorageIntegration getOrCreateIntegration(
+ PolarisStorageConfigurationInfo storageConfig) {
+ return integrationCache.computeIfAbsent(storageConfig,
this::createIntegration);
Review Comment:
you're probably right. I'm not sure about
`ServiceOptions.getFromServiceLoader(HttpTransportFactory.class,
NetHttpTransport::new)` in gcp branch though. I can move it out of the
constructor though.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]