This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 5359bea71b GCS: Refresh vended credentials (#11282)
5359bea71b is described below
commit 5359bea71bceaad35897ac82573d73e3c7e47f71
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Wed Oct 30 07:09:42 2024 +0100
GCS: Refresh vended credentials (#11282)
---
build.gradle | 3 +
.../java/org/apache/iceberg/gcp/GCPProperties.java | 20 ++
.../java/org/apache/iceberg/gcp/gcs/GCSFileIO.java | 12 +-
.../gcp/gcs/OAuth2RefreshCredentialsHandler.java | 99 ++++++++
.../org/apache/iceberg/gcp/GCPPropertiesTest.java | 30 +++
.../org/apache/iceberg/gcp/gcs/GCSFileIOTest.java | 47 ++++
.../gcs/OAuth2RefreshCredentialsHandlerTest.java | 264 +++++++++++++++++++++
7 files changed, 474 insertions(+), 1 deletion(-)
diff --git a/build.gradle b/build.gradle
index fe8392279e..1a05f83f7d 100644
--- a/build.gradle
+++ b/build.gradle
@@ -644,6 +644,7 @@ project(':iceberg-gcp') {
testImplementation "com.google.cloud:google-cloud-nio"
testImplementation project(path: ':iceberg-api', configuration:
'testArtifacts')
+ testImplementation project(path: ':iceberg-core', configuration:
'testArtifacts')
testImplementation(libs.hadoop2.common) {
exclude group: 'org.apache.avro', module: 'avro'
@@ -652,6 +653,8 @@ project(':iceberg-gcp') {
exclude group: 'com.google.code.gson', module: 'gson'
}
testImplementation libs.esotericsoftware.kryo
+ testImplementation libs.mockserver.netty
+ testImplementation libs.mockserver.client.java
}
}
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java
b/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java
index 4f60e2f91f..c03906ae5d 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java
@@ -43,6 +43,12 @@ public class GCPProperties implements Serializable {
public static final String GCS_OAUTH2_TOKEN_EXPIRES_AT =
"gcs.oauth2.token-expires-at";
// Boolean to explicitly configure "no authentication" for testing purposes
using a GCS emulator
public static final String GCS_NO_AUTH = "gcs.no-auth";
+ public static final String GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT =
+ "gcs.oauth2.refresh-credentials-endpoint";
+
+ /** Controls whether vended credentials should be refreshed or not. Defaults
to true. */
+ public static final String GCS_OAUTH2_REFRESH_CREDENTIALS_ENABLED =
+ "gcs.oauth2.refresh-credentials-enabled";
/** Configure the batch size used when deleting multiple files from a given
GCS bucket */
public static final String GCS_DELETE_BATCH_SIZE = "gcs.delete.batch-size";
@@ -67,6 +73,8 @@ public class GCPProperties implements Serializable {
private boolean gcsNoAuth;
private String gcsOAuth2Token;
private Date gcsOAuth2TokenExpiresAt;
+ private String gcsOauth2RefreshCredentialsEndpoint;
+ private boolean gcsOauth2RefreshCredentialsEnabled;
private int gcsDeleteBatchSize = GCS_DELETE_BATCH_SIZE_DEFAULT;
@@ -95,6 +103,10 @@ public class GCPProperties implements Serializable {
gcsOAuth2TokenExpiresAt =
new
Date(Long.parseLong(properties.get(GCS_OAUTH2_TOKEN_EXPIRES_AT)));
}
+
+ gcsOauth2RefreshCredentialsEndpoint =
properties.get(GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT);
+ gcsOauth2RefreshCredentialsEnabled =
+ PropertyUtil.propertyAsBoolean(properties,
GCS_OAUTH2_REFRESH_CREDENTIALS_ENABLED, true);
gcsNoAuth = Boolean.parseBoolean(properties.getOrDefault(GCS_NO_AUTH,
"false"));
Preconditions.checkState(
!(gcsOAuth2Token != null && gcsNoAuth),
@@ -154,4 +166,12 @@ public class GCPProperties implements Serializable {
public int deleteBatchSize() {
return gcsDeleteBatchSize;
}
+
+ public Optional<String> oauth2RefreshCredentialsEndpoint() {
+ return Optional.ofNullable(gcsOauth2RefreshCredentialsEndpoint);
+ }
+
+ public boolean oauth2RefreshCredentialsEnabled() {
+ return gcsOauth2RefreshCredentialsEnabled;
+ }
}
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
index 2201c876bd..5737606aef 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.gcp.gcs;
import com.google.auth.oauth2.AccessToken;
import com.google.auth.oauth2.OAuth2Credentials;
+import com.google.auth.oauth2.OAuth2CredentialsWithRefresh;
import com.google.cloud.NoCredentials;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
@@ -156,7 +157,16 @@ public class GCSFileIO implements DelegateFileIO {
// Explicitly configure an OAuth token.
AccessToken accessToken =
new AccessToken(token,
gcpProperties.oauth2TokenExpiresAt().orElse(null));
-
builder.setCredentials(OAuth2Credentials.create(accessToken));
+ if (gcpProperties.oauth2RefreshCredentialsEnabled()
+ &&
gcpProperties.oauth2RefreshCredentialsEndpoint().isPresent()) {
+ builder.setCredentials(
+ OAuth2CredentialsWithRefresh.newBuilder()
+ .setAccessToken(accessToken)
+
.setRefreshHandler(OAuth2RefreshCredentialsHandler.create(properties))
+ .build());
+ } else {
+
builder.setCredentials(OAuth2Credentials.create(accessToken));
+ }
});
return builder.build().getService();
diff --git
a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandler.java
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandler.java
new file mode 100644
index 0000000000..611e7baaec
--- /dev/null
+++
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandler.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.auth.oauth2.AccessToken;
+import com.google.auth.oauth2.OAuth2CredentialsWithRefresh;
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.rest.ErrorHandlers;
+import org.apache.iceberg.rest.HTTPClient;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.auth.OAuth2Properties;
+import org.apache.iceberg.rest.auth.OAuth2Util;
+import org.apache.iceberg.rest.credentials.Credential;
+import org.apache.iceberg.rest.responses.LoadCredentialsResponse;
+
+public class OAuth2RefreshCredentialsHandler
+ implements OAuth2CredentialsWithRefresh.OAuth2RefreshHandler {
+ private final Map<String, String> properties;
+
+ private OAuth2RefreshCredentialsHandler(Map<String, String> properties) {
+ Preconditions.checkArgument(
+ null !=
properties.get(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT),
+ "Invalid credentials endpoint: null");
+ this.properties = properties;
+ }
+
+ @Override
+ public AccessToken refreshAccessToken() {
+ LoadCredentialsResponse response;
+ try (RESTClient client = httpClient()) {
+ response =
+ client.get(
+
properties.get(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT),
+ null,
+ LoadCredentialsResponse.class,
+ OAuth2Util.authHeaders(properties.get(OAuth2Properties.TOKEN)),
+ ErrorHandlers.defaultErrorHandler());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ List<Credential> gcsCredentials =
+ response.credentials().stream()
+ .filter(c -> c.prefix().startsWith("gs"))
+ .collect(Collectors.toList());
+
+ Preconditions.checkState(!gcsCredentials.isEmpty(), "Invalid GCS
Credentials: empty");
+ Preconditions.checkState(
+ gcsCredentials.size() == 1,
+ "Invalid GCS Credentials: only one GCS credential should exist");
+
+ Credential gcsCredential = gcsCredentials.get(0);
+ checkCredential(gcsCredential, GCPProperties.GCS_OAUTH2_TOKEN);
+ checkCredential(gcsCredential, GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT);
+ String token = gcsCredential.config().get(GCPProperties.GCS_OAUTH2_TOKEN);
+ String expiresAt =
gcsCredential.config().get(GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT);
+
+ return new AccessToken(token, new Date(Long.parseLong(expiresAt)));
+ }
+
+ private void checkCredential(Credential gcsCredential, String
gcsOauth2Token) {
+ Preconditions.checkState(
+ gcsCredential.config().containsKey(gcsOauth2Token),
+ "Invalid GCS Credentials: %s not set",
+ gcsOauth2Token);
+ }
+
+ public static OAuth2RefreshCredentialsHandler create(Map<String, String>
properties) {
+ return new OAuth2RefreshCredentialsHandler(properties);
+ }
+
+ private RESTClient httpClient() {
+ return HTTPClient.builder(properties)
+
.uri(properties.get(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT))
+ .build();
+ }
+}
diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/GCPPropertiesTest.java
b/gcp/src/test/java/org/apache/iceberg/gcp/GCPPropertiesTest.java
index c71b558287..61bd069f0c 100644
--- a/gcp/src/test/java/org/apache/iceberg/gcp/GCPPropertiesTest.java
+++ b/gcp/src/test/java/org/apache/iceberg/gcp/GCPPropertiesTest.java
@@ -19,6 +19,8 @@
package org.apache.iceberg.gcp;
import static org.apache.iceberg.gcp.GCPProperties.GCS_NO_AUTH;
+import static
org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENABLED;
+import static
org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT;
import static org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_TOKEN;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
@@ -47,4 +49,32 @@ public class GCPPropertiesTest {
assertThat(gcpProperties.noAuth()).isTrue();
assertThat(gcpProperties.oauth2Token()).isNotPresent();
}
+
+ @Test
+ public void refreshCredentialsEndpointSet() {
+ GCPProperties gcpProperties =
+ new GCPProperties(
+ ImmutableMap.of(GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT,
"/v1/credentials"));
+ assertThat(gcpProperties.oauth2RefreshCredentialsEnabled()).isTrue();
+ assertThat(gcpProperties.oauth2RefreshCredentialsEndpoint())
+ .isPresent()
+ .get()
+ .isEqualTo("/v1/credentials");
+ }
+
+ @Test
+ public void refreshCredentialsEndpointSetButRefreshDisabled() {
+ GCPProperties gcpProperties =
+ new GCPProperties(
+ ImmutableMap.of(
+ GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT,
+ "/v1/credentials",
+ GCS_OAUTH2_REFRESH_CREDENTIALS_ENABLED,
+ "false"));
+ assertThat(gcpProperties.oauth2RefreshCredentialsEnabled()).isFalse();
+ assertThat(gcpProperties.oauth2RefreshCredentialsEndpoint())
+ .isPresent()
+ .get()
+ .isEqualTo("/v1/credentials");
+ }
}
diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java
b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java
index fbc3fe7114..6302f664b7 100644
--- a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java
+++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java
@@ -19,11 +19,17 @@
package org.apache.iceberg.gcp.gcs;
import static java.lang.String.format;
+import static
org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENABLED;
+import static
org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT;
+import static org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_TOKEN;
+import static org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
+import com.google.auth.oauth2.OAuth2Credentials;
+import com.google.auth.oauth2.OAuth2CredentialsWithRefresh;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
@@ -32,6 +38,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Random;
import java.util.stream.StreamSupport;
@@ -223,4 +231,43 @@ public class GCSFileIOTest {
.invoke("gs://foo/bar");
assertThat(result).isInstanceOf(GCSFileIO.class);
}
+
+ @Test
+ public void refreshCredentialsEndpointSet() {
+ Storage client;
+ try (GCSFileIO fileIO = new GCSFileIO()) {
+ fileIO.initialize(
+ ImmutableMap.of(
+ GCS_OAUTH2_TOKEN,
+ "gcsToken",
+ GCS_OAUTH2_TOKEN_EXPIRES_AT,
+ Long.toString(Instant.now().plus(5,
ChronoUnit.MINUTES).toEpochMilli()),
+ GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT,
+ "/v1/credentials"));
+ client = fileIO.client();
+ }
+
+ assertThat(client.getOptions().getCredentials())
+ .isInstanceOf(OAuth2CredentialsWithRefresh.class);
+ }
+
+ @Test
+ public void refreshCredentialsEndpointSetButRefreshDisabled() {
+ Storage client;
+ try (GCSFileIO fileIO = new GCSFileIO()) {
+ fileIO.initialize(
+ ImmutableMap.of(
+ GCS_OAUTH2_TOKEN,
+ "gcsTokenWithoutRefresh",
+ GCS_OAUTH2_TOKEN_EXPIRES_AT,
+ Long.toString(Instant.now().plus(5,
ChronoUnit.MINUTES).toEpochMilli()),
+ GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT,
+ "/v1/credentials",
+ GCS_OAUTH2_REFRESH_CREDENTIALS_ENABLED,
+ "false"));
+ client = fileIO.client();
+ }
+
+
assertThat(client.getOptions().getCredentials()).isInstanceOf(OAuth2Credentials.class);
+ }
}
diff --git
a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandlerTest.java
b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandlerTest.java
new file mode 100644
index 0000000000..c538745f27
--- /dev/null
+++
b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandlerTest.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.gcp.gcs;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockserver.integration.ClientAndServer.startClientAndServer;
+
+import com.google.auth.oauth2.AccessToken;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import org.apache.iceberg.exceptions.BadRequestException;
+import org.apache.iceberg.exceptions.RESTException;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.rest.HttpMethod;
+import org.apache.iceberg.rest.credentials.Credential;
+import org.apache.iceberg.rest.credentials.ImmutableCredential;
+import org.apache.iceberg.rest.responses.ImmutableLoadCredentialsResponse;
+import org.apache.iceberg.rest.responses.LoadCredentialsResponseParser;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockserver.integration.ClientAndServer;
+import org.mockserver.model.HttpRequest;
+import org.mockserver.model.HttpResponse;
+import org.mockserver.verify.VerificationTimes;
+
+public class OAuth2RefreshCredentialsHandlerTest {
+ private static final int PORT = 3333;
+ private static final String URI =
String.format("http://127.0.0.1:%d/v1/credentials", PORT);
+ private static ClientAndServer mockServer;
+
+ @BeforeAll
+ public static void beforeAll() {
+ mockServer = startClientAndServer(PORT);
+ }
+
+ @AfterAll
+ public static void stopServer() {
+ mockServer.stop();
+ }
+
+ @BeforeEach
+ public void before() {
+ mockServer.reset();
+ }
+
+ @Test
+ public void invalidOrMissingUri() {
+ assertThatThrownBy(() ->
OAuth2RefreshCredentialsHandler.create(ImmutableMap.of()))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid credentials endpoint: null");
+
+ assertThatThrownBy(
+ () ->
+ OAuth2RefreshCredentialsHandler.create(
+ ImmutableMap.of(
+
GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, "invalid uri"))
+ .refreshAccessToken())
+ .isInstanceOf(RESTException.class)
+ .hasMessageStartingWith("Failed to create request URI from base
invalid uri");
+ }
+
+ @Test
+ public void badRequest() {
+ HttpRequest mockRequest =
+
HttpRequest.request("/v1/credentials").withMethod(HttpMethod.GET.name());
+
+ HttpResponse mockResponse = HttpResponse.response().withStatusCode(400);
+ mockServer.when(mockRequest).respond(mockResponse);
+
+ OAuth2RefreshCredentialsHandler handler =
+ OAuth2RefreshCredentialsHandler.create(
+
ImmutableMap.of(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, URI));
+
+ assertThatThrownBy(handler::refreshAccessToken)
+ .isInstanceOf(BadRequestException.class)
+ .hasMessageStartingWith("Malformed request");
+ }
+
+ @Test
+ public void noGcsCredentialInResponse() {
+ HttpRequest mockRequest =
+
HttpRequest.request("/v1/credentials").withMethod(HttpMethod.GET.name());
+
+ HttpResponse mockResponse =
+ HttpResponse.response(
+ LoadCredentialsResponseParser.toJson(
+ ImmutableLoadCredentialsResponse.builder().build()))
+ .withStatusCode(200);
+ mockServer.when(mockRequest).respond(mockResponse);
+
+ OAuth2RefreshCredentialsHandler handler =
+ OAuth2RefreshCredentialsHandler.create(
+
ImmutableMap.of(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, URI));
+
+ assertThatThrownBy(handler::refreshAccessToken)
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage("Invalid GCS Credentials: empty");
+ }
+
+ @Test
+ public void noGcsToken() {
+ HttpRequest mockRequest =
+
HttpRequest.request("/v1/credentials").withMethod(HttpMethod.GET.name());
+
+ Credential credential =
+ ImmutableCredential.builder()
+ .prefix("gs")
+ .config(ImmutableMap.of(GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT,
"1000"))
+ .build();
+ HttpResponse mockResponse =
+ HttpResponse.response(
+ LoadCredentialsResponseParser.toJson(
+
ImmutableLoadCredentialsResponse.builder().addCredentials(credential).build()))
+ .withStatusCode(200);
+ mockServer.when(mockRequest).respond(mockResponse);
+
+ OAuth2RefreshCredentialsHandler handler =
+ OAuth2RefreshCredentialsHandler.create(
+
ImmutableMap.of(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, URI));
+
+ assertThatThrownBy(handler::refreshAccessToken)
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage("Invalid GCS Credentials: gcs.oauth2.token not set");
+ }
+
+ @Test
+ public void tokenWithoutExpiration() {
+ HttpRequest mockRequest =
+
HttpRequest.request("/v1/credentials").withMethod(HttpMethod.GET.name());
+
+ Credential credential =
+ ImmutableCredential.builder()
+ .prefix("gs")
+ .config(ImmutableMap.of(GCPProperties.GCS_OAUTH2_TOKEN,
"gcsToken"))
+ .build();
+ HttpResponse mockResponse =
+ HttpResponse.response(
+ LoadCredentialsResponseParser.toJson(
+
ImmutableLoadCredentialsResponse.builder().addCredentials(credential).build()))
+ .withStatusCode(200);
+ mockServer.when(mockRequest).respond(mockResponse);
+
+ OAuth2RefreshCredentialsHandler handler =
+ OAuth2RefreshCredentialsHandler.create(
+
ImmutableMap.of(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, URI));
+
+ assertThatThrownBy(handler::refreshAccessToken)
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage("Invalid GCS Credentials: gcs.oauth2.token-expires-at not
set");
+ }
+
+ @Test
+ public void tokenWithExpiration() {
+ HttpRequest mockRequest =
+
HttpRequest.request("/v1/credentials").withMethod(HttpMethod.GET.name());
+
+ Credential credential =
+ ImmutableCredential.builder()
+ .prefix("gs")
+ .config(
+ ImmutableMap.of(
+ GCPProperties.GCS_OAUTH2_TOKEN,
+ "gcsToken",
+ GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT,
+ Long.toString(Instant.now().plus(5,
ChronoUnit.MINUTES).toEpochMilli())))
+ .build();
+ HttpResponse mockResponse =
+ HttpResponse.response(
+ LoadCredentialsResponseParser.toJson(
+
ImmutableLoadCredentialsResponse.builder().addCredentials(credential).build()))
+ .withStatusCode(200);
+ mockServer.when(mockRequest).respond(mockResponse);
+
+ OAuth2RefreshCredentialsHandler handler =
+ OAuth2RefreshCredentialsHandler.create(
+
ImmutableMap.of(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, URI));
+
+ AccessToken accessToken = handler.refreshAccessToken();
+ assertThat(accessToken.getTokenValue())
+ .isEqualTo(credential.config().get(GCPProperties.GCS_OAUTH2_TOKEN));
+ assertThat(accessToken.getExpirationTime().toInstant().toEpochMilli())
+ .isEqualTo(
+
Long.parseLong(credential.config().get(GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT)));
+
+ // refresh always fetches a new token
+ AccessToken refreshedToken = handler.refreshAccessToken();
+ assertThat(refreshedToken).isNotSameAs(accessToken);
+
+ mockServer.verify(mockRequest, VerificationTimes.exactly(2));
+ }
+
+ @Test
+ public void multipleGcsCredentials() {
+ HttpRequest mockRequest =
+
HttpRequest.request("/v1/credentials").withMethod(HttpMethod.GET.name());
+
+ Credential credentialOne =
+ ImmutableCredential.builder()
+ .prefix("gs")
+ .config(
+ ImmutableMap.of(
+ GCPProperties.GCS_OAUTH2_TOKEN,
+ "gcsToken1",
+ GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT,
+ Long.toString(Instant.now().plus(1,
ChronoUnit.MINUTES).toEpochMilli())))
+ .build();
+ Credential credentialTwo =
+ ImmutableCredential.builder()
+ .prefix("gs://my-custom-prefix/xyz/long-prefix")
+ .config(
+ ImmutableMap.of(
+ GCPProperties.GCS_OAUTH2_TOKEN,
+ "gcsToken2",
+ GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT,
+ Long.toString(Instant.now().plus(2,
ChronoUnit.MINUTES).toEpochMilli())))
+ .build();
+ Credential credentialThree =
+ ImmutableCredential.builder()
+ .prefix("gs://my-custom-prefix/xyz")
+ .config(
+ ImmutableMap.of(
+ GCPProperties.GCS_OAUTH2_TOKEN,
+ "gcsToken3",
+ GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT,
+ Long.toString(Instant.now().plus(3,
ChronoUnit.MINUTES).toEpochMilli())))
+ .build();
+ HttpResponse mockResponse =
+ HttpResponse.response(
+ LoadCredentialsResponseParser.toJson(
+ ImmutableLoadCredentialsResponse.builder()
+ .addCredentials(credentialOne, credentialTwo,
credentialThree)
+ .build()))
+ .withStatusCode(200);
+ mockServer.when(mockRequest).respond(mockResponse);
+
+ OAuth2RefreshCredentialsHandler handler =
+ OAuth2RefreshCredentialsHandler.create(
+
ImmutableMap.of(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, URI));
+
+ assertThatThrownBy(handler::refreshAccessToken)
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage("Invalid GCS Credentials: only one GCS credential should
exist");
+ }
+}