This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 19b4189060 GCP: Add scheduled refresh for storage credentials held by 
GCSFileIO (#15696)
19b4189060 is described below

commit 19b418906043dbfc2aba7d8947404f778414b71a
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Mon Mar 23 19:41:12 2026 +0100

    GCP: Add scheduled refresh for storage credentials held by GCSFileIO 
(#15696)
    
    GCP: Add scheduled refresh for storage credentials held by GCSFileIO
---
 build.gradle                                       |   1 +
 .../java/org/apache/iceberg/gcp/gcs/GCSFileIO.java |  86 +++++++-
 .../gcp/gcs/OAuth2RefreshCredentialsHandler.java   |  19 +-
 .../org/apache/iceberg/gcp/gcs/TestGCSFileIO.java  |  39 ++++
 .../gcp/gcs/TestGCSFileIOCredentialRefresh.java    | 229 +++++++++++++++++++++
 5 files changed, 365 insertions(+), 9 deletions(-)

diff --git a/build.gradle b/build.gradle
index 4cf6cf2ad9..2f5f1fa545 100644
--- a/build.gradle
+++ b/build.gradle
@@ -749,6 +749,7 @@ project(':iceberg-gcp') {
       exclude group: 'javax.servlet', module: 'servlet-api'
       exclude group: 'com.google.code.gson', module: 'gson'
     }
+    testImplementation libs.awaitility
     testImplementation libs.esotericsoftware.kryo
     testImplementation libs.mockserver.netty
     testImplementation libs.mockserver.client.java
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java 
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
index 188eddbfbb..5e040943f7 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
@@ -23,12 +23,21 @@ import com.google.api.client.util.Maps;
 import com.google.cloud.storage.Blob;
 import com.google.cloud.storage.BlobId;
 import com.google.cloud.storage.Storage;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.iceberg.common.DynConstructors;
+import org.apache.iceberg.gcp.GCPProperties;
 import org.apache.iceberg.io.BulkDeletionFailureException;
 import org.apache.iceberg.io.DelegateFileIO;
 import org.apache.iceberg.io.FileInfo;
@@ -44,6 +53,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Iterators;
 import org.apache.iceberg.relocated.com.google.common.collect.Streams;
 import org.apache.iceberg.util.SerializableMap;
 import org.apache.iceberg.util.SerializableSupplier;
+import org.apache.iceberg.util.ThreadPools;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,14 +74,16 @@ public class GCSFileIO implements DelegateFileIO, 
SupportsStorageCredentials {
   private static final String DEFAULT_METRICS_IMPL =
       "org.apache.iceberg.hadoop.HadoopMetricsContext";
   private static final String ROOT_STORAGE_PREFIX = "gs";
+  private static volatile ScheduledExecutorService executorService;
 
   private SerializableSupplier<Storage> storageSupplier;
   private MetricsContext metrics = MetricsContext.nullMetrics();
   private final AtomicBoolean isResourceClosed = new AtomicBoolean(false);
   private SerializableMap<String, String> properties = null;
   // use modifiable collection for Kryo serde
-  private List<StorageCredential> storageCredentials = Lists.newArrayList();
+  private volatile List<StorageCredential> storageCredentials = 
Lists.newArrayList();
   private transient volatile Map<String, PrefixedStorage> storageByPrefix;
+  private transient volatile ScheduledFuture<?> refreshFuture;
 
   /**
    * No-arg constructor to load the FileIO dynamically.
@@ -199,6 +211,7 @@ public class GCSFileIO implements DelegateFileIO, 
SupportsStorageCredentials {
                             storageSupplier));
                   });
           this.storageByPrefix = localStorageByPrefix;
+          scheduleCredentialRefresh();
         }
       }
     }
@@ -206,6 +219,60 @@ public class GCSFileIO implements DelegateFileIO, 
SupportsStorageCredentials {
     return storageByPrefix;
   }
 
+  private void scheduleCredentialRefresh() {
+    storageCredentials.stream()
+        .map(
+            storageCredential ->
+                
storageCredential.config().get(GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT))
+        .filter(Objects::nonNull)
+        .map(expiresAtString -> 
Instant.ofEpochMilli(Long.parseLong(expiresAtString)))
+        .min(Comparator.naturalOrder())
+        .ifPresent(
+            expiresAt -> {
+              Instant prefetchAt = expiresAt.minus(5, ChronoUnit.MINUTES);
+              long delay = Duration.between(Instant.now(), 
prefetchAt).toMillis();
+              this.refreshFuture =
+                  executorService()
+                      .schedule(this::refreshStorageCredentials, delay, 
TimeUnit.MILLISECONDS);
+            });
+  }
+
+  private void refreshStorageCredentials() {
+    if (isResourceClosed.get()) {
+      return;
+    }
+
+    try (OAuth2RefreshCredentialsHandler handler =
+        OAuth2RefreshCredentialsHandler.create(properties)) {
+      List<StorageCredential> refreshed =
+          handler.fetchCredentials().credentials().stream()
+              .filter(c -> c.prefix().startsWith(ROOT_STORAGE_PREFIX))
+              .map(c -> StorageCredential.create(c.prefix(), c.config()))
+              .toList();
+
+      if (!refreshed.isEmpty() && !isResourceClosed.get()) {
+        this.storageCredentials = Lists.newArrayList(refreshed);
+        scheduleCredentialRefresh();
+      }
+    } catch (Exception e) {
+      LOG.warn("Failed to refresh storage credentials", e);
+    }
+  }
+
+  private ScheduledExecutorService executorService() {
+    if (executorService == null) {
+      synchronized (GCSFileIO.class) {
+        if (executorService == null) {
+          executorService =
+              ThreadPools.newExitingScheduledPool(
+                  "iceberg-gcsfileio-tasks", 1, Duration.ofSeconds(10));
+        }
+      }
+    }
+
+    return executorService;
+  }
+
   @Override
   public void close() {
     // handles concurrent calls to close()
@@ -214,6 +281,10 @@ public class GCSFileIO implements DelegateFileIO, 
SupportsStorageCredentials {
         storageByPrefix.values().forEach(PrefixedStorage::close);
         this.storageByPrefix = null;
       }
+      if (refreshFuture != null) {
+        refreshFuture.cancel(true);
+        refreshFuture = null;
+      }
     }
   }
 
@@ -271,8 +342,21 @@ public class GCSFileIO implements DelegateFileIO, 
SupportsStorageCredentials {
   @Override
   public void setCredentials(List<StorageCredential> credentials) {
     Preconditions.checkArgument(credentials != null, "Invalid storage 
credentials: null");
+    // stop any refresh that might be scheduled
+    if (refreshFuture != null) {
+      refreshFuture.cancel(true);
+    }
+
     // copy credentials into a modifiable collection for Kryo serde
     this.storageCredentials = Lists.newArrayList(credentials);
+
+    // if the clients are already initialized, we need to close and allow them 
to be recreated
+    synchronized (this) {
+      if (storageByPrefix != null) {
+        storageByPrefix.values().forEach(PrefixedStorage::close);
+        this.storageByPrefix = null;
+      }
+    }
   }
 
   @Override
diff --git 
a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandler.java
 
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandler.java
index d42c131b9e..c0ff1194d9 100644
--- 
a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandler.java
+++ 
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandler.java
@@ -68,14 +68,7 @@ public class OAuth2RefreshCredentialsHandler
   @SuppressWarnings("JavaUtilDate") // GCP API uses java.util.Date
   @Override
   public AccessToken refreshAccessToken() {
-    LoadCredentialsResponse response =
-        httpClient()
-            .get(
-                credentialsEndpoint,
-                null != planId ? Map.of("planId", planId) : null,
-                LoadCredentialsResponse.class,
-                Map.of(),
-                ErrorHandlers.defaultErrorHandler());
+    LoadCredentialsResponse response = fetchCredentials();
 
     List<Credential> gcsCredentials =
         response.credentials().stream()
@@ -96,6 +89,16 @@ public class OAuth2RefreshCredentialsHandler
     return new AccessToken(token, new Date(Long.parseLong(expiresAt)));
   }
 
+  LoadCredentialsResponse fetchCredentials() {
+    return httpClient()
+        .get(
+            credentialsEndpoint,
+            null != planId ? Map.of("planId", planId) : null,
+            LoadCredentialsResponse.class,
+            Map.of(),
+            ErrorHandlers.defaultErrorHandler());
+  }
+
   private void checkCredential(Credential gcsCredential, String 
gcsOauth2Token) {
     Preconditions.checkState(
         gcsCredential.config().containsKey(gcsOauth2Token),
diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGCSFileIO.java 
b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGCSFileIO.java
index fde0038b27..f6841664e0 100644
--- a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGCSFileIO.java
+++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGCSFileIO.java
@@ -488,6 +488,45 @@ public class TestGCSFileIO {
         .isEqualTo(new AccessToken("gcsTokenFromCredential", new Date(2000L)));
   }
 
+  @Test
+  public void setCredentialsRefreshesClients() {
+    StorageCredential initialCredential =
+        StorageCredential.create(
+            "gs://custom-uri",
+            ImmutableMap.of(
+                "gcs.oauth2.token", "initialToken", 
"gcs.oauth2.token-expires-at", "1000"));
+
+    try (GCSFileIO fileIO = new GCSFileIO()) {
+      fileIO.setCredentials(ImmutableList.of(initialCredential));
+      fileIO.initialize(
+          ImmutableMap.of(
+              GCS_OAUTH2_TOKEN, "gcsTokenFromProperties", 
GCS_OAUTH2_TOKEN_EXPIRES_AT, "500"));
+
+      Storage initialClient = fileIO.client("gs://custom-uri/table1");
+      assertThat(initialClient.getOptions().getCredentials())
+          .isInstanceOf(OAuth2Credentials.class)
+          .extracting("value")
+          .extracting("temporaryAccess")
+          .isEqualTo(new AccessToken("initialToken", new Date(1000L)));
+
+      StorageCredential refreshedCredential =
+          StorageCredential.create(
+              "gs://custom-uri",
+              ImmutableMap.of(
+                  "gcs.oauth2.token", "refreshedToken", 
"gcs.oauth2.token-expires-at", "2000"));
+
+      fileIO.setCredentials(ImmutableList.of(refreshedCredential));
+
+      Storage refreshedClient = fileIO.client("gs://custom-uri/table1");
+      assertThat(refreshedClient).isNotSameAs(initialClient);
+      assertThat(refreshedClient.getOptions().getCredentials())
+          .isInstanceOf(OAuth2Credentials.class)
+          .extracting("value")
+          .extracting("temporaryAccess")
+          .isEqualTo(new AccessToken("refreshedToken", new Date(2000L)));
+    }
+  }
+
   @ParameterizedTest
   @MethodSource("org.apache.iceberg.TestHelpers#serializers")
   public void resolvingFileIOLoadWithoutStorageCredentials(
diff --git 
a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGCSFileIOCredentialRefresh.java
 
b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGCSFileIOCredentialRefresh.java
new file mode 100644
index 0000000000..d0c05483ad
--- /dev/null
+++ 
b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGCSFileIOCredentialRefresh.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.gcp.gcs;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockserver.integration.ClientAndServer.startClientAndServer;
+import static org.mockserver.model.HttpRequest.request;
+import static org.mockserver.model.HttpResponse.response;
+
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.StorageCredential;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.rest.HttpMethod;
+import org.apache.iceberg.rest.credentials.ImmutableCredential;
+import org.apache.iceberg.rest.responses.ImmutableLoadCredentialsResponse;
+import org.apache.iceberg.rest.responses.LoadCredentialsResponse;
+import org.apache.iceberg.rest.responses.LoadCredentialsResponseParser;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockserver.integration.ClientAndServer;
+import org.mockserver.model.HttpRequest;
+import org.mockserver.model.HttpResponse;
+import org.mockserver.verify.VerificationTimes;
+
+class TestGCSFileIOCredentialRefresh {
+
+  private static ClientAndServer mockServer;
+  private static String credentialsUri;
+  private static String catalogUri;
+
+  @BeforeAll
+  static void beforeAll() {
+    mockServer = startClientAndServer(0);
+    int port = mockServer.getPort();
+    credentialsUri = String.format("http://127.0.0.1:%d/v1/credentials";, port);
+    catalogUri = String.format("http://127.0.0.1:%d/v1";, port);
+  }
+
+  @AfterAll
+  static void stopServer() {
+    mockServer.stop();
+  }
+
+  @BeforeEach
+  void before() {
+    mockServer.reset();
+  }
+
+  @Test
+  void credentialRefreshSchedulesNextRefresh() {
+    String nearExpiryMs = Long.toString(Instant.now().plus(3, 
ChronoUnit.MINUTES).toEpochMilli());
+
+    StorageCredential initialCredential =
+        StorageCredential.create(
+            "gs://bucket/path",
+            ImmutableMap.of(
+                GCPProperties.GCS_OAUTH2_TOKEN,
+                "initialToken",
+                GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT,
+                nearExpiryMs));
+
+    // return credentials that also expire within 5 minutes so the next 
refresh fires immediately
+    String firstRefreshExpiryMs =
+        Long.toString(Instant.now().plus(2, 
ChronoUnit.MINUTES).toEpochMilli());
+    String secondRefreshExpiryMs =
+        Long.toString(Instant.now().plus(1, ChronoUnit.HOURS).toEpochMilli());
+
+    LoadCredentialsResponse firstRefreshResponse =
+        ImmutableLoadCredentialsResponse.builder()
+            .addCredentials(
+                ImmutableCredential.builder()
+                    .prefix("gs://bucket/path")
+                    .config(
+                        ImmutableMap.of(
+                            GCPProperties.GCS_OAUTH2_TOKEN,
+                            "firstRefreshedToken",
+                            GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT,
+                            firstRefreshExpiryMs))
+                    .build())
+            .build();
+
+    LoadCredentialsResponse secondRefreshResponse =
+        ImmutableLoadCredentialsResponse.builder()
+            .addCredentials(
+                ImmutableCredential.builder()
+                    .prefix("gs://bucket/path")
+                    .config(
+                        ImmutableMap.of(
+                            GCPProperties.GCS_OAUTH2_TOKEN,
+                            "secondRefreshedToken",
+                            GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT,
+                            secondRefreshExpiryMs))
+                    .build())
+            .build();
+
+    HttpRequest mockRequest = 
request("/v1/credentials").withMethod(HttpMethod.GET.name());
+    mockServer
+        .when(mockRequest, org.mockserver.matchers.Times.once())
+        .respond(
+            
response(LoadCredentialsResponseParser.toJson(firstRefreshResponse))
+                .withStatusCode(200));
+    mockServer
+        .when(mockRequest, org.mockserver.matchers.Times.unlimited())
+        .respond(
+            
response(LoadCredentialsResponseParser.toJson(secondRefreshResponse))
+                .withStatusCode(200));
+
+    Map<String, String> properties =
+        ImmutableMap.of(
+            GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT,
+            credentialsUri,
+            CatalogProperties.URI,
+            catalogUri);
+
+    try (GCSFileIO fileIO = new GCSFileIO()) {
+      fileIO.initialize(properties);
+      fileIO.setCredentials(List.of(initialCredential));
+
+      fileIO.client();
+
+      // the first refresh returns near-expiry credentials, which should 
schedule a second refresh
+      Awaitility.await()
+          .atMost(30, TimeUnit.SECONDS)
+          .untilAsserted(() -> mockServer.verify(mockRequest, 
VerificationTimes.atLeast(2)));
+
+      Awaitility.await()
+          .atMost(10, TimeUnit.SECONDS)
+          .untilAsserted(
+              () -> {
+                List<StorageCredential> credentials = fileIO.credentials();
+                assertThat(credentials).hasSize(1);
+                assertThat(credentials.get(0).config())
+                    .containsEntry(GCPProperties.GCS_OAUTH2_TOKEN, 
"secondRefreshedToken")
+                    .containsEntry(
+                        GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT, 
secondRefreshExpiryMs);
+              });
+    }
+  }
+
+  @Test
+  void credentialRefreshWithinFiveMinuteWindow() {
+    String nearExpiryMs = Long.toString(Instant.now().plus(3, 
ChronoUnit.MINUTES).toEpochMilli());
+
+    StorageCredential initialCredential =
+        StorageCredential.create(
+            "gs://bucket/path",
+            ImmutableMap.of(
+                GCPProperties.GCS_OAUTH2_TOKEN,
+                "initialToken",
+                GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT,
+                nearExpiryMs));
+
+    String refreshedExpiryMs =
+        Long.toString(Instant.now().plus(1, ChronoUnit.HOURS).toEpochMilli());
+    LoadCredentialsResponse refreshResponse =
+        ImmutableLoadCredentialsResponse.builder()
+            .addCredentials(
+                ImmutableCredential.builder()
+                    .prefix("gs://bucket/path")
+                    .config(
+                        ImmutableMap.of(
+                            GCPProperties.GCS_OAUTH2_TOKEN,
+                            "refreshedToken",
+                            GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT,
+                            refreshedExpiryMs))
+                    .build())
+            .build();
+
+    HttpRequest mockRequest = 
request("/v1/credentials").withMethod(HttpMethod.GET.name());
+    HttpResponse mockResponse =
+        
response(LoadCredentialsResponseParser.toJson(refreshResponse)).withStatusCode(200);
+    mockServer.when(mockRequest).respond(mockResponse);
+
+    Map<String, String> properties =
+        ImmutableMap.of(
+            GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT,
+            credentialsUri,
+            CatalogProperties.URI,
+            catalogUri);
+
+    try (GCSFileIO fileIO = new GCSFileIO()) {
+      fileIO.initialize(properties);
+      fileIO.setCredentials(List.of(initialCredential));
+
+      // trigger storageByPrefix() to build the client map and schedule the 
refresh
+      fileIO.client();
+
+      Awaitility.await()
+          .atMost(10, TimeUnit.SECONDS)
+          .untilAsserted(() -> mockServer.verify(mockRequest, 
VerificationTimes.atLeast(1)));
+
+      Awaitility.await()
+          .atMost(10, TimeUnit.SECONDS)
+          .untilAsserted(
+              () -> {
+                List<StorageCredential> credentials = fileIO.credentials();
+                assertThat(credentials).hasSize(1);
+                assertThat(credentials.get(0).config())
+                    .containsEntry(GCPProperties.GCS_OAUTH2_TOKEN, 
"refreshedToken")
+                    .containsEntry(GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT, 
refreshedExpiryMs);
+              });
+    }
+  }
+}

Reply via email to