This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 56fa9e5ebb GCP: KeyManagementClient implementation that works with 
Google Cloud KMS (#13334)
56fa9e5ebb is described below

commit 56fa9e5ebbde6d9432ec1ed1c6a52e7d81a1c634
Author: Adam Szita <40628386+sz...@users.noreply.github.com>
AuthorDate: Mon Aug 25 15:08:49 2025 +0200

    GCP: KeyManagementClient implementation that works with Google Cloud KMS 
(#13334)
---
 build.gradle                                       |  22 +++
 gcp-bundle/build.gradle                            |   1 +
 .../iceberg/gcp/TestKeyManagementClient.java       | 115 ++++++++++++++
 .../gcp/TestKeyManagementClientWithAppCreds.java   |  50 ++++++
 .../gcp/TestKeyManagementClientWithOAuth.java      |  63 ++++++++
 .../java/org/apache/iceberg/gcp/GCPAuthUtils.java  |  62 ++++++++
 .../apache/iceberg/gcp/GcpKeyManagementClient.java | 171 +++++++++++++++++++++
 .../apache/iceberg/gcp/gcs/PrefixedStorage.java    |  42 ++---
 8 files changed, 501 insertions(+), 25 deletions(-)

diff --git a/build.gradle b/build.gradle
index 64a57b6e6a..bfe0c8cef3 100644
--- a/build.gradle
+++ b/build.gradle
@@ -712,6 +712,7 @@ project(':iceberg-gcp') {
 
     compileOnly platform(libs.google.libraries.bom)
     compileOnly "com.google.cloud:google-cloud-storage"
+    compileOnly "com.google.cloud:google-cloud-kms"
 
     testImplementation "com.google.cloud:google-cloud-nio"
 
@@ -729,6 +730,27 @@ project(':iceberg-gcp') {
     testImplementation libs.mockserver.client.java
     testImplementation libs.mockito.junit.jupiter
   }
+
+  sourceSets {
+    integration {
+      java.srcDir "$projectDir/src/integration/java"
+      resources.srcDir "$projectDir/src/integration/resources"
+      compileClasspath += main.output + test.output
+      runtimeClasspath += main.output + test.output
+    }
+  }
+
+  configurations {
+    integrationImplementation.extendsFrom testImplementation
+    integrationRuntime.extendsFrom testRuntimeOnly
+  }
+
+  task integrationTest(type: Test) {
+    useJUnitPlatform()
+    testClassesDirs = sourceSets.integration.output.classesDirs
+    classpath = sourceSets.integration.runtimeClasspath
+    jvmArgs += project.property('extraJvmArgs')
+  }
 }
 
 project(':iceberg-hive-metastore') {
diff --git a/gcp-bundle/build.gradle b/gcp-bundle/build.gradle
index cf7bb77a0f..99e4359bd6 100644
--- a/gcp-bundle/build.gradle
+++ b/gcp-bundle/build.gradle
@@ -28,6 +28,7 @@ project(":iceberg-gcp-bundle") {
     implementation "com.google.cloud:google-cloud-storage"
     implementation "com.google.cloud:google-cloud-bigquery"
     implementation "com.google.cloud:google-cloud-core"
+    implementation "com.google.cloud:google-cloud-kms"
   }
 
   shadowJar {
diff --git 
a/gcp/src/integration/java/org/apache/iceberg/gcp/TestKeyManagementClient.java 
b/gcp/src/integration/java/org/apache/iceberg/gcp/TestKeyManagementClient.java
new file mode 100644
index 0000000000..1e02013b3a
--- /dev/null
+++ 
b/gcp/src/integration/java/org/apache/iceberg/gcp/TestKeyManagementClient.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.gcp;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.google.api.gax.rpc.AlreadyExistsException;
+import com.google.cloud.kms.v1.CryptoKey;
+import com.google.cloud.kms.v1.CryptoKeyName;
+import com.google.cloud.kms.v1.CryptoKeyVersion;
+import com.google.cloud.kms.v1.CryptoKeyVersionName;
+import com.google.cloud.kms.v1.CryptoKeyVersionTemplate;
+import com.google.cloud.kms.v1.KeyManagementServiceClient;
+import com.google.cloud.kms.v1.KeyRing;
+import com.google.cloud.kms.v1.KeyRingName;
+import com.google.cloud.kms.v1.LocationName;
+import com.google.cloud.kms.v1.ProtectionLevel;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.UUID;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public abstract class TestKeyManagementClient {
+
+  private static final String LOCATION = "global";
+  private static final String KEY_RING_ID = "iceberg-gcp-it-keyring";
+  private static final String KEY_VERSION = "1";
+  private final String keyId = "iceberg-gcp-it-key-" + UUID.randomUUID();
+
+  private KeyManagementServiceClient kmsClient;
+  private String projectId;
+
+  protected abstract void init() throws IOException;
+
+  protected abstract Map<String, String> properties();
+
+  public void setKmsClient(KeyManagementServiceClient kmsClient) {
+    this.kmsClient = kmsClient;
+  }
+
+  public void setProjectId(String projectId) {
+    this.projectId = projectId;
+  }
+
+  @BeforeEach
+  public void before() throws IOException {
+    init();
+
+    LocationName locationName = LocationName.of(projectId, LOCATION);
+    try {
+      kmsClient.createKeyRing(locationName, KEY_RING_ID, 
KeyRing.newBuilder().build());
+    } catch (AlreadyExistsException e) {
+      // we're going to reuse - in GCP key rings are not removable anyway
+    }
+
+    CryptoKeyVersionTemplate keyVersionTemplate =
+        CryptoKeyVersionTemplate.newBuilder()
+            .setProtectionLevel(ProtectionLevel.SOFTWARE)
+            
.setAlgorithm(CryptoKeyVersion.CryptoKeyVersionAlgorithm.GOOGLE_SYMMETRIC_ENCRYPTION)
+            .build();
+
+    // API only allows 24 hr of key retention after a key destruction request
+    CryptoKey cryptoKey =
+        CryptoKey.newBuilder()
+            .setPurpose(CryptoKey.CryptoKeyPurpose.ENCRYPT_DECRYPT)
+            .setVersionTemplate(keyVersionTemplate)
+            .build();
+
+    kmsClient.createCryptoKey(KeyRingName.of(projectId, LOCATION, 
KEY_RING_ID), keyId, cryptoKey);
+  }
+
+  @AfterEach
+  public void after() {
+    CryptoKeyVersionName cryptoKeyVersionName =
+        CryptoKeyVersionName.of(projectId, LOCATION, KEY_RING_ID, keyId, 
KEY_VERSION);
+    kmsClient.destroyCryptoKeyVersion(cryptoKeyVersionName);
+
+    // key rings are not removable by design in GCP
+
+    kmsClient.close();
+  }
+
+  @Test
+  public void testKeyWrapping() {
+    String keyname = CryptoKeyName.of(projectId, LOCATION, KEY_RING_ID, 
keyId).toString();
+
+    try (GcpKeyManagementClient keyManagementClient = new 
GcpKeyManagementClient(); ) {
+      keyManagementClient.initialize(properties());
+
+      ByteBuffer key = ByteBuffer.wrap(new 
String("super-secret-table-master-key").getBytes());
+      ByteBuffer encryptedKey = keyManagementClient.wrapKey(key, keyname);
+
+      assertThat(keyManagementClient.unwrapKey(encryptedKey, 
keyname)).isEqualTo(key);
+    }
+  }
+}
diff --git 
a/gcp/src/integration/java/org/apache/iceberg/gcp/TestKeyManagementClientWithAppCreds.java
 
b/gcp/src/integration/java/org/apache/iceberg/gcp/TestKeyManagementClientWithAppCreds.java
new file mode 100644
index 0000000000..ad508170e3
--- /dev/null
+++ 
b/gcp/src/integration/java/org/apache/iceberg/gcp/TestKeyManagementClientWithAppCreds.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.gcp;
+
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.ServiceAccountCredentials;
+import com.google.cloud.kms.v1.KeyManagementServiceClient;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
+import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariables;
+
+@EnabledIfEnvironmentVariables({
+  @EnabledIfEnvironmentVariable(named = "GOOGLE_APPLICATION_CREDENTIALS", 
matches = ".*")
+})
+public class TestKeyManagementClientWithAppCreds extends 
TestKeyManagementClient {
+
+  @Override
+  protected void init() throws IOException {
+    GoogleCredentials credentials = GoogleCredentials.getApplicationDefault();
+    assumeThat(credentials instanceof ServiceAccountCredentials).isTrue();
+
+    setKmsClient(KeyManagementServiceClient.create());
+    setProjectId(((ServiceAccountCredentials) credentials).getProjectId());
+  }
+
+  @Override
+  protected Map<String, String> properties() {
+    return ImmutableMap.of();
+  }
+}
diff --git 
a/gcp/src/integration/java/org/apache/iceberg/gcp/TestKeyManagementClientWithOAuth.java
 
b/gcp/src/integration/java/org/apache/iceberg/gcp/TestKeyManagementClientWithOAuth.java
new file mode 100644
index 0000000000..988b561ea0
--- /dev/null
+++ 
b/gcp/src/integration/java/org/apache/iceberg/gcp/TestKeyManagementClientWithOAuth.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.gcp;
+
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.auth.oauth2.AccessToken;
+import com.google.auth.oauth2.OAuth2Credentials;
+import com.google.cloud.kms.v1.KeyManagementServiceClient;
+import com.google.cloud.kms.v1.KeyManagementServiceSettings;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
+import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariables;
+
+@EnabledIfEnvironmentVariables({
+  @EnabledIfEnvironmentVariable(
+      named = TestKeyManagementClientWithOAuth.GOOGLE_OAUTH_TOKEN,
+      matches = ".*"),
+  @EnabledIfEnvironmentVariable(
+      named = TestKeyManagementClientWithOAuth.GOOGLE_CLOUD_PROJECT_ID,
+      matches = ".*")
+})
+public class TestKeyManagementClientWithOAuth extends TestKeyManagementClient {
+
+  static final String GOOGLE_OAUTH_TOKEN = "GOOGLE_OAUTH_TOKEN";
+  static final String GOOGLE_CLOUD_PROJECT_ID = "GOOGLE_CLOUD_PROJECT_ID";
+  private String token;
+
+  @Override
+  protected void init() throws IOException {
+    token = System.getenv(GOOGLE_OAUTH_TOKEN);
+    AccessToken accessToken = new AccessToken(token, null);
+    KeyManagementServiceSettings settings =
+        KeyManagementServiceSettings.newBuilder()
+            .setCredentialsProvider(
+                
FixedCredentialsProvider.create(OAuth2Credentials.create(accessToken)))
+            .build();
+    setKmsClient(KeyManagementServiceClient.create(settings));
+    setProjectId(System.getenv(GOOGLE_CLOUD_PROJECT_ID));
+  }
+
+  @Override
+  protected Map<String, String> properties() {
+    return ImmutableMap.of(GCPProperties.GCS_OAUTH2_TOKEN, token);
+  }
+}
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/GCPAuthUtils.java 
b/gcp/src/main/java/org/apache/iceberg/gcp/GCPAuthUtils.java
new file mode 100644
index 0000000000..881efcbfec
--- /dev/null
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/GCPAuthUtils.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.gcp;
+
+import com.google.auth.oauth2.AccessToken;
+import com.google.auth.oauth2.OAuth2Credentials;
+import com.google.auth.oauth2.OAuth2CredentialsWithRefresh;
+import java.util.Optional;
+import org.apache.iceberg.gcp.gcs.OAuth2RefreshCredentialsHandler;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public final class GCPAuthUtils {
+
+  private GCPAuthUtils() {}
+
+  public static OAuth2Credentials oauth2CredentialsFromGcpProperties(
+      GCPProperties gcpProperties, CloseableGroup closeableGroup) {
+
+    Optional<String> optionalToken = gcpProperties.oauth2Token();
+    Preconditions.checkState(
+        optionalToken.isPresent(), "OAuth2 token must be set to produce OAuth2 
Credentials");
+
+    // Explicitly configure an OAuth token
+    AccessToken accessToken =
+        new AccessToken(optionalToken.get(), 
gcpProperties.oauth2TokenExpiresAt().orElse(null));
+
+    if (gcpProperties.oauth2RefreshCredentialsEnabled()
+        && gcpProperties.oauth2RefreshCredentialsEndpoint().isPresent()) {
+
+      Preconditions.checkNotNull(
+          closeableGroup,
+          "Must specify a closeable group that handles closure of refresh 
handler.");
+      OAuth2RefreshCredentialsHandler oAuthRefreshHandler =
+          OAuth2RefreshCredentialsHandler.create(gcpProperties.properties());
+      closeableGroup.addCloseable(oAuthRefreshHandler);
+
+      return OAuth2CredentialsWithRefresh.newBuilder()
+          .setAccessToken(accessToken)
+          .setRefreshHandler(oAuthRefreshHandler)
+          .build();
+    } else {
+      return OAuth2Credentials.create(accessToken);
+    }
+  }
+}
diff --git 
a/gcp/src/main/java/org/apache/iceberg/gcp/GcpKeyManagementClient.java 
b/gcp/src/main/java/org/apache/iceberg/gcp/GcpKeyManagementClient.java
new file mode 100644
index 0000000000..ff18d5234d
--- /dev/null
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/GcpKeyManagementClient.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.gcp;
+
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.auth.oauth2.OAuth2Credentials;
+import com.google.cloud.kms.v1.DecryptRequest;
+import com.google.cloud.kms.v1.DecryptResponse;
+import com.google.cloud.kms.v1.EncryptRequest;
+import com.google.cloud.kms.v1.EncryptResponse;
+import com.google.cloud.kms.v1.KeyManagementServiceClient;
+import com.google.cloud.kms.v1.KeyManagementServiceSettings;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import org.apache.iceberg.common.DynClasses;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.encryption.KeyManagementClient;
+import org.apache.iceberg.io.CloseableGroup;
+
+/**
+ * Key management client implementation that uses Google Cloud Key Management. 
To be used for
+ * encrypting/decrypting keys with a KMS-managed master key (by referencing 
its key ID)
+ *
+ * <p>Uses {@link ByteStringShim} to ensure this class works with and without 
iceberg-gcp-bundle.
+ * Since the bundle relocates {@link com.google.protobuf.ByteString}, all 
related methods need to be
+ * loaded dynamically. During runtime if the relocated class is observed, it 
will be preferred over
+ * the original one.
+ */
+public class GcpKeyManagementClient implements KeyManagementClient {
+
+  private KeyManagementServiceClient kmsClient;
+  private CloseableGroup closeableGroup = new CloseableGroup();
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    this.closeableGroup = new CloseableGroup();
+    closeableGroup.setSuppressCloseFailure(true);
+
+    GCPProperties gcpProperties = new GCPProperties(properties);
+
+    try {
+      KeyManagementServiceSettings.Builder kmsBuilder = 
KeyManagementServiceSettings.newBuilder();
+      if (gcpProperties.oauth2Token().isPresent()) {
+        OAuth2Credentials oAuth2Credentials =
+            GCPAuthUtils.oauth2CredentialsFromGcpProperties(gcpProperties, 
closeableGroup);
+        
kmsBuilder.setCredentialsProvider(FixedCredentialsProvider.create(oAuth2Credentials));
+      }
+
+      // if not OAuth then defaults to 
GoogleCredentials.getApplicationDefault()
+      this.kmsClient = KeyManagementServiceClient.create(kmsBuilder.build());
+      closeableGroup.addCloseable(kmsClient);
+
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to create GCP cloud KMS service 
client", e);
+    }
+  }
+
+  @Override
+  public ByteBuffer wrapKey(ByteBuffer key, String wrappingKeyId) {
+    EncryptRequest.Builder requestBuilder = 
EncryptRequest.newBuilder().setName(wrappingKeyId);
+    requestBuilder = ByteStringShim.setPlainText(requestBuilder, key);
+
+    EncryptRequest encryptRequest = requestBuilder.build();
+    EncryptResponse encryptResponse = kmsClient.encrypt(encryptRequest);
+
+    // need ByteString.copyFrom() leaves the BB in an end position, need to 
reset
+    key.position(0);
+    return ByteBuffer.wrap(ByteStringShim.getCipherText(encryptResponse));
+  }
+
+  @Override
+  public ByteBuffer unwrapKey(ByteBuffer wrappedKey, String wrappingKeyId) {
+    DecryptRequest.Builder requestBuilder = 
DecryptRequest.newBuilder().setName(wrappingKeyId);
+    requestBuilder = ByteStringShim.setCipherText(requestBuilder, wrappedKey);
+
+    DecryptRequest decryptRequest = requestBuilder.build();
+    DecryptResponse decryptResponse = kmsClient.decrypt(decryptRequest);
+
+    // need ByteString.copyFrom() leaves the BB in an end position, need to 
reset
+    wrappedKey.position(0);
+    return ByteBuffer.wrap(ByteStringShim.getPlainText(decryptResponse));
+  }
+
+  @Override
+  public void close() {
+    try {
+      closeableGroup.close();
+    } catch (IOException ioe) {
+      // closure exceptions already suppressed and logged in closeableGroup
+    }
+  }
+
+  private static final class ByteStringShim {
+    private static final String ORIGINAL_BYTE_STRING_CLASS_NAME = 
"com.google.protobuf.ByteString";
+    private static final String SHADED_BYTE_STRING_CLASS_NAME =
+        "org.apache.iceberg.gcp.shaded." + ORIGINAL_BYTE_STRING_CLASS_NAME;
+    private static final Class<?> BYTE_STRING_CLASS;
+
+    static {
+      Class<?> byteStringClass =
+          
DynClasses.builder().impl(SHADED_BYTE_STRING_CLASS_NAME).orNull().build();
+      if (byteStringClass == null) {
+        byteStringClass = 
DynClasses.builder().impl(ORIGINAL_BYTE_STRING_CLASS_NAME).build();
+      }
+
+      BYTE_STRING_CLASS = byteStringClass;
+    }
+
+    private static final DynMethods.UnboundMethod SET_PLAIN_TEXT_METHOD =
+        new DynMethods.Builder("setPlaintext")
+            .hiddenImpl(EncryptRequest.Builder.class, BYTE_STRING_CLASS)
+            .build();
+    private static final DynMethods.UnboundMethod SET_CIPHER_TEXT_METHOD =
+        new DynMethods.Builder("setCiphertext")
+            .hiddenImpl(DecryptRequest.Builder.class, BYTE_STRING_CLASS)
+            .build();
+    private static final DynMethods.UnboundMethod GET_PLAIN_TEXT_METHOD =
+        new 
DynMethods.Builder("getPlaintext").hiddenImpl(DecryptResponse.class).build();
+    private static final DynMethods.UnboundMethod GET_CIPHER_TEXT_METHOD =
+        new 
DynMethods.Builder("getCiphertext").hiddenImpl(EncryptResponse.class).build();
+    private static final DynMethods.StaticMethod COPY_FROM_BYTEBUFFER_METHOD =
+        new DynMethods.Builder("copyFrom")
+            .hiddenImpl(BYTE_STRING_CLASS, ByteBuffer.class)
+            .buildStatic();
+    private static final DynMethods.UnboundMethod TO_BYTE_ARRAY_METHOD =
+        new 
DynMethods.Builder("toByteArray").hiddenImpl(BYTE_STRING_CLASS).build();
+
+    static EncryptRequest.Builder setPlainText(
+        EncryptRequest.Builder requestBuilder, ByteBuffer key) {
+      // dynamic call: requestBuilder.setPlaintext(ByteString.copyFrom(key))
+      Object byteStringKey = COPY_FROM_BYTEBUFFER_METHOD.invoke(key);
+      return SET_PLAIN_TEXT_METHOD.invoke(requestBuilder, byteStringKey);
+    }
+
+    static DecryptRequest.Builder setCipherText(
+        DecryptRequest.Builder requestBuilder, ByteBuffer wrappedKey) {
+      // dynamic call: 
requestBuilder.setCiphertext(ByteString.copyFrom(wrappedKey))
+      Object byteStringWrappedKey = 
COPY_FROM_BYTEBUFFER_METHOD.invoke(wrappedKey);
+      return SET_CIPHER_TEXT_METHOD.invoke(requestBuilder, 
byteStringWrappedKey);
+    }
+
+    static byte[] getCipherText(EncryptResponse response) {
+      // dynamic call: response.getCipherText().toByteArray()
+      Object byteStringEncryptedKey = GET_CIPHER_TEXT_METHOD.invoke(response);
+      return TO_BYTE_ARRAY_METHOD.invoke(byteStringEncryptedKey);
+    }
+
+    static byte[] getPlainText(DecryptResponse response) {
+      // dynamic call: response.getPlainText().toByteArray()
+      Object byteStringDecryptedKey = GET_PLAIN_TEXT_METHOD.invoke(response);
+      return TO_BYTE_ARRAY_METHOD.invoke(byteStringDecryptedKey);
+    }
+  }
+}
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/PrefixedStorage.java 
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/PrefixedStorage.java
index d0a76ae9ad..e9db60b149 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/PrefixedStorage.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/PrefixedStorage.java
@@ -19,15 +19,16 @@
 package org.apache.iceberg.gcp.gcs;
 
 import com.google.api.gax.rpc.FixedHeaderProvider;
-import com.google.auth.oauth2.AccessToken;
-import com.google.auth.oauth2.OAuth2Credentials;
-import com.google.auth.oauth2.OAuth2CredentialsWithRefresh;
 import com.google.cloud.NoCredentials;
 import com.google.cloud.storage.Storage;
 import com.google.cloud.storage.StorageOptions;
+import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.Map;
 import org.apache.iceberg.EnvironmentContext;
+import org.apache.iceberg.gcp.GCPAuthUtils;
 import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.CloseableGroup;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.base.Strings;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -38,7 +39,7 @@ class PrefixedStorage implements AutoCloseable {
   private final String storagePrefix;
   private final GCPProperties gcpProperties;
   private SerializableSupplier<Storage> storage;
-  private OAuth2RefreshCredentialsHandler refreshHandler = null;
+  private CloseableGroup closeableGroup;
   private transient volatile Storage storageClient;
 
   PrefixedStorage(
@@ -70,25 +71,12 @@ class PrefixedStorage implements AutoCloseable {
               // Explicitly allow "no credentials" for testing purposes
               builder.setCredentials(NoCredentials.getInstance());
             }
-            gcpProperties
-                .oauth2Token()
-                .ifPresent(
-                    token -> {
-                      // Explicitly configure an OAuth token
-                      AccessToken accessToken =
-                          new AccessToken(token, 
gcpProperties.oauth2TokenExpiresAt().orElse(null));
-                      if (gcpProperties.oauth2RefreshCredentialsEnabled()
-                          && 
gcpProperties.oauth2RefreshCredentialsEndpoint().isPresent()) {
-                        this.refreshHandler = 
OAuth2RefreshCredentialsHandler.create(properties);
-                        builder.setCredentials(
-                            OAuth2CredentialsWithRefresh.newBuilder()
-                                .setAccessToken(accessToken)
-                                .setRefreshHandler(refreshHandler)
-                                .build());
-                      } else {
-                        
builder.setCredentials(OAuth2Credentials.create(accessToken));
-                      }
-                    });
+
+            if (gcpProperties.oauth2Token().isPresent()) {
+              this.closeableGroup = new CloseableGroup();
+              builder.setCredentials(
+                  
GCPAuthUtils.oauth2CredentialsFromGcpProperties(gcpProperties, closeableGroup));
+            }
 
             return builder.build().getService();
           };
@@ -117,8 +105,12 @@ class PrefixedStorage implements AutoCloseable {
 
   @Override
   public void close() {
-    if (null != refreshHandler) {
-      refreshHandler.close();
+    if (null != closeableGroup) {
+      try {
+        closeableGroup.close();
+      } catch (IOException ioe) {
+        throw new UncheckedIOException(ioe);
+      }
     }
 
     if (null != storage) {

Reply via email to