This is an automated email from the ASF dual-hosted git repository.
dimas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new 5ca3fdc5d Always propagate non-credential properties from AccessConfig
to clients (#2615)
5ca3fdc5d is described below
commit 5ca3fdc5d240a40770f33237d1dbcca8c163b465
Author: Dmitri Bourlatchkov <[email protected]>
AuthorDate: Mon Sep 22 13:54:28 2025 -0400
Always propagate non-credential properties from AccessConfig to clients
(#2615)
* Always propagate non-credential properties from AccessConfig to clients
This change builds on top of #2589 and further prepares Polaris code to
support non-STS S3 implementations for #2589.
For S3 implementations that do have STS, this change enables clients to
run with local credentials (no credential vending) and still receive
endpoint configuration from the catalog.
* Call `SupportsCredentialDelegation.getAccessConfig()` on all relevant
create/load requests (previously it was called only when
`vended-credentials` was requested
* Always sent `AccessConfig.extraProperties()` to clients
* Expose credentials to clients only when the `vended-credentials` access
delegation mode is requested.
* There is not client-visible behaviour change for implementations of
`PolarisStorageIntegration` that do not produce "extra" `AccessConfig`
properties.
---
.../service/it/RestCatalogMinIOSpecialIT.java | 97 ++++++++++++++++------
.../catalog/iceberg/IcebergCatalogHandler.java | 10 +--
.../org/apache/polaris/service/TestServices.java | 23 ++++-
3 files changed, 96 insertions(+), 34 deletions(-)
diff --git
a/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java
b/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java
index 80845608d..7fd263e44 100644
---
a/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java
+++
b/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java
@@ -19,8 +19,13 @@
package org.apache.polaris.service.it;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.iceberg.aws.AwsClientProperties.REFRESH_CREDENTIALS_ENDPOINT;
+import static org.apache.iceberg.aws.s3.S3FileIOProperties.ACCESS_KEY_ID;
+import static org.apache.iceberg.aws.s3.S3FileIOProperties.ENDPOINT;
+import static org.apache.iceberg.aws.s3.S3FileIOProperties.SECRET_ACCESS_KEY;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
+import static
org.apache.polaris.service.catalog.AccessDelegationMode.VENDED_CREDENTIALS;
import static org.apache.polaris.service.it.env.PolarisClient.polarisClient;
import static org.assertj.core.api.Assertions.assertThat;
@@ -42,7 +47,6 @@ import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
-import org.apache.iceberg.aws.AwsClientProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
@@ -57,6 +61,7 @@ import org.apache.polaris.core.admin.model.CatalogProperties;
import org.apache.polaris.core.admin.model.PolarisCatalog;
import org.apache.polaris.core.admin.model.PrincipalWithCredentials;
import org.apache.polaris.core.admin.model.StorageConfigInfo;
+import org.apache.polaris.service.catalog.AccessDelegationMode;
import org.apache.polaris.service.it.env.CatalogApi;
import org.apache.polaris.service.it.env.ClientCredentials;
import org.apache.polaris.service.it.env.ManagementApi;
@@ -74,6 +79,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
@@ -112,7 +118,6 @@ public class RestCatalogMinIOSpecialIT {
required(1, "id", Types.IntegerType.get(), "doc"),
optional(2, "data", Types.StringType.get()));
- private static ClientCredentials adminCredentials;
private static PolarisApiEndpoints endpoints;
private static PolarisClient client;
private static ManagementApi managementApi;
@@ -131,7 +136,6 @@ public class RestCatalogMinIOSpecialIT {
@Minio(accessKey = MINIO_ACCESS_KEY, secretKey = MINIO_SECRET_KEY)
MinioAccess minioAccess,
ClientCredentials credentials) {
s3Client = minioAccess.s3Client();
- adminCredentials = credentials;
endpoints = apiEndpoints;
client = polarisClient(endpoints);
adminToken = client.obtainToken(credentials);
@@ -158,15 +162,19 @@ public class RestCatalogMinIOSpecialIT {
}
private RESTCatalog createCatalog(
- Optional<String> endpoint, Optional<String> stsEndpoint, boolean
pathStyleAccess) {
- return createCatalog(endpoint, stsEndpoint, pathStyleAccess,
Optional.empty());
+ Optional<String> endpoint,
+ Optional<String> stsEndpoint,
+ boolean pathStyleAccess,
+ Optional<AccessDelegationMode> delegationMode) {
+ return createCatalog(endpoint, stsEndpoint, pathStyleAccess,
Optional.empty(), delegationMode);
}
private RESTCatalog createCatalog(
Optional<String> endpoint,
Optional<String> stsEndpoint,
boolean pathStyleAccess,
- Optional<String> endpointInternal) {
+ Optional<String> endpointInternal,
+ Optional<AccessDelegationMode> delegationMode) {
AwsStorageConfigInfo.Builder storageConfig =
AwsStorageConfigInfo.builder()
.setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
@@ -198,8 +206,16 @@ public class RestCatalogMinIOSpecialIT {
org.apache.iceberg.CatalogProperties.URI,
endpoints.catalogApiEndpoint().toString())
.put(OAuth2Properties.TOKEN, authToken)
.put("warehouse", catalogName)
- .putAll(endpoints.extraHeaders("header."))
- .put("header.X-Iceberg-Access-Delegation", "vended-credentials");
+ .putAll(endpoints.extraHeaders("header."));
+
+ delegationMode.ifPresent(
+ dm -> propertiesBuilder.put("header.X-Iceberg-Access-Delegation",
dm.protocolValue()));
+
+ if (delegationMode.isEmpty()) {
+ // Use local credentials on the client side
+ propertiesBuilder.put("s3.access-key-id", MINIO_ACCESS_KEY);
+ propertiesBuilder.put("s3.secret-access-key", MINIO_SECRET_KEY);
+ }
restCatalog.initialize("polaris", propertiesBuilder.buildKeepingLast());
return restCatalog;
@@ -213,13 +229,34 @@ public class RestCatalogMinIOSpecialIT {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testCreateTable(boolean pathStyle) throws IOException {
+ LoadTableResponse response = doTestCreateTable(pathStyle,
Optional.empty());
+ assertThat(response.config()).doesNotContainKey(SECRET_ACCESS_KEY);
+ assertThat(response.config()).doesNotContainKey(ACCESS_KEY_ID);
+
assertThat(response.config()).doesNotContainKey(REFRESH_CREDENTIALS_ENDPOINT);
+ assertThat(response.credentials()).isEmpty();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testCreateTableVendedCredentials(boolean pathStyle) throws
IOException {
+ LoadTableResponse response = doTestCreateTable(pathStyle,
Optional.of(VENDED_CREDENTIALS));
+ assertThat(response.config())
+ .containsEntry(
+ REFRESH_CREDENTIALS_ENDPOINT,
+ "v1/" + catalogName + "/namespaces/test-ns/tables/t1/credentials");
+ assertThat(response.credentials()).hasSize(1);
+ }
+
+ private LoadTableResponse doTestCreateTable(boolean pathStyle,
Optional<AccessDelegationMode> dm)
+ throws IOException {
try (RESTCatalog restCatalog =
- createCatalog(Optional.of(endpoint), Optional.empty(), pathStyle)) {
- LoadTableResponse loadTableResponse = doTestCreateTable(restCatalog);
+ createCatalog(Optional.of(endpoint), Optional.empty(), pathStyle, dm))
{
+ LoadTableResponse loadTableResponse = doTestCreateTable(restCatalog, dm);
if (pathStyle) {
assertThat(loadTableResponse.config())
.containsEntry("s3.path-style-access", Boolean.TRUE.toString());
}
+ return loadTableResponse;
}
}
@@ -230,7 +267,8 @@ public class RestCatalogMinIOSpecialIT {
Optional.of("http://s3.example.com"),
Optional.of(endpoint),
false,
- Optional.of(endpoint))) {
+ Optional.of(endpoint),
+ Optional.empty())) {
StorageConfigInfo storageConfig =
managementApi.getCatalog(catalogName).getStorageConfigInfo();
assertThat((AwsStorageConfigInfo) storageConfig)
@@ -240,12 +278,13 @@ public class RestCatalogMinIOSpecialIT {
AwsStorageConfigInfo::getEndpointInternal,
AwsStorageConfigInfo::getPathStyleAccess)
.containsExactly("http://s3.example.com", endpoint, endpoint, false);
- LoadTableResponse loadTableResponse = doTestCreateTable(restCatalog);
- assertThat(loadTableResponse.config()).containsEntry("s3.endpoint",
"http://s3.example.com");
+ LoadTableResponse loadTableResponse = doTestCreateTable(restCatalog,
Optional.empty());
+ assertThat(loadTableResponse.config()).containsEntry(ENDPOINT,
"http://s3.example.com");
}
}
- public LoadTableResponse doTestCreateTable(RESTCatalog restCatalog) throws
IOException {
+ public LoadTableResponse doTestCreateTable(
+ RESTCatalog restCatalog, Optional<AccessDelegationMode> dm) {
catalogApi.createNamespace(catalogName, "test-ns");
TableIdentifier id = TableIdentifier.of("test-ns", "t1");
Table table = restCatalog.createTable(id, SCHEMA);
@@ -266,12 +305,13 @@ public class RestCatalogMinIOSpecialIT {
assertThat(response.contentLength()).isGreaterThan(0);
LoadTableResponse loadTableResponse =
- catalogApi.loadTableWithAccessDelegation(catalogName, id, "ALL");
- assertThat(loadTableResponse.config())
- .containsKey("s3.endpoint")
- .containsEntry(
- AwsClientProperties.REFRESH_CREDENTIALS_ENDPOINT,
- "v1/" + catalogName + "/namespaces/test-ns/tables/t1/credentials");
+ catalogApi.loadTable(
+ catalogName,
+ id,
+ "ALL",
+ dm.map(v -> Map.of("X-Iceberg-Access-Delegation",
v.protocolValue())).orElse(Map.of()));
+
+ assertThat(loadTableResponse.config()).containsKey(ENDPOINT);
restCatalog.dropTable(id);
assertThat(restCatalog.tableExists(id)).isFalse();
@@ -279,10 +319,18 @@ public class RestCatalogMinIOSpecialIT {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testAppendFiles(boolean pathStyle) throws IOException {
+ @CsvSource("true,")
+ @CsvSource("false,")
+ @CsvSource("true,VENDED_CREDENTIALS")
+ @CsvSource("false,VENDED_CREDENTIALS")
+ public void testAppendFiles(boolean pathStyle, AccessDelegationMode
delegationMode)
+ throws IOException {
try (RESTCatalog restCatalog =
- createCatalog(Optional.of(endpoint), Optional.of(endpoint),
pathStyle)) {
+ createCatalog(
+ Optional.of(endpoint),
+ Optional.of(endpoint),
+ pathStyle,
+ Optional.ofNullable(delegationMode))) {
catalogApi.createNamespace(catalogName, "test-ns");
TableIdentifier id = TableIdentifier.of("test-ns", "t1");
Table table = restCatalog.createTable(id, SCHEMA);
@@ -295,7 +343,8 @@ public class RestCatalogMinIOSpecialIT {
URI.create(
table
.locationProvider()
- .newDataLocation(String.format("test-file-%s.txt",
pathStyle)));
+ .newDataLocation(
+ String.format("test-file-%s-%s.txt", pathStyle,
delegationMode)));
OutputFile f1 = io.newOutputFile(loc.toString());
try (PositionOutputStream os = f1.create()) {
os.write("Hello World".getBytes(UTF_8));
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
index 2b7c85384..d1daf8fa5 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
@@ -794,10 +794,6 @@ public class IcebergCatalogHandler extends CatalogHandler
implements AutoCloseab
LoadTableResponse.Builder responseBuilder =
LoadTableResponse.builder().withTableMetadata(tableMetadata);
- if (!delegationModes.contains(VENDED_CREDENTIALS)) {
- return responseBuilder;
- }
-
if (baseCatalog instanceof SupportsCredentialDelegation
credentialDelegation) {
LOGGER
.atDebug()
@@ -808,15 +804,15 @@ public class IcebergCatalogHandler extends CatalogHandler
implements AutoCloseab
credentialDelegation.getAccessConfig(
tableIdentifier, tableMetadata, actions,
refreshCredentialsEndpoint);
Map<String, String> credentialConfig = accessConfig.credentials();
- responseBuilder.addAllConfig(credentialConfig);
- responseBuilder.addAllConfig(accessConfig.extraProperties());
- if (!credentialConfig.isEmpty()) {
+ if (!credentialConfig.isEmpty() &&
delegationModes.contains(VENDED_CREDENTIALS)) {
+ responseBuilder.addAllConfig(credentialConfig);
responseBuilder.addCredential(
ImmutableCredential.builder()
.prefix(tableMetadata.location())
.config(credentialConfig)
.build());
}
+ responseBuilder.addAllConfig(accessConfig.extraProperties());
}
return responseBuilder;
}
diff --git
a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java
b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java
index 89a27307d..c6c03e799 100644
---
a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java
+++
b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java
@@ -18,6 +18,9 @@
*/
package org.apache.polaris.service;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+
import com.google.auth.oauth2.AccessToken;
import com.google.auth.oauth2.GoogleCredentials;
import jakarta.annotation.Nonnull;
@@ -77,6 +80,9 @@ import
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
import org.apache.polaris.service.task.TaskExecutor;
import org.mockito.Mockito;
import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
+import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
+import software.amazon.awssdk.services.sts.model.Credentials;
public record TestServices(
Clock clock,
@@ -129,10 +135,21 @@ public record TestServices(
private PolarisDiagnostics diagnostics = new
PolarisDefaultDiagServiceImpl();
private RealmContext realmContext = TEST_REALM;
private Map<String, Object> config = Map.of();
- private StsClient stsClient = Mockito.mock(StsClient.class);
+ private StsClient stsClient;
private FileIOFactorySupplier fileIOFactorySupplier =
MeasuredFileIOFactory::new;
- private Builder() {}
+ private Builder() {
+ stsClient = Mockito.mock(StsClient.class, RETURNS_DEEP_STUBS);
+ AssumeRoleResponse arr = Mockito.mock(AssumeRoleResponse.class,
RETURNS_DEEP_STUBS);
+
Mockito.when(stsClient.assumeRole(any(AssumeRoleRequest.class))).thenReturn(arr);
+ Mockito.when(arr.credentials())
+ .thenReturn(
+ Credentials.builder()
+ .accessKeyId("test-access-key-id-111")
+ .secretAccessKey("test-secret-access-key-222")
+ .sessionToken("test-session-token-333")
+ .build());
+ }
public Builder realmContext(RealmContext realmContext) {
this.realmContext = realmContext;
@@ -222,7 +239,7 @@ public record TestServices(
@SuppressWarnings("unchecked")
Instance<ExternalCatalogFactory> externalCatalogFactory =
Mockito.mock(Instance.class);
-
Mockito.when(externalCatalogFactory.select(Mockito.any())).thenReturn(externalCatalogFactory);
+
Mockito.when(externalCatalogFactory.select(any())).thenReturn(externalCatalogFactory);
Mockito.when(externalCatalogFactory.isUnsatisfied()).thenReturn(true);
IcebergCatalogAdapter catalogService =