This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 5359bea71b GCS: Refresh vended credentials (#11282)
5359bea71b is described below

commit 5359bea71bceaad35897ac82573d73e3c7e47f71
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Wed Oct 30 07:09:42 2024 +0100

    GCS: Refresh vended credentials (#11282)
---
 build.gradle                                       |   3 +
 .../java/org/apache/iceberg/gcp/GCPProperties.java |  20 ++
 .../java/org/apache/iceberg/gcp/gcs/GCSFileIO.java |  12 +-
 .../gcp/gcs/OAuth2RefreshCredentialsHandler.java   |  99 ++++++++
 .../org/apache/iceberg/gcp/GCPPropertiesTest.java  |  30 +++
 .../org/apache/iceberg/gcp/gcs/GCSFileIOTest.java  |  47 ++++
 .../gcs/OAuth2RefreshCredentialsHandlerTest.java   | 264 +++++++++++++++++++++
 7 files changed, 474 insertions(+), 1 deletion(-)

diff --git a/build.gradle b/build.gradle
index fe8392279e..1a05f83f7d 100644
--- a/build.gradle
+++ b/build.gradle
@@ -644,6 +644,7 @@ project(':iceberg-gcp') {
     testImplementation "com.google.cloud:google-cloud-nio"
 
     testImplementation project(path: ':iceberg-api', configuration: 
'testArtifacts')
+    testImplementation project(path: ':iceberg-core', configuration: 
'testArtifacts')
 
     testImplementation(libs.hadoop2.common) {
       exclude group: 'org.apache.avro', module: 'avro'
@@ -652,6 +653,8 @@ project(':iceberg-gcp') {
       exclude group: 'com.google.code.gson', module: 'gson'
     }
     testImplementation libs.esotericsoftware.kryo
+    testImplementation libs.mockserver.netty
+    testImplementation libs.mockserver.client.java
   }
 }
 
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java 
b/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java
index 4f60e2f91f..c03906ae5d 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java
@@ -43,6 +43,12 @@ public class GCPProperties implements Serializable {
   public static final String GCS_OAUTH2_TOKEN_EXPIRES_AT = 
"gcs.oauth2.token-expires-at";
   // Boolean to explicitly configure "no authentication" for testing purposes 
using a GCS emulator
   public static final String GCS_NO_AUTH = "gcs.no-auth";
+  public static final String GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT =
+      "gcs.oauth2.refresh-credentials-endpoint";
+
+  /** Controls whether vended credentials should be refreshed or not. Defaults 
to true. */
+  public static final String GCS_OAUTH2_REFRESH_CREDENTIALS_ENABLED =
+      "gcs.oauth2.refresh-credentials-enabled";
 
   /** Configure the batch size used when deleting multiple files from a given 
GCS bucket */
   public static final String GCS_DELETE_BATCH_SIZE = "gcs.delete.batch-size";
@@ -67,6 +73,8 @@ public class GCPProperties implements Serializable {
   private boolean gcsNoAuth;
   private String gcsOAuth2Token;
   private Date gcsOAuth2TokenExpiresAt;
+  private String gcsOauth2RefreshCredentialsEndpoint;
+  private boolean gcsOauth2RefreshCredentialsEnabled;
 
   private int gcsDeleteBatchSize = GCS_DELETE_BATCH_SIZE_DEFAULT;
 
@@ -95,6 +103,10 @@ public class GCPProperties implements Serializable {
       gcsOAuth2TokenExpiresAt =
           new 
Date(Long.parseLong(properties.get(GCS_OAUTH2_TOKEN_EXPIRES_AT)));
     }
+
+    gcsOauth2RefreshCredentialsEndpoint = 
properties.get(GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT);
+    gcsOauth2RefreshCredentialsEnabled =
+        PropertyUtil.propertyAsBoolean(properties, 
GCS_OAUTH2_REFRESH_CREDENTIALS_ENABLED, true);
     gcsNoAuth = Boolean.parseBoolean(properties.getOrDefault(GCS_NO_AUTH, 
"false"));
     Preconditions.checkState(
         !(gcsOAuth2Token != null && gcsNoAuth),
@@ -154,4 +166,12 @@ public class GCPProperties implements Serializable {
   public int deleteBatchSize() {
     return gcsDeleteBatchSize;
   }
+
+  public Optional<String> oauth2RefreshCredentialsEndpoint() {
+    return Optional.ofNullable(gcsOauth2RefreshCredentialsEndpoint);
+  }
+
+  public boolean oauth2RefreshCredentialsEnabled() {
+    return gcsOauth2RefreshCredentialsEnabled;
+  }
 }
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java 
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
index 2201c876bd..5737606aef 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.gcp.gcs;
 
 import com.google.auth.oauth2.AccessToken;
 import com.google.auth.oauth2.OAuth2Credentials;
+import com.google.auth.oauth2.OAuth2CredentialsWithRefresh;
 import com.google.cloud.NoCredentials;
 import com.google.cloud.storage.Blob;
 import com.google.cloud.storage.BlobId;
@@ -156,7 +157,16 @@ public class GCSFileIO implements DelegateFileIO {
                     // Explicitly configure an OAuth token.
                     AccessToken accessToken =
                         new AccessToken(token, 
gcpProperties.oauth2TokenExpiresAt().orElse(null));
-                    
builder.setCredentials(OAuth2Credentials.create(accessToken));
+                    if (gcpProperties.oauth2RefreshCredentialsEnabled()
+                        && 
gcpProperties.oauth2RefreshCredentialsEndpoint().isPresent()) {
+                      builder.setCredentials(
+                          OAuth2CredentialsWithRefresh.newBuilder()
+                              .setAccessToken(accessToken)
+                              
.setRefreshHandler(OAuth2RefreshCredentialsHandler.create(properties))
+                              .build());
+                    } else {
+                      
builder.setCredentials(OAuth2Credentials.create(accessToken));
+                    }
                   });
 
           return builder.build().getService();
diff --git 
a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandler.java
 
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandler.java
new file mode 100644
index 0000000000..611e7baaec
--- /dev/null
+++ 
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandler.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.auth.oauth2.AccessToken;
+import com.google.auth.oauth2.OAuth2CredentialsWithRefresh;
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.rest.ErrorHandlers;
+import org.apache.iceberg.rest.HTTPClient;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.auth.OAuth2Properties;
+import org.apache.iceberg.rest.auth.OAuth2Util;
+import org.apache.iceberg.rest.credentials.Credential;
+import org.apache.iceberg.rest.responses.LoadCredentialsResponse;
+
+public class OAuth2RefreshCredentialsHandler
+    implements OAuth2CredentialsWithRefresh.OAuth2RefreshHandler {
+  private final Map<String, String> properties;
+
+  private OAuth2RefreshCredentialsHandler(Map<String, String> properties) {
+    Preconditions.checkArgument(
+        null != 
properties.get(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT),
+        "Invalid credentials endpoint: null");
+    this.properties = properties;
+  }
+
+  @Override
+  public AccessToken refreshAccessToken() {
+    LoadCredentialsResponse response;
+    try (RESTClient client = httpClient()) {
+      response =
+          client.get(
+              
properties.get(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT),
+              null,
+              LoadCredentialsResponse.class,
+              OAuth2Util.authHeaders(properties.get(OAuth2Properties.TOKEN)),
+              ErrorHandlers.defaultErrorHandler());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    List<Credential> gcsCredentials =
+        response.credentials().stream()
+            .filter(c -> c.prefix().startsWith("gs"))
+            .collect(Collectors.toList());
+
+    Preconditions.checkState(!gcsCredentials.isEmpty(), "Invalid GCS 
Credentials: empty");
+    Preconditions.checkState(
+        gcsCredentials.size() == 1,
+        "Invalid GCS Credentials: only one GCS credential should exist");
+
+    Credential gcsCredential = gcsCredentials.get(0);
+    checkCredential(gcsCredential, GCPProperties.GCS_OAUTH2_TOKEN);
+    checkCredential(gcsCredential, GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT);
+    String token = gcsCredential.config().get(GCPProperties.GCS_OAUTH2_TOKEN);
+    String expiresAt = 
gcsCredential.config().get(GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT);
+
+    return new AccessToken(token, new Date(Long.parseLong(expiresAt)));
+  }
+
+  private void checkCredential(Credential gcsCredential, String 
gcsOauth2Token) {
+    Preconditions.checkState(
+        gcsCredential.config().containsKey(gcsOauth2Token),
+        "Invalid GCS Credentials: %s not set",
+        gcsOauth2Token);
+  }
+
+  public static OAuth2RefreshCredentialsHandler create(Map<String, String> 
properties) {
+    return new OAuth2RefreshCredentialsHandler(properties);
+  }
+
+  private RESTClient httpClient() {
+    return HTTPClient.builder(properties)
+        
.uri(properties.get(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT))
+        .build();
+  }
+}
diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/GCPPropertiesTest.java 
b/gcp/src/test/java/org/apache/iceberg/gcp/GCPPropertiesTest.java
index c71b558287..61bd069f0c 100644
--- a/gcp/src/test/java/org/apache/iceberg/gcp/GCPPropertiesTest.java
+++ b/gcp/src/test/java/org/apache/iceberg/gcp/GCPPropertiesTest.java
@@ -19,6 +19,8 @@
 package org.apache.iceberg.gcp;
 
 import static org.apache.iceberg.gcp.GCPProperties.GCS_NO_AUTH;
+import static 
org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENABLED;
+import static 
org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT;
 import static org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_TOKEN;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
@@ -47,4 +49,32 @@ public class GCPPropertiesTest {
     assertThat(gcpProperties.noAuth()).isTrue();
     assertThat(gcpProperties.oauth2Token()).isNotPresent();
   }
+
+  @Test
+  public void refreshCredentialsEndpointSet() {
+    GCPProperties gcpProperties =
+        new GCPProperties(
+            ImmutableMap.of(GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, 
"/v1/credentials"));
+    assertThat(gcpProperties.oauth2RefreshCredentialsEnabled()).isTrue();
+    assertThat(gcpProperties.oauth2RefreshCredentialsEndpoint())
+        .isPresent()
+        .get()
+        .isEqualTo("/v1/credentials");
+  }
+
+  @Test
+  public void refreshCredentialsEndpointSetButRefreshDisabled() {
+    GCPProperties gcpProperties =
+        new GCPProperties(
+            ImmutableMap.of(
+                GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT,
+                "/v1/credentials",
+                GCS_OAUTH2_REFRESH_CREDENTIALS_ENABLED,
+                "false"));
+    assertThat(gcpProperties.oauth2RefreshCredentialsEnabled()).isFalse();
+    assertThat(gcpProperties.oauth2RefreshCredentialsEndpoint())
+        .isPresent()
+        .get()
+        .isEqualTo("/v1/credentials");
+  }
 }
diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java 
b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java
index fbc3fe7114..6302f664b7 100644
--- a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java
+++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java
@@ -19,11 +19,17 @@
 package org.apache.iceberg.gcp.gcs;
 
 import static java.lang.String.format;
+import static 
org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENABLED;
+import static 
org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT;
+import static org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_TOKEN;
+import static org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.spy;
 
+import com.google.auth.oauth2.OAuth2Credentials;
+import com.google.auth.oauth2.OAuth2CredentialsWithRefresh;
 import com.google.cloud.storage.BlobId;
 import com.google.cloud.storage.BlobInfo;
 import com.google.cloud.storage.Storage;
@@ -32,6 +38,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
 import java.util.List;
 import java.util.Random;
 import java.util.stream.StreamSupport;
@@ -223,4 +231,43 @@ public class GCSFileIOTest {
             .invoke("gs://foo/bar");
     assertThat(result).isInstanceOf(GCSFileIO.class);
   }
+
+  @Test
+  public void refreshCredentialsEndpointSet() {
+    Storage client;
+    try (GCSFileIO fileIO = new GCSFileIO()) {
+      fileIO.initialize(
+          ImmutableMap.of(
+              GCS_OAUTH2_TOKEN,
+              "gcsToken",
+              GCS_OAUTH2_TOKEN_EXPIRES_AT,
+              Long.toString(Instant.now().plus(5, 
ChronoUnit.MINUTES).toEpochMilli()),
+              GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT,
+              "/v1/credentials"));
+      client = fileIO.client();
+    }
+
+    assertThat(client.getOptions().getCredentials())
+        .isInstanceOf(OAuth2CredentialsWithRefresh.class);
+  }
+
+  @Test
+  public void refreshCredentialsEndpointSetButRefreshDisabled() {
+    Storage client;
+    try (GCSFileIO fileIO = new GCSFileIO()) {
+      fileIO.initialize(
+          ImmutableMap.of(
+              GCS_OAUTH2_TOKEN,
+              "gcsTokenWithoutRefresh",
+              GCS_OAUTH2_TOKEN_EXPIRES_AT,
+              Long.toString(Instant.now().plus(5, 
ChronoUnit.MINUTES).toEpochMilli()),
+              GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT,
+              "/v1/credentials",
+              GCS_OAUTH2_REFRESH_CREDENTIALS_ENABLED,
+              "false"));
+      client = fileIO.client();
+    }
+
+    
assertThat(client.getOptions().getCredentials()).isInstanceOf(OAuth2Credentials.class);
+  }
 }
diff --git 
a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandlerTest.java
 
b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandlerTest.java
new file mode 100644
index 0000000000..c538745f27
--- /dev/null
+++ 
b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandlerTest.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.gcp.gcs;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockserver.integration.ClientAndServer.startClientAndServer;
+
+import com.google.auth.oauth2.AccessToken;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import org.apache.iceberg.exceptions.BadRequestException;
+import org.apache.iceberg.exceptions.RESTException;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.rest.HttpMethod;
+import org.apache.iceberg.rest.credentials.Credential;
+import org.apache.iceberg.rest.credentials.ImmutableCredential;
+import org.apache.iceberg.rest.responses.ImmutableLoadCredentialsResponse;
+import org.apache.iceberg.rest.responses.LoadCredentialsResponseParser;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockserver.integration.ClientAndServer;
+import org.mockserver.model.HttpRequest;
+import org.mockserver.model.HttpResponse;
+import org.mockserver.verify.VerificationTimes;
+
+public class OAuth2RefreshCredentialsHandlerTest {
+  private static final int PORT = 3333;
+  private static final String URI = 
String.format("http://127.0.0.1:%d/v1/credentials";, PORT);
+  private static ClientAndServer mockServer;
+
+  @BeforeAll
+  public static void beforeAll() {
+    mockServer = startClientAndServer(PORT);
+  }
+
+  @AfterAll
+  public static void stopServer() {
+    mockServer.stop();
+  }
+
+  @BeforeEach
+  public void before() {
+    mockServer.reset();
+  }
+
+  @Test
+  public void invalidOrMissingUri() {
+    assertThatThrownBy(() -> 
OAuth2RefreshCredentialsHandler.create(ImmutableMap.of()))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid credentials endpoint: null");
+
+    assertThatThrownBy(
+            () ->
+                OAuth2RefreshCredentialsHandler.create(
+                        ImmutableMap.of(
+                            
GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, "invalid uri"))
+                    .refreshAccessToken())
+        .isInstanceOf(RESTException.class)
+        .hasMessageStartingWith("Failed to create request URI from base 
invalid uri");
+  }
+
+  @Test
+  public void badRequest() {
+    HttpRequest mockRequest =
+        
HttpRequest.request("/v1/credentials").withMethod(HttpMethod.GET.name());
+
+    HttpResponse mockResponse = HttpResponse.response().withStatusCode(400);
+    mockServer.when(mockRequest).respond(mockResponse);
+
+    OAuth2RefreshCredentialsHandler handler =
+        OAuth2RefreshCredentialsHandler.create(
+            
ImmutableMap.of(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, URI));
+
+    assertThatThrownBy(handler::refreshAccessToken)
+        .isInstanceOf(BadRequestException.class)
+        .hasMessageStartingWith("Malformed request");
+  }
+
+  @Test
+  public void noGcsCredentialInResponse() {
+    HttpRequest mockRequest =
+        
HttpRequest.request("/v1/credentials").withMethod(HttpMethod.GET.name());
+
+    HttpResponse mockResponse =
+        HttpResponse.response(
+                LoadCredentialsResponseParser.toJson(
+                    ImmutableLoadCredentialsResponse.builder().build()))
+            .withStatusCode(200);
+    mockServer.when(mockRequest).respond(mockResponse);
+
+    OAuth2RefreshCredentialsHandler handler =
+        OAuth2RefreshCredentialsHandler.create(
+            
ImmutableMap.of(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, URI));
+
+    assertThatThrownBy(handler::refreshAccessToken)
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage("Invalid GCS Credentials: empty");
+  }
+
+  @Test
+  public void noGcsToken() {
+    HttpRequest mockRequest =
+        
HttpRequest.request("/v1/credentials").withMethod(HttpMethod.GET.name());
+
+    Credential credential =
+        ImmutableCredential.builder()
+            .prefix("gs")
+            .config(ImmutableMap.of(GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT, 
"1000"))
+            .build();
+    HttpResponse mockResponse =
+        HttpResponse.response(
+                LoadCredentialsResponseParser.toJson(
+                    
ImmutableLoadCredentialsResponse.builder().addCredentials(credential).build()))
+            .withStatusCode(200);
+    mockServer.when(mockRequest).respond(mockResponse);
+
+    OAuth2RefreshCredentialsHandler handler =
+        OAuth2RefreshCredentialsHandler.create(
+            
ImmutableMap.of(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, URI));
+
+    assertThatThrownBy(handler::refreshAccessToken)
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage("Invalid GCS Credentials: gcs.oauth2.token not set");
+  }
+
+  @Test
+  public void tokenWithoutExpiration() {
+    HttpRequest mockRequest =
+        
HttpRequest.request("/v1/credentials").withMethod(HttpMethod.GET.name());
+
+    Credential credential =
+        ImmutableCredential.builder()
+            .prefix("gs")
+            .config(ImmutableMap.of(GCPProperties.GCS_OAUTH2_TOKEN, 
"gcsToken"))
+            .build();
+    HttpResponse mockResponse =
+        HttpResponse.response(
+                LoadCredentialsResponseParser.toJson(
+                    
ImmutableLoadCredentialsResponse.builder().addCredentials(credential).build()))
+            .withStatusCode(200);
+    mockServer.when(mockRequest).respond(mockResponse);
+
+    OAuth2RefreshCredentialsHandler handler =
+        OAuth2RefreshCredentialsHandler.create(
+            
ImmutableMap.of(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, URI));
+
+    assertThatThrownBy(handler::refreshAccessToken)
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage("Invalid GCS Credentials: gcs.oauth2.token-expires-at not 
set");
+  }
+
+  @Test
+  public void tokenWithExpiration() {
+    HttpRequest mockRequest =
+        
HttpRequest.request("/v1/credentials").withMethod(HttpMethod.GET.name());
+
+    Credential credential =
+        ImmutableCredential.builder()
+            .prefix("gs")
+            .config(
+                ImmutableMap.of(
+                    GCPProperties.GCS_OAUTH2_TOKEN,
+                    "gcsToken",
+                    GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT,
+                    Long.toString(Instant.now().plus(5, 
ChronoUnit.MINUTES).toEpochMilli())))
+            .build();
+    HttpResponse mockResponse =
+        HttpResponse.response(
+                LoadCredentialsResponseParser.toJson(
+                    
ImmutableLoadCredentialsResponse.builder().addCredentials(credential).build()))
+            .withStatusCode(200);
+    mockServer.when(mockRequest).respond(mockResponse);
+
+    OAuth2RefreshCredentialsHandler handler =
+        OAuth2RefreshCredentialsHandler.create(
+            
ImmutableMap.of(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, URI));
+
+    AccessToken accessToken = handler.refreshAccessToken();
+    assertThat(accessToken.getTokenValue())
+        .isEqualTo(credential.config().get(GCPProperties.GCS_OAUTH2_TOKEN));
+    assertThat(accessToken.getExpirationTime().toInstant().toEpochMilli())
+        .isEqualTo(
+            
Long.parseLong(credential.config().get(GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT)));
+
+    // refresh always fetches a new token
+    AccessToken refreshedToken = handler.refreshAccessToken();
+    assertThat(refreshedToken).isNotSameAs(accessToken);
+
+    mockServer.verify(mockRequest, VerificationTimes.exactly(2));
+  }
+
+  @Test
+  public void multipleGcsCredentials() {
+    HttpRequest mockRequest =
+        
HttpRequest.request("/v1/credentials").withMethod(HttpMethod.GET.name());
+
+    Credential credentialOne =
+        ImmutableCredential.builder()
+            .prefix("gs")
+            .config(
+                ImmutableMap.of(
+                    GCPProperties.GCS_OAUTH2_TOKEN,
+                    "gcsToken1",
+                    GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT,
+                    Long.toString(Instant.now().plus(1, 
ChronoUnit.MINUTES).toEpochMilli())))
+            .build();
+    Credential credentialTwo =
+        ImmutableCredential.builder()
+            .prefix("gs://my-custom-prefix/xyz/long-prefix")
+            .config(
+                ImmutableMap.of(
+                    GCPProperties.GCS_OAUTH2_TOKEN,
+                    "gcsToken2",
+                    GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT,
+                    Long.toString(Instant.now().plus(2, 
ChronoUnit.MINUTES).toEpochMilli())))
+            .build();
+    Credential credentialThree =
+        ImmutableCredential.builder()
+            .prefix("gs://my-custom-prefix/xyz")
+            .config(
+                ImmutableMap.of(
+                    GCPProperties.GCS_OAUTH2_TOKEN,
+                    "gcsToken3",
+                    GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT,
+                    Long.toString(Instant.now().plus(3, 
ChronoUnit.MINUTES).toEpochMilli())))
+            .build();
+    HttpResponse mockResponse =
+        HttpResponse.response(
+                LoadCredentialsResponseParser.toJson(
+                    ImmutableLoadCredentialsResponse.builder()
+                        .addCredentials(credentialOne, credentialTwo, 
credentialThree)
+                        .build()))
+            .withStatusCode(200);
+    mockServer.when(mockRequest).respond(mockResponse);
+
+    OAuth2RefreshCredentialsHandler handler =
+        OAuth2RefreshCredentialsHandler.create(
+            
ImmutableMap.of(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, URI));
+
+    assertThatThrownBy(handler::refreshAccessToken)
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage("Invalid GCS Credentials: only one GCS credential should 
exist");
+  }
+}

Reply via email to