This is an automated email from the ASF dual-hosted git repository.
dweeks pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 1009ef41b2 AWS: Add scheduled refresh for the S3FileIO held storage
credentials (#15678)
1009ef41b2 is described below
commit 1009ef41b25b4602baa949c15c31db34b7a2ad96
Author: Daniel Weeks <[email protected]>
AuthorDate: Thu Mar 19 16:50:42 2026 -0700
AWS: Add scheduled refresh for the S3FileIO held storage credentials
(#15678)
* AWS: Add scheduled refresh for the S3FileIO held storage credentials
Co-authored-by: Eduard Tudenhoefner <[email protected]>
---
.../org/apache/iceberg/aws/s3/TestS3FileIO.java | 56 ++++++++
.../java/org/apache/iceberg/aws/s3/S3FileIO.java | 81 ++++++++++-
.../iceberg/aws/s3/VendedCredentialsProvider.java | 2 +-
.../aws/s3/TestS3FileIOCredentialRefresh.java | 159 +++++++++++++++++++++
4 files changed, 290 insertions(+), 8 deletions(-)
diff --git
a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
index ebee07e53e..e2fe6db0a4 100644
--- a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
+++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
@@ -986,4 +986,60 @@ public class TestS3FileIO {
// do nothing
}
}
+
+ @Test
+ public void setCredentialsRefreshesClients() {
+ StorageCredential initialCredential =
+ StorageCredential.create(
+ "s3://custom-uri",
+ ImmutableMap.of(
+ "s3.access-key-id",
+ "initialKeyId",
+ "s3.secret-access-key",
+ "initialSecretKey",
+ "s3.session-token",
+ "initialSessionToken"));
+
+ S3FileIO fileIO = new S3FileIO();
+ fileIO.setCredentials(ImmutableList.of(initialCredential));
+ fileIO.initialize(ImmutableMap.of(AwsClientProperties.CLIENT_REGION,
"us-east-1"));
+
+ S3Client initialClient = fileIO.client("s3://custom-uri/table1");
+ assertThat(initialClient.serviceClientConfiguration())
+ .extracting(AwsServiceClientConfiguration::credentialsProvider)
+ .extracting(IdentityProvider::resolveIdentity)
+ .satisfies(
+ future -> {
+ AwsSessionCredentialsIdentity identity =
(AwsSessionCredentialsIdentity) future.get();
+ assertThat(identity.accessKeyId()).isEqualTo("initialKeyId");
+
assertThat(identity.secretAccessKey()).isEqualTo("initialSecretKey");
+
assertThat(identity.sessionToken()).isEqualTo("initialSessionToken");
+ });
+
+ StorageCredential refreshedCredential =
+ StorageCredential.create(
+ "s3://custom-uri",
+ ImmutableMap.of(
+ "s3.access-key-id",
+ "refreshedKeyId",
+ "s3.secret-access-key",
+ "refreshedSecretKey",
+ "s3.session-token",
+ "refreshedSessionToken"));
+
+ fileIO.setCredentials(ImmutableList.of(refreshedCredential));
+
+ S3Client refreshedClient = fileIO.client("s3://custom-uri/table1");
+ assertThat(refreshedClient).isNotSameAs(initialClient);
+ assertThat(refreshedClient.serviceClientConfiguration())
+ .extracting(AwsServiceClientConfiguration::credentialsProvider)
+ .extracting(IdentityProvider::resolveIdentity)
+ .satisfies(
+ x -> {
+ AwsSessionCredentialsIdentity identity =
(AwsSessionCredentialsIdentity) x.get();
+ assertThat(identity.accessKeyId()).isEqualTo("refreshedKeyId");
+
assertThat(identity.secretAccessKey()).isEqualTo("refreshedSecretKey");
+
assertThat(identity.sessionToken()).isEqualTo("refreshedSessionToken");
+ });
+ }
}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
index e5f7a7ba1e..f045fcfde6 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
@@ -18,16 +18,22 @@
*/
package org.apache.iceberg.aws.s3;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.iceberg.aws.S3FileIOAwsClientFactories;
@@ -94,7 +100,7 @@ public class S3FileIO
private static final String DEFAULT_METRICS_IMPL =
"org.apache.iceberg.hadoop.HadoopMetricsContext";
private static final String ROOT_PREFIX = "s3";
- private static volatile ExecutorService executorService;
+ private static volatile ScheduledExecutorService executorService;
private String credential = null;
private SerializableSupplier<S3Client> s3;
@@ -104,8 +110,9 @@ public class S3FileIO
private final AtomicBoolean isResourceClosed = new AtomicBoolean(false);
private transient StackTraceElement[] createStack;
// use modifiable collection for Kryo serde
- private List<StorageCredential> storageCredentials = Lists.newArrayList();
+ private volatile List<StorageCredential> storageCredentials =
Lists.newArrayList();
private transient volatile Map<String, PrefixedS3Client> clientByPrefix;
+ private transient volatile ScheduledFuture<?> refreshFuture;
/**
* No-arg constructor to load the FileIO dynamically.
@@ -419,7 +426,11 @@ public class S3FileIO
new PrefixedS3Client(
storageCredential.prefix(),
propertiesWithCredentials, s3, s3Async));
});
+
this.clientByPrefix = localClientByPrefix;
+ // Note: the s3 clients separately refresh via the
VendedCredentialsProvider but are
+ // not directly referencable from the FileIO
+ scheduleCredentialRefresh();
}
}
}
@@ -427,14 +438,53 @@ public class S3FileIO
return clientByPrefix;
}
- private ExecutorService executorService() {
+ private void scheduleCredentialRefresh() {
+ storageCredentials.stream()
+ .map(
+ storageCredential ->
+
storageCredential.config().get(S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS))
+ .filter(Objects::nonNull)
+ .map(expiresAtString ->
Instant.ofEpochMilli(Long.parseLong(expiresAtString)))
+ .min(Comparator.naturalOrder())
+ .ifPresent(
+ expiresAt -> {
+ Instant prefetchAt = expiresAt.minus(5, ChronoUnit.MINUTES);
+ long delay = Duration.between(Instant.now(),
prefetchAt).toMillis();
+ this.refreshFuture =
+ executorService()
+ .schedule(this::refreshStorageCredentials, delay,
TimeUnit.MILLISECONDS);
+ });
+ }
+
+ private void refreshStorageCredentials() {
+ if (isResourceClosed.get()) {
+ return;
+ }
+
+ try (VendedCredentialsProvider provider =
VendedCredentialsProvider.create(properties)) {
+ List<StorageCredential> refreshed =
+ provider.fetchCredentials().credentials().stream()
+ .filter(c -> c.prefix().startsWith(ROOT_PREFIX))
+ .map(c -> StorageCredential.create(c.prefix(), c.config()))
+ .collect(Collectors.toList());
+
+ if (!refreshed.isEmpty() && !isResourceClosed.get()) {
+ this.storageCredentials = Lists.newArrayList(refreshed);
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to refresh storage credentials", e);
+ }
+ }
+
+ private ScheduledExecutorService executorService() {
if (executorService == null) {
synchronized (S3FileIO.class) {
if (executorService == null) {
executorService =
- ThreadPools.newExitingWorkerPool(
- "iceberg-s3fileio-delete",
-
clientForStoragePath(ROOT_PREFIX).s3FileIOProperties().deleteThreads());
+ ThreadPools.newExitingScheduledPool(
+ "iceberg-s3fileio-tasks",
+
clientForStoragePath(ROOT_PREFIX).s3FileIOProperties().deleteThreads(),
+ Duration.ofSeconds(10));
}
}
}
@@ -491,6 +541,10 @@ public class S3FileIO
clientByPrefix.values().forEach(PrefixedS3Client::close);
this.clientByPrefix = null;
}
+ if (refreshFuture != null) {
+ refreshFuture.cancel(true);
+ refreshFuture = null;
+ }
}
}
@@ -559,8 +613,21 @@ public class S3FileIO
@Override
public void setCredentials(List<StorageCredential> credentials) {
Preconditions.checkArgument(credentials != null, "Invalid storage
credentials: null");
+ // stop any refresh that might be scheduled
+ if (refreshFuture != null) {
+ refreshFuture.cancel(true);
+ }
+
// copy credentials into a modifiable collection for Kryo serde
this.storageCredentials = Lists.newArrayList(credentials);
+
+ // if the clients are already initialized, we need to close and allow them
to be recreated
+ synchronized (this) {
+ if (clientByPrefix != null) {
+ clientByPrefix.values().forEach(PrefixedS3Client::close);
+ this.clientByPrefix = null;
+ }
+ }
}
@Override
diff --git
a/aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java
b/aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java
index 8683ce89bf..75d114d4ef 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java
@@ -107,7 +107,7 @@ public class VendedCredentialsProvider implements
AwsCredentialsProvider, SdkAut
return client;
}
- private LoadCredentialsResponse fetchCredentials() {
+ LoadCredentialsResponse fetchCredentials() {
return httpClient()
.get(
credentialsEndpoint,
diff --git
a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOCredentialRefresh.java
b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOCredentialRefresh.java
new file mode 100644
index 0000000000..686a6b1277
--- /dev/null
+++
b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOCredentialRefresh.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.s3;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockserver.integration.ClientAndServer.startClientAndServer;
+import static org.mockserver.model.HttpRequest.request;
+import static org.mockserver.model.HttpResponse.response;
+
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.io.StorageCredential;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.rest.HttpMethod;
+import org.apache.iceberg.rest.credentials.ImmutableCredential;
+import org.apache.iceberg.rest.responses.ImmutableLoadCredentialsResponse;
+import org.apache.iceberg.rest.responses.LoadCredentialsResponse;
+import org.apache.iceberg.rest.responses.LoadCredentialsResponseParser;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockserver.integration.ClientAndServer;
+import org.mockserver.model.HttpRequest;
+import org.mockserver.model.HttpResponse;
+import org.mockserver.verify.VerificationTimes;
+
+public class TestS3FileIOCredentialRefresh {
+
+ private static final int PORT = 3233;
+ private static final String CREDENTIALS_URI =
+ String.format("http://127.0.0.1:%d/v1/credentials", PORT);
+ private static final String CATALOG_URI =
String.format("http://127.0.0.1:%d/v1", PORT);
+ private static ClientAndServer mockServer;
+
+ @BeforeAll
+ public static void beforeAll() {
+ mockServer = startClientAndServer(PORT);
+ }
+
+ @AfterAll
+ public static void stopServer() {
+ mockServer.stop();
+ }
+
+ @BeforeEach
+ public void before() {
+ mockServer.reset();
+ }
+
+ @Test
+ public void credentialRefreshWithinFiveMinuteWindow() {
+ // Set up credentials expiring within the next 5 minutes so the refresh
triggers immediately
+ String nearExpiryMs = Long.toString(Instant.now().plus(3,
ChronoUnit.MINUTES).toEpochMilli());
+
+ StorageCredential initialCredential =
+ StorageCredential.create(
+ "s3://bucket/path",
+ ImmutableMap.of(
+ S3FileIOProperties.ACCESS_KEY_ID,
+ "initialAccessKey",
+ S3FileIOProperties.SECRET_ACCESS_KEY,
+ "initialSecretKey",
+ S3FileIOProperties.SESSION_TOKEN,
+ "initialToken",
+ S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS,
+ nearExpiryMs));
+
+ // Mock the credentials endpoint to return refreshed credentials
+ String refreshedExpiryMs =
+ Long.toString(Instant.now().plus(1, ChronoUnit.HOURS).toEpochMilli());
+ LoadCredentialsResponse refreshResponse =
+ ImmutableLoadCredentialsResponse.builder()
+ .addCredentials(
+ ImmutableCredential.builder()
+ .prefix("s3://bucket/path")
+ .config(
+ ImmutableMap.of(
+ S3FileIOProperties.ACCESS_KEY_ID,
+ "refreshedAccessKey",
+ S3FileIOProperties.SECRET_ACCESS_KEY,
+ "refreshedSecretKey",
+ S3FileIOProperties.SESSION_TOKEN,
+ "refreshedToken",
+ S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS,
+ refreshedExpiryMs))
+ .build())
+ .build();
+
+ HttpRequest mockRequest =
request("/v1/credentials").withMethod(HttpMethod.GET.name());
+ HttpResponse mockResponse =
+
response(LoadCredentialsResponseParser.toJson(refreshResponse)).withStatusCode(200);
+ mockServer.when(mockRequest).respond(mockResponse);
+
+ Map<String, String> properties =
+ ImmutableMap.of(
+ AwsProperties.CLIENT_FACTORY,
+ StaticClientFactory.class.getName(),
+ VendedCredentialsProvider.URI,
+ CREDENTIALS_URI,
+ CatalogProperties.URI,
+ CATALOG_URI,
+ "init-creation-stacktrace",
+ "false");
+
+ StaticClientFactory.client = null;
+ try (S3FileIO fileIO = new S3FileIO()) {
+ fileIO.initialize(properties);
+ fileIO.setCredentials(List.of(initialCredential));
+
+ // Trigger clientByPrefix() to build the client map and schedule the
refresh.
+ // Since the credential expires within 5 minutes, the delay is
negative/zero
+ // and the refresh fires immediately.
+ fileIO.client();
+
+ // Wait for the scheduled refresh to call the credentials endpoint
+ Awaitility.await()
+ .atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() -> mockServer.verify(mockRequest,
VerificationTimes.atLeast(1)));
+
+ // Verify the credentials were updated with the refreshed values
+ Awaitility.await()
+ .atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ List<StorageCredential> credentials = fileIO.credentials();
+ assertThat(credentials).hasSize(1);
+ assertThat(credentials.get(0).config())
+ .containsEntry(S3FileIOProperties.ACCESS_KEY_ID,
"refreshedAccessKey")
+ .containsEntry(S3FileIOProperties.SECRET_ACCESS_KEY,
"refreshedSecretKey")
+ .containsEntry(S3FileIOProperties.SESSION_TOKEN,
"refreshedToken")
+ .containsEntry(
+ S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS,
refreshedExpiryMs);
+ });
+ }
+ }
+}