This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c1fc3698a38 Java GroupByEncryptedKey (#36217)
c1fc3698a38 is described below
commit c1fc3698a383b5b4086dc3b3cbc84c5b67fad9d6
Author: Danny McCormick <[email protected]>
AuthorDate: Sat Oct 4 08:04:29 2025 -0400
Java GroupByEncryptedKey (#36217)
* First pass at Java GBEK (AI generated)
* Compile
* Compiletest
* checkstyle
* tests passing
* Move secret code into utils
* Use secret manager from bom
* Docs
* Better docs
* Updates
* Update encryption mode
* checkstyle
* explicitly add dep
* spotbugs: only create generator once
---
.../resources/beam/checkstyle/suppressions.xml | 2 +
sdks/java/core/build.gradle | 5 +
.../beam/sdk/transforms/GroupByEncryptedKey.java | 254 +++++++++++++++++++++
.../java/org/apache/beam/sdk/util/GcpSecret.java | 58 +++++
.../main/java/org/apache/beam/sdk/util/Secret.java | 36 +++
.../sdk/transforms/GroupByEncryptedKeyTest.java | 189 +++++++++++++++
6 files changed, 544 insertions(+)
diff --git
a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml
b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml
index e8d4e8888da..c103ab7f5b1 100644
--- a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml
@@ -52,10 +52,12 @@
<suppress id="ForbidNonVendoredGuava"
files=".*it.*ResourceManagerTest\.java" />
<suppress id="ForbidNonVendoredGuava" files=".*it.*TemplateClientTest\.java"
/>
<suppress id="ForbidNonVendoredGuava" files=".*it.*LT\.java" />
+ <suppress id="ForbidNonVendoredGuava"
files=".*sdk.*core.*GroupByEncryptedKey.*" />
<!-- gRPC/protobuf exceptions -->
<!-- Non-vendored gRPC/protobuf imports are allowed for files that depend on
libraries that expose gRPC/protobuf in its public API -->
<suppress id="ForbidNonVendoredGrpcProtobuf"
files=".*sdk.*extensions.*protobuf.*" />
+ <suppress id="ForbidNonVendoredGrpcProtobuf"
files=".*sdk.*core.*GroupByEncryptedKeyTest.*" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*extensions.*ml.*"
/>
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*io.*gcp.*" />
<suppress id="ForbidNonVendoredGrpcProtobuf"
files=".*sdk.*io.*googleads.*DummyRateLimitPolicy\.java" />
diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle
index e849ae59779..4a6d2f11973 100644
--- a/sdks/java/core/build.gradle
+++ b/sdks/java/core/build.gradle
@@ -100,9 +100,13 @@ dependencies {
shadow library.java.snappy_java
shadow library.java.joda_time
implementation
enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
+ implementation library.java.google_cloud_secret_manager
+ implementation library.java.proto_google_cloud_secret_manager_v1
+ implementation library.java.protobuf_java
permitUnusedDeclared
enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
provided library.java.json_org
implementation library.java.everit_json_schema
+ implementation library.java.guava
implementation library.java.snake_yaml
shadowTest library.java.everit_json_schema
provided library.java.junit
@@ -123,6 +127,7 @@ dependencies {
shadowTest library.java.log4j
shadowTest library.java.log4j2_api
shadowTest library.java.jamm
+ shadowTest 'com.google.cloud:google-cloud-secretmanager:2.75.0'
testRuntimeOnly library.java.slf4j_jdk14
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java
new file mode 100644
index 00000000000..e927efad44a
--- /dev/null
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import java.util.Arrays;
+import javax.crypto.Cipher;
+import javax.crypto.Mac;
+import javax.crypto.spec.GCMParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.util.Secret;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * A {@link PTransform} that provides a secure alternative to {@link
+ * org.apache.beam.sdk.transforms.GroupByKey}.
+ *
+ * <p>This transform encrypts the keys of the input {@link PCollection},
performs a {@link
+ * org.apache.beam.sdk.transforms.GroupByKey} on the encrypted keys, and then
decrypts the keys in
+ * the output. This is useful when the keys contain sensitive data that should
not be stored at rest
+ * by the runner.
+ *
+ * <p>The transform requires a {@link Secret} which returns a 32 byte secret
which can be used to
+ * generate a {@link SecretKeySpec} object using the HmacSHA256 algorithm.
+ *
+ * <p>Note the following caveats: 1) Runners can implement arbitrary
materialization steps, so this
+ * does not guarantee that the whole pipeline will not have unencrypted data
at rest by itself. 2)
+ * If using this transform in streaming mode, this transform may not properly
handle update
+ * compatibility checks around coders. This means that an improper update
could lead to invalid
+ * coders, causing pipeline failure or data corruption. If you need to update,
make sure that the
+ * input type passed into this transform does not change.
+ */
+public class GroupByEncryptedKey<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>
{
+
+ private final Secret hmacKey;
+
+ private GroupByEncryptedKey(Secret hmacKey) {
+ this.hmacKey = hmacKey;
+ }
+
+ /**
+ * Creates a {@link GroupByEncryptedKey} transform.
+ *
+ * @param hmacKey The {@link Secret} key to use for encryption.
+ * @param <K> The type of the keys in the input PCollection.
+ * @param <V> The type of the values in the input PCollection.
+ * @return A {@link GroupByEncryptedKey} transform.
+ */
+ public static <K, V> GroupByEncryptedKey<K, V> create(Secret hmacKey) {
+ return new GroupByEncryptedKey<>(hmacKey);
+ }
+
+ @Override
+ public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
+ Coder<KV<K, V>> inputCoder = input.getCoder();
+ if (!(inputCoder instanceof KvCoder)) {
+ throw new IllegalStateException("GroupByEncryptedKey requires its input
to use KvCoder");
+ }
+ KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) inputCoder;
+ Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+
+ try {
+ keyCoder.verifyDeterministic();
+ } catch (NonDeterministicException e) {
+ throw new IllegalStateException(
+ "the keyCoder of a GroupByEncryptedKey must be deterministic", e);
+ }
+
+ Coder<V> valueCoder = inputKvCoder.getValueCoder();
+
+ PCollection<KV<byte[], Iterable<KV<byte[], byte[]>>>> grouped =
+ input
+ .apply(
+ "EncryptMessage",
+ ParDo.of(new EncryptMessage<>(this.hmacKey, keyCoder,
valueCoder)))
+ .apply(GroupByKey.create());
+
+ return grouped
+ .apply("DecryptMessage", ParDo.of(new DecryptMessage<>(this.hmacKey,
keyCoder, valueCoder)))
+ .setCoder(KvCoder.of(keyCoder, IterableCoder.of(valueCoder)));
+ }
+
+ /**
+ * A {@link PTransform} that encrypts the key and value of an element.
+ *
+ * <p>The resulting PCollection will be a KV pair with the key being the
HMAC of the encoded key,
+ * and the value being a KV pair of the encrypted key and value.
+ */
+ @SuppressWarnings("initialization.fields.uninitialized")
+ private static class EncryptMessage<K, V> extends DoFn<KV<K, V>, KV<byte[],
KV<byte[], byte[]>>> {
+ private final Secret hmacKey;
+ private final Coder<K> keyCoder;
+ private final Coder<V> valueCoder;
+ private transient Mac mac;
+ private transient Cipher cipher;
+ private transient SecretKeySpec secretKeySpec;
+ private transient java.security.SecureRandom generator;
+
+ EncryptMessage(Secret hmacKey, Coder<K> keyCoder, Coder<V> valueCoder) {
+ this.hmacKey = hmacKey;
+ this.keyCoder = keyCoder;
+ this.valueCoder = valueCoder;
+ }
+
+ @Setup
+ public void setup() {
+ try {
+ byte[] secretBytes = this.hmacKey.getSecretBytes();
+ this.mac = Mac.getInstance("HmacSHA256");
+ this.mac.init(new SecretKeySpec(secretBytes, "HmacSHA256"));
+ this.cipher = Cipher.getInstance("AES/GCM/NoPadding");
+ this.secretKeySpec = new SecretKeySpec(secretBytes, "AES");
+ } catch (Exception ex) {
+ throw new RuntimeException(
+ "Failed to initialize cryptography libraries needed for
GroupByEncryptedKey", ex);
+ }
+ this.generator = new java.security.SecureRandom();
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ byte[] encodedKey = encode(this.keyCoder, c.element().getKey());
+ byte[] encodedValue = encode(this.valueCoder, c.element().getValue());
+
+ byte[] hmac = this.mac.doFinal(encodedKey);
+
+ byte[] keyIv = new byte[12];
+ byte[] valueIv = new byte[12];
+ this.generator.nextBytes(keyIv);
+ this.generator.nextBytes(valueIv);
+ GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(128, keyIv);
+ this.cipher.init(Cipher.ENCRYPT_MODE, this.secretKeySpec,
gcmParameterSpec);
+ byte[] encryptedKey = this.cipher.doFinal(encodedKey);
+ gcmParameterSpec = new GCMParameterSpec(128, valueIv);
+ this.cipher.init(Cipher.ENCRYPT_MODE, this.secretKeySpec,
gcmParameterSpec);
+ byte[] encryptedValue = this.cipher.doFinal(encodedValue);
+
+ c.output(
+ KV.of(
+ hmac,
+ KV.of(
+ com.google.common.primitives.Bytes.concat(keyIv,
encryptedKey),
+ com.google.common.primitives.Bytes.concat(valueIv,
encryptedValue))));
+ }
+
+ private <T> byte[] encode(Coder<T> coder, T value) throws Exception {
+ java.io.ByteArrayOutputStream os = new java.io.ByteArrayOutputStream();
+ coder.encode(value, os);
+ return os.toByteArray();
+ }
+ }
+
+ /**
+ * A {@link PTransform} that decrypts the key and values of an element.
+ *
+ * <p>The input PCollection will be a KV pair with the key being the HMAC of
the encoded key, and
+ * the value being a list of KV pairs of the encrypted key and value.
+ *
+ * <p>This will return a tuple containing the decrypted key and a list of
decrypted values.
+ *
+ * <p>Since there is some loss of precision in the HMAC encoding of the key
(but not the key
+ * encryption), there is some extra work done here to ensure that all
key/value pairs are mapped
+ * out appropriately.
+ */
+ @SuppressWarnings("initialization.fields.uninitialized")
+ private static class DecryptMessage<K, V>
+ extends DoFn<KV<byte[], Iterable<KV<byte[], byte[]>>>, KV<K,
Iterable<V>>> {
+ private final Secret hmacKey;
+ private final Coder<K> keyCoder;
+ private final Coder<V> valueCoder;
+ private transient Cipher cipher;
+ private transient SecretKeySpec secretKeySpec;
+
+ DecryptMessage(Secret hmacKey, Coder<K> keyCoder, Coder<V> valueCoder) {
+ this.hmacKey = hmacKey;
+ this.keyCoder = keyCoder;
+ this.valueCoder = valueCoder;
+ }
+
+ @Setup
+ public void setup() {
+ try {
+ this.cipher = Cipher.getInstance("AES/GCM/NoPadding");
+ this.secretKeySpec = new SecretKeySpec(this.hmacKey.getSecretBytes(),
"AES");
+ } catch (Exception ex) {
+ throw new RuntimeException(
+ "Failed to initialize cryptography libraries needed for
GroupByEncryptedKey", ex);
+ }
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ java.util.Map<K, java.util.List<V>> decryptedKvs = new
java.util.HashMap<>();
+ for (KV<byte[], byte[]> encryptedKv : c.element().getValue()) {
+ byte[] iv = Arrays.copyOfRange(encryptedKv.getKey(), 0, 12);
+ GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(128, iv);
+ this.cipher.init(Cipher.DECRYPT_MODE, this.secretKeySpec,
gcmParameterSpec);
+
+ byte[] encryptedKey =
+ Arrays.copyOfRange(encryptedKv.getKey(), 12,
encryptedKv.getKey().length);
+ byte[] decryptedKeyBytes = this.cipher.doFinal(encryptedKey);
+ K key = decode(this.keyCoder, decryptedKeyBytes);
+
+ if (key != null) {
+ if (!decryptedKvs.containsKey(key)) {
+ decryptedKvs.put(key, new java.util.ArrayList<>());
+ }
+
+ iv = Arrays.copyOfRange(encryptedKv.getValue(), 0, 12);
+ gcmParameterSpec = new GCMParameterSpec(128, iv);
+ this.cipher.init(Cipher.DECRYPT_MODE, this.secretKeySpec,
gcmParameterSpec);
+
+ byte[] encryptedValue =
+ Arrays.copyOfRange(encryptedKv.getValue(), 12,
encryptedKv.getValue().length);
+ byte[] decryptedValueBytes = this.cipher.doFinal(encryptedValue);
+ V value = decode(this.valueCoder, decryptedValueBytes);
+ decryptedKvs.get(key).add(value);
+ } else {
+ throw new RuntimeException(
+ "Found null key when decoding " +
Arrays.toString(decryptedKeyBytes));
+ }
+ }
+
+ for (java.util.Map.Entry<K, java.util.List<V>> entry :
decryptedKvs.entrySet()) {
+ c.output(KV.of(entry.getKey(), entry.getValue()));
+ }
+ }
+
+ private <T> T decode(Coder<T> coder, byte[] bytes) throws Exception {
+ java.io.ByteArrayInputStream is = new
java.io.ByteArrayInputStream(bytes);
+ return coder.decode(is);
+ }
+ }
+}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java
new file mode 100644
index 00000000000..80bc3a54535
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import com.google.cloud.secretmanager.v1.AccessSecretVersionResponse;
+import com.google.cloud.secretmanager.v1.SecretManagerServiceClient;
+import com.google.cloud.secretmanager.v1.SecretVersionName;
+import java.io.IOException;
+
+/**
+ * A {@link Secret} manager implementation that retrieves secrets from Google
Cloud Secret Manager.
+ */
+public class GcpSecret implements Secret {
+ private final String versionName;
+
+ /**
+ * Initializes a {@link GcpSecret} object.
+ *
+ * @param versionName The full version name of the secret in Google Cloud
Secret Manager. For
+ * example: projects/<id>/secrets/<secret_name>/versions/1. For more
info, see
+ *
https://cloud.google.com/python/docs/reference/secretmanager/latest/google.cloud.secretmanager_v1beta1.services.secret_manager_service.SecretManagerServiceClient#google_cloud_secretmanager_v1beta1_services_secret_manager_service_SecretManagerServiceClient_access_secret_version
+ */
+ public GcpSecret(String versionName) {
+ this.versionName = versionName;
+ }
+
+ /**
+ * Returns the secret as a byte array. Assumes that the current active
service account has
+ * permissions to read the secret.
+ *
+ * @return The secret as a byte array.
+ */
+ @Override
+ public byte[] getSecretBytes() {
+ try (SecretManagerServiceClient client =
SecretManagerServiceClient.create()) {
+ SecretVersionName secretVersionName =
SecretVersionName.parse(versionName);
+ AccessSecretVersionResponse response =
client.accessSecretVersion(secretVersionName);
+ return response.getPayload().getData().toByteArray();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to retrieve secret bytes", e);
+ }
+ }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java
new file mode 100644
index 00000000000..fe476ef6cb1
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.Serializable;
+
+/**
+ * A secret management interface used for handling sensitive data.
+ *
+ * <p>This interface provides a generic way to handle secrets. Implementations
of this interface
+ * should handle fetching secrets from a secret management system. The
underlying secret management
+ * system should be able to return a valid byte array representing the secret.
+ */
+public interface Secret extends Serializable {
+ /**
+ * Returns the secret as a byte array.
+ *
+ * @return The secret as a byte array.
+ */
+ byte[] getSecretBytes();
+}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java
new file mode 100644
index 00000000000..ba4c50e5a41
--- /dev/null
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.junit.Assert.assertThrows;
+
+import com.google.cloud.secretmanager.v1.ProjectName;
+import com.google.cloud.secretmanager.v1.SecretManagerServiceClient;
+import com.google.cloud.secretmanager.v1.SecretName;
+import com.google.cloud.secretmanager.v1.SecretPayload;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.security.SecureRandom;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.util.GcpSecret;
+import org.apache.beam.sdk.util.Secret;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link GroupByEncryptedKey}. */
+@RunWith(JUnit4.class)
+public class GroupByEncryptedKeyTest implements Serializable {
+
+ @Rule public transient TestPipeline p = TestPipeline.create();
+
+ private static class FakeSecret implements Secret {
+ private final byte[] secret =
+ "aKwI2PmqYFt2p5tNKCyBS5qYmHhHsGZc".getBytes(Charset.defaultCharset());
+
+ @Override
+ public byte[] getSecretBytes() {
+ return secret;
+ }
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testGroupByKeyFakeSecret() {
+ List<KV<String, Integer>> ungroupedPairs =
+ Arrays.asList(
+ KV.of("k1", 3),
+ KV.of("k5", Integer.MAX_VALUE),
+ KV.of("k5", Integer.MIN_VALUE),
+ KV.of("k2", 66),
+ KV.of("k1", 4),
+ KV.of("k2", -33),
+ KV.of("k3", 0));
+
+ PCollection<KV<String, Integer>> input =
+ p.apply(
+ Create.of(ungroupedPairs)
+ .withCoder(KvCoder.of(StringUtf8Coder.of(),
VarIntCoder.of())));
+
+ PCollection<KV<String, Iterable<Integer>>> output =
+ input.apply(GroupByEncryptedKey.<String, Integer>create(new
FakeSecret()));
+
+ PAssert.that(output.apply("Sort", MapElements.via(new SortValues())))
+ .containsInAnyOrder(
+ KV.of("k1", Arrays.asList(3, 4)),
+ KV.of("k5", Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE)),
+ KV.of("k2", Arrays.asList(-33, 66)),
+ KV.of("k3", Arrays.asList(0)));
+
+ p.run();
+ }
+
+ private static final String PROJECT_ID = "apache-beam-testing";
+ private static final String SECRET_ID = "gbek-test";
+ private static Secret gcpSecret;
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ SecretManagerServiceClient client = SecretManagerServiceClient.create();
+ ProjectName projectName = ProjectName.of(PROJECT_ID);
+ SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID);
+
+ try {
+ client.getSecret(secretName);
+ } catch (Exception e) {
+ com.google.cloud.secretmanager.v1.Secret secret =
+ com.google.cloud.secretmanager.v1.Secret.newBuilder()
+ .setReplication(
+ com.google.cloud.secretmanager.v1.Replication.newBuilder()
+ .setAutomatic(
+
com.google.cloud.secretmanager.v1.Replication.Automatic.newBuilder()
+ .build())
+ .build())
+ .build();
+ client.createSecret(projectName, SECRET_ID, secret);
+ byte[] secretBytes = new byte[32];
+ new SecureRandom().nextBytes(secretBytes);
+ client.addSecretVersion(
+ secretName,
SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build());
+ }
+ gcpSecret = new GcpSecret(secretName.toString() + "/versions/latest");
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ SecretManagerServiceClient client = SecretManagerServiceClient.create();
+ SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID);
+ client.deleteSecret(secretName);
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testGroupByKeyGcpSecret() {
+ List<KV<String, Integer>> ungroupedPairs =
+ Arrays.asList(
+ KV.of("k1", 3),
+ KV.of("k5", Integer.MAX_VALUE),
+ KV.of("k5", Integer.MIN_VALUE),
+ KV.of("k2", 66),
+ KV.of("k1", 4),
+ KV.of("k2", -33),
+ KV.of("k3", 0));
+
+ PCollection<KV<String, Integer>> input =
+ p.apply(
+ Create.of(ungroupedPairs)
+ .withCoder(KvCoder.of(StringUtf8Coder.of(),
VarIntCoder.of())));
+
+ PCollection<KV<String, Iterable<Integer>>> output =
+ input.apply(GroupByEncryptedKey.<String, Integer>create(gcpSecret));
+
+ PAssert.that(output.apply("Sort", MapElements.via(new SortValues())))
+ .containsInAnyOrder(
+ KV.of("k1", Arrays.asList(3, 4)),
+ KV.of("k5", Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE)),
+ KV.of("k2", Arrays.asList(-33, 66)),
+ KV.of("k3", Arrays.asList(0)));
+
+ p.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testGroupByKeyGcpSecretThrows() {
+ Secret gcpSecret = new GcpSecret("bad_path/versions/latest");
+ p.apply(Create.of(KV.of("k1", 1)))
+ .apply(GroupByEncryptedKey.<String, Integer>create(gcpSecret));
+ assertThrows(RuntimeException.class, () -> p.run());
+ }
+
+ private static class SortValues
+ extends SimpleFunction<KV<String, Iterable<Integer>>, KV<String,
List<Integer>>> {
+ @Override
+ public KV<String, List<Integer>> apply(KV<String, Iterable<Integer>>
input) {
+ List<Integer> sorted =
+ StreamSupport.stream(input.getValue().spliterator(), false)
+ .sorted()
+ .collect(Collectors.toList());
+ return KV.of(input.getKey(), sorted);
+ }
+ }
+}