tokoko commented on code in PR #3699:
URL: https://github.com/apache/polaris/pull/3699#discussion_r3242525062


##########
runtime/service/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java:
##########
@@ -45,120 +49,136 @@
 import org.apache.polaris.core.storage.aws.StsClientProvider;
 import 
org.apache.polaris.core.storage.azure.AzureCredentialsStorageIntegration;
 import org.apache.polaris.core.storage.azure.AzureStorageConfigurationInfo;
+import org.apache.polaris.core.storage.cache.StorageCredentialCache;
 import org.apache.polaris.core.storage.gcp.GcpCredentialsStorageIntegration;
 import org.apache.polaris.core.storage.gcp.GcpStorageConfigurationInfo;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 
+/**
+ * Provider that returns a {@link PolarisStorageIntegration} for a resolved 
entity path.
+ *
+ * <p>Integration instances are bound to a specific {@link 
PolarisStorageConfigurationInfo} and
+ * cached per-config in an internal map: every unique config gets exactly one 
integration instance
+ * for the lifetime of the provider. Construction is driven by a 
per-storage-type factory so we can
+ * still share expensive request-scope state (STS client provider, GCP 
credentials supplier, etc.)
+ * across all instances of the same backend.
+ */
 @ApplicationScoped
 public class PolarisStorageIntegrationProviderImpl implements 
PolarisStorageIntegrationProvider {
 
-  private final StsClientProvider stsClientProvider;
-  private final Optional<AwsCredentialsProvider> stsCredentials;
-  private final Supplier<GoogleCredentials> gcpCredsProvider;
-  private final StorageConfiguration storageConfiguration;
-  private final RealmConfig realmConfig;
+  private final PolarisDiagnostics diagnostics;
+  private final Function<AwsStorageConfigurationInfo, 
AwsCredentialsStorageIntegration> awsFactory;
+  private final Function<GcpStorageConfigurationInfo, 
GcpCredentialsStorageIntegration> gcpFactory;
+  private final Function<AzureStorageConfigurationInfo, 
AzureCredentialsStorageIntegration>
+      azureFactory;
+
+  private final ConcurrentMap<PolarisStorageConfigurationInfo, 
PolarisStorageIntegration>
+      integrationCache = new ConcurrentHashMap<>();
 
   @SuppressWarnings("CdiInjectionPointsInspection")
   @Inject
   public PolarisStorageIntegrationProviderImpl(
       StorageConfiguration storageConfiguration,
       StsClientProvider stsClientProvider,
       RealmConfig realmConfig,
-      Clock clock) {
-    this.storageConfiguration = storageConfiguration;
-    this.stsClientProvider = stsClientProvider;
-    this.stsCredentials = Optional.empty();
-    this.gcpCredsProvider = storageConfiguration.gcpCredentialsSupplier(clock);
-    this.realmConfig = realmConfig;
+      Clock clock,
+      StorageCredentialCache cache,
+      PolarisDiagnostics diagnostics) {
+    this.diagnostics = diagnostics;
+    this.awsFactory =
+        storageConfig ->
+            new AwsCredentialsStorageIntegration(
+                stsClientProvider,
+                config -> {
+                  if (realmConfig.getConfig(
+                      
FeatureConfiguration.RESOLVE_CREDENTIALS_BY_STORAGE_NAME)) {
+                    return Optional.of(
+                        
storageConfiguration.stsCredentials(config.getStorageName()));
+                  }
+                  return Optional.of(storageConfiguration.stsCredentials());
+                },
+                cache,
+                storageConfig,
+                realmConfig);
+    Supplier<GoogleCredentials> gcpCredsProvider =
+        storageConfiguration.gcpCredentialsSupplier(clock);
+    this.gcpFactory =
+        storageConfig ->
+            new GcpCredentialsStorageIntegration(
+                gcpCredsProvider.get(),
+                ServiceOptions.getFromServiceLoader(
+                    HttpTransportFactory.class, NetHttpTransport::new),
+                cache,
+                storageConfig,
+                realmConfig);
+    this.azureFactory =
+        storageConfig -> new AzureCredentialsStorageIntegration(cache, 
storageConfig, realmConfig);
   }
 
   public PolarisStorageIntegrationProviderImpl(
       StsClientProvider stsClientProvider,
       Optional<AwsCredentialsProvider> stsCredentials,
-      Supplier<GoogleCredentials> gcpCredsProvider) {
-    this.stsClientProvider = stsClientProvider;
-    this.stsCredentials = stsCredentials;
-    this.gcpCredsProvider = gcpCredsProvider;
-    this.storageConfiguration = null;
-    this.realmConfig = null;
+      Supplier<GoogleCredentials> gcpCredsProvider,
+      StorageCredentialCache cache,
+      RealmConfig realmConfig,
+      PolarisDiagnostics diagnostics) {
+    this.diagnostics = diagnostics;
+    this.awsFactory =
+        storageConfig ->
+            new AwsCredentialsStorageIntegration(
+                stsClientProvider, config -> stsCredentials, cache, 
storageConfig, realmConfig);
+    this.gcpFactory =
+        storageConfig ->
+            new GcpCredentialsStorageIntegration(
+                gcpCredsProvider.get(),
+                ServiceOptions.getFromServiceLoader(
+                    HttpTransportFactory.class, NetHttpTransport::new),
+                cache,
+                storageConfig,
+                realmConfig);
+    this.azureFactory =
+        storageConfig -> new AzureCredentialsStorageIntegration(cache, 
storageConfig, realmConfig);
   }
 
   @Override
-  @SuppressWarnings("unchecked")
-  public <T extends PolarisStorageConfigurationInfo>
-      @Nullable PolarisStorageIntegration<T> getStorageIntegrationForConfig(
-          PolarisStorageConfigurationInfo polarisStorageConfigurationInfo) {
-    if (polarisStorageConfigurationInfo == null) {
-      return null;
-    }
-    PolarisStorageIntegration<T> storageIntegration;
-    switch (polarisStorageConfigurationInfo.getStorageType()) {
-      case S3:
-        Optional<AwsCredentialsProvider> awsCreds = stsCredentials;
-        if (awsCreds.isEmpty() && storageConfiguration != null) {
-          if (realmConfig != null
-              && 
realmConfig.getConfig(FeatureConfiguration.RESOLVE_CREDENTIALS_BY_STORAGE_NAME))
 {
-            awsCreds =
-                Optional.of(
-                    storageConfiguration.stsCredentials(
-                        polarisStorageConfigurationInfo.getStorageName()));
-          } else {
-            awsCreds = Optional.of(storageConfiguration.stsCredentials());
-          }
-        }
-        storageIntegration =
-            (PolarisStorageIntegration<T>)
-                new AwsCredentialsStorageIntegration(
-                    (AwsStorageConfigurationInfo) 
polarisStorageConfigurationInfo,
-                    stsClientProvider,
-                    awsCreds);
-        break;
-      case GCS:
-        storageIntegration =
-            (PolarisStorageIntegration<T>)
-                new GcpCredentialsStorageIntegration(
-                    (GcpStorageConfigurationInfo) 
polarisStorageConfigurationInfo,
-                    gcpCredsProvider.get(),
-                    ServiceOptions.getFromServiceLoader(
-                        HttpTransportFactory.class, NetHttpTransport::new));
-        break;
-      case AZURE:
-        storageIntegration =
-            (PolarisStorageIntegration<T>)
-                new AzureCredentialsStorageIntegration(
-                    (AzureStorageConfigurationInfo) 
polarisStorageConfigurationInfo);
-        break;
-      case FILE:
-        storageIntegration =
-            new PolarisStorageIntegration<>((T) 
polarisStorageConfigurationInfo, "file") {
-              @Override
-              public StorageAccessConfig getSubscopedCreds(
-                  @Nonnull RealmConfig realmConfig,
-                  boolean allowListOperation,
-                  @Nonnull Set<String> allowedReadLocations,
-                  @Nonnull Set<String> allowedWriteLocations,
-                  @Nonnull PolarisPrincipal polarisPrincipal,
-                  Optional<String> refreshCredentialsEndpoint,
-                  @Nonnull CredentialVendingContext credentialVendingContext) {
-                // FILE storage does not support credential vending
-                return 
StorageAccessConfig.builder().supportsCredentialVending(false).build();
-              }
+  public @Nullable PolarisStorageIntegration getStorageIntegration(
+      @Nonnull List<PolarisEntity> resolvedEntityPath) {
+    return 
PolarisStorageConfigurationInfo.findStorageInfoFromHierarchy(resolvedEntityPath)
+        .map(entity -> 
BaseMetaStoreManager.extractStorageConfiguration(diagnostics, entity))
+        .map(this::getOrCreateIntegration)
+        .orElse(null);
+  }
+
+  private PolarisStorageIntegration getOrCreateIntegration(
+      PolarisStorageConfigurationInfo storageConfig) {
+    return integrationCache.computeIfAbsent(storageConfig, 
this::createIntegration);

Review Comment:
   you're probably right. I'm not sure about 
`ServiceOptions.getFromServiceLoader(HttpTransportFactory.class, 
NetHttpTransport::new)` in gcp branch though. I can move it out of the 
constructor though. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to