This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c1fc3698a38 Java GroupByEncryptedKey (#36217)
c1fc3698a38 is described below

commit c1fc3698a383b5b4086dc3b3cbc84c5b67fad9d6
Author: Danny McCormick <[email protected]>
AuthorDate: Sat Oct 4 08:04:29 2025 -0400

    Java GroupByEncryptedKey (#36217)
    
    * First pass at Java GBEK (AI generated)
    
    * Compile
    
    * Compiletest
    
    * checkstyle
    
    * tests passing
    
    * Move secret code into utils
    
    * Use secret manager from bom
    
    * Docs
    
    * Better docs
    
    * Updates
    
    * Update encryption mode
    
    * checkstyle
    
    * explicitly add dep
    
    * spotbugs: only create generator once
---
 .../resources/beam/checkstyle/suppressions.xml     |   2 +
 sdks/java/core/build.gradle                        |   5 +
 .../beam/sdk/transforms/GroupByEncryptedKey.java   | 254 +++++++++++++++++++++
 .../java/org/apache/beam/sdk/util/GcpSecret.java   |  58 +++++
 .../main/java/org/apache/beam/sdk/util/Secret.java |  36 +++
 .../sdk/transforms/GroupByEncryptedKeyTest.java    | 189 +++++++++++++++
 6 files changed, 544 insertions(+)

diff --git 
a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml 
b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml
index e8d4e8888da..c103ab7f5b1 100644
--- a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml
@@ -52,10 +52,12 @@
   <suppress id="ForbidNonVendoredGuava" 
files=".*it.*ResourceManagerTest\.java" />
   <suppress id="ForbidNonVendoredGuava" files=".*it.*TemplateClientTest\.java" 
/>
   <suppress id="ForbidNonVendoredGuava" files=".*it.*LT\.java" />
+  <suppress id="ForbidNonVendoredGuava" 
files=".*sdk.*core.*GroupByEncryptedKey.*" />
 
   <!-- gRPC/protobuf exceptions -->
   <!-- Non-vendored gRPC/protobuf imports are allowed for files that depend on 
libraries that expose gRPC/protobuf in its public API -->
   <suppress id="ForbidNonVendoredGrpcProtobuf" 
files=".*sdk.*extensions.*protobuf.*" />
+  <suppress id="ForbidNonVendoredGrpcProtobuf" 
files=".*sdk.*core.*GroupByEncryptedKeyTest.*" />
   <suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*extensions.*ml.*" 
/>
   <suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*io.*gcp.*" />
   <suppress id="ForbidNonVendoredGrpcProtobuf" 
files=".*sdk.*io.*googleads.*DummyRateLimitPolicy\.java" />
diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle
index e849ae59779..4a6d2f11973 100644
--- a/sdks/java/core/build.gradle
+++ b/sdks/java/core/build.gradle
@@ -100,9 +100,13 @@ dependencies {
   shadow library.java.snappy_java
   shadow library.java.joda_time
   implementation 
enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
+  implementation library.java.google_cloud_secret_manager
+  implementation library.java.proto_google_cloud_secret_manager_v1
+  implementation library.java.protobuf_java
   permitUnusedDeclared 
enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
   provided library.java.json_org
   implementation library.java.everit_json_schema
+  implementation library.java.guava
   implementation library.java.snake_yaml
   shadowTest library.java.everit_json_schema
   provided library.java.junit
@@ -123,6 +127,7 @@ dependencies {
   shadowTest library.java.log4j
   shadowTest library.java.log4j2_api
   shadowTest library.java.jamm
+  shadowTest 'com.google.cloud:google-cloud-secretmanager:2.75.0'
   testRuntimeOnly library.java.slf4j_jdk14
 }
 
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java
new file mode 100644
index 00000000000..e927efad44a
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import java.util.Arrays;
+import javax.crypto.Cipher;
+import javax.crypto.Mac;
+import javax.crypto.spec.GCMParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.util.Secret;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * A {@link PTransform} that provides a secure alternative to {@link
+ * org.apache.beam.sdk.transforms.GroupByKey}.
+ *
+ * <p>This transform encrypts the keys of the input {@link PCollection}, 
performs a {@link
+ * org.apache.beam.sdk.transforms.GroupByKey} on the encrypted keys, and then 
decrypts the keys in
+ * the output. This is useful when the keys contain sensitive data that should 
not be stored at rest
+ * by the runner.
+ *
+ * <p>The transform requires a {@link Secret} which returns a 32 byte secret 
which can be used to
+ * generate a {@link SecretKeySpec} object using the HmacSHA256 algorithm.
+ *
+ * <p>Note the following caveats: 1) Runners can implement arbitrary 
materialization steps, so this
+ * does not guarantee that the whole pipeline will not have unencrypted data 
at rest by itself. 2)
+ * If using this transform in streaming mode, this transform may not properly 
handle update
+ * compatibility checks around coders. This means that an improper update 
could lead to invalid
+ * coders, causing pipeline failure or data corruption. If you need to update, 
make sure that the
+ * input type passed into this transform does not change.
+ */
+public class GroupByEncryptedKey<K, V>
+    extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> 
{
+
+  private final Secret hmacKey;
+
+  private GroupByEncryptedKey(Secret hmacKey) {
+    this.hmacKey = hmacKey;
+  }
+
+  /**
+   * Creates a {@link GroupByEncryptedKey} transform.
+   *
+   * @param hmacKey The {@link Secret} key to use for encryption.
+   * @param <K> The type of the keys in the input PCollection.
+   * @param <V> The type of the values in the input PCollection.
+   * @return A {@link GroupByEncryptedKey} transform.
+   */
+  public static <K, V> GroupByEncryptedKey<K, V> create(Secret hmacKey) {
+    return new GroupByEncryptedKey<>(hmacKey);
+  }
+
+  @Override
+  public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
+    Coder<KV<K, V>> inputCoder = input.getCoder();
+    if (!(inputCoder instanceof KvCoder)) {
+      throw new IllegalStateException("GroupByEncryptedKey requires its input 
to use KvCoder");
+    }
+    KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) inputCoder;
+    Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+
+    try {
+      keyCoder.verifyDeterministic();
+    } catch (NonDeterministicException e) {
+      throw new IllegalStateException(
+          "the keyCoder of a GroupByEncryptedKey must be deterministic", e);
+    }
+
+    Coder<V> valueCoder = inputKvCoder.getValueCoder();
+
+    PCollection<KV<byte[], Iterable<KV<byte[], byte[]>>>> grouped =
+        input
+            .apply(
+                "EncryptMessage",
+                ParDo.of(new EncryptMessage<>(this.hmacKey, keyCoder, 
valueCoder)))
+            .apply(GroupByKey.create());
+
+    return grouped
+        .apply("DecryptMessage", ParDo.of(new DecryptMessage<>(this.hmacKey, 
keyCoder, valueCoder)))
+        .setCoder(KvCoder.of(keyCoder, IterableCoder.of(valueCoder)));
+  }
+
+  /**
+   * A {@link PTransform} that encrypts the key and value of an element.
+   *
+   * <p>The resulting PCollection will be a KV pair with the key being the 
HMAC of the encoded key,
+   * and the value being a KV pair of the encrypted key and value.
+   */
+  @SuppressWarnings("initialization.fields.uninitialized")
+  private static class EncryptMessage<K, V> extends DoFn<KV<K, V>, KV<byte[], 
KV<byte[], byte[]>>> {
+    private final Secret hmacKey;
+    private final Coder<K> keyCoder;
+    private final Coder<V> valueCoder;
+    private transient Mac mac;
+    private transient Cipher cipher;
+    private transient SecretKeySpec secretKeySpec;
+    private transient java.security.SecureRandom generator;
+
+    EncryptMessage(Secret hmacKey, Coder<K> keyCoder, Coder<V> valueCoder) {
+      this.hmacKey = hmacKey;
+      this.keyCoder = keyCoder;
+      this.valueCoder = valueCoder;
+    }
+
+    @Setup
+    public void setup() {
+      try {
+        byte[] secretBytes = this.hmacKey.getSecretBytes();
+        this.mac = Mac.getInstance("HmacSHA256");
+        this.mac.init(new SecretKeySpec(secretBytes, "HmacSHA256"));
+        this.cipher = Cipher.getInstance("AES/GCM/NoPadding");
+        this.secretKeySpec = new SecretKeySpec(secretBytes, "AES");
+      } catch (Exception ex) {
+        throw new RuntimeException(
+            "Failed to initialize cryptography libraries needed for 
GroupByEncryptedKey", ex);
+      }
+      this.generator = new java.security.SecureRandom();
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      byte[] encodedKey = encode(this.keyCoder, c.element().getKey());
+      byte[] encodedValue = encode(this.valueCoder, c.element().getValue());
+
+      byte[] hmac = this.mac.doFinal(encodedKey);
+
+      byte[] keyIv = new byte[12];
+      byte[] valueIv = new byte[12];
+      this.generator.nextBytes(keyIv);
+      this.generator.nextBytes(valueIv);
+      GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(128, keyIv);
+      this.cipher.init(Cipher.ENCRYPT_MODE, this.secretKeySpec, 
gcmParameterSpec);
+      byte[] encryptedKey = this.cipher.doFinal(encodedKey);
+      gcmParameterSpec = new GCMParameterSpec(128, valueIv);
+      this.cipher.init(Cipher.ENCRYPT_MODE, this.secretKeySpec, 
gcmParameterSpec);
+      byte[] encryptedValue = this.cipher.doFinal(encodedValue);
+
+      c.output(
+          KV.of(
+              hmac,
+              KV.of(
+                  com.google.common.primitives.Bytes.concat(keyIv, 
encryptedKey),
+                  com.google.common.primitives.Bytes.concat(valueIv, 
encryptedValue))));
+    }
+
+    private <T> byte[] encode(Coder<T> coder, T value) throws Exception {
+      java.io.ByteArrayOutputStream os = new java.io.ByteArrayOutputStream();
+      coder.encode(value, os);
+      return os.toByteArray();
+    }
+  }
+
+  /**
+   * A {@link PTransform} that decrypts the key and values of an element.
+   *
+   * <p>The input PCollection will be a KV pair with the key being the HMAC of 
the encoded key, and
+   * the value being a list of KV pairs of the encrypted key and value.
+   *
+   * <p>This will return a tuple containing the decrypted key and a list of 
decrypted values.
+   *
+   * <p>Since there is some loss of precision in the HMAC encoding of the key 
(but not the key
+   * encryption), there is some extra work done here to ensure that all 
key/value pairs are mapped
+   * out appropriately.
+   */
+  @SuppressWarnings("initialization.fields.uninitialized")
+  private static class DecryptMessage<K, V>
+      extends DoFn<KV<byte[], Iterable<KV<byte[], byte[]>>>, KV<K, 
Iterable<V>>> {
+    private final Secret hmacKey;
+    private final Coder<K> keyCoder;
+    private final Coder<V> valueCoder;
+    private transient Cipher cipher;
+    private transient SecretKeySpec secretKeySpec;
+
+    DecryptMessage(Secret hmacKey, Coder<K> keyCoder, Coder<V> valueCoder) {
+      this.hmacKey = hmacKey;
+      this.keyCoder = keyCoder;
+      this.valueCoder = valueCoder;
+    }
+
+    @Setup
+    public void setup() {
+      try {
+        this.cipher = Cipher.getInstance("AES/GCM/NoPadding");
+        this.secretKeySpec = new SecretKeySpec(this.hmacKey.getSecretBytes(), 
"AES");
+      } catch (Exception ex) {
+        throw new RuntimeException(
+            "Failed to initialize cryptography libraries needed for 
GroupByEncryptedKey", ex);
+      }
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      java.util.Map<K, java.util.List<V>> decryptedKvs = new 
java.util.HashMap<>();
+      for (KV<byte[], byte[]> encryptedKv : c.element().getValue()) {
+        byte[] iv = Arrays.copyOfRange(encryptedKv.getKey(), 0, 12);
+        GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(128, iv);
+        this.cipher.init(Cipher.DECRYPT_MODE, this.secretKeySpec, 
gcmParameterSpec);
+
+        byte[] encryptedKey =
+            Arrays.copyOfRange(encryptedKv.getKey(), 12, 
encryptedKv.getKey().length);
+        byte[] decryptedKeyBytes = this.cipher.doFinal(encryptedKey);
+        K key = decode(this.keyCoder, decryptedKeyBytes);
+
+        if (key != null) {
+          if (!decryptedKvs.containsKey(key)) {
+            decryptedKvs.put(key, new java.util.ArrayList<>());
+          }
+
+          iv = Arrays.copyOfRange(encryptedKv.getValue(), 0, 12);
+          gcmParameterSpec = new GCMParameterSpec(128, iv);
+          this.cipher.init(Cipher.DECRYPT_MODE, this.secretKeySpec, 
gcmParameterSpec);
+
+          byte[] encryptedValue =
+              Arrays.copyOfRange(encryptedKv.getValue(), 12, 
encryptedKv.getValue().length);
+          byte[] decryptedValueBytes = this.cipher.doFinal(encryptedValue);
+          V value = decode(this.valueCoder, decryptedValueBytes);
+          decryptedKvs.get(key).add(value);
+        } else {
+          throw new RuntimeException(
+              "Found null key when decoding " + 
Arrays.toString(decryptedKeyBytes));
+        }
+      }
+
+      for (java.util.Map.Entry<K, java.util.List<V>> entry : 
decryptedKvs.entrySet()) {
+        c.output(KV.of(entry.getKey(), entry.getValue()));
+      }
+    }
+
+    private <T> T decode(Coder<T> coder, byte[] bytes) throws Exception {
+      java.io.ByteArrayInputStream is = new 
java.io.ByteArrayInputStream(bytes);
+      return coder.decode(is);
+    }
+  }
+}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java
new file mode 100644
index 00000000000..80bc3a54535
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import com.google.cloud.secretmanager.v1.AccessSecretVersionResponse;
+import com.google.cloud.secretmanager.v1.SecretManagerServiceClient;
+import com.google.cloud.secretmanager.v1.SecretVersionName;
+import java.io.IOException;
+
+/**
+ * A {@link Secret} manager implementation that retrieves secrets from Google 
Cloud Secret Manager.
+ */
+public class GcpSecret implements Secret {
+  private final String versionName;
+
+  /**
+   * Initializes a {@link GcpSecret} object.
+   *
+   * @param versionName The full version name of the secret in Google Cloud 
Secret Manager. For
+   *     example: projects/<id>/secrets/<secret_name>/versions/1. For more 
info, see
+   *     
https://cloud.google.com/python/docs/reference/secretmanager/latest/google.cloud.secretmanager_v1beta1.services.secret_manager_service.SecretManagerServiceClient#google_cloud_secretmanager_v1beta1_services_secret_manager_service_SecretManagerServiceClient_access_secret_version
+   */
+  public GcpSecret(String versionName) {
+    this.versionName = versionName;
+  }
+
+  /**
+   * Returns the secret as a byte array. Assumes that the current active 
service account has
+   * permissions to read the secret.
+   *
+   * @return The secret as a byte array.
+   */
+  @Override
+  public byte[] getSecretBytes() {
+    try (SecretManagerServiceClient client = 
SecretManagerServiceClient.create()) {
+      SecretVersionName secretVersionName = 
SecretVersionName.parse(versionName);
+      AccessSecretVersionResponse response = 
client.accessSecretVersion(secretVersionName);
+      return response.getPayload().getData().toByteArray();
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to retrieve secret bytes", e);
+    }
+  }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java
new file mode 100644
index 00000000000..fe476ef6cb1
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.Serializable;
+
+/**
+ * A secret management interface used for handling sensitive data.
+ *
+ * <p>This interface provides a generic way to handle secrets. Implementations 
of this interface
+ * should handle fetching secrets from a secret management system. The 
underlying secret management
+ * system should be able to return a valid byte array representing the secret.
+ */
+public interface Secret extends Serializable {
+  /**
+   * Returns the secret as a byte array.
+   *
+   * @return The secret as a byte array.
+   */
+  byte[] getSecretBytes();
+}
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java
new file mode 100644
index 00000000000..ba4c50e5a41
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.junit.Assert.assertThrows;
+
+import com.google.cloud.secretmanager.v1.ProjectName;
+import com.google.cloud.secretmanager.v1.SecretManagerServiceClient;
+import com.google.cloud.secretmanager.v1.SecretName;
+import com.google.cloud.secretmanager.v1.SecretPayload;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.security.SecureRandom;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.util.GcpSecret;
+import org.apache.beam.sdk.util.Secret;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link GroupByEncryptedKey}. */
+@RunWith(JUnit4.class)
+public class GroupByEncryptedKeyTest implements Serializable {
+
+  @Rule public transient TestPipeline p = TestPipeline.create();
+
+  private static class FakeSecret implements Secret {
+    private final byte[] secret =
+        "aKwI2PmqYFt2p5tNKCyBS5qYmHhHsGZc".getBytes(Charset.defaultCharset());
+
+    @Override
+    public byte[] getSecretBytes() {
+      return secret;
+    }
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testGroupByKeyFakeSecret() {
+    List<KV<String, Integer>> ungroupedPairs =
+        Arrays.asList(
+            KV.of("k1", 3),
+            KV.of("k5", Integer.MAX_VALUE),
+            KV.of("k5", Integer.MIN_VALUE),
+            KV.of("k2", 66),
+            KV.of("k1", 4),
+            KV.of("k2", -33),
+            KV.of("k3", 0));
+
+    PCollection<KV<String, Integer>> input =
+        p.apply(
+            Create.of(ungroupedPairs)
+                .withCoder(KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of())));
+
+    PCollection<KV<String, Iterable<Integer>>> output =
+        input.apply(GroupByEncryptedKey.<String, Integer>create(new 
FakeSecret()));
+
+    PAssert.that(output.apply("Sort", MapElements.via(new SortValues())))
+        .containsInAnyOrder(
+            KV.of("k1", Arrays.asList(3, 4)),
+            KV.of("k5", Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE)),
+            KV.of("k2", Arrays.asList(-33, 66)),
+            KV.of("k3", Arrays.asList(0)));
+
+    p.run();
+  }
+
+  private static final String PROJECT_ID = "apache-beam-testing";
+  private static final String SECRET_ID = "gbek-test";
+  private static Secret gcpSecret;
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    SecretManagerServiceClient client = SecretManagerServiceClient.create();
+    ProjectName projectName = ProjectName.of(PROJECT_ID);
+    SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID);
+
+    try {
+      client.getSecret(secretName);
+    } catch (Exception e) {
+      com.google.cloud.secretmanager.v1.Secret secret =
+          com.google.cloud.secretmanager.v1.Secret.newBuilder()
+              .setReplication(
+                  com.google.cloud.secretmanager.v1.Replication.newBuilder()
+                      .setAutomatic(
+                          
com.google.cloud.secretmanager.v1.Replication.Automatic.newBuilder()
+                              .build())
+                      .build())
+              .build();
+      client.createSecret(projectName, SECRET_ID, secret);
+      byte[] secretBytes = new byte[32];
+      new SecureRandom().nextBytes(secretBytes);
+      client.addSecretVersion(
+          secretName, 
SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build());
+    }
+    gcpSecret = new GcpSecret(secretName.toString() + "/versions/latest");
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    SecretManagerServiceClient client = SecretManagerServiceClient.create();
+    SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID);
+    client.deleteSecret(secretName);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testGroupByKeyGcpSecret() {
+    List<KV<String, Integer>> ungroupedPairs =
+        Arrays.asList(
+            KV.of("k1", 3),
+            KV.of("k5", Integer.MAX_VALUE),
+            KV.of("k5", Integer.MIN_VALUE),
+            KV.of("k2", 66),
+            KV.of("k1", 4),
+            KV.of("k2", -33),
+            KV.of("k3", 0));
+
+    PCollection<KV<String, Integer>> input =
+        p.apply(
+            Create.of(ungroupedPairs)
+                .withCoder(KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of())));
+
+    PCollection<KV<String, Iterable<Integer>>> output =
+        input.apply(GroupByEncryptedKey.<String, Integer>create(gcpSecret));
+
+    PAssert.that(output.apply("Sort", MapElements.via(new SortValues())))
+        .containsInAnyOrder(
+            KV.of("k1", Arrays.asList(3, 4)),
+            KV.of("k5", Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE)),
+            KV.of("k2", Arrays.asList(-33, 66)),
+            KV.of("k3", Arrays.asList(0)));
+
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testGroupByKeyGcpSecretThrows() {
+    Secret gcpSecret = new GcpSecret("bad_path/versions/latest");
+    p.apply(Create.of(KV.of("k1", 1)))
+        .apply(GroupByEncryptedKey.<String, Integer>create(gcpSecret));
+    assertThrows(RuntimeException.class, () -> p.run());
+  }
+
+  private static class SortValues
+      extends SimpleFunction<KV<String, Iterable<Integer>>, KV<String, 
List<Integer>>> {
+    @Override
+    public KV<String, List<Integer>> apply(KV<String, Iterable<Integer>> 
input) {
+      List<Integer> sorted =
+          StreamSupport.stream(input.getValue().spliterator(), false)
+              .sorted()
+              .collect(Collectors.toList());
+      return KV.of(input.getKey(), sorted);
+    }
+  }
+}

Reply via email to