This is an automated email from the ASF dual-hosted git repository.

dimas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new fb9397f1a adding support to use a kms key for s3 buckets data 
encryption (AWS only) (#2802)
fb9397f1a is described below

commit fb9397f1a2c4ad156afec424e47cd3447cee8284
Author: fabio-rizzo-01 <[email protected]>
AuthorDate: Tue Nov 25 19:07:59 2025 +0000

    adding support to use a kms key for s3 buckets data encryption (AWS only) 
(#2802)
    
    Add catalog-level support for KMS with s3 buckets
---
 CHANGELOG.md                                       |   1 +
 .../core/admin/model/CatalogSerializationTest.java |  31 +++
 .../apache/polaris/core/entity/CatalogEntity.java  |   4 +
 .../aws/AwsCredentialsStorageIntegration.java      | 107 +++++++++-
 .../storage/aws/AwsStorageConfigurationInfo.java   |   9 +
 .../aws/AwsCredentialsStorageIntegrationTest.java  | 221 ++++++++++++++++++++-
 .../polaris/service/entity/CatalogEntityTest.java  |   6 +
 spec/polaris-management-service.yml                |  10 +
 8 files changed, 378 insertions(+), 11 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index c544dcb03..e5bbc8868 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -84,6 +84,7 @@ request adding CHANGELOG notes for breaking (!) changes and 
possibly other secti
 
 ### New Features
 
+- Added KMS properties (optional) to catalog storage config to enable S3 data 
encryption.
 - Added a finer grained authorization model for UpdateTable requests. Existing 
privileges continue to work for granting UpdateTable, such as 
`TABLE_WRITE_PROPERTIES`.
   However, you can now instead grant privileges just for specific operations, 
such as `TABLE_ADD_SNAPSHOT`
 - Added a Management API endpoint to reset principal credentials, controlled 
by the `ENABLE_CREDENTIAL_RESET` (default: true) feature flag.
diff --git 
a/api/management-model/src/test/java/org/apache/polaris/core/admin/model/CatalogSerializationTest.java
 
b/api/management-model/src/test/java/org/apache/polaris/core/admin/model/CatalogSerializationTest.java
index c4210486b..3244f1473 100644
--- 
a/api/management-model/src/test/java/org/apache/polaris/core/admin/model/CatalogSerializationTest.java
+++ 
b/api/management-model/src/test/java/org/apache/polaris/core/admin/model/CatalogSerializationTest.java
@@ -35,6 +35,7 @@ public class CatalogSerializationTest {
   private static final String TEST_LOCATION = "s3://test/";
   private static final String TEST_CATALOG_NAME = "test-catalog";
   private static final String TEST_ROLE_ARN = 
"arn:aws:iam::123456789012:role/test-role";
+  private static final String KMS_KEY = 
"arn:aws:kms:us-east-1:012345678901:key/allowed-key-1";
 
   @BeforeEach
   public void setUp() {
@@ -70,6 +71,36 @@ public class CatalogSerializationTest {
                 + "\"properties\":{\"default-base-location\":\"s3://test/\"},"
                 + "\"storageConfigInfo\":{"
                 + "\"roleArn\":\"arn:aws:iam::123456789012:role/test-role\","
+                + "\"allowedKmsKeys\":[],"
+                + "\"pathStyleAccess\":false,"
+                + "\"storageType\":\"S3\","
+                + "\"allowedLocations\":[]"
+                + "}}");
+  }
+
+  @Test
+  public void testJsonFormatWithKmsProperties() throws JsonProcessingException 
{
+    Catalog catalog =
+        new Catalog(
+            Catalog.TypeEnum.INTERNAL,
+            TEST_CATALOG_NAME,
+            new CatalogProperties(TEST_LOCATION),
+            AwsStorageConfigInfo.builder(StorageConfigInfo.StorageTypeEnum.S3)
+                .setRoleArn(TEST_ROLE_ARN)
+                .setCurrentKmsKey(KMS_KEY)
+                .build());
+
+    String json = mapper.writeValueAsString(catalog);
+
+    assertThat(json)
+        .isEqualTo(
+            "{\"type\":\"INTERNAL\","
+                + "\"name\":\"test-catalog\","
+                + "\"properties\":{\"default-base-location\":\"s3://test/\"},"
+                + "\"storageConfigInfo\":{"
+                + "\"roleArn\":\"arn:aws:iam::123456789012:role/test-role\","
+                + 
"\"currentKmsKey\":\"arn:aws:kms:us-east-1:012345678901:key/allowed-key-1\","
+                + "\"allowedKmsKeys\":[],"
                 + "\"pathStyleAccess\":false,"
                 + "\"storageType\":\"S3\","
                 + "\"allowedLocations\":[]"
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/entity/CatalogEntity.java 
b/polaris-core/src/main/java/org/apache/polaris/core/entity/CatalogEntity.java
index 40cf1969d..4607728b3 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/entity/CatalogEntity.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/entity/CatalogEntity.java
@@ -161,6 +161,8 @@ public class CatalogEntity extends PolarisEntity implements 
LocationBasedEntity
             .setRoleArn(awsConfig.getRoleARN())
             .setExternalId(awsConfig.getExternalId())
             .setUserArn(awsConfig.getUserARN())
+            .setCurrentKmsKey(awsConfig.getCurrentKmsKey())
+            .setAllowedKmsKeys(awsConfig.getAllowedKmsKeys())
             .setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
             .setAllowedLocations(awsConfig.getAllowedLocations())
             .setRegion(awsConfig.getRegion())
@@ -308,6 +310,8 @@ public class CatalogEntity extends PolarisEntity implements 
LocationBasedEntity
                 AwsStorageConfigurationInfo.builder()
                     .allowedLocations(allowedLocations)
                     .roleARN(awsConfigModel.getRoleArn())
+                    .currentKmsKey(awsConfigModel.getCurrentKmsKey())
+                    .allowedKmsKeys(awsConfigModel.getAllowedKmsKeys())
                     .externalId(awsConfigModel.getExternalId())
                     .region(awsConfigModel.getRegion())
                     .endpoint(awsConfigModel.getEndpoint())
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java
index e393911f7..299600695 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java
@@ -23,6 +23,7 @@ import static 
org.apache.polaris.core.config.FeatureConfiguration.STORAGE_CREDEN
 import jakarta.annotation.Nonnull;
 import java.net.URI;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -33,6 +34,8 @@ import org.apache.polaris.core.storage.StorageAccessConfig;
 import org.apache.polaris.core.storage.StorageAccessProperty;
 import org.apache.polaris.core.storage.StorageUtil;
 import org.apache.polaris.core.storage.aws.StsClientProvider.StsDestination;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.policybuilder.iam.IamConditionOperator;
 import software.amazon.awssdk.policybuilder.iam.IamEffect;
@@ -49,6 +52,9 @@ public class AwsCredentialsStorageIntegration
   private final StsClientProvider stsClientProvider;
   private final Optional<AwsCredentialsProvider> credentialsProvider;
 
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(AwsCredentialsStorageIntegration.class);
+
   public AwsCredentialsStorageIntegration(
       AwsStorageConfigurationInfo config, StsClient fixedClient) {
     this(config, (destination) -> fixedClient);
@@ -80,6 +86,7 @@ public class AwsCredentialsStorageIntegration
         realmConfig.getConfig(STORAGE_CREDENTIAL_DURATION_SECONDS);
     AwsStorageConfigurationInfo storageConfig = config();
     String region = storageConfig.getRegion();
+    String accountId = storageConfig.getAwsAccountId();
     StorageAccessConfig.Builder accessConfig = StorageAccessConfig.builder();
 
     if (shouldUseSts(storageConfig)) {
@@ -90,10 +97,12 @@ public class AwsCredentialsStorageIntegration
               .roleSessionName("PolarisAwsCredentialsStorageIntegration")
               .policy(
                   policyString(
-                          storageConfig.getAwsPartition(),
+                          storageConfig,
                           allowListOperation,
                           allowedReadLocations,
-                          allowedWriteLocations)
+                          allowedWriteLocations,
+                          region,
+                          accountId)
                       .toJson())
               .durationSeconds(storageCredentialDurationSeconds);
       credentialsProvider.ifPresent(
@@ -163,12 +172,13 @@ public class AwsCredentialsStorageIntegration
    * ListBucket privileges with no resources. This prevents us from sending an 
empty policy to AWS
    * and just assuming the role with full privileges.
    */
-  // TODO - add KMS key access
   private IamPolicy policyString(
-      String awsPartition,
+      AwsStorageConfigurationInfo storageConfigurationInfo,
       boolean allowList,
       Set<String> readLocations,
-      Set<String> writeLocations) {
+      Set<String> writeLocations,
+      String region,
+      String accountId) {
     IamPolicy.Builder policyBuilder = IamPolicy.builder();
     IamStatement.Builder allowGetObjectStatementBuilder =
         IamStatement.builder()
@@ -178,7 +188,9 @@ public class AwsCredentialsStorageIntegration
     Map<String, IamStatement.Builder> bucketListStatementBuilder = new 
HashMap<>();
     Map<String, IamStatement.Builder> bucketGetLocationStatementBuilder = new 
HashMap<>();
 
-    String arnPrefix = arnPrefixForPartition(awsPartition);
+    String arnPrefix = 
arnPrefixForPartition(storageConfigurationInfo.getAwsPartition());
+    String currentKmsKey = storageConfigurationInfo.getCurrentKmsKey();
+    List<String> allowedKmsKeys = storageConfigurationInfo.getAllowedKmsKeys();
     Stream.concat(readLocations.stream(), writeLocations.stream())
         .distinct()
         .forEach(
@@ -225,6 +237,9 @@ public class AwsCredentialsStorageIntegration
                     arnPrefix + 
StorageUtil.concatFilePrefixes(parseS3Path(uri), "*", "/")));
           });
       policyBuilder.addStatement(allowPutObjectStatementBuilder.build());
+      addKmsKeyPolicy(currentKmsKey, allowedKmsKeys, policyBuilder, true, 
region, accountId);
+    } else {
+      addKmsKeyPolicy(currentKmsKey, allowedKmsKeys, policyBuilder, false, 
region, accountId);
     }
     if (!bucketListStatementBuilder.isEmpty()) {
       bucketListStatementBuilder
@@ -242,6 +257,86 @@ public class AwsCredentialsStorageIntegration
     return 
policyBuilder.addStatement(allowGetObjectStatementBuilder.build()).build();
   }
 
+  private static void addKmsKeyPolicy(
+      String kmsKeyArn,
+      List<String> allowedKmsKeys,
+      IamPolicy.Builder policyBuilder,
+      boolean canWrite,
+      String region,
+      String accountId) {
+
+    IamStatement.Builder allowKms = buildBaseKmsStatement(canWrite);
+    boolean hasCurrentKey = kmsKeyArn != null;
+    boolean hasAllowedKeys = hasAllowedKmsKeys(allowedKmsKeys);
+
+    if (hasCurrentKey) {
+      addKmsKeyResource(kmsKeyArn, allowKms);
+    }
+
+    if (hasAllowedKeys) {
+      addAllowedKmsKeyResources(allowedKmsKeys, allowKms);
+    }
+
+    // Add KMS statement if we have any KMS key configuration
+    if (hasCurrentKey || hasAllowedKeys) {
+      policyBuilder.addStatement(allowKms.build());
+    } else if (!canWrite) {
+      // Only add wildcard KMS access for read-only operations when no 
specific keys are configured
+      // this check is for minio because it doesn't have region or account id
+      if (region != null && accountId != null) {
+        addAllKeysResource(region, accountId, allowKms);
+        policyBuilder.addStatement(allowKms.build());
+      }
+    }
+  }
+
+  private static IamStatement.Builder buildBaseKmsStatement(boolean 
canEncrypt) {
+    IamStatement.Builder allowKms =
+        IamStatement.builder()
+            .effect(IamEffect.ALLOW)
+            .addAction("kms:GenerateDataKeyWithoutPlaintext")
+            .addAction("kms:DescribeKey")
+            .addAction("kms:Decrypt")
+            .addAction("kms:GenerateDataKey");
+
+    if (canEncrypt) {
+      allowKms.addAction("kms:Encrypt");
+    }
+
+    return allowKms;
+  }
+
+  private static void addKmsKeyResource(String kmsKeyArn, IamStatement.Builder 
allowKms) {
+    if (kmsKeyArn != null) {
+      LOGGER.debug("Adding KMS key policy for key {}", kmsKeyArn);
+      allowKms.addResource(IamResource.create(kmsKeyArn));
+    }
+  }
+
+  private static boolean hasAllowedKmsKeys(List<String> allowedKmsKeys) {
+    return allowedKmsKeys != null && !allowedKmsKeys.isEmpty();
+  }
+
+  private static void addAllowedKmsKeyResources(
+      List<String> allowedKmsKeys, IamStatement.Builder allowKms) {
+    allowedKmsKeys.forEach(
+        keyArn -> {
+          LOGGER.debug("Adding allowed KMS key policy for key {}", keyArn);
+          allowKms.addResource(IamResource.create(keyArn));
+        });
+  }
+
+  private static void addAllKeysResource(
+      String region, String accountId, IamStatement.Builder allowKms) {
+    String allKeysArn = arnKeyAll(region, accountId);
+    allowKms.addResource(IamResource.create(allKeysArn));
+    LOGGER.debug("Adding KMS key policy for all keys in account {}", 
accountId);
+  }
+
+  private static String arnKeyAll(String region, String accountId) {
+    return String.format("arn:aws:kms:%s:%s:key/*", region, accountId);
+  }
+
   private static String arnPrefixForPartition(String awsPartition) {
     return String.format("arn:%s:s3:::", awsPartition != null ? awsPartition : 
"aws");
   }
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsStorageConfigurationInfo.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsStorageConfigurationInfo.java
index 69c669222..b62265f92 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsStorageConfigurationInfo.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsStorageConfigurationInfo.java
@@ -27,6 +27,7 @@ import 
com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 import jakarta.annotation.Nullable;
 import java.net.URI;
+import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
@@ -63,6 +64,14 @@ public abstract class AwsStorageConfigurationInfo extends 
PolarisStorageConfigur
   @Nullable
   public abstract String getRoleARN();
 
+  /** KMS Key ARN for server-side encryption,used for writes, optional */
+  @Nullable
+  public abstract String getCurrentKmsKey();
+
+  /** Comma-separated list of allowed KMS Key ARNs, optional */
+  @Nullable
+  public abstract List<String> getAllowedKmsKeys();
+
   /** AWS external ID, optional */
   @Nullable
   public abstract String getExternalId();
diff --git 
a/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java
 
b/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java
index fb0c63c40..273257748 100644
--- 
a/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java
+++ 
b/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java
@@ -389,6 +389,8 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
     String warehouseKeyPrefix = "path/to/warehouse";
     String firstPath = warehouseKeyPrefix + "/namespace/table";
     String secondPath = warehouseKeyPrefix + "/oldnamespace/table";
+    String region = "us-east-2";
+    String accountId = "012345678901";
     Mockito.when(stsClient.assumeRole(Mockito.isA(AssumeRoleRequest.class)))
         .thenAnswer(
             invocation -> {
@@ -402,8 +404,26 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
                         assertThat(policy)
                             .extracting(IamPolicy::statements)
                             
.asInstanceOf(InstanceOfAssertFactories.list(IamStatement.class))
-                            .hasSize(3)
+                            .hasSize(4)
                             .satisfiesExactly(
+                                statement ->
+                                    assertThat(statement)
+                                        .returns(IamEffect.ALLOW, 
IamStatement::effect)
+                                        .returns(
+                                            List.of(
+                                                IamAction.create(
+                                                    
"kms:GenerateDataKeyWithoutPlaintext"),
+                                                
IamAction.create("kms:DescribeKey"),
+                                                
IamAction.create("kms:Decrypt"),
+                                                
IamAction.create("kms:GenerateDataKey")),
+                                            IamStatement::actions)
+                                        .returns(
+                                            List.of(
+                                                IamResource.create(
+                                                    String.format(
+                                                        
"arn:aws:kms:%s:%s:key/*",
+                                                        region, accountId))),
+                                            IamStatement::resources),
                                 statement ->
                                     assertThat(statement)
                                         .returns(IamEffect.ALLOW, 
IamStatement::effect)
@@ -456,7 +476,7 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
                     .addAllowedLocation(s3Path(bucket, warehouseKeyPrefix))
                     .roleARN(roleARN)
                     .externalId(externalId)
-                    .region("us-east-2")
+                    .region(region)
                     .build(),
                 stsClient)
             .getSubscopedCreds(
@@ -482,6 +502,8 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
     String externalId = "externalId";
     String bucket = "bucket";
     String warehouseKeyPrefix = "path/to/warehouse";
+    String region = "us-east-2";
+    String accountId = "012345678901";
     Mockito.when(stsClient.assumeRole(Mockito.isA(AssumeRoleRequest.class)))
         .thenAnswer(
             invocation -> {
@@ -495,8 +517,26 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
                         assertThat(policy)
                             .extracting(IamPolicy::statements)
                             
.asInstanceOf(InstanceOfAssertFactories.list(IamStatement.class))
-                            .hasSize(2)
-                            .satisfiesExactly(
+                            .hasSize(3)
+                            .satisfiesExactlyInAnyOrder(
+                                statement ->
+                                    assertThat(statement)
+                                        .returns(IamEffect.ALLOW, 
IamStatement::effect)
+                                        .returns(
+                                            List.of(
+                                                IamAction.create(
+                                                    
"kms:GenerateDataKeyWithoutPlaintext"),
+                                                
IamAction.create("kms:DescribeKey"),
+                                                
IamAction.create("kms:Decrypt"),
+                                                
IamAction.create("kms:GenerateDataKey")),
+                                            IamStatement::actions)
+                                        .returns(
+                                            List.of(
+                                                IamResource.create(
+                                                    String.format(
+                                                        
"arn:aws:kms:%s:%s:key/*",
+                                                        region, accountId))),
+                                            IamStatement::resources),
                                 statement ->
                                     assertThat(statement)
                                         .returns(IamEffect.ALLOW, 
IamStatement::effect)
@@ -523,7 +563,7 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
                     .addAllowedLocation(s3Path(bucket, warehouseKeyPrefix))
                     .roleARN(roleARN)
                     .externalId(externalId)
-                    .region("us-east-2")
+                    .region(region)
                     .build(),
                 stsClient)
             .getSubscopedCreds(
@@ -662,6 +702,177 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
     ;
   }
 
+  @Test
+  public void testKmsKeyPolicyLogic() {
+    StsClient stsClient = Mockito.mock(StsClient.class);
+    String roleARN = "arn:aws:iam::012345678901:role/jdoe";
+    String externalId = "externalId";
+    String bucket = "bucket";
+    String warehouseKeyPrefix = "path/to/warehouse";
+    String region = "us-east-1";
+    String accountId = "012345678901";
+    String currentKmsKey = 
"arn:aws:kms:us-east-1:012345678901:key/current-key";
+    List<String> allowedKmsKeys =
+        List.of(
+            "arn:aws:kms:us-east-1:012345678901:key/allowed-key-1",
+            "arn:aws:kms:us-east-1:012345678901:key/allowed-key-2");
+
+    // Test with current KMS key and write permissions
+    Mockito.when(stsClient.assumeRole(Mockito.isA(AssumeRoleRequest.class)))
+        .thenAnswer(
+            invocation -> {
+              AssumeRoleRequest request = invocation.getArgument(0);
+              IamPolicy policy = IamPolicy.fromJson(request.policy());
+
+              // Verify KMS statement exists with write permissions
+              assertThat(policy.statements())
+                  .anySatisfy(
+                      stmt -> {
+                        assertThat(stmt.actions())
+                            .containsAll(
+                                List.of(
+                                    
IamAction.create("kms:GenerateDataKeyWithoutPlaintext"),
+                                    IamAction.create("kms:DescribeKey"),
+                                    IamAction.create("kms:Decrypt"),
+                                    IamAction.create("kms:GenerateDataKey"),
+                                    IamAction.create("kms:Encrypt")));
+                        
assertThat(stmt.resources()).contains(IamResource.create(currentKmsKey));
+                      });
+
+              return ASSUME_ROLE_RESPONSE;
+            });
+
+    new AwsCredentialsStorageIntegration(
+            AwsStorageConfigurationInfo.builder()
+                .addAllowedLocation(s3Path(bucket, warehouseKeyPrefix))
+                .roleARN(roleARN)
+                .externalId(externalId)
+                .region(region)
+                .currentKmsKey(currentKmsKey)
+                .build(),
+            stsClient)
+        .getSubscopedCreds(
+            EMPTY_REALM_CONFIG,
+            true,
+            Set.of(s3Path(bucket, warehouseKeyPrefix + "/table")),
+            Set.of(s3Path(bucket, warehouseKeyPrefix + "/table")),
+            Optional.empty());
+
+    // Test with allowed KMS keys and read-only permissions
+    Mockito.reset(stsClient);
+    Mockito.when(stsClient.assumeRole(Mockito.isA(AssumeRoleRequest.class)))
+        .thenAnswer(
+            invocation -> {
+              AssumeRoleRequest request = invocation.getArgument(0);
+              IamPolicy policy = IamPolicy.fromJson(request.policy());
+
+              // Verify KMS statement exists with read-only permissions
+              assertThat(policy.statements())
+                  .anySatisfy(
+                      stmt -> {
+                        assertThat(stmt.actions())
+                            .containsAll(
+                                List.of(
+                                    
IamAction.create("kms:GenerateDataKeyWithoutPlaintext"),
+                                    IamAction.create("kms:DescribeKey"),
+                                    IamAction.create("kms:Decrypt"),
+                                    IamAction.create("kms:GenerateDataKey")));
+                        
assertThat(stmt.actions()).doesNotContain(IamAction.create("kms:Encrypt"));
+                        assertThat(stmt.resources())
+                            .containsExactlyInAnyOrder(
+                                IamResource.create(allowedKmsKeys.get(0)),
+                                IamResource.create(allowedKmsKeys.get(1)));
+                      });
+
+              return ASSUME_ROLE_RESPONSE;
+            });
+
+    new AwsCredentialsStorageIntegration(
+            AwsStorageConfigurationInfo.builder()
+                .addAllowedLocation(s3Path(bucket, warehouseKeyPrefix))
+                .roleARN(roleARN)
+                .externalId(externalId)
+                .region(region)
+                .allowedKmsKeys(allowedKmsKeys)
+                .build(),
+            stsClient)
+        .getSubscopedCreds(
+            EMPTY_REALM_CONFIG,
+            true,
+            Set.of(s3Path(bucket, warehouseKeyPrefix + "/table")),
+            Set.of(),
+            Optional.empty());
+
+    // Test with no KMS keys and read-only (should add wildcard KMS access)
+    Mockito.reset(stsClient);
+    Mockito.when(stsClient.assumeRole(Mockito.isA(AssumeRoleRequest.class)))
+        .thenAnswer(
+            invocation -> {
+              AssumeRoleRequest request = invocation.getArgument(0);
+              IamPolicy policy = IamPolicy.fromJson(request.policy());
+
+              // Verify wildcard KMS statement exists
+              assertThat(policy.statements())
+                  .anySatisfy(
+                      stmt -> {
+                        assertThat(stmt.resources())
+                            .contains(
+                                IamResource.create(
+                                    String.format("arn:aws:kms:%s:%s:key/*", 
region, accountId)));
+                      });
+
+              return ASSUME_ROLE_RESPONSE;
+            });
+
+    new AwsCredentialsStorageIntegration(
+            AwsStorageConfigurationInfo.builder()
+                .addAllowedLocation(s3Path(bucket, warehouseKeyPrefix))
+                .roleARN(roleARN)
+                .externalId(externalId)
+                .region(region)
+                .build(),
+            stsClient)
+        .getSubscopedCreds(
+            EMPTY_REALM_CONFIG,
+            true,
+            Set.of(s3Path(bucket, warehouseKeyPrefix + "/table")),
+            Set.of(),
+            Optional.empty());
+
+    // Test with no KMS keys and write permissions (should not add KMS 
statement)
+    Mockito.reset(stsClient);
+    Mockito.when(stsClient.assumeRole(Mockito.isA(AssumeRoleRequest.class)))
+        .thenAnswer(
+            invocation -> {
+              AssumeRoleRequest request = invocation.getArgument(0);
+              IamPolicy policy = IamPolicy.fromJson(request.policy());
+
+              // Verify no KMS statement exists
+              assertThat(policy.statements())
+                  .noneMatch(
+                      stmt ->
+                          stmt.actions().stream()
+                              .anyMatch(action -> 
action.value().startsWith("kms:")));
+
+              return ASSUME_ROLE_RESPONSE;
+            });
+
+    new AwsCredentialsStorageIntegration(
+            AwsStorageConfigurationInfo.builder()
+                .addAllowedLocation(s3Path(bucket, warehouseKeyPrefix))
+                .roleARN(roleARN)
+                .externalId(externalId)
+                .region(region)
+                .build(),
+            stsClient)
+        .getSubscopedCreds(
+            EMPTY_REALM_CONFIG,
+            true,
+            Set.of(s3Path(bucket, warehouseKeyPrefix + "/table")),
+            Set.of(s3Path(bucket, warehouseKeyPrefix + "/table")),
+            Optional.empty());
+  }
+
   private static @Nonnull String s3Arn(String partition, String bucket, String 
keyPrefix) {
     String bucketArn = "arn:" + partition + ":s3:::" + bucket;
     if (keyPrefix == null) {
diff --git 
a/runtime/service/src/test/java/org/apache/polaris/service/entity/CatalogEntityTest.java
 
b/runtime/service/src/test/java/org/apache/polaris/service/entity/CatalogEntityTest.java
index 5c3c220c8..24c7814ce 100644
--- 
a/runtime/service/src/test/java/org/apache/polaris/service/entity/CatalogEntityTest.java
+++ 
b/runtime/service/src/test/java/org/apache/polaris/service/entity/CatalogEntityTest.java
@@ -321,6 +321,7 @@ public class CatalogEntityTest {
         AwsStorageConfigInfo.builder()
             .setRoleArn("arn:aws:iam::012345678901:role/test-role")
             .setExternalId("externalId")
+            
.setCurrentKmsKey("arn:aws:kms:us-east-1:012345678901:key/444343245")
             .setUserArn("aws::a:user:arn")
             .setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
             .setAllowedLocations(List.of(baseLocation))
@@ -334,6 +335,8 @@ public class CatalogEntityTest {
 
     Catalog catalog = catalogEntity.asCatalog(serviceIdentityProvider);
     assertThat(catalog.getType()).isEqualTo(Catalog.TypeEnum.INTERNAL);
+    assertThat(((AwsStorageConfigInfo) 
catalog.getStorageConfigInfo()).getCurrentKmsKey())
+        .isEqualTo("arn:aws:kms:us-east-1:012345678901:key/444343245");
   }
 
   @Test
@@ -342,6 +345,7 @@ public class CatalogEntityTest {
     AwsStorageConfigInfo storageConfigModel =
         AwsStorageConfigInfo.builder()
             .setRoleArn("arn:aws:iam::012345678901:role/test-role")
+            
.setCurrentKmsKey("arn:aws:kms:us-east-1:012345678901:key/444343245")
             .setExternalId("externalId")
             .setUserArn("aws::a:user:arn")
             .setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
@@ -357,6 +361,8 @@ public class CatalogEntityTest {
 
     Catalog catalog = catalogEntity.asCatalog(serviceIdentityProvider);
     assertThat(catalog.getType()).isEqualTo(Catalog.TypeEnum.EXTERNAL);
+    assertThat(((AwsStorageConfigInfo) 
catalog.getStorageConfigInfo()).getCurrentKmsKey())
+        .isEqualTo("arn:aws:kms:us-east-1:012345678901:key/444343245");
   }
 
   @Test
diff --git a/spec/polaris-management-service.yml 
b/spec/polaris-management-service.yml
index d1775b759..03c894b6e 100644
--- a/spec/polaris-management-service.yml
+++ b/spec/polaris-management-service.yml
@@ -1103,6 +1103,16 @@ components:
               type: string
               description: the aws user arn used to assume the aws role
               example: "arn:aws:iam::123456789001:user/abc1-b-self1234"
+            currentKmsKey:
+              type: string
+              description: the aws kms key arn used to encrypt s3 data
+              example: "arn:aws:kms::123456789001:key/01234578"
+            allowedKmsKeys:
+              type: array
+              description: The list of kms keys that this catalog and its 
clients are allow to use for reading s3 data
+              items:
+                type: string
+              example: ["arn:aws:kms::123456789001:key/01234578"]
             region:
               type: string
               description: the aws region where data is stored

Reply via email to