This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 1009ef41b2 AWS: Add scheduled refresh for the S3FileIO held storage 
credentials (#15678)
1009ef41b2 is described below

commit 1009ef41b25b4602baa949c15c31db34b7a2ad96
Author: Daniel Weeks <[email protected]>
AuthorDate: Thu Mar 19 16:50:42 2026 -0700

    AWS: Add scheduled refresh for the S3FileIO held storage credentials 
(#15678)
    
    * AWS: Add scheduled refresh for the S3FileIO held storage credentials
    
    
    Co-authored-by: Eduard Tudenhoefner <[email protected]>
---
 .../org/apache/iceberg/aws/s3/TestS3FileIO.java    |  56 ++++++++
 .../java/org/apache/iceberg/aws/s3/S3FileIO.java   |  81 ++++++++++-
 .../iceberg/aws/s3/VendedCredentialsProvider.java  |   2 +-
 .../aws/s3/TestS3FileIOCredentialRefresh.java      | 159 +++++++++++++++++++++
 4 files changed, 290 insertions(+), 8 deletions(-)

diff --git 
a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java 
b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
index ebee07e53e..e2fe6db0a4 100644
--- a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
+++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
@@ -986,4 +986,60 @@ public class TestS3FileIO {
       // do nothing
     }
   }
+
+  @Test
+  public void setCredentialsRefreshesClients() {
+    StorageCredential initialCredential =
+        StorageCredential.create(
+            "s3://custom-uri",
+            ImmutableMap.of(
+                "s3.access-key-id",
+                "initialKeyId",
+                "s3.secret-access-key",
+                "initialSecretKey",
+                "s3.session-token",
+                "initialSessionToken"));
+
+    S3FileIO fileIO = new S3FileIO();
+    fileIO.setCredentials(ImmutableList.of(initialCredential));
+    fileIO.initialize(ImmutableMap.of(AwsClientProperties.CLIENT_REGION, 
"us-east-1"));
+
+    S3Client initialClient = fileIO.client("s3://custom-uri/table1");
+    assertThat(initialClient.serviceClientConfiguration())
+        .extracting(AwsServiceClientConfiguration::credentialsProvider)
+        .extracting(IdentityProvider::resolveIdentity)
+        .satisfies(
+            future -> {
+              AwsSessionCredentialsIdentity identity = 
(AwsSessionCredentialsIdentity) future.get();
+              assertThat(identity.accessKeyId()).isEqualTo("initialKeyId");
+              
assertThat(identity.secretAccessKey()).isEqualTo("initialSecretKey");
+              
assertThat(identity.sessionToken()).isEqualTo("initialSessionToken");
+            });
+
+    StorageCredential refreshedCredential =
+        StorageCredential.create(
+            "s3://custom-uri",
+            ImmutableMap.of(
+                "s3.access-key-id",
+                "refreshedKeyId",
+                "s3.secret-access-key",
+                "refreshedSecretKey",
+                "s3.session-token",
+                "refreshedSessionToken"));
+
+    fileIO.setCredentials(ImmutableList.of(refreshedCredential));
+
+    S3Client refreshedClient = fileIO.client("s3://custom-uri/table1");
+    assertThat(refreshedClient).isNotSameAs(initialClient);
+    assertThat(refreshedClient.serviceClientConfiguration())
+        .extracting(AwsServiceClientConfiguration::credentialsProvider)
+        .extracting(IdentityProvider::resolveIdentity)
+        .satisfies(
+            x -> {
+              AwsSessionCredentialsIdentity identity = 
(AwsSessionCredentialsIdentity) x.get();
+              assertThat(identity.accessKeyId()).isEqualTo("refreshedKeyId");
+              
assertThat(identity.secretAccessKey()).isEqualTo("refreshedSecretKey");
+              
assertThat(identity.sessionToken()).isEqualTo("refreshedSessionToken");
+            });
+  }
 }
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java 
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
index e5f7a7ba1e..f045fcfde6 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
@@ -18,16 +18,22 @@
  */
 package org.apache.iceberg.aws.s3;
 
+import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import org.apache.iceberg.aws.S3FileIOAwsClientFactories;
@@ -94,7 +100,7 @@ public class S3FileIO
   private static final String DEFAULT_METRICS_IMPL =
       "org.apache.iceberg.hadoop.HadoopMetricsContext";
   private static final String ROOT_PREFIX = "s3";
-  private static volatile ExecutorService executorService;
+  private static volatile ScheduledExecutorService executorService;
 
   private String credential = null;
   private SerializableSupplier<S3Client> s3;
@@ -104,8 +110,9 @@ public class S3FileIO
   private final AtomicBoolean isResourceClosed = new AtomicBoolean(false);
   private transient StackTraceElement[] createStack;
   // use modifiable collection for Kryo serde
-  private List<StorageCredential> storageCredentials = Lists.newArrayList();
+  private volatile List<StorageCredential> storageCredentials = 
Lists.newArrayList();
   private transient volatile Map<String, PrefixedS3Client> clientByPrefix;
+  private transient volatile ScheduledFuture<?> refreshFuture;
 
   /**
    * No-arg constructor to load the FileIO dynamically.
@@ -419,7 +426,11 @@ public class S3FileIO
                         new PrefixedS3Client(
                             storageCredential.prefix(), 
propertiesWithCredentials, s3, s3Async));
                   });
+
           this.clientByPrefix = localClientByPrefix;
+          // Note: the s3 clients separately refresh via the 
VendedCredentialsProvider but are
+          // not directly referencable from the FileIO
+          scheduleCredentialRefresh();
         }
       }
     }
@@ -427,14 +438,53 @@ public class S3FileIO
     return clientByPrefix;
   }
 
-  private ExecutorService executorService() {
+  private void scheduleCredentialRefresh() {
+    storageCredentials.stream()
+        .map(
+            storageCredential ->
+                
storageCredential.config().get(S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS))
+        .filter(Objects::nonNull)
+        .map(expiresAtString -> 
Instant.ofEpochMilli(Long.parseLong(expiresAtString)))
+        .min(Comparator.naturalOrder())
+        .ifPresent(
+            expiresAt -> {
+              Instant prefetchAt = expiresAt.minus(5, ChronoUnit.MINUTES);
+              long delay = Duration.between(Instant.now(), 
prefetchAt).toMillis();
+              this.refreshFuture =
+                  executorService()
+                      .schedule(this::refreshStorageCredentials, delay, 
TimeUnit.MILLISECONDS);
+            });
+  }
+
+  private void refreshStorageCredentials() {
+    if (isResourceClosed.get()) {
+      return;
+    }
+
+    try (VendedCredentialsProvider provider = 
VendedCredentialsProvider.create(properties)) {
+      List<StorageCredential> refreshed =
+          provider.fetchCredentials().credentials().stream()
+              .filter(c -> c.prefix().startsWith(ROOT_PREFIX))
+              .map(c -> StorageCredential.create(c.prefix(), c.config()))
+              .collect(Collectors.toList());
+
+      if (!refreshed.isEmpty() && !isResourceClosed.get()) {
+        this.storageCredentials = Lists.newArrayList(refreshed);
+      }
+    } catch (Exception e) {
+      LOG.warn("Failed to refresh storage credentials", e);
+    }
+  }
+
+  private ScheduledExecutorService executorService() {
     if (executorService == null) {
       synchronized (S3FileIO.class) {
         if (executorService == null) {
           executorService =
-              ThreadPools.newExitingWorkerPool(
-                  "iceberg-s3fileio-delete",
-                  
clientForStoragePath(ROOT_PREFIX).s3FileIOProperties().deleteThreads());
+              ThreadPools.newExitingScheduledPool(
+                  "iceberg-s3fileio-tasks",
+                  
clientForStoragePath(ROOT_PREFIX).s3FileIOProperties().deleteThreads(),
+                  Duration.ofSeconds(10));
         }
       }
     }
@@ -491,6 +541,10 @@ public class S3FileIO
         clientByPrefix.values().forEach(PrefixedS3Client::close);
         this.clientByPrefix = null;
       }
+      if (refreshFuture != null) {
+        refreshFuture.cancel(true);
+        refreshFuture = null;
+      }
     }
   }
 
@@ -559,8 +613,21 @@ public class S3FileIO
   @Override
   public void setCredentials(List<StorageCredential> credentials) {
     Preconditions.checkArgument(credentials != null, "Invalid storage 
credentials: null");
+    // stop any refresh that might be scheduled
+    if (refreshFuture != null) {
+      refreshFuture.cancel(true);
+    }
+
     // copy credentials into a modifiable collection for Kryo serde
     this.storageCredentials = Lists.newArrayList(credentials);
+
+    // if the clients are already initialized, we need to close and allow them 
to be recreated
+    synchronized (this) {
+      if (clientByPrefix != null) {
+        clientByPrefix.values().forEach(PrefixedS3Client::close);
+        this.clientByPrefix = null;
+      }
+    }
   }
 
   @Override
diff --git 
a/aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java 
b/aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java
index 8683ce89bf..75d114d4ef 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java
@@ -107,7 +107,7 @@ public class VendedCredentialsProvider implements 
AwsCredentialsProvider, SdkAut
     return client;
   }
 
-  private LoadCredentialsResponse fetchCredentials() {
+  LoadCredentialsResponse fetchCredentials() {
     return httpClient()
         .get(
             credentialsEndpoint,
diff --git 
a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOCredentialRefresh.java
 
b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOCredentialRefresh.java
new file mode 100644
index 0000000000..686a6b1277
--- /dev/null
+++ 
b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOCredentialRefresh.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.s3;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockserver.integration.ClientAndServer.startClientAndServer;
+import static org.mockserver.model.HttpRequest.request;
+import static org.mockserver.model.HttpResponse.response;
+
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.io.StorageCredential;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.rest.HttpMethod;
+import org.apache.iceberg.rest.credentials.ImmutableCredential;
+import org.apache.iceberg.rest.responses.ImmutableLoadCredentialsResponse;
+import org.apache.iceberg.rest.responses.LoadCredentialsResponse;
+import org.apache.iceberg.rest.responses.LoadCredentialsResponseParser;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockserver.integration.ClientAndServer;
+import org.mockserver.model.HttpRequest;
+import org.mockserver.model.HttpResponse;
+import org.mockserver.verify.VerificationTimes;
+
+public class TestS3FileIOCredentialRefresh {
+
+  private static final int PORT = 3233;
+  private static final String CREDENTIALS_URI =
+      String.format("http://127.0.0.1:%d/v1/credentials";, PORT);
+  private static final String CATALOG_URI = 
String.format("http://127.0.0.1:%d/v1";, PORT);
+  private static ClientAndServer mockServer;
+
+  @BeforeAll
+  public static void beforeAll() {
+    mockServer = startClientAndServer(PORT);
+  }
+
+  @AfterAll
+  public static void stopServer() {
+    mockServer.stop();
+  }
+
+  @BeforeEach
+  public void before() {
+    mockServer.reset();
+  }
+
+  @Test
+  public void credentialRefreshWithinFiveMinuteWindow() {
+    // Set up credentials expiring within the next 5 minutes so the refresh 
triggers immediately
+    String nearExpiryMs = Long.toString(Instant.now().plus(3, 
ChronoUnit.MINUTES).toEpochMilli());
+
+    StorageCredential initialCredential =
+        StorageCredential.create(
+            "s3://bucket/path",
+            ImmutableMap.of(
+                S3FileIOProperties.ACCESS_KEY_ID,
+                "initialAccessKey",
+                S3FileIOProperties.SECRET_ACCESS_KEY,
+                "initialSecretKey",
+                S3FileIOProperties.SESSION_TOKEN,
+                "initialToken",
+                S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS,
+                nearExpiryMs));
+
+    // Mock the credentials endpoint to return refreshed credentials
+    String refreshedExpiryMs =
+        Long.toString(Instant.now().plus(1, ChronoUnit.HOURS).toEpochMilli());
+    LoadCredentialsResponse refreshResponse =
+        ImmutableLoadCredentialsResponse.builder()
+            .addCredentials(
+                ImmutableCredential.builder()
+                    .prefix("s3://bucket/path")
+                    .config(
+                        ImmutableMap.of(
+                            S3FileIOProperties.ACCESS_KEY_ID,
+                            "refreshedAccessKey",
+                            S3FileIOProperties.SECRET_ACCESS_KEY,
+                            "refreshedSecretKey",
+                            S3FileIOProperties.SESSION_TOKEN,
+                            "refreshedToken",
+                            S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS,
+                            refreshedExpiryMs))
+                    .build())
+            .build();
+
+    HttpRequest mockRequest = 
request("/v1/credentials").withMethod(HttpMethod.GET.name());
+    HttpResponse mockResponse =
+        
response(LoadCredentialsResponseParser.toJson(refreshResponse)).withStatusCode(200);
+    mockServer.when(mockRequest).respond(mockResponse);
+
+    Map<String, String> properties =
+        ImmutableMap.of(
+            AwsProperties.CLIENT_FACTORY,
+            StaticClientFactory.class.getName(),
+            VendedCredentialsProvider.URI,
+            CREDENTIALS_URI,
+            CatalogProperties.URI,
+            CATALOG_URI,
+            "init-creation-stacktrace",
+            "false");
+
+    StaticClientFactory.client = null;
+    try (S3FileIO fileIO = new S3FileIO()) {
+      fileIO.initialize(properties);
+      fileIO.setCredentials(List.of(initialCredential));
+
+      // Trigger clientByPrefix() to build the client map and schedule the 
refresh.
+      // Since the credential expires within 5 minutes, the delay is 
negative/zero
+      // and the refresh fires immediately.
+      fileIO.client();
+
+      // Wait for the scheduled refresh to call the credentials endpoint
+      Awaitility.await()
+          .atMost(10, TimeUnit.SECONDS)
+          .untilAsserted(() -> mockServer.verify(mockRequest, 
VerificationTimes.atLeast(1)));
+
+      // Verify the credentials were updated with the refreshed values
+      Awaitility.await()
+          .atMost(10, TimeUnit.SECONDS)
+          .untilAsserted(
+              () -> {
+                List<StorageCredential> credentials = fileIO.credentials();
+                assertThat(credentials).hasSize(1);
+                assertThat(credentials.get(0).config())
+                    .containsEntry(S3FileIOProperties.ACCESS_KEY_ID, 
"refreshedAccessKey")
+                    .containsEntry(S3FileIOProperties.SECRET_ACCESS_KEY, 
"refreshedSecretKey")
+                    .containsEntry(S3FileIOProperties.SESSION_TOKEN, 
"refreshedToken")
+                    .containsEntry(
+                        S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS, 
refreshedExpiryMs);
+              });
+    }
+  }
+}

Reply via email to