This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch users/damccorm/simpler-spark-repro
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to
refs/heads/users/damccorm/simpler-spark-repro by this push:
new 9dbfcc3184a add back ITs to see if it works
9dbfcc3184a is described below
commit 9dbfcc3184a0b514ae51234623aa16f7041ceffa
Author: Danny Mccormick <[email protected]>
AuthorDate: Fri Oct 10 15:02:26 2025 -0400
add back ITs to see if it works
---
.../apache/beam/sdk/transforms/GroupByKeyIT.java | 235 +++++++++++++++++++++
1 file changed, 235 insertions(+)
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java
new file mode 100644
index 00000000000..1c8168a42a0
--- /dev/null
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import com.google.cloud.secretmanager.v1.ProjectName;
+import com.google.cloud.secretmanager.v1.SecretManagerServiceClient;
+import com.google.cloud.secretmanager.v1.SecretName;
+import com.google.cloud.secretmanager.v1.SecretPayload;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.security.SecureRandom;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Integration test for GroupByKey transforms and some other transforms which
use GBK. */
+@RunWith(JUnit4.class)
+public class GroupByKeyIT {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ private static final String PROJECT_ID = "apache-beam-testing";
+ private static final String SECRET_ID = "gbek-test";
+ private static String gcpSecretVersionName;
+ private static String secretId;
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ secretId = String.format("%s-%d", SECRET_ID, new
SecureRandom().nextInt(10000));
+ SecretManagerServiceClient client;
+ try {
+ client = SecretManagerServiceClient.create();
+ } catch (IOException e) {
+ gcpSecretVersionName = null;
+ return;
+ }
+ ProjectName projectName = ProjectName.of(PROJECT_ID);
+ SecretName secretName = SecretName.of(PROJECT_ID, secretId);
+
+ try {
+ client.getSecret(secretName);
+ } catch (Exception e) {
+ com.google.cloud.secretmanager.v1.Secret secret =
+ com.google.cloud.secretmanager.v1.Secret.newBuilder()
+ .setReplication(
+ com.google.cloud.secretmanager.v1.Replication.newBuilder()
+ .setAutomatic(
+
com.google.cloud.secretmanager.v1.Replication.Automatic.newBuilder()
+ .build())
+ .build())
+ .build();
+ client.createSecret(projectName, secretId, secret);
+ byte[] secretBytes = new byte[32];
+ new SecureRandom().nextBytes(secretBytes);
+ client.addSecretVersion(
+ secretName,
+ SecretPayload.newBuilder()
+
.setData(ByteString.copyFrom(java.util.Base64.getUrlEncoder().encode(secretBytes)))
+ .build());
+ }
+ gcpSecretVersionName = secretName.toString() + "/versions/latest";
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ if (gcpSecretVersionName != null) {
+ SecretManagerServiceClient client = SecretManagerServiceClient.create();
+ SecretName secretName = SecretName.of(PROJECT_ID, secretId);
+ client.deleteSecret(secretName);
+ }
+ }
+
+ @Test
+ public void testGroupByKeyWithValidGcpSecretOption() throws Exception {
+ if (gcpSecretVersionName == null) {
+ // Skip test if we couldn't set up secret manager
+ return;
+ }
+ PipelineOptions options = TestPipeline.testingPipelineOptions();
+ options.setGbek(String.format("type:gcpsecret;version_name:%s",
gcpSecretVersionName));
+ Pipeline p = Pipeline.create(options);
+ List<KV<String, Integer>> ungroupedPairs =
+ Arrays.asList(
+ KV.of("k1", 3),
+ KV.of("k5", Integer.MAX_VALUE),
+ KV.of("k5", Integer.MIN_VALUE),
+ KV.of("k2", 66),
+ KV.of("k1", 4),
+ KV.of("k2", -33),
+ KV.of("k3", 0));
+
+ PCollection<KV<String, Integer>> input =
+ p.apply(
+ Create.of(ungroupedPairs)
+ .withCoder(KvCoder.of(StringUtf8Coder.of(),
VarIntCoder.of())));
+
+ PCollection<KV<String, Iterable<Integer>>> output =
input.apply(GroupByKey.create());
+
+ PAssert.that(output)
+ .containsInAnyOrder(
+ KV.of("k1", Arrays.asList(3, 4)),
+ KV.of("k5", Arrays.asList(Integer.MAX_VALUE, Integer.MIN_VALUE)),
+ KV.of("k2", Arrays.asList(66, -33)),
+ KV.of("k3", Arrays.asList(0)));
+
+ p.run();
+ }
+
+ @Test
+ public void testGroupByKeyWithInvalidGcpSecretOption() throws Exception {
+ if (gcpSecretVersionName == null) {
+ // Skip test if we couldn't set up secret manager
+ return;
+ }
+ PipelineOptions options = TestPipeline.testingPipelineOptions();
+ options.setGbek("type:gcpsecret;version_name:bad_path/versions/latest");
+ Pipeline p = Pipeline.create(options);
+ p.apply(Create.of(KV.of("k1", 1))).apply(GroupByKey.create());
+ thrown.expect(RuntimeException.class);
+ p.run();
+ }
+
+ // Redistribute depends on GBK under the hood and can have runner-specific
implementations
+ @Test
+ public void testRedistributeWithValidGcpSecretOption() throws Exception {
+ if (gcpSecretVersionName == null) {
+ // Skip test if we couldn't set up secret manager
+ return;
+ }
+ PipelineOptions options = TestPipeline.testingPipelineOptions();
+ options.setGbek(String.format("type:gcpsecret;version_name:%s",
gcpSecretVersionName));
+ Pipeline p = Pipeline.create(options);
+
+ List<KV<String, Integer>> ungroupedPairs =
+ Arrays.asList(
+ KV.of("k1", 3),
+ KV.of("k5", Integer.MAX_VALUE),
+ KV.of("k5", Integer.MIN_VALUE),
+ KV.of("k2", 66),
+ KV.of("k1", 4),
+ KV.of("k2", -33),
+ KV.of("k3", 0));
+ PCollection<KV<String, Integer>> input =
+ p.apply(
+ Create.of(ungroupedPairs)
+ .withCoder(KvCoder.of(StringUtf8Coder.of(),
VarIntCoder.of())));
+ PCollection<KV<String, Integer>> output =
input.apply(Redistribute.byKey());
+ PAssert.that(output).containsInAnyOrder(ungroupedPairs);
+
+ p.run();
+ }
+
+ @Test
+ public void testRedistributeWithInvalidGcpSecretOption() throws Exception {
+ if (gcpSecretVersionName == null) {
+ // Skip test if we couldn't set up secret manager
+ return;
+ }
+ PipelineOptions options = TestPipeline.testingPipelineOptions();
+ options.setGbek("type:gcpsecret;version_name:bad_path/versions/latest");
+ Pipeline p = Pipeline.create(options);
+ p.apply(Create.of(KV.of("k1", 1))).apply(Redistribute.byKey());
+ thrown.expect(RuntimeException.class);
+ p.run();
+ }
+
+ // Combine.PerKey depends on GBK under the hood, but can be overriden by a
runner. This can
+ // fail unless it is handled specially, so we should test it specifically
+ @Test
+ public void testCombinePerKeyWithValidGcpSecretOption() throws Exception {
+ if (gcpSecretVersionName == null) {
+ // Skip test if we couldn't set up secret manager
+ return;
+ }
+ PipelineOptions options = TestPipeline.testingPipelineOptions();
+ options.setGbek(String.format("type:gcpsecret;version_name:%s",
gcpSecretVersionName));
+ Pipeline p = Pipeline.create(options);
+
+ List<KV<String, Integer>> ungroupedPairs =
+ Arrays.asList(
+ KV.of("k1", 3), KV.of("k2", 66), KV.of("k1", 4), KV.of("k2", -33),
KV.of("k3", 0));
+ List<KV<String, Integer>> sums = Arrays.asList(KV.of("k1", 7), KV.of("k2",
33), KV.of("k3", 0));
+ PCollection<KV<String, Integer>> input =
+ p.apply(
+ Create.of(ungroupedPairs)
+ .withCoder(KvCoder.of(StringUtf8Coder.of(),
VarIntCoder.of())));
+ PCollection<KV<String, Integer>> output =
input.apply(Combine.perKey(Sum.ofIntegers()));
+ PAssert.that(output).containsInAnyOrder(sums);
+
+ p.run();
+ }
+
+ @Test
+ public void testCombinePerKeyWithInvalidGcpSecretOption() throws Exception {
+ if (gcpSecretVersionName == null) {
+ // Skip test if we couldn't set up secret manager
+ return;
+ }
+ PipelineOptions options = TestPipeline.testingPipelineOptions();
+ options.setGbek("type:gcpsecret;version_name:bad_path/versions/latest");
+ Pipeline p = Pipeline.create(options);
+ p.apply(Create.of(KV.of("k1", 1))).apply(Combine.perKey(Sum.ofIntegers()));
+ thrown.expect(RuntimeException.class);
+ p.run();
+ }
+}