This is an automated email from the ASF dual-hosted git repository.
dweeks pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 19b4189060 GCP: Add scheduled refresh for storage credentials held by
GCSFileIO (#15696)
19b4189060 is described below
commit 19b418906043dbfc2aba7d8947404f778414b71a
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Mon Mar 23 19:41:12 2026 +0100
GCP: Add scheduled refresh for storage credentials held by GCSFileIO
(#15696)
GCP: Add scheduled refresh for storage credentials held by GCSFileIO
---
build.gradle | 1 +
.../java/org/apache/iceberg/gcp/gcs/GCSFileIO.java | 86 +++++++-
.../gcp/gcs/OAuth2RefreshCredentialsHandler.java | 19 +-
.../org/apache/iceberg/gcp/gcs/TestGCSFileIO.java | 39 ++++
.../gcp/gcs/TestGCSFileIOCredentialRefresh.java | 229 +++++++++++++++++++++
5 files changed, 365 insertions(+), 9 deletions(-)
diff --git a/build.gradle b/build.gradle
index 4cf6cf2ad9..2f5f1fa545 100644
--- a/build.gradle
+++ b/build.gradle
@@ -749,6 +749,7 @@ project(':iceberg-gcp') {
exclude group: 'javax.servlet', module: 'servlet-api'
exclude group: 'com.google.code.gson', module: 'gson'
}
+ testImplementation libs.awaitility
testImplementation libs.esotericsoftware.kryo
testImplementation libs.mockserver.netty
testImplementation libs.mockserver.client.java
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
index 188eddbfbb..5e040943f7 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
@@ -23,12 +23,21 @@ import com.google.api.client.util.Maps;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Storage;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.common.DynConstructors;
+import org.apache.iceberg.gcp.GCPProperties;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.DelegateFileIO;
import org.apache.iceberg.io.FileInfo;
@@ -44,6 +53,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Iterators;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.util.SerializableMap;
import org.apache.iceberg.util.SerializableSupplier;
+import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,14 +74,16 @@ public class GCSFileIO implements DelegateFileIO,
SupportsStorageCredentials {
private static final String DEFAULT_METRICS_IMPL =
"org.apache.iceberg.hadoop.HadoopMetricsContext";
private static final String ROOT_STORAGE_PREFIX = "gs";
+ private static volatile ScheduledExecutorService executorService;
private SerializableSupplier<Storage> storageSupplier;
private MetricsContext metrics = MetricsContext.nullMetrics();
private final AtomicBoolean isResourceClosed = new AtomicBoolean(false);
private SerializableMap<String, String> properties = null;
// use modifiable collection for Kryo serde
- private List<StorageCredential> storageCredentials = Lists.newArrayList();
+ private volatile List<StorageCredential> storageCredentials =
Lists.newArrayList();
private transient volatile Map<String, PrefixedStorage> storageByPrefix;
+ private transient volatile ScheduledFuture<?> refreshFuture;
/**
* No-arg constructor to load the FileIO dynamically.
@@ -199,6 +211,7 @@ public class GCSFileIO implements DelegateFileIO,
SupportsStorageCredentials {
storageSupplier));
});
this.storageByPrefix = localStorageByPrefix;
+ scheduleCredentialRefresh();
}
}
}
@@ -206,6 +219,60 @@ public class GCSFileIO implements DelegateFileIO,
SupportsStorageCredentials {
return storageByPrefix;
}
+ private void scheduleCredentialRefresh() {
+ storageCredentials.stream()
+ .map(
+ storageCredential ->
+
storageCredential.config().get(GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT))
+ .filter(Objects::nonNull)
+ .map(expiresAtString ->
Instant.ofEpochMilli(Long.parseLong(expiresAtString)))
+ .min(Comparator.naturalOrder())
+ .ifPresent(
+ expiresAt -> {
+ Instant prefetchAt = expiresAt.minus(5, ChronoUnit.MINUTES);
+ long delay = Duration.between(Instant.now(),
prefetchAt).toMillis();
+ this.refreshFuture =
+ executorService()
+ .schedule(this::refreshStorageCredentials, delay,
TimeUnit.MILLISECONDS);
+ });
+ }
+
+ private void refreshStorageCredentials() {
+ if (isResourceClosed.get()) {
+ return;
+ }
+
+ try (OAuth2RefreshCredentialsHandler handler =
+ OAuth2RefreshCredentialsHandler.create(properties)) {
+ List<StorageCredential> refreshed =
+ handler.fetchCredentials().credentials().stream()
+ .filter(c -> c.prefix().startsWith(ROOT_STORAGE_PREFIX))
+ .map(c -> StorageCredential.create(c.prefix(), c.config()))
+ .toList();
+
+ if (!refreshed.isEmpty() && !isResourceClosed.get()) {
+ this.storageCredentials = Lists.newArrayList(refreshed);
+ scheduleCredentialRefresh();
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to refresh storage credentials", e);
+ }
+ }
+
+ private ScheduledExecutorService executorService() {
+ if (executorService == null) {
+ synchronized (GCSFileIO.class) {
+ if (executorService == null) {
+ executorService =
+ ThreadPools.newExitingScheduledPool(
+ "iceberg-gcsfileio-tasks", 1, Duration.ofSeconds(10));
+ }
+ }
+ }
+
+ return executorService;
+ }
+
@Override
public void close() {
// handles concurrent calls to close()
@@ -214,6 +281,10 @@ public class GCSFileIO implements DelegateFileIO,
SupportsStorageCredentials {
storageByPrefix.values().forEach(PrefixedStorage::close);
this.storageByPrefix = null;
}
+ if (refreshFuture != null) {
+ refreshFuture.cancel(true);
+ refreshFuture = null;
+ }
}
}
@@ -271,8 +342,21 @@ public class GCSFileIO implements DelegateFileIO,
SupportsStorageCredentials {
@Override
public void setCredentials(List<StorageCredential> credentials) {
Preconditions.checkArgument(credentials != null, "Invalid storage
credentials: null");
+ // stop any refresh that might be scheduled
+ if (refreshFuture != null) {
+ refreshFuture.cancel(true);
+ }
+
// copy credentials into a modifiable collection for Kryo serde
this.storageCredentials = Lists.newArrayList(credentials);
+
+ // if the clients are already initialized, we need to close and allow them
to be recreated
+ synchronized (this) {
+ if (storageByPrefix != null) {
+ storageByPrefix.values().forEach(PrefixedStorage::close);
+ this.storageByPrefix = null;
+ }
+ }
}
@Override
diff --git
a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandler.java
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandler.java
index d42c131b9e..c0ff1194d9 100644
---
a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandler.java
+++
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandler.java
@@ -68,14 +68,7 @@ public class OAuth2RefreshCredentialsHandler
@SuppressWarnings("JavaUtilDate") // GCP API uses java.util.Date
@Override
public AccessToken refreshAccessToken() {
- LoadCredentialsResponse response =
- httpClient()
- .get(
- credentialsEndpoint,
- null != planId ? Map.of("planId", planId) : null,
- LoadCredentialsResponse.class,
- Map.of(),
- ErrorHandlers.defaultErrorHandler());
+ LoadCredentialsResponse response = fetchCredentials();
List<Credential> gcsCredentials =
response.credentials().stream()
@@ -96,6 +89,16 @@ public class OAuth2RefreshCredentialsHandler
return new AccessToken(token, new Date(Long.parseLong(expiresAt)));
}
+ LoadCredentialsResponse fetchCredentials() {
+ return httpClient()
+ .get(
+ credentialsEndpoint,
+ null != planId ? Map.of("planId", planId) : null,
+ LoadCredentialsResponse.class,
+ Map.of(),
+ ErrorHandlers.defaultErrorHandler());
+ }
+
private void checkCredential(Credential gcsCredential, String
gcsOauth2Token) {
Preconditions.checkState(
gcsCredential.config().containsKey(gcsOauth2Token),
diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGCSFileIO.java
b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGCSFileIO.java
index fde0038b27..f6841664e0 100644
--- a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGCSFileIO.java
+++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGCSFileIO.java
@@ -488,6 +488,45 @@ public class TestGCSFileIO {
.isEqualTo(new AccessToken("gcsTokenFromCredential", new Date(2000L)));
}
+ @Test
+ public void setCredentialsRefreshesClients() {
+ StorageCredential initialCredential =
+ StorageCredential.create(
+ "gs://custom-uri",
+ ImmutableMap.of(
+ "gcs.oauth2.token", "initialToken",
"gcs.oauth2.token-expires-at", "1000"));
+
+ try (GCSFileIO fileIO = new GCSFileIO()) {
+ fileIO.setCredentials(ImmutableList.of(initialCredential));
+ fileIO.initialize(
+ ImmutableMap.of(
+ GCS_OAUTH2_TOKEN, "gcsTokenFromProperties",
GCS_OAUTH2_TOKEN_EXPIRES_AT, "500"));
+
+ Storage initialClient = fileIO.client("gs://custom-uri/table1");
+ assertThat(initialClient.getOptions().getCredentials())
+ .isInstanceOf(OAuth2Credentials.class)
+ .extracting("value")
+ .extracting("temporaryAccess")
+ .isEqualTo(new AccessToken("initialToken", new Date(1000L)));
+
+ StorageCredential refreshedCredential =
+ StorageCredential.create(
+ "gs://custom-uri",
+ ImmutableMap.of(
+ "gcs.oauth2.token", "refreshedToken",
"gcs.oauth2.token-expires-at", "2000"));
+
+ fileIO.setCredentials(ImmutableList.of(refreshedCredential));
+
+ Storage refreshedClient = fileIO.client("gs://custom-uri/table1");
+ assertThat(refreshedClient).isNotSameAs(initialClient);
+ assertThat(refreshedClient.getOptions().getCredentials())
+ .isInstanceOf(OAuth2Credentials.class)
+ .extracting("value")
+ .extracting("temporaryAccess")
+ .isEqualTo(new AccessToken("refreshedToken", new Date(2000L)));
+ }
+ }
+
@ParameterizedTest
@MethodSource("org.apache.iceberg.TestHelpers#serializers")
public void resolvingFileIOLoadWithoutStorageCredentials(
diff --git
a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGCSFileIOCredentialRefresh.java
b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGCSFileIOCredentialRefresh.java
new file mode 100644
index 0000000000..d0c05483ad
--- /dev/null
+++
b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGCSFileIOCredentialRefresh.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.gcp.gcs;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockserver.integration.ClientAndServer.startClientAndServer;
+import static org.mockserver.model.HttpRequest.request;
+import static org.mockserver.model.HttpResponse.response;
+
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.StorageCredential;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.rest.HttpMethod;
+import org.apache.iceberg.rest.credentials.ImmutableCredential;
+import org.apache.iceberg.rest.responses.ImmutableLoadCredentialsResponse;
+import org.apache.iceberg.rest.responses.LoadCredentialsResponse;
+import org.apache.iceberg.rest.responses.LoadCredentialsResponseParser;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockserver.integration.ClientAndServer;
+import org.mockserver.model.HttpRequest;
+import org.mockserver.model.HttpResponse;
+import org.mockserver.verify.VerificationTimes;
+
+class TestGCSFileIOCredentialRefresh {
+
+ private static ClientAndServer mockServer;
+ private static String credentialsUri;
+ private static String catalogUri;
+
+ @BeforeAll
+ static void beforeAll() {
+ mockServer = startClientAndServer(0);
+ int port = mockServer.getPort();
+ credentialsUri = String.format("http://127.0.0.1:%d/v1/credentials", port);
+ catalogUri = String.format("http://127.0.0.1:%d/v1", port);
+ }
+
+ @AfterAll
+ static void stopServer() {
+ mockServer.stop();
+ }
+
+ @BeforeEach
+ void before() {
+ mockServer.reset();
+ }
+
+ @Test
+ void credentialRefreshSchedulesNextRefresh() {
+ String nearExpiryMs = Long.toString(Instant.now().plus(3,
ChronoUnit.MINUTES).toEpochMilli());
+
+ StorageCredential initialCredential =
+ StorageCredential.create(
+ "gs://bucket/path",
+ ImmutableMap.of(
+ GCPProperties.GCS_OAUTH2_TOKEN,
+ "initialToken",
+ GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT,
+ nearExpiryMs));
+
+ // return credentials that also expire within 5 minutes so the next
refresh fires immediately
+ String firstRefreshExpiryMs =
+ Long.toString(Instant.now().plus(2,
ChronoUnit.MINUTES).toEpochMilli());
+ String secondRefreshExpiryMs =
+ Long.toString(Instant.now().plus(1, ChronoUnit.HOURS).toEpochMilli());
+
+ LoadCredentialsResponse firstRefreshResponse =
+ ImmutableLoadCredentialsResponse.builder()
+ .addCredentials(
+ ImmutableCredential.builder()
+ .prefix("gs://bucket/path")
+ .config(
+ ImmutableMap.of(
+ GCPProperties.GCS_OAUTH2_TOKEN,
+ "firstRefreshedToken",
+ GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT,
+ firstRefreshExpiryMs))
+ .build())
+ .build();
+
+ LoadCredentialsResponse secondRefreshResponse =
+ ImmutableLoadCredentialsResponse.builder()
+ .addCredentials(
+ ImmutableCredential.builder()
+ .prefix("gs://bucket/path")
+ .config(
+ ImmutableMap.of(
+ GCPProperties.GCS_OAUTH2_TOKEN,
+ "secondRefreshedToken",
+ GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT,
+ secondRefreshExpiryMs))
+ .build())
+ .build();
+
+ HttpRequest mockRequest =
request("/v1/credentials").withMethod(HttpMethod.GET.name());
+ mockServer
+ .when(mockRequest, org.mockserver.matchers.Times.once())
+ .respond(
+
response(LoadCredentialsResponseParser.toJson(firstRefreshResponse))
+ .withStatusCode(200));
+ mockServer
+ .when(mockRequest, org.mockserver.matchers.Times.unlimited())
+ .respond(
+
response(LoadCredentialsResponseParser.toJson(secondRefreshResponse))
+ .withStatusCode(200));
+
+ Map<String, String> properties =
+ ImmutableMap.of(
+ GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT,
+ credentialsUri,
+ CatalogProperties.URI,
+ catalogUri);
+
+ try (GCSFileIO fileIO = new GCSFileIO()) {
+ fileIO.initialize(properties);
+ fileIO.setCredentials(List.of(initialCredential));
+
+ fileIO.client();
+
+ // the first refresh returns near-expiry credentials, which should
schedule a second refresh
+ Awaitility.await()
+ .atMost(30, TimeUnit.SECONDS)
+ .untilAsserted(() -> mockServer.verify(mockRequest,
VerificationTimes.atLeast(2)));
+
+ Awaitility.await()
+ .atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ List<StorageCredential> credentials = fileIO.credentials();
+ assertThat(credentials).hasSize(1);
+ assertThat(credentials.get(0).config())
+ .containsEntry(GCPProperties.GCS_OAUTH2_TOKEN,
"secondRefreshedToken")
+ .containsEntry(
+ GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT,
secondRefreshExpiryMs);
+ });
+ }
+ }
+
+ @Test
+ void credentialRefreshWithinFiveMinuteWindow() {
+ String nearExpiryMs = Long.toString(Instant.now().plus(3,
ChronoUnit.MINUTES).toEpochMilli());
+
+ StorageCredential initialCredential =
+ StorageCredential.create(
+ "gs://bucket/path",
+ ImmutableMap.of(
+ GCPProperties.GCS_OAUTH2_TOKEN,
+ "initialToken",
+ GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT,
+ nearExpiryMs));
+
+ String refreshedExpiryMs =
+ Long.toString(Instant.now().plus(1, ChronoUnit.HOURS).toEpochMilli());
+ LoadCredentialsResponse refreshResponse =
+ ImmutableLoadCredentialsResponse.builder()
+ .addCredentials(
+ ImmutableCredential.builder()
+ .prefix("gs://bucket/path")
+ .config(
+ ImmutableMap.of(
+ GCPProperties.GCS_OAUTH2_TOKEN,
+ "refreshedToken",
+ GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT,
+ refreshedExpiryMs))
+ .build())
+ .build();
+
+ HttpRequest mockRequest =
request("/v1/credentials").withMethod(HttpMethod.GET.name());
+ HttpResponse mockResponse =
+
response(LoadCredentialsResponseParser.toJson(refreshResponse)).withStatusCode(200);
+ mockServer.when(mockRequest).respond(mockResponse);
+
+ Map<String, String> properties =
+ ImmutableMap.of(
+ GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT,
+ credentialsUri,
+ CatalogProperties.URI,
+ catalogUri);
+
+ try (GCSFileIO fileIO = new GCSFileIO()) {
+ fileIO.initialize(properties);
+ fileIO.setCredentials(List.of(initialCredential));
+
+ // trigger storageByPrefix() to build the client map and schedule the
refresh
+ fileIO.client();
+
+ Awaitility.await()
+ .atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() -> mockServer.verify(mockRequest,
VerificationTimes.atLeast(1)));
+
+ Awaitility.await()
+ .atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ List<StorageCredential> credentials = fileIO.credentials();
+ assertThat(credentials).hasSize(1);
+ assertThat(credentials.get(0).config())
+ .containsEntry(GCPProperties.GCS_OAUTH2_TOKEN,
"refreshedToken")
+ .containsEntry(GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT,
refreshedExpiryMs);
+ });
+ }
+ }
+}