This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch users/damccorm/java-gbek
in repository https://gitbox.apache.org/repos/asf/beam.git

commit b98663c2fa8644364c639b9e08de94fc7fae47ec
Author: Danny Mccormick <[email protected]>
AuthorDate: Fri Sep 19 14:03:00 2025 -0400

    First pass at Java GBEK (AI generated)
---
 sdks/java/core/build.gradle                        |   1 +
 .../org/apache/beam/sdk/transforms/GcpSecret.java  |  52 +++++++
 .../beam/sdk/transforms/GroupByEncryptedKey.java   | 138 +++++++++++++++++
 .../org/apache/beam/sdk/transforms/Secret.java     |  31 ++++
 .../sdk/transforms/GroupByEncryptedKeyTest.java    | 169 +++++++++++++++++++++
 5 files changed, 391 insertions(+)

diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle
index e849ae59779..8ca227a6ae8 100644
--- a/sdks/java/core/build.gradle
+++ b/sdks/java/core/build.gradle
@@ -100,6 +100,7 @@ dependencies {
   shadow library.java.snappy_java
   shadow library.java.joda_time
   implementation 
enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
+  implementation 'com.google.cloud:google-cloud-secretmanager'
   permitUnusedDeclared 
enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
   provided library.java.json_org
   implementation library.java.everit_json_schema
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java
new file mode 100644
index 00000000000..00271dd7ad8
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import com.google.cloud.secretmanager.v1.AccessSecretVersionResponse;
+import com.google.cloud.secretmanager.v1.SecretManagerServiceClient;
+import com.google.cloud.secretmanager.v1.SecretVersionName;
+import java.io.IOException;
+
+/**
+ * A secret manager implementation that retrieves secrets from Google Cloud 
Secret Manager.
+ */
+public class GcpSecret implements Secret {
+  private final String version_name;
+
+  /**
+   * Initializes a GcpSecret object.
+   *
+   * @param version_name The full version name of the secret in Google Cloud 
Secret Manager. For
+   *     example: projects/<id>/secrets/<secret_name>/versions/1. For more 
info, see
+   *     
https://cloud.google.com/python/docs/reference/secretmanager/latest/google.cloud.secretmanager_v1beta1.services.secret_manager_service.SecretManagerServiceClient#google_cloud_secretmanager_v1beta1_services_secret_manager_service_SecretManagerServiceClient_access_secret_version
+   */
+  public GcpSecret(String version_name) {
+    this.version_name = version_name;
+  }
+
+  @Override
+  public byte[] getSecretBytes() {
+    try (SecretManagerServiceClient client = 
SecretManagerServiceClient.create()) {
+      SecretVersionName secretVersionName = 
SecretVersionName.parse(version_name);
+      AccessSecretVersionResponse response = 
client.accessSecretVersion(secretVersionName);
+      return response.getPayload().getData().toByteArray();
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to retrieve secret bytes", e);
+    }
+  }
+}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java
new file mode 100644
index 00000000000..29fe57e5d85
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import javax.crypto.Cipher;
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * A {@link PTransform} that provides a secure alternative to {@link 
GroupByKey}.
+ *
+ * <p>This transform encrypts the keys of the input {@link PCollection}, 
performs a {@link
+ * GroupByKey} on the encrypted keys, and then decrypts the keys in the 
output. This is useful when
+ * the keys contain sensitive data that should not be stored at rest by the 
runner.
+ */
+public class GroupByEncryptedKey<K, V>
+    extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> 
{
+
+  private final Secret hmacKey;
+
+  private GroupByEncryptedKey(Secret hmacKey) {
+    this.hmacKey = hmacKey;
+  }
+
+  public static <K, V> GroupByEncryptedKey<K, V> create(Secret hmacKey) {
+    return new GroupByEncryptedKey<>(hmacKey);
+  }
+
+  @Override
+  public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
+    return input
+        .apply("EncryptMessage", ParDo.of(new _EncryptMessage<>()))
+        .apply(GroupByKey.create())
+        .apply("DecryptMessage", ParDo.of(new _DecryptMessage<>()));
+  }
+
+  private static class _EncryptMessage<K, V> extends DoFn<KV<K, V>, KV<byte[], 
KV<byte[], byte[]>>> {
+    private final Secret hmacKey;
+    private transient Mac mac;
+    private transient Cipher cipher;
+
+    _EncryptMessage(Secret hmacKey) {
+      this.hmacKey = hmacKey;
+    }
+
+    @Setup
+    public void setup() throws Exception {
+      mac = Mac.getInstance("HmacSHA256");
+      mac.init(new SecretKeySpec(hmacKey.getSecretBytes(), "HmacSHA256"));
+      cipher = Cipher.getInstance("AES");
+      cipher.init(Cipher.ENCRYPT_MODE, new 
SecretKeySpec(hmacKey.getSecretBytes(), "AES"));
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      Coder<K> keyCoder = ((KvCoder<K, V>) 
c.getPipeline().getCoderRegistry().getCoder(c.element().getClass())).getKeyCoder();
+      Coder<V> valueCoder = ((KvCoder<K, V>) 
c.getPipeline().getCoderRegistry().getCoder(c.element().getClass())).getValueCoder();
+
+      byte[] encodedKey = encode(keyCoder, c.element().getKey());
+      byte[] encodedValue = encode(valueCoder, c.element().getValue());
+
+      byte[] hmac = mac.doFinal(encodedKey);
+      byte[] encryptedKey = cipher.doFinal(encodedKey);
+      byte[] encryptedValue = cipher.doFinal(encodedValue);
+
+      c.output(KV.of(hmac, KV.of(encryptedKey, encryptedValue)));
+    }
+
+    private <T> byte[] encode(Coder<T> coder, T value) throws Exception {
+      java.io.ByteArrayOutputStream os = new java.io.ByteArrayOutputStream();
+      coder.encode(value, os);
+      return os.toByteArray();
+    }
+  }
+
+  private static class _DecryptMessage<K, V>
+      extends DoFn<KV<byte[], Iterable<KV<byte[], byte[]>>>, KV<K, 
Iterable<V>>> {
+    private final Secret hmacKey;
+    private transient Cipher cipher;
+
+    _DecryptMessage(Secret hmacKey) {
+      this.hmacKey = hmacKey;
+    }
+
+    @Setup
+    public void setup() throws Exception {
+      cipher = Cipher.getInstance("AES");
+      cipher.init(Cipher.DECRYPT_MODE, new 
SecretKeySpec(hmacKey.getSecretBytes(), "AES"));
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      Coder<K> keyCoder = ((KvCoder<K, V>) 
c.getPipeline().getCoderRegistry().getCoder(c.element().getClass())).getKeyCoder();
+      Coder<V> valueCoder = ((KvCoder<K, V>) 
c.getPipeline().getCoderRegistry().getCoder(c.element().getClass())).getValueCoder();
+
+      java.util.Map<K, java.util.List<V>> decryptedKvs = new 
java.util.HashMap<>();
+      for (KV<byte[], byte[]> encryptedKv : c.element().getValue()) {
+        byte[] decryptedKeyBytes = cipher.doFinal(encryptedKv.getKey());
+        K key = decode(keyCoder, decryptedKeyBytes);
+
+        if (!decryptedKvs.containsKey(key)) {
+          decryptedKvs.put(key, new java.util.ArrayList<>());
+        }
+        byte[] decryptedValueBytes = cipher.doFinal(encryptedKv.getValue());
+        V value = decode(valueCoder, decryptedValueBytes);
+        decryptedKvs.get(key).add(value);
+      }
+
+      for (java.util.Map.Entry<K, java.util.List<V>> entry : 
decryptedKvs.entrySet()) {
+        c.output(KV.of(entry.getKey(), entry.getValue()));
+      }
+    }
+
+    private <T> T decode(Coder<T> coder, byte[] bytes) throws Exception {
+      java.io.ByteArrayInputStream is = new 
java.io.ByteArrayInputStream(bytes);
+      return coder.decode(is);
+    }
+  }
+}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Secret.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Secret.java
new file mode 100644
index 00000000000..ae34fb6ee63
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Secret.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import java.io.Serializable;
+
+/**
+ * A secret management interface used for handling sensitive data.
+ *
+ * <p>This interface provides a generic way to handle secrets. Implementations 
of this interface
+ * should handle fetching secrets from a secret management system.
+ */
+public interface Secret extends Serializable {
+  /** Returns the secret as a byte array. */
+  byte[] getSecretBytes();
+}
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java
new file mode 100644
index 00000000000..5860448140f
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import com.google.cloud.secretmanager.v1.ProjectName;
+import com.google.cloud.secretmanager.v1.SecretManagerServiceClient;
+import com.google.cloud.secretmanager.v1.SecretName;
+import com.google.cloud.secretmanager.v1.SecretPayload;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.security.SecureRandom;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link GroupByEncryptedKey}. */
+@RunWith(JUnit4.class)
+public class GroupByEncryptedKeyTest implements Serializable {
+
+  @Rule public transient TestPipeline p = TestPipeline.create();
+
+  private static class FakeSecret implements Secret {
+    private final byte[] secret = 
"aKwI2PmqYFt2p5tNKCyBS5qYmHhHsGZcyZrnZQiQ-uE=".getBytes();
+
+    @Override
+    public byte[] getSecretBytes() {
+      return secret;
+    }
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testGroupByKeyFakeSecret() {
+    List<KV<String, Integer>> ungroupedPairs =
+        Arrays.asList(
+            KV.of("k1", 3),
+            KV.of("k5", Integer.MAX_VALUE),
+            KV.of("k5", Integer.MIN_VALUE),
+            KV.of("k2", 66),
+            KV.of("k1", 4),
+            KV.of("k2", -33),
+            KV.of("k3", 0));
+
+    PCollection<KV<String, Integer>> input =
+        p.apply(
+            Create.of(ungroupedPairs)
+                .withCoder(KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of())));
+
+    PCollection<KV<String, Iterable<Integer>>> output =
+        input.apply(GroupByEncryptedKey.create(new FakeSecret()));
+
+    PAssert.that(output)
+        .containsInAnyOrder(
+            KV.of("k1", Arrays.asList(3, 4)),
+            KV.of("k5", Arrays.asList(Integer.MAX_VALUE, Integer.MIN_VALUE)),
+            KV.of("k2", Arrays.asList(66, -33)),
+            KV.of("k3", Arrays.asList(0)));
+
+    p.run();
+  }
+
+  private static final String PROJECT_ID = "apache-beam-testing";
+  private static final String SECRET_ID = "gbek-test";
+  private Secret gcpSecret;
+
+  @Before
+  public void setup() throws IOException {
+    SecretManagerServiceClient client = SecretManagerServiceClient.create();
+    ProjectName projectName = ProjectName.of(PROJECT_ID);
+    SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID);
+
+    try {
+      client.getSecret(secretName);
+    } catch (Exception e) {
+      com.google.cloud.secretmanager.v1.Secret secret =
+          com.google.cloud.secretmanager.v1.Secret.newBuilder()
+              .setReplication(
+                  com.google.cloud.secretmanager.v1.Replication.newBuilder()
+                      .setAutomatic(
+                          
com.google.cloud.secretmanager.v1.Replication.Automatic.newBuilder()
+                              .build())
+                      .build())
+              .build();
+      client.createSecret(projectName, SECRET_ID, secret);
+      byte[] secretBytes = new byte[32];
+      new SecureRandom().nextBytes(secretBytes);
+      client.addSecretVersion(
+          secretName, 
SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build());
+    }
+    gcpSecret = new GcpSecret(secretName.toString() + "/versions/latest");
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    SecretManagerServiceClient client = SecretManagerServiceClient.create();
+    SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID);
+    client.deleteSecret(secretName);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testGroupByKeyGcpSecret() {
+    List<KV<String, Integer>> ungroupedPairs =
+        Arrays.asList(
+            KV.of("k1", 3),
+            KV.of("k5", Integer.MAX_VALUE),
+            KV.of("k5", Integer.MIN_VALUE),
+            KV.of("k2", 66),
+            KV.of("k1", 4),
+            KV.of("k2", -33),
+            KV.of("k3", 0));
+
+    PCollection<KV<String, Integer>> input =
+        p.apply(
+            Create.of(ungroupedPairs)
+                .withCoder(KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of())));
+
+    PCollection<KV<String, Iterable<Integer>>> output =
+        input.apply(GroupByEncryptedKey.create(gcpSecret));
+
+    PAssert.that(output)
+        .containsInAnyOrder(
+            KV.of("k1", Arrays.asList(3, 4)),
+            KV.of("k5", Arrays.asList(Integer.MAX_VALUE, Integer.MIN_VALUE)),
+            KV.of("k2", Arrays.asList(66, -33)),
+            KV.of("k3", Arrays.asList(0)));
+
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testGroupByKeyGcpSecretThrows() {
+    Secret gcpSecret = new GcpSecret("bad_path/versions/latest");
+    p.apply(Create.of(KV.of("k1", 1)))
+        .apply(GroupByEncryptedKey.create(gcpSecret));
+    assertThrows(RuntimeException.class, () -> p.run());
+  }
+}

Reply via email to