This is an automated email from the ASF dual-hosted git repository. damccorm pushed a commit to branch users/damccorm/java-gbek in repository https://gitbox.apache.org/repos/asf/beam.git
commit b98663c2fa8644364c639b9e08de94fc7fae47ec Author: Danny Mccormick <[email protected]> AuthorDate: Fri Sep 19 14:03:00 2025 -0400 First pass at Java GBEK (AI generated) --- sdks/java/core/build.gradle | 1 + .../org/apache/beam/sdk/transforms/GcpSecret.java | 52 +++++++ .../beam/sdk/transforms/GroupByEncryptedKey.java | 138 +++++++++++++++++ .../org/apache/beam/sdk/transforms/Secret.java | 31 ++++ .../sdk/transforms/GroupByEncryptedKeyTest.java | 169 +++++++++++++++++++++ 5 files changed, 391 insertions(+) diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index e849ae59779..8ca227a6ae8 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -100,6 +100,7 @@ dependencies { shadow library.java.snappy_java shadow library.java.joda_time implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom) + implementation 'com.google.cloud:google-cloud-secretmanager' permitUnusedDeclared enforcedPlatform(library.java.google_cloud_platform_libraries_bom) provided library.java.json_org implementation library.java.everit_json_schema diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java new file mode 100644 index 00000000000..00271dd7ad8 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import com.google.cloud.secretmanager.v1.AccessSecretVersionResponse; +import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; +import com.google.cloud.secretmanager.v1.SecretVersionName; +import java.io.IOException; + +/** + * A secret manager implementation that retrieves secrets from Google Cloud Secret Manager. + */ +public class GcpSecret implements Secret { + private final String version_name; + + /** + * Initializes a GcpSecret object. + * + * @param version_name The full version name of the secret in Google Cloud Secret Manager. For + * example: projects/<id>/secrets/<secret_name>/versions/1. For more info, see + * https://cloud.google.com/python/docs/reference/secretmanager/latest/google.cloud.secretmanager_v1beta1.services.secret_manager_service.SecretManagerServiceClient#google_cloud_secretmanager_v1beta1_services_secret_manager_service_SecretManagerServiceClient_access_secret_version + */ + public GcpSecret(String version_name) { + this.version_name = version_name; + } + + @Override + public byte[] getSecretBytes() { + try (SecretManagerServiceClient client = SecretManagerServiceClient.create()) { + SecretVersionName secretVersionName = SecretVersionName.parse(version_name); + AccessSecretVersionResponse response = client.accessSecretVersion(secretVersionName); + return response.getPayload().getData().toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Failed to retrieve secret bytes", e); + } + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java new file mode 100644 index 00000000000..29fe57e5d85 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import javax.crypto.Cipher; +import javax.crypto.Mac; +import javax.crypto.spec.SecretKeySpec; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +/** + * A {@link PTransform} that provides a secure alternative to {@link GroupByKey}. + * + * <p>This transform encrypts the keys of the input {@link PCollection}, performs a {@link + * GroupByKey} on the encrypted keys, and then decrypts the keys in the output. This is useful when + * the keys contain sensitive data that should not be stored at rest by the runner. + */ +public class GroupByEncryptedKey<K, V> + extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> { + + private final Secret hmacKey; + + private GroupByEncryptedKey(Secret hmacKey) { + this.hmacKey = hmacKey; + } + + public static <K, V> GroupByEncryptedKey<K, V> create(Secret hmacKey) { + return new GroupByEncryptedKey<>(hmacKey); + } + + @Override + public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) { + return input + .apply("EncryptMessage", ParDo.of(new _EncryptMessage<>())) + .apply(GroupByKey.create()) + .apply("DecryptMessage", ParDo.of(new _DecryptMessage<>())); + } + + private static class _EncryptMessage<K, V> extends DoFn<KV<K, V>, KV<byte[], KV<byte[], byte[]>>> { + private final Secret hmacKey; + private transient Mac mac; + private transient Cipher cipher; + + _EncryptMessage(Secret hmacKey) { + this.hmacKey = hmacKey; + } + + @Setup + public void setup() throws Exception { + mac = Mac.getInstance("HmacSHA256"); + mac.init(new SecretKeySpec(hmacKey.getSecretBytes(), "HmacSHA256")); + cipher = Cipher.getInstance("AES"); + cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(hmacKey.getSecretBytes(), "AES")); + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + Coder<K> keyCoder = ((KvCoder<K, V>) c.getPipeline().getCoderRegistry().getCoder(c.element().getClass())).getKeyCoder(); + Coder<V> valueCoder = ((KvCoder<K, V>) c.getPipeline().getCoderRegistry().getCoder(c.element().getClass())).getValueCoder(); + + byte[] encodedKey = encode(keyCoder, c.element().getKey()); + byte[] encodedValue = encode(valueCoder, c.element().getValue()); + + byte[] hmac = mac.doFinal(encodedKey); + byte[] encryptedKey = cipher.doFinal(encodedKey); + byte[] encryptedValue = cipher.doFinal(encodedValue); + + c.output(KV.of(hmac, KV.of(encryptedKey, encryptedValue))); + } + + private <T> byte[] encode(Coder<T> coder, T value) throws Exception { + java.io.ByteArrayOutputStream os = new java.io.ByteArrayOutputStream(); + coder.encode(value, os); + return os.toByteArray(); + } + } + + private static class _DecryptMessage<K, V> + extends DoFn<KV<byte[], Iterable<KV<byte[], byte[]>>>, KV<K, Iterable<V>>> { + private final Secret hmacKey; + private transient Cipher cipher; + + _DecryptMessage(Secret hmacKey) { + this.hmacKey = hmacKey; + } + + @Setup + public void setup() throws Exception { + cipher = Cipher.getInstance("AES"); + cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(hmacKey.getSecretBytes(), "AES")); + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + Coder<K> keyCoder = ((KvCoder<K, V>) c.getPipeline().getCoderRegistry().getCoder(c.element().getClass())).getKeyCoder(); + Coder<V> valueCoder = ((KvCoder<K, V>) c.getPipeline().getCoderRegistry().getCoder(c.element().getClass())).getValueCoder(); + + java.util.Map<K, java.util.List<V>> decryptedKvs = new java.util.HashMap<>(); + for (KV<byte[], byte[]> encryptedKv : c.element().getValue()) { + byte[] decryptedKeyBytes = cipher.doFinal(encryptedKv.getKey()); + K key = decode(keyCoder, decryptedKeyBytes); + + if (!decryptedKvs.containsKey(key)) { + decryptedKvs.put(key, new java.util.ArrayList<>()); + } + byte[] decryptedValueBytes = cipher.doFinal(encryptedKv.getValue()); + V value = decode(valueCoder, decryptedValueBytes); + decryptedKvs.get(key).add(value); + } + + for (java.util.Map.Entry<K, java.util.List<V>> entry : decryptedKvs.entrySet()) { + c.output(KV.of(entry.getKey(), entry.getValue())); + } + } + + private <T> T decode(Coder<T> coder, byte[] bytes) throws Exception { + java.io.ByteArrayInputStream is = new java.io.ByteArrayInputStream(bytes); + return coder.decode(is); + } + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Secret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Secret.java new file mode 100644 index 00000000000..ae34fb6ee63 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Secret.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import java.io.Serializable; + +/** + * A secret management interface used for handling sensitive data. + * + * <p>This interface provides a generic way to handle secrets. Implementations of this interface + * should handle fetching secrets from a secret management system. + */ +public interface Secret extends Serializable { + /** Returns the secret as a byte array. */ + byte[] getSecretBytes(); +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java new file mode 100644 index 00000000000..5860448140f --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import com.google.cloud.secretmanager.v1.ProjectName; +import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; +import com.google.cloud.secretmanager.v1.SecretName; +import com.google.cloud.secretmanager.v1.SecretPayload; +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.security.SecureRandom; +import org.junit.After; +import org.junit.Before; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link GroupByEncryptedKey}. */ +@RunWith(JUnit4.class) +public class GroupByEncryptedKeyTest implements Serializable { + + @Rule public transient TestPipeline p = TestPipeline.create(); + + private static class FakeSecret implements Secret { + private final byte[] secret = "aKwI2PmqYFt2p5tNKCyBS5qYmHhHsGZcyZrnZQiQ-uE=".getBytes(); + + @Override + public byte[] getSecretBytes() { + return secret; + } + } + + @Test + @Category(NeedsRunner.class) + public void testGroupByKeyFakeSecret() { + List<KV<String, Integer>> ungroupedPairs = + Arrays.asList( + KV.of("k1", 3), + KV.of("k5", Integer.MAX_VALUE), + KV.of("k5", Integer.MIN_VALUE), + KV.of("k2", 66), + KV.of("k1", 4), + KV.of("k2", -33), + KV.of("k3", 0)); + + PCollection<KV<String, Integer>> input = + p.apply( + Create.of(ungroupedPairs) + .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); + + PCollection<KV<String, Iterable<Integer>>> output = + input.apply(GroupByEncryptedKey.create(new FakeSecret())); + + PAssert.that(output) + .containsInAnyOrder( + KV.of("k1", Arrays.asList(3, 4)), + KV.of("k5", Arrays.asList(Integer.MAX_VALUE, Integer.MIN_VALUE)), + KV.of("k2", Arrays.asList(66, -33)), + KV.of("k3", Arrays.asList(0))); + + p.run(); + } + + private static final String PROJECT_ID = "apache-beam-testing"; + private static final String SECRET_ID = "gbek-test"; + private Secret gcpSecret; + + @Before + public void setup() throws IOException { + SecretManagerServiceClient client = SecretManagerServiceClient.create(); + ProjectName projectName = ProjectName.of(PROJECT_ID); + SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID); + + try { + client.getSecret(secretName); + } catch (Exception e) { + com.google.cloud.secretmanager.v1.Secret secret = + com.google.cloud.secretmanager.v1.Secret.newBuilder() + .setReplication( + com.google.cloud.secretmanager.v1.Replication.newBuilder() + .setAutomatic( + com.google.cloud.secretmanager.v1.Replication.Automatic.newBuilder() + .build()) + .build()) + .build(); + client.createSecret(projectName, SECRET_ID, secret); + byte[] secretBytes = new byte[32]; + new SecureRandom().nextBytes(secretBytes); + client.addSecretVersion( + secretName, SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build()); + } + gcpSecret = new GcpSecret(secretName.toString() + "/versions/latest"); + } + + @After + public void tearDown() throws IOException { + SecretManagerServiceClient client = SecretManagerServiceClient.create(); + SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID); + client.deleteSecret(secretName); + } + + @Test + @Category(NeedsRunner.class) + public void testGroupByKeyGcpSecret() { + List<KV<String, Integer>> ungroupedPairs = + Arrays.asList( + KV.of("k1", 3), + KV.of("k5", Integer.MAX_VALUE), + KV.of("k5", Integer.MIN_VALUE), + KV.of("k2", 66), + KV.of("k1", 4), + KV.of("k2", -33), + KV.of("k3", 0)); + + PCollection<KV<String, Integer>> input = + p.apply( + Create.of(ungroupedPairs) + .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); + + PCollection<KV<String, Iterable<Integer>>> output = + input.apply(GroupByEncryptedKey.create(gcpSecret)); + + PAssert.that(output) + .containsInAnyOrder( + KV.of("k1", Arrays.asList(3, 4)), + KV.of("k5", Arrays.asList(Integer.MAX_VALUE, Integer.MIN_VALUE)), + KV.of("k2", Arrays.asList(66, -33)), + KV.of("k3", Arrays.asList(0))); + + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testGroupByKeyGcpSecretThrows() { + Secret gcpSecret = new GcpSecret("bad_path/versions/latest"); + p.apply(Create.of(KV.of("k1", 1))) + .apply(GroupByEncryptedKey.create(gcpSecret)); + assertThrows(RuntimeException.class, () -> p.run()); + } +}
