This is an automated email from the ASF dual-hosted git repository.

dimas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new b03bc2274 feat: storage-scoped aws credentials (#3409)
b03bc2274 is described below

commit b03bc2274ff96aca3cc184cf407aab1385cae185
Author: Tornike Gurgenidze <[email protected]>
AuthorDate: Tue Feb 24 03:06:29 2026 +0400

    feat: storage-scoped aws credentials (#3409)
    
    * feat: storage-scoped aws credentials
    
    * feat: add storageName config to storageConfiguration, add feature flag
    
    * fallback to default sts credentials
---
 CHANGELOG.md                                       |   1 +
 .../test/PolarisPolicyServiceIntegrationTest.java  |   2 +-
 .../polaris/core/config/FeatureConfiguration.java  |  10 ++
 .../apache/polaris/core/entity/CatalogEntity.java  |  18 ++-
 .../storage/PolarisStorageConfigurationInfo.java   |   4 +
 .../PolarisStorageConfigurationInfoTest.java       |   8 ++
 .../PolarisStorageIntegrationProviderImpl.java     |  33 ++++-
 .../service/storage/StorageConfiguration.java      |  67 +++++++--
 .../iceberg/AbstractIcebergCatalogViewTest.java    |   2 +-
 .../polaris/service/entity/CatalogEntityTest.java  |   1 +
 .../service/storage/StorageConfigurationTest.java  | 158 +++++++++++++++++++--
 spec/polaris-management-service.yml                |   3 +
 12 files changed, 277 insertions(+), 30 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 3b790849a..7fad73b9f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -57,6 +57,7 @@ request adding CHANGELOG notes for breaking (!) changes and 
possibly other secti
   feature is enabled in Azure.
 - Relaxed `client_id`, `client_secret` regex/pattern validation on reset 
endpoint call
 - Added support for S3-compatible storage that does not have KMS (use 
`kmsUavailable: true` in catalog storage configuration)
+- Added support for storage-scoped AWS credentials, allowing different AWS 
credentials to be configured per named storage. Enable with the 
`RESOLVE_CREDENTIALS_BY_STORAGE_NAME` feature flag (default: false). Storage 
names can be set explicitly via the `storageName` field on storage 
configuration, or inferred from the first allowed location's host.
 
 ### Changes
 
diff --git 
a/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisPolicyServiceIntegrationTest.java
 
b/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisPolicyServiceIntegrationTest.java
index b0392967e..9163dff98 100644
--- 
a/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisPolicyServiceIntegrationTest.java
+++ 
b/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisPolicyServiceIntegrationTest.java
@@ -200,7 +200,7 @@ public class PolarisPolicyServiceIntegrationTest {
             .setStorageConfigInfo(
                 s3BucketBase.getScheme().equals("file")
                     ? new FileStorageConfigInfo(
-                        StorageConfigInfo.StorageTypeEnum.FILE, 
List.of("file://"))
+                        StorageConfigInfo.StorageTypeEnum.FILE, 
List.of("file://"), null)
                     : awsConfigModel)
             .build();
 
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
index 583361e4e..c3083e310 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
@@ -532,4 +532,14 @@ public class FeatureConfiguration<T> extends 
PolarisConfiguration<T> {
           .description("Metadata batch size for tasks that clean up dropped 
tables' files.")
           .defaultValue(10)
           .buildFeatureConfiguration();
+
+  public static final FeatureConfiguration<Boolean> 
RESOLVE_CREDENTIALS_BY_STORAGE_NAME =
+      PolarisConfiguration.<Boolean>builder()
+          .key("RESOLVE_CREDENTIALS_BY_STORAGE_NAME")
+          .description(
+              "If set to true, resolve AWS credentials based on the 
storageName field "
+                  + "of the storage configuration. "
+                  + "When disabled, the default AWS credentials are used for 
all storages.")
+          .defaultValue(false)
+          .buildFeatureConfiguration();
 }
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/entity/CatalogEntity.java 
b/polaris-core/src/main/java/org/apache/polaris/core/entity/CatalogEntity.java
index afbe226de..51b4be903 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/entity/CatalogEntity.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/entity/CatalogEntity.java
@@ -164,6 +164,7 @@ public class CatalogEntity extends PolarisEntity implements 
LocationBasedEntity
             .setAllowedKmsKeys(awsConfig.getAllowedKmsKeys())
             .setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
             .setAllowedLocations(awsConfig.getAllowedLocations())
+            .setStorageName(awsConfig.getStorageName())
             .setRegion(awsConfig.getRegion())
             .setEndpoint(awsConfig.getEndpoint())
             .setStsEndpoint(awsConfig.getStsEndpoint())
@@ -180,6 +181,7 @@ public class CatalogEntity extends PolarisEntity implements 
LocationBasedEntity
             .setConsentUrl(azureConfig.getConsentUrl())
             .setStorageType(AZURE)
             .setAllowedLocations(azureConfig.getAllowedLocations())
+            .setStorageName(azureConfig.getStorageName())
             .setHierarchical(azureConfig.isHierarchical())
             .build();
       }
@@ -188,11 +190,15 @@ public class CatalogEntity extends PolarisEntity 
implements LocationBasedEntity
             .setGcsServiceAccount(gcpConfigModel.getGcpServiceAccount())
             .setStorageType(StorageConfigInfo.StorageTypeEnum.GCS)
             .setAllowedLocations(gcpConfigModel.getAllowedLocations())
+            .setStorageName(gcpConfigModel.getStorageName())
             .build();
       }
       if (configInfo instanceof FileStorageConfigurationInfo fileConfigModel) {
-        return new FileStorageConfigInfo(
-            StorageConfigInfo.StorageTypeEnum.FILE, 
fileConfigModel.getAllowedLocations());
+        return FileStorageConfigInfo.builder()
+            .setStorageType(StorageConfigInfo.StorageTypeEnum.FILE)
+            .setAllowedLocations(fileConfigModel.getAllowedLocations())
+            .setStorageName(fileConfigModel.getStorageName())
+            .build();
       }
       return null;
     }
@@ -307,6 +313,7 @@ public class CatalogEntity extends PolarisEntity implements 
LocationBasedEntity
             AwsStorageConfigurationInfo awsConfig =
                 AwsStorageConfigurationInfo.builder()
                     .allowedLocations(allowedLocations)
+                    .storageName(storageConfigModel.getStorageName())
                     .roleARN(awsConfigModel.getRoleArn())
                     .currentKmsKey(awsConfigModel.getCurrentKmsKey())
                     .allowedKmsKeys(awsConfigModel.getAllowedKmsKeys())
@@ -326,6 +333,7 @@ public class CatalogEntity extends PolarisEntity implements 
LocationBasedEntity
             config =
                 AzureStorageConfigurationInfo.builder()
                     .allowedLocations(allowedLocations)
+                    .storageName(storageConfigModel.getStorageName())
                     .tenantId(azureConfigModel.getTenantId())
                     
.multiTenantAppName(azureConfigModel.getMultiTenantAppName())
                     .consentUrl(azureConfigModel.getConsentUrl())
@@ -336,13 +344,17 @@ public class CatalogEntity extends PolarisEntity 
implements LocationBasedEntity
             config =
                 GcpStorageConfigurationInfo.builder()
                     .allowedLocations(allowedLocations)
+                    .storageName(storageConfigModel.getStorageName())
                     .gcpServiceAccount(
                         ((GcpStorageConfigInfo) 
storageConfigModel).getGcsServiceAccount())
                     .build();
             break;
           case FILE:
             config =
-                
FileStorageConfigurationInfo.builder().allowedLocations(allowedLocations).build();
+                FileStorageConfigurationInfo.builder()
+                    .allowedLocations(allowedLocations)
+                    .storageName(storageConfigModel.getStorageName())
+                    .build();
             break;
           default:
             throw new IllegalStateException(
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java
index 930da1fb6..63e8da2fd 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java
@@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.json.JsonMapper;
 import jakarta.annotation.Nonnull;
+import jakarta.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -83,6 +84,9 @@ public abstract class PolarisStorageConfigurationInfo {
 
   public abstract List<String> getAllowedLocations();
 
+  @Nullable
+  public abstract String getStorageName();
+
   public abstract StorageType getStorageType();
 
   private static final ObjectMapper DEFAULT_MAPPER;
diff --git 
a/polaris-core/src/test/java/org/apache/polaris/service/storage/PolarisStorageConfigurationInfoTest.java
 
b/polaris-core/src/test/java/org/apache/polaris/service/storage/PolarisStorageConfigurationInfoTest.java
index 35d4ed809..9c9628990 100644
--- 
a/polaris-core/src/test/java/org/apache/polaris/service/storage/PolarisStorageConfigurationInfoTest.java
+++ 
b/polaris-core/src/test/java/org/apache/polaris/service/storage/PolarisStorageConfigurationInfoTest.java
@@ -96,6 +96,14 @@ public class PolarisStorageConfigurationInfoTest {
                 .pathStyleAccess(true)
                 .build(),
             
"{\"@type\":\"AwsStorageConfigurationInfo\",\"storageType\":\"S3\",\"allowedLocations\":[\"s3://foo/bar\",\"s3://no/where\"],\"roleARN\":\"arn:aws:iam::123456789012:role/polaris-test\",\"externalId\":\"external-id\",\"region\":\"no-where-1\",\"endpoint\":\"http://127.9.9.9/\",\"stsEndpoint\":\"http://127.9.9.9/sts/\",\"endpointInternal\":\"http://127.8.8.8/internal/\",\"pathStyleAccess\":true,\"fileIoImplClassName\":\"org.apache.iceberg.aws.s3.S3FileIO\"}";),
+        arguments(
+            AwsStorageConfigurationInfo.builder()
+                .addAllowedLocations("s3://foo/bar", "s3://no/where")
+                .roleARN("arn:aws:iam::123456789012:role/polaris-test")
+                .region("no-where-1")
+                .storageName("my-storage")
+                .build(),
+            
"{\"@type\":\"AwsStorageConfigurationInfo\",\"storageType\":\"S3\",\"allowedLocations\":[\"s3://foo/bar\",\"s3://no/where\"],\"storageName\":\"my-storage\",\"roleARN\":\"arn:aws:iam::123456789012:role/polaris-test\",\"region\":\"no-where-1\",\"fileIoImplClassName\":\"org.apache.iceberg.aws.s3.S3FileIO\"}"),
         //
         arguments(
             GcpStorageConfigurationInfo.builder()
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java
index 7dff1cc83..8249eda1c 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java
@@ -32,6 +32,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.function.Supplier;
 import org.apache.polaris.core.auth.PolarisPrincipal;
+import org.apache.polaris.core.config.FeatureConfiguration;
 import org.apache.polaris.core.config.RealmConfig;
 import org.apache.polaris.core.storage.CredentialVendingContext;
 import org.apache.polaris.core.storage.PolarisStorageActions;
@@ -54,15 +55,21 @@ public class PolarisStorageIntegrationProviderImpl 
implements PolarisStorageInte
   private final StsClientProvider stsClientProvider;
   private final Optional<AwsCredentialsProvider> stsCredentials;
   private final Supplier<GoogleCredentials> gcpCredsProvider;
+  private final StorageConfiguration storageConfiguration;
+  private final RealmConfig realmConfig;
 
   @SuppressWarnings("CdiInjectionPointsInspection")
   @Inject
   public PolarisStorageIntegrationProviderImpl(
-      StorageConfiguration storageConfiguration, StsClientProvider 
stsClientProvider, Clock clock) {
-    this(
-        stsClientProvider,
-        Optional.ofNullable(storageConfiguration.stsCredentials()),
-        storageConfiguration.gcpCredentialsSupplier(clock));
+      StorageConfiguration storageConfiguration,
+      StsClientProvider stsClientProvider,
+      RealmConfig realmConfig,
+      Clock clock) {
+    this.storageConfiguration = storageConfiguration;
+    this.stsClientProvider = stsClientProvider;
+    this.stsCredentials = Optional.empty();
+    this.gcpCredsProvider = storageConfiguration.gcpCredentialsSupplier(clock);
+    this.realmConfig = realmConfig;
   }
 
   public PolarisStorageIntegrationProviderImpl(
@@ -72,6 +79,8 @@ public class PolarisStorageIntegrationProviderImpl implements 
PolarisStorageInte
     this.stsClientProvider = stsClientProvider;
     this.stsCredentials = stsCredentials;
     this.gcpCredsProvider = gcpCredsProvider;
+    this.storageConfiguration = null;
+    this.realmConfig = null;
   }
 
   @Override
@@ -85,12 +94,24 @@ public class PolarisStorageIntegrationProviderImpl 
implements PolarisStorageInte
     PolarisStorageIntegration<T> storageIntegration;
     switch (polarisStorageConfigurationInfo.getStorageType()) {
       case S3:
+        Optional<AwsCredentialsProvider> awsCreds = stsCredentials;
+        if (awsCreds.isEmpty() && storageConfiguration != null) {
+          if (realmConfig != null
+              && 
realmConfig.getConfig(FeatureConfiguration.RESOLVE_CREDENTIALS_BY_STORAGE_NAME))
 {
+            awsCreds =
+                Optional.of(
+                    storageConfiguration.stsCredentials(
+                        polarisStorageConfigurationInfo.getStorageName()));
+          } else {
+            awsCreds = Optional.of(storageConfiguration.stsCredentials());
+          }
+        }
         storageIntegration =
             (PolarisStorageIntegration<T>)
                 new AwsCredentialsStorageIntegration(
                     (AwsStorageConfigurationInfo) 
polarisStorageConfigurationInfo,
                     stsClientProvider,
-                    stsCredentials);
+                    awsCreds);
         break;
       case GCS:
         storageIntegration =
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/storage/StorageConfiguration.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/storage/StorageConfiguration.java
index f92c45416..0f1916ec7 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/storage/StorageConfiguration.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/storage/StorageConfiguration.java
@@ -23,10 +23,12 @@ import com.google.auth.oauth2.GoogleCredentials;
 import com.google.common.base.Suppliers;
 import io.smallrye.config.ConfigMapping;
 import io.smallrye.config.WithName;
+import io.smallrye.config.WithParentName;
 import java.io.IOException;
 import java.time.Clock;
 import java.time.Duration;
 import java.util.Date;
+import java.util.Map;
 import java.util.Optional;
 import java.util.function.Supplier;
 import org.apache.polaris.service.storage.aws.S3AccessConfig;
@@ -43,19 +45,51 @@ public interface StorageConfiguration extends 
S3AccessConfig {
 
   Duration DEFAULT_TOKEN_LIFESPAN = Duration.ofHours(1);
 
+  @WithName("aws")
+  AwsStorageConfig aws();
+
   /**
-   * The AWS access key to use for authentication. If not present, the default 
credentials provider
-   * chain will be used.
+   * @deprecated Use {@link #aws()}.{@link AwsStorageConfig#accessKey() 
accessKey()} instead.
    */
-  @WithName("aws.access-key")
-  Optional<String> awsAccessKey();
+  @Deprecated
+  default Optional<String> awsAccessKey() {
+    return aws().accessKey();
+  }
 
   /**
-   * The AWS secret key to use for authentication. If not present, the default 
credentials provider
-   * chain will be used.
+   * @deprecated Use {@link #aws()}.{@link AwsStorageConfig#secretKey() 
secretKey()} instead.
    */
-  @WithName("aws.secret-key")
-  Optional<String> awsSecretKey();
+  @Deprecated
+  default Optional<String> awsSecretKey() {
+    return aws().secretKey();
+  }
+
+  interface AwsStorageConfig {
+    /**
+     * The AWS access key to use for authentication. If not present, the 
default credentials
+     * provider chain will be used.
+     */
+    @WithName("access-key")
+    Optional<String> accessKey();
+
+    /**
+     * The AWS secret key to use for authentication. If not present, the 
default credentials
+     * provider chain will be used.
+     */
+    @WithName("secret-key")
+    Optional<String> secretKey();
+
+    @WithParentName
+    Map<String, StorageConfig> storages();
+  }
+
+  interface StorageConfig {
+    @WithName("access-key")
+    String accessKey();
+
+    @WithName("secret-key")
+    String secretKey();
+  }
 
   /**
    * The GCP access token to use for authentication. If not present, the 
default credentials
@@ -87,16 +121,29 @@ public interface StorageConfiguration extends 
S3AccessConfig {
   }
 
   default AwsCredentialsProvider stsCredentials() {
-    if (awsAccessKey().isPresent() && awsSecretKey().isPresent()) {
+    if (aws().accessKey().isPresent() && aws().secretKey().isPresent()) {
       LoggerFactory.getLogger(StorageConfiguration.class)
           .warn("Using hard-coded AWS credentials - this is not recommended 
for production");
       return StaticCredentialsProvider.create(
-          AwsBasicCredentials.create(awsAccessKey().get(), 
awsSecretKey().get()));
+          AwsBasicCredentials.create(aws().accessKey().get(), 
aws().secretKey().get()));
     } else {
       return DefaultCredentialsProvider.builder().build();
     }
   }
 
+  default AwsCredentialsProvider stsCredentials(String storageName) {
+    if (storageName != null) {
+      if (!aws().storages().containsKey(storageName)) {
+        throw new IllegalArgumentException(
+            "Storage name '" + storageName + "' is not configured on the 
server");
+      }
+      StorageConfig storageConfig = aws().storages().get(storageName);
+      return StaticCredentialsProvider.create(
+          AwsBasicCredentials.create(storageConfig.accessKey(), 
storageConfig.secretKey()));
+    }
+    return stsCredentials();
+  }
+
   default Supplier<GoogleCredentials> gcpCredentialsSupplier(Clock clock) {
     return Suppliers.memoize(
         () -> {
diff --git 
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogViewTest.java
 
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogViewTest.java
index 0648ed343..a6fc777b0 100644
--- 
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogViewTest.java
+++ 
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogViewTest.java
@@ -185,7 +185,7 @@ public abstract class AbstractIcebergCatalogViewTest 
extends ViewCatalogTests<Ic
                 .setStorageConfigurationInfo(
                     realmConfig,
                     new FileStorageConfigInfo(
-                        StorageConfigInfo.StorageTypeEnum.FILE, 
List.of("file://", "/", "*")),
+                        StorageConfigInfo.StorageTypeEnum.FILE, 
List.of("file://", "/", "*"), null),
                     "file://tmp")
                 .build()
                 .asCatalog(serviceIdentityProvider)));
diff --git 
a/runtime/service/src/test/java/org/apache/polaris/service/entity/CatalogEntityTest.java
 
b/runtime/service/src/test/java/org/apache/polaris/service/entity/CatalogEntityTest.java
index ec90da786..d8d499012 100644
--- 
a/runtime/service/src/test/java/org/apache/polaris/service/entity/CatalogEntityTest.java
+++ 
b/runtime/service/src/test/java/org/apache/polaris/service/entity/CatalogEntityTest.java
@@ -500,6 +500,7 @@ public class CatalogEntityTest {
         Arguments.of(b.setEndpoint("http://s3.example.com:1234";).build()),
         Arguments.of(b.setStsEndpoint("http://sts.example.com:1234";).build()),
         Arguments.of(b.setPathStyleAccess(true).build()),
+        Arguments.of(b.setStorageName("my-storage").build()),
         Arguments.of(a.build()),
         Arguments.of(a.setHierarchical(true).build()));
   }
diff --git 
a/runtime/service/src/test/java/org/apache/polaris/service/storage/StorageConfigurationTest.java
 
b/runtime/service/src/test/java/org/apache/polaris/service/storage/StorageConfigurationTest.java
index 916b1912a..b74cd4192 100644
--- 
a/runtime/service/src/test/java/org/apache/polaris/service/storage/StorageConfigurationTest.java
+++ 
b/runtime/service/src/test/java/org/apache/polaris/service/storage/StorageConfigurationTest.java
@@ -19,6 +19,7 @@
 package org.apache.polaris.service.storage;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -30,6 +31,7 @@ import java.time.Clock;
 import java.time.Duration;
 import java.time.Instant;
 import java.time.ZoneOffset;
+import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.function.Supplier;
@@ -47,18 +49,30 @@ public class StorageConfigurationTest {
   private static final String TEST_ACCESS_KEY = "test-access-key";
   private static final String TEST_GCP_TOKEN = "ya29.test-token";
   private static final String TEST_SECRET_KEY = "test-secret-key";
+  private static final String STORAGE_ACCESS_KEY = "storage-access-key";
+  private static final String STORAGE_SECRET_KEY = "storage-secret-key";
   private static final Duration TEST_TOKEN_LIFESPAN = Duration.ofMinutes(20);
 
   private StorageConfiguration configWithAwsCredentialsAndGcpToken() {
     return new StorageConfiguration() {
       @Override
-      public Optional<String> awsAccessKey() {
-        return Optional.of(TEST_ACCESS_KEY);
-      }
-
-      @Override
-      public Optional<String> awsSecretKey() {
-        return Optional.of(TEST_SECRET_KEY);
+      public AwsStorageConfig aws() {
+        return new AwsStorageConfig() {
+          @Override
+          public Optional<String> accessKey() {
+            return Optional.of(TEST_ACCESS_KEY);
+          }
+
+          @Override
+          public Optional<String> secretKey() {
+            return Optional.of(TEST_SECRET_KEY);
+          }
+
+          @Override
+          public Map<String, StorageConfig> storages() {
+            return Map.of();
+          }
+        };
       }
 
       @Override
@@ -116,15 +130,111 @@ public class StorageConfigurationTest {
   private StorageConfiguration configWithoutGcpToken() {
     return new StorageConfiguration() {
       @Override
-      public Optional<String> awsAccessKey() {
+      public AwsStorageConfig aws() {
+        return new AwsStorageConfig() {
+          @Override
+          public Optional<String> accessKey() {
+            return Optional.empty();
+          }
+
+          @Override
+          public Optional<String> secretKey() {
+            return Optional.empty();
+          }
+
+          @Override
+          public Map<String, StorageConfig> storages() {
+            return Map.of();
+          }
+        };
+      }
+
+      @Override
+      public Optional<String> gcpAccessToken() {
         return Optional.empty();
       }
 
       @Override
-      public Optional<String> awsSecretKey() {
+      public Optional<Duration> gcpAccessTokenLifespan() {
         return Optional.empty();
       }
 
+      @Override
+      public OptionalInt clientsCacheMaxSize() {
+        return OptionalInt.empty();
+      }
+
+      @Override
+      public OptionalInt maxHttpConnections() {
+        return OptionalInt.empty();
+      }
+
+      @Override
+      public Optional<Duration> readTimeout() {
+        return Optional.empty();
+      }
+
+      @Override
+      public Optional<Duration> connectTimeout() {
+        return Optional.empty();
+      }
+
+      @Override
+      public Optional<Duration> connectionAcquisitionTimeout() {
+        return Optional.empty();
+      }
+
+      @Override
+      public Optional<Duration> connectionMaxIdleTime() {
+        return Optional.empty();
+      }
+
+      @Override
+      public Optional<Duration> connectionTimeToLive() {
+        return Optional.empty();
+      }
+
+      @Override
+      public Optional<Boolean> expectContinueEnabled() {
+        return Optional.empty();
+      }
+    };
+  }
+
+  private StorageConfiguration configWithNamedStorage() {
+    return new StorageConfiguration() {
+      @Override
+      public AwsStorageConfig aws() {
+        return new AwsStorageConfig() {
+          @Override
+          public Optional<String> accessKey() {
+            return Optional.of(TEST_ACCESS_KEY);
+          }
+
+          @Override
+          public Optional<String> secretKey() {
+            return Optional.of(TEST_SECRET_KEY);
+          }
+
+          @Override
+          public Map<String, StorageConfig> storages() {
+            return Map.of(
+                "myStorage",
+                new StorageConfig() {
+                  @Override
+                  public String accessKey() {
+                    return STORAGE_ACCESS_KEY;
+                  }
+
+                  @Override
+                  public String secretKey() {
+                    return STORAGE_SECRET_KEY;
+                  }
+                });
+          }
+        };
+      }
+
       @Override
       public Optional<String> gcpAccessToken() {
         return Optional.empty();
@@ -249,4 +359,34 @@ public class StorageConfigurationTest {
       mockedStatic.verify(GoogleCredentials::getApplicationDefault, times(1));
     }
   }
+
+  @Test
+  public void testStsCredentialsWithNamedStorage() {
+    StorageConfiguration config = configWithNamedStorage();
+    AwsCredentialsProvider credentialsProvider = 
config.stsCredentials("myStorage");
+    
assertThat(credentialsProvider).isInstanceOf(StaticCredentialsProvider.class);
+    assertThat(credentialsProvider.resolveCredentials().accessKeyId())
+        .isEqualTo(STORAGE_ACCESS_KEY);
+    assertThat(credentialsProvider.resolveCredentials().secretAccessKey())
+        .isEqualTo(STORAGE_SECRET_KEY);
+  }
+
+  @Test
+  public void testStsCredentialsWithUnknownStorageThrows() {
+    StorageConfiguration config = configWithNamedStorage();
+    assertThatThrownBy(() -> config.stsCredentials("unknownStorage"))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("unknownStorage")
+        .hasMessageContaining("not configured");
+  }
+
+  @Test
+  public void testStsCredentialsWithNullStorageNameFallsBackToDefault() {
+    StorageConfiguration config = configWithNamedStorage();
+    AwsCredentialsProvider credentialsProvider = config.stsCredentials(null);
+    
assertThat(credentialsProvider).isInstanceOf(StaticCredentialsProvider.class);
+    
assertThat(credentialsProvider.resolveCredentials().accessKeyId()).isEqualTo(TEST_ACCESS_KEY);
+    assertThat(credentialsProvider.resolveCredentials().secretAccessKey())
+        .isEqualTo(TEST_SECRET_KEY);
+  }
 }
diff --git a/spec/polaris-management-service.yml 
b/spec/polaris-management-service.yml
index 2ba80aedd..de5317872 100644
--- a/spec/polaris-management-service.yml
+++ b/spec/polaris-management-service.yml
@@ -1075,6 +1075,9 @@ components:
           items:
             type: string
           example: "For AWS [s3://bucketname/prefix/], for AZURE 
[abfss://[email protected]/prefix/], for GCP 
[gs://bucketname/prefix/]"
+        storageName:
+          type: string
+          description: An optional name referencing a server-side storage 
configuration
       required:
         - storageType
       discriminator:

Reply via email to