This is an automated email from the ASF dual-hosted git repository. pvary pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push: new 56fa9e5ebb GCP: KeyManagementClient implementation that works with Google Cloud KMS (#13334) 56fa9e5ebb is described below commit 56fa9e5ebbde6d9432ec1ed1c6a52e7d81a1c634 Author: Adam Szita <40628386+sz...@users.noreply.github.com> AuthorDate: Mon Aug 25 15:08:49 2025 +0200 GCP: KeyManagementClient implementation that works with Google Cloud KMS (#13334) --- build.gradle | 22 +++ gcp-bundle/build.gradle | 1 + .../iceberg/gcp/TestKeyManagementClient.java | 115 ++++++++++++++ .../gcp/TestKeyManagementClientWithAppCreds.java | 50 ++++++ .../gcp/TestKeyManagementClientWithOAuth.java | 63 ++++++++ .../java/org/apache/iceberg/gcp/GCPAuthUtils.java | 62 ++++++++ .../apache/iceberg/gcp/GcpKeyManagementClient.java | 171 +++++++++++++++++++++ .../apache/iceberg/gcp/gcs/PrefixedStorage.java | 42 ++--- 8 files changed, 501 insertions(+), 25 deletions(-) diff --git a/build.gradle b/build.gradle index 64a57b6e6a..bfe0c8cef3 100644 --- a/build.gradle +++ b/build.gradle @@ -712,6 +712,7 @@ project(':iceberg-gcp') { compileOnly platform(libs.google.libraries.bom) compileOnly "com.google.cloud:google-cloud-storage" + compileOnly "com.google.cloud:google-cloud-kms" testImplementation "com.google.cloud:google-cloud-nio" @@ -729,6 +730,27 @@ project(':iceberg-gcp') { testImplementation libs.mockserver.client.java testImplementation libs.mockito.junit.jupiter } + + sourceSets { + integration { + java.srcDir "$projectDir/src/integration/java" + resources.srcDir "$projectDir/src/integration/resources" + compileClasspath += main.output + test.output + runtimeClasspath += main.output + test.output + } + } + + configurations { + integrationImplementation.extendsFrom testImplementation + integrationRuntime.extendsFrom testRuntimeOnly + } + + task integrationTest(type: Test) { + useJUnitPlatform() + testClassesDirs = sourceSets.integration.output.classesDirs + classpath = sourceSets.integration.runtimeClasspath + jvmArgs += project.property('extraJvmArgs') + } } project(':iceberg-hive-metastore') { diff --git a/gcp-bundle/build.gradle b/gcp-bundle/build.gradle index cf7bb77a0f..99e4359bd6 100644 --- a/gcp-bundle/build.gradle +++ b/gcp-bundle/build.gradle @@ -28,6 +28,7 @@ project(":iceberg-gcp-bundle") { implementation "com.google.cloud:google-cloud-storage" implementation "com.google.cloud:google-cloud-bigquery" implementation "com.google.cloud:google-cloud-core" + implementation "com.google.cloud:google-cloud-kms" } shadowJar { diff --git a/gcp/src/integration/java/org/apache/iceberg/gcp/TestKeyManagementClient.java b/gcp/src/integration/java/org/apache/iceberg/gcp/TestKeyManagementClient.java new file mode 100644 index 0000000000..1e02013b3a --- /dev/null +++ b/gcp/src/integration/java/org/apache/iceberg/gcp/TestKeyManagementClient.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.gcp; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.api.gax.rpc.AlreadyExistsException; +import com.google.cloud.kms.v1.CryptoKey; +import com.google.cloud.kms.v1.CryptoKeyName; +import com.google.cloud.kms.v1.CryptoKeyVersion; +import com.google.cloud.kms.v1.CryptoKeyVersionName; +import com.google.cloud.kms.v1.CryptoKeyVersionTemplate; +import com.google.cloud.kms.v1.KeyManagementServiceClient; +import com.google.cloud.kms.v1.KeyRing; +import com.google.cloud.kms.v1.KeyRingName; +import com.google.cloud.kms.v1.LocationName; +import com.google.cloud.kms.v1.ProtectionLevel; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.UUID; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public abstract class TestKeyManagementClient { + + private static final String LOCATION = "global"; + private static final String KEY_RING_ID = "iceberg-gcp-it-keyring"; + private static final String KEY_VERSION = "1"; + private final String keyId = "iceberg-gcp-it-key-" + UUID.randomUUID(); + + private KeyManagementServiceClient kmsClient; + private String projectId; + + protected abstract void init() throws IOException; + + protected abstract Map<String, String> properties(); + + public void setKmsClient(KeyManagementServiceClient kmsClient) { + this.kmsClient = kmsClient; + } + + public void setProjectId(String projectId) { + this.projectId = projectId; + } + + @BeforeEach + public void before() throws IOException { + init(); + + LocationName locationName = LocationName.of(projectId, LOCATION); + try { + kmsClient.createKeyRing(locationName, KEY_RING_ID, KeyRing.newBuilder().build()); + } catch (AlreadyExistsException e) { + // we're going to reuse - in GCP key rings are not removable anyway + } + + CryptoKeyVersionTemplate keyVersionTemplate = + CryptoKeyVersionTemplate.newBuilder() + .setProtectionLevel(ProtectionLevel.SOFTWARE) + .setAlgorithm(CryptoKeyVersion.CryptoKeyVersionAlgorithm.GOOGLE_SYMMETRIC_ENCRYPTION) + .build(); + + // API only allows 24 hr of key retention after a key destruction request + CryptoKey cryptoKey = + CryptoKey.newBuilder() + .setPurpose(CryptoKey.CryptoKeyPurpose.ENCRYPT_DECRYPT) + .setVersionTemplate(keyVersionTemplate) + .build(); + + kmsClient.createCryptoKey(KeyRingName.of(projectId, LOCATION, KEY_RING_ID), keyId, cryptoKey); + } + + @AfterEach + public void after() { + CryptoKeyVersionName cryptoKeyVersionName = + CryptoKeyVersionName.of(projectId, LOCATION, KEY_RING_ID, keyId, KEY_VERSION); + kmsClient.destroyCryptoKeyVersion(cryptoKeyVersionName); + + // key rings are not removable by design in GCP + + kmsClient.close(); + } + + @Test + public void testKeyWrapping() { + String keyname = CryptoKeyName.of(projectId, LOCATION, KEY_RING_ID, keyId).toString(); + + try (GcpKeyManagementClient keyManagementClient = new GcpKeyManagementClient(); ) { + keyManagementClient.initialize(properties()); + + ByteBuffer key = ByteBuffer.wrap(new String("super-secret-table-master-key").getBytes()); + ByteBuffer encryptedKey = keyManagementClient.wrapKey(key, keyname); + + assertThat(keyManagementClient.unwrapKey(encryptedKey, keyname)).isEqualTo(key); + } + } +} diff --git a/gcp/src/integration/java/org/apache/iceberg/gcp/TestKeyManagementClientWithAppCreds.java b/gcp/src/integration/java/org/apache/iceberg/gcp/TestKeyManagementClientWithAppCreds.java new file mode 100644 index 0000000000..ad508170e3 --- /dev/null +++ b/gcp/src/integration/java/org/apache/iceberg/gcp/TestKeyManagementClientWithAppCreds.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.gcp; + +import static org.assertj.core.api.Assumptions.assumeThat; + +import com.google.auth.oauth2.GoogleCredentials; +import com.google.auth.oauth2.ServiceAccountCredentials; +import com.google.cloud.kms.v1.KeyManagementServiceClient; +import java.io.IOException; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; +import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariables; + +@EnabledIfEnvironmentVariables({ + @EnabledIfEnvironmentVariable(named = "GOOGLE_APPLICATION_CREDENTIALS", matches = ".*") +}) +public class TestKeyManagementClientWithAppCreds extends TestKeyManagementClient { + + @Override + protected void init() throws IOException { + GoogleCredentials credentials = GoogleCredentials.getApplicationDefault(); + assumeThat(credentials instanceof ServiceAccountCredentials).isTrue(); + + setKmsClient(KeyManagementServiceClient.create()); + setProjectId(((ServiceAccountCredentials) credentials).getProjectId()); + } + + @Override + protected Map<String, String> properties() { + return ImmutableMap.of(); + } +} diff --git a/gcp/src/integration/java/org/apache/iceberg/gcp/TestKeyManagementClientWithOAuth.java b/gcp/src/integration/java/org/apache/iceberg/gcp/TestKeyManagementClientWithOAuth.java new file mode 100644 index 0000000000..988b561ea0 --- /dev/null +++ b/gcp/src/integration/java/org/apache/iceberg/gcp/TestKeyManagementClientWithOAuth.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.gcp; + +import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.auth.oauth2.AccessToken; +import com.google.auth.oauth2.OAuth2Credentials; +import com.google.cloud.kms.v1.KeyManagementServiceClient; +import com.google.cloud.kms.v1.KeyManagementServiceSettings; +import java.io.IOException; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; +import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariables; + +@EnabledIfEnvironmentVariables({ + @EnabledIfEnvironmentVariable( + named = TestKeyManagementClientWithOAuth.GOOGLE_OAUTH_TOKEN, + matches = ".*"), + @EnabledIfEnvironmentVariable( + named = TestKeyManagementClientWithOAuth.GOOGLE_CLOUD_PROJECT_ID, + matches = ".*") +}) +public class TestKeyManagementClientWithOAuth extends TestKeyManagementClient { + + static final String GOOGLE_OAUTH_TOKEN = "GOOGLE_OAUTH_TOKEN"; + static final String GOOGLE_CLOUD_PROJECT_ID = "GOOGLE_CLOUD_PROJECT_ID"; + private String token; + + @Override + protected void init() throws IOException { + token = System.getenv(GOOGLE_OAUTH_TOKEN); + AccessToken accessToken = new AccessToken(token, null); + KeyManagementServiceSettings settings = + KeyManagementServiceSettings.newBuilder() + .setCredentialsProvider( + FixedCredentialsProvider.create(OAuth2Credentials.create(accessToken))) + .build(); + setKmsClient(KeyManagementServiceClient.create(settings)); + setProjectId(System.getenv(GOOGLE_CLOUD_PROJECT_ID)); + } + + @Override + protected Map<String, String> properties() { + return ImmutableMap.of(GCPProperties.GCS_OAUTH2_TOKEN, token); + } +} diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/GCPAuthUtils.java b/gcp/src/main/java/org/apache/iceberg/gcp/GCPAuthUtils.java new file mode 100644 index 0000000000..881efcbfec --- /dev/null +++ b/gcp/src/main/java/org/apache/iceberg/gcp/GCPAuthUtils.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.gcp; + +import com.google.auth.oauth2.AccessToken; +import com.google.auth.oauth2.OAuth2Credentials; +import com.google.auth.oauth2.OAuth2CredentialsWithRefresh; +import java.util.Optional; +import org.apache.iceberg.gcp.gcs.OAuth2RefreshCredentialsHandler; +import org.apache.iceberg.io.CloseableGroup; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public final class GCPAuthUtils { + + private GCPAuthUtils() {} + + public static OAuth2Credentials oauth2CredentialsFromGcpProperties( + GCPProperties gcpProperties, CloseableGroup closeableGroup) { + + Optional<String> optionalToken = gcpProperties.oauth2Token(); + Preconditions.checkState( + optionalToken.isPresent(), "OAuth2 token must be set to produce OAuth2 Credentials"); + + // Explicitly configure an OAuth token + AccessToken accessToken = + new AccessToken(optionalToken.get(), gcpProperties.oauth2TokenExpiresAt().orElse(null)); + + if (gcpProperties.oauth2RefreshCredentialsEnabled() + && gcpProperties.oauth2RefreshCredentialsEndpoint().isPresent()) { + + Preconditions.checkNotNull( + closeableGroup, + "Must specify a closeable group that handles closure of refresh handler."); + OAuth2RefreshCredentialsHandler oAuthRefreshHandler = + OAuth2RefreshCredentialsHandler.create(gcpProperties.properties()); + closeableGroup.addCloseable(oAuthRefreshHandler); + + return OAuth2CredentialsWithRefresh.newBuilder() + .setAccessToken(accessToken) + .setRefreshHandler(oAuthRefreshHandler) + .build(); + } else { + return OAuth2Credentials.create(accessToken); + } + } +} diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/GcpKeyManagementClient.java b/gcp/src/main/java/org/apache/iceberg/gcp/GcpKeyManagementClient.java new file mode 100644 index 0000000000..ff18d5234d --- /dev/null +++ b/gcp/src/main/java/org/apache/iceberg/gcp/GcpKeyManagementClient.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.gcp; + +import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.auth.oauth2.OAuth2Credentials; +import com.google.cloud.kms.v1.DecryptRequest; +import com.google.cloud.kms.v1.DecryptResponse; +import com.google.cloud.kms.v1.EncryptRequest; +import com.google.cloud.kms.v1.EncryptResponse; +import com.google.cloud.kms.v1.KeyManagementServiceClient; +import com.google.cloud.kms.v1.KeyManagementServiceSettings; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.iceberg.common.DynClasses; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.encryption.KeyManagementClient; +import org.apache.iceberg.io.CloseableGroup; + +/** + * Key management client implementation that uses Google Cloud Key Management. To be used for + * encrypting/decrypting keys with a KMS-managed master key (by referencing its key ID) + * + * <p>Uses {@link ByteStringShim} to ensure this class works with and without iceberg-gcp-bundle. + * Since the bundle relocates {@link com.google.protobuf.ByteString}, all related methods need to be + * loaded dynamically. During runtime if the relocated class is observed, it will be preferred over + * the original one. + */ +public class GcpKeyManagementClient implements KeyManagementClient { + + private KeyManagementServiceClient kmsClient; + private CloseableGroup closeableGroup = new CloseableGroup(); + + @Override + public void initialize(Map<String, String> properties) { + this.closeableGroup = new CloseableGroup(); + closeableGroup.setSuppressCloseFailure(true); + + GCPProperties gcpProperties = new GCPProperties(properties); + + try { + KeyManagementServiceSettings.Builder kmsBuilder = KeyManagementServiceSettings.newBuilder(); + if (gcpProperties.oauth2Token().isPresent()) { + OAuth2Credentials oAuth2Credentials = + GCPAuthUtils.oauth2CredentialsFromGcpProperties(gcpProperties, closeableGroup); + kmsBuilder.setCredentialsProvider(FixedCredentialsProvider.create(oAuth2Credentials)); + } + + // if not OAuth then defaults to GoogleCredentials.getApplicationDefault() + this.kmsClient = KeyManagementServiceClient.create(kmsBuilder.build()); + closeableGroup.addCloseable(kmsClient); + + } catch (IOException e) { + throw new RuntimeException("Failed to create GCP cloud KMS service client", e); + } + } + + @Override + public ByteBuffer wrapKey(ByteBuffer key, String wrappingKeyId) { + EncryptRequest.Builder requestBuilder = EncryptRequest.newBuilder().setName(wrappingKeyId); + requestBuilder = ByteStringShim.setPlainText(requestBuilder, key); + + EncryptRequest encryptRequest = requestBuilder.build(); + EncryptResponse encryptResponse = kmsClient.encrypt(encryptRequest); + + // need ByteString.copyFrom() leaves the BB in an end position, need to reset + key.position(0); + return ByteBuffer.wrap(ByteStringShim.getCipherText(encryptResponse)); + } + + @Override + public ByteBuffer unwrapKey(ByteBuffer wrappedKey, String wrappingKeyId) { + DecryptRequest.Builder requestBuilder = DecryptRequest.newBuilder().setName(wrappingKeyId); + requestBuilder = ByteStringShim.setCipherText(requestBuilder, wrappedKey); + + DecryptRequest decryptRequest = requestBuilder.build(); + DecryptResponse decryptResponse = kmsClient.decrypt(decryptRequest); + + // need ByteString.copyFrom() leaves the BB in an end position, need to reset + wrappedKey.position(0); + return ByteBuffer.wrap(ByteStringShim.getPlainText(decryptResponse)); + } + + @Override + public void close() { + try { + closeableGroup.close(); + } catch (IOException ioe) { + // closure exceptions already suppressed and logged in closeableGroup + } + } + + private static final class ByteStringShim { + private static final String ORIGINAL_BYTE_STRING_CLASS_NAME = "com.google.protobuf.ByteString"; + private static final String SHADED_BYTE_STRING_CLASS_NAME = + "org.apache.iceberg.gcp.shaded." + ORIGINAL_BYTE_STRING_CLASS_NAME; + private static final Class<?> BYTE_STRING_CLASS; + + static { + Class<?> byteStringClass = + DynClasses.builder().impl(SHADED_BYTE_STRING_CLASS_NAME).orNull().build(); + if (byteStringClass == null) { + byteStringClass = DynClasses.builder().impl(ORIGINAL_BYTE_STRING_CLASS_NAME).build(); + } + + BYTE_STRING_CLASS = byteStringClass; + } + + private static final DynMethods.UnboundMethod SET_PLAIN_TEXT_METHOD = + new DynMethods.Builder("setPlaintext") + .hiddenImpl(EncryptRequest.Builder.class, BYTE_STRING_CLASS) + .build(); + private static final DynMethods.UnboundMethod SET_CIPHER_TEXT_METHOD = + new DynMethods.Builder("setCiphertext") + .hiddenImpl(DecryptRequest.Builder.class, BYTE_STRING_CLASS) + .build(); + private static final DynMethods.UnboundMethod GET_PLAIN_TEXT_METHOD = + new DynMethods.Builder("getPlaintext").hiddenImpl(DecryptResponse.class).build(); + private static final DynMethods.UnboundMethod GET_CIPHER_TEXT_METHOD = + new DynMethods.Builder("getCiphertext").hiddenImpl(EncryptResponse.class).build(); + private static final DynMethods.StaticMethod COPY_FROM_BYTEBUFFER_METHOD = + new DynMethods.Builder("copyFrom") + .hiddenImpl(BYTE_STRING_CLASS, ByteBuffer.class) + .buildStatic(); + private static final DynMethods.UnboundMethod TO_BYTE_ARRAY_METHOD = + new DynMethods.Builder("toByteArray").hiddenImpl(BYTE_STRING_CLASS).build(); + + static EncryptRequest.Builder setPlainText( + EncryptRequest.Builder requestBuilder, ByteBuffer key) { + // dynamic call: requestBuilder.setPlaintext(ByteString.copyFrom(key)) + Object byteStringKey = COPY_FROM_BYTEBUFFER_METHOD.invoke(key); + return SET_PLAIN_TEXT_METHOD.invoke(requestBuilder, byteStringKey); + } + + static DecryptRequest.Builder setCipherText( + DecryptRequest.Builder requestBuilder, ByteBuffer wrappedKey) { + // dynamic call: requestBuilder.setCiphertext(ByteString.copyFrom(wrappedKey)) + Object byteStringWrappedKey = COPY_FROM_BYTEBUFFER_METHOD.invoke(wrappedKey); + return SET_CIPHER_TEXT_METHOD.invoke(requestBuilder, byteStringWrappedKey); + } + + static byte[] getCipherText(EncryptResponse response) { + // dynamic call: response.getCipherText().toByteArray() + Object byteStringEncryptedKey = GET_CIPHER_TEXT_METHOD.invoke(response); + return TO_BYTE_ARRAY_METHOD.invoke(byteStringEncryptedKey); + } + + static byte[] getPlainText(DecryptResponse response) { + // dynamic call: response.getPlainText().toByteArray() + Object byteStringDecryptedKey = GET_PLAIN_TEXT_METHOD.invoke(response); + return TO_BYTE_ARRAY_METHOD.invoke(byteStringDecryptedKey); + } + } +} diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/PrefixedStorage.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/PrefixedStorage.java index d0a76ae9ad..e9db60b149 100644 --- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/PrefixedStorage.java +++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/PrefixedStorage.java @@ -19,15 +19,16 @@ package org.apache.iceberg.gcp.gcs; import com.google.api.gax.rpc.FixedHeaderProvider; -import com.google.auth.oauth2.AccessToken; -import com.google.auth.oauth2.OAuth2Credentials; -import com.google.auth.oauth2.OAuth2CredentialsWithRefresh; import com.google.cloud.NoCredentials; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Map; import org.apache.iceberg.EnvironmentContext; +import org.apache.iceberg.gcp.GCPAuthUtils; import org.apache.iceberg.gcp.GCPProperties; +import org.apache.iceberg.io.CloseableGroup; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Strings; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -38,7 +39,7 @@ class PrefixedStorage implements AutoCloseable { private final String storagePrefix; private final GCPProperties gcpProperties; private SerializableSupplier<Storage> storage; - private OAuth2RefreshCredentialsHandler refreshHandler = null; + private CloseableGroup closeableGroup; private transient volatile Storage storageClient; PrefixedStorage( @@ -70,25 +71,12 @@ class PrefixedStorage implements AutoCloseable { // Explicitly allow "no credentials" for testing purposes builder.setCredentials(NoCredentials.getInstance()); } - gcpProperties - .oauth2Token() - .ifPresent( - token -> { - // Explicitly configure an OAuth token - AccessToken accessToken = - new AccessToken(token, gcpProperties.oauth2TokenExpiresAt().orElse(null)); - if (gcpProperties.oauth2RefreshCredentialsEnabled() - && gcpProperties.oauth2RefreshCredentialsEndpoint().isPresent()) { - this.refreshHandler = OAuth2RefreshCredentialsHandler.create(properties); - builder.setCredentials( - OAuth2CredentialsWithRefresh.newBuilder() - .setAccessToken(accessToken) - .setRefreshHandler(refreshHandler) - .build()); - } else { - builder.setCredentials(OAuth2Credentials.create(accessToken)); - } - }); + + if (gcpProperties.oauth2Token().isPresent()) { + this.closeableGroup = new CloseableGroup(); + builder.setCredentials( + GCPAuthUtils.oauth2CredentialsFromGcpProperties(gcpProperties, closeableGroup)); + } return builder.build().getService(); }; @@ -117,8 +105,12 @@ class PrefixedStorage implements AutoCloseable { @Override public void close() { - if (null != refreshHandler) { - refreshHandler.close(); + if (null != closeableGroup) { + try { + closeableGroup.close(); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } } if (null != storage) {