This is an automated email from the ASF dual-hosted git repository. damccorm pushed a commit to branch users/damccorm/simpler-spark-repro in repository https://gitbox.apache.org/repos/asf/beam.git
commit 82897f9221e4968184d633dd5df45c97dca24bda Author: Danny Mccormick <[email protected]> AuthorDate: Fri Oct 10 14:39:29 2025 -0400 Try minimal repro to fix spark streaming job --- .../beam_PostCommit_Java_PVR_Spark3_Streaming.json | 2 +- .../apache/beam/sdk/transforms/GroupByKeyIT.java | 235 --------------------- .../apache/beam/sdk/transforms/GroupByKeyTest.java | 108 ---------- 3 files changed, 1 insertion(+), 344 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Java_PVR_Spark3_Streaming.json b/.github/trigger_files/beam_PostCommit_Java_PVR_Spark3_Streaming.json index e0266d62f2e..f1ba03a243e 100644 --- a/.github/trigger_files/beam_PostCommit_Java_PVR_Spark3_Streaming.json +++ b/.github/trigger_files/beam_PostCommit_Java_PVR_Spark3_Streaming.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 4 + "modification": 5 } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java deleted file mode 100644 index 1c8168a42a0..00000000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java +++ /dev/null @@ -1,235 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms; - -import com.google.cloud.secretmanager.v1.ProjectName; -import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; -import com.google.cloud.secretmanager.v1.SecretName; -import com.google.cloud.secretmanager.v1.SecretPayload; -import com.google.protobuf.ByteString; -import java.io.IOException; -import java.security.SecureRandom; -import java.util.Arrays; -import java.util.List; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Integration test for GroupByKey transforms and some other transforms which use GBK. */ -@RunWith(JUnit4.class) -public class GroupByKeyIT { - @Rule public ExpectedException thrown = ExpectedException.none(); - - private static final String PROJECT_ID = "apache-beam-testing"; - private static final String SECRET_ID = "gbek-test"; - private static String gcpSecretVersionName; - private static String secretId; - - @BeforeClass - public static void setup() throws IOException { - secretId = String.format("%s-%d", SECRET_ID, new SecureRandom().nextInt(10000)); - SecretManagerServiceClient client; - try { - client = SecretManagerServiceClient.create(); - } catch (IOException e) { - gcpSecretVersionName = null; - return; - } - ProjectName projectName = ProjectName.of(PROJECT_ID); - SecretName secretName = SecretName.of(PROJECT_ID, secretId); - - try { - client.getSecret(secretName); - } catch (Exception e) { - com.google.cloud.secretmanager.v1.Secret secret = - com.google.cloud.secretmanager.v1.Secret.newBuilder() - .setReplication( - com.google.cloud.secretmanager.v1.Replication.newBuilder() - .setAutomatic( - com.google.cloud.secretmanager.v1.Replication.Automatic.newBuilder() - .build()) - .build()) - .build(); - client.createSecret(projectName, secretId, secret); - byte[] secretBytes = new byte[32]; - new SecureRandom().nextBytes(secretBytes); - client.addSecretVersion( - secretName, - SecretPayload.newBuilder() - .setData(ByteString.copyFrom(java.util.Base64.getUrlEncoder().encode(secretBytes))) - .build()); - } - gcpSecretVersionName = secretName.toString() + "/versions/latest"; - } - - @AfterClass - public static void tearDown() throws IOException { - if (gcpSecretVersionName != null) { - SecretManagerServiceClient client = SecretManagerServiceClient.create(); - SecretName secretName = SecretName.of(PROJECT_ID, secretId); - client.deleteSecret(secretName); - } - } - - @Test - public void testGroupByKeyWithValidGcpSecretOption() throws Exception { - if (gcpSecretVersionName == null) { - // Skip test if we couldn't set up secret manager - return; - } - PipelineOptions options = TestPipeline.testingPipelineOptions(); - options.setGbek(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); - Pipeline p = Pipeline.create(options); - List<KV<String, Integer>> ungroupedPairs = - Arrays.asList( - KV.of("k1", 3), - KV.of("k5", Integer.MAX_VALUE), - KV.of("k5", Integer.MIN_VALUE), - KV.of("k2", 66), - KV.of("k1", 4), - KV.of("k2", -33), - KV.of("k3", 0)); - - PCollection<KV<String, Integer>> input = - p.apply( - Create.of(ungroupedPairs) - .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); - - PCollection<KV<String, Iterable<Integer>>> output = input.apply(GroupByKey.create()); - - PAssert.that(output) - .containsInAnyOrder( - KV.of("k1", Arrays.asList(3, 4)), - KV.of("k5", Arrays.asList(Integer.MAX_VALUE, Integer.MIN_VALUE)), - KV.of("k2", Arrays.asList(66, -33)), - KV.of("k3", Arrays.asList(0))); - - p.run(); - } - - @Test - public void testGroupByKeyWithInvalidGcpSecretOption() throws Exception { - if (gcpSecretVersionName == null) { - // Skip test if we couldn't set up secret manager - return; - } - PipelineOptions options = TestPipeline.testingPipelineOptions(); - options.setGbek("type:gcpsecret;version_name:bad_path/versions/latest"); - Pipeline p = Pipeline.create(options); - p.apply(Create.of(KV.of("k1", 1))).apply(GroupByKey.create()); - thrown.expect(RuntimeException.class); - p.run(); - } - - // Redistribute depends on GBK under the hood and can have runner-specific implementations - @Test - public void testRedistributeWithValidGcpSecretOption() throws Exception { - if (gcpSecretVersionName == null) { - // Skip test if we couldn't set up secret manager - return; - } - PipelineOptions options = TestPipeline.testingPipelineOptions(); - options.setGbek(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); - Pipeline p = Pipeline.create(options); - - List<KV<String, Integer>> ungroupedPairs = - Arrays.asList( - KV.of("k1", 3), - KV.of("k5", Integer.MAX_VALUE), - KV.of("k5", Integer.MIN_VALUE), - KV.of("k2", 66), - KV.of("k1", 4), - KV.of("k2", -33), - KV.of("k3", 0)); - PCollection<KV<String, Integer>> input = - p.apply( - Create.of(ungroupedPairs) - .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); - PCollection<KV<String, Integer>> output = input.apply(Redistribute.byKey()); - PAssert.that(output).containsInAnyOrder(ungroupedPairs); - - p.run(); - } - - @Test - public void testRedistributeWithInvalidGcpSecretOption() throws Exception { - if (gcpSecretVersionName == null) { - // Skip test if we couldn't set up secret manager - return; - } - PipelineOptions options = TestPipeline.testingPipelineOptions(); - options.setGbek("type:gcpsecret;version_name:bad_path/versions/latest"); - Pipeline p = Pipeline.create(options); - p.apply(Create.of(KV.of("k1", 1))).apply(Redistribute.byKey()); - thrown.expect(RuntimeException.class); - p.run(); - } - - // Combine.PerKey depends on GBK under the hood, but can be overriden by a runner. This can - // fail unless it is handled specially, so we should test it specifically - @Test - public void testCombinePerKeyWithValidGcpSecretOption() throws Exception { - if (gcpSecretVersionName == null) { - // Skip test if we couldn't set up secret manager - return; - } - PipelineOptions options = TestPipeline.testingPipelineOptions(); - options.setGbek(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); - Pipeline p = Pipeline.create(options); - - List<KV<String, Integer>> ungroupedPairs = - Arrays.asList( - KV.of("k1", 3), KV.of("k2", 66), KV.of("k1", 4), KV.of("k2", -33), KV.of("k3", 0)); - List<KV<String, Integer>> sums = Arrays.asList(KV.of("k1", 7), KV.of("k2", 33), KV.of("k3", 0)); - PCollection<KV<String, Integer>> input = - p.apply( - Create.of(ungroupedPairs) - .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); - PCollection<KV<String, Integer>> output = input.apply(Combine.perKey(Sum.ofIntegers())); - PAssert.that(output).containsInAnyOrder(sums); - - p.run(); - } - - @Test - public void testCombinePerKeyWithInvalidGcpSecretOption() throws Exception { - if (gcpSecretVersionName == null) { - // Skip test if we couldn't set up secret manager - return; - } - PipelineOptions options = TestPipeline.testingPipelineOptions(); - options.setGbek("type:gcpsecret;version_name:bad_path/versions/latest"); - Pipeline p = Pipeline.create(options); - p.apply(Create.of(KV.of("k1", 1))).apply(Combine.perKey(Sum.ofIntegers())); - thrown.expect(RuntimeException.class); - p.run(); - } -} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index d9a3e3ed20d..5464838ad4d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -26,18 +26,12 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; import static org.junit.Assert.assertThrows; -import com.google.cloud.secretmanager.v1.ProjectName; -import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; -import com.google.cloud.secretmanager.v1.SecretName; -import com.google.cloud.secretmanager.v1.SecretPayload; -import com.google.protobuf.ByteString; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; -import java.security.SecureRandom; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -96,9 +90,7 @@ import org.checkerframework.checker.nullness.qual.Nullable; import org.hamcrest.Matcher; import org.joda.time.Duration; import org.joda.time.Instant; -import org.junit.AfterClass; import org.junit.Assert; -import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -117,57 +109,6 @@ public class GroupByKeyTest implements Serializable { /** Shared test base class with setup/teardown helpers. */ public abstract static class SharedTestBase { @Rule public transient TestPipeline p = TestPipeline.create(); - - private static final String PROJECT_ID = "apache-beam-testing"; - private static final String SECRET_ID = "gbek-test"; - public static String gcpSecretVersionName; - private static String secretId; - - @BeforeClass - public static void setup() throws IOException { - secretId = String.format("%s-%d", SECRET_ID, new SecureRandom().nextInt(10000)); - SecretManagerServiceClient client; - try { - client = SecretManagerServiceClient.create(); - } catch (IOException e) { - gcpSecretVersionName = null; - return; - } - ProjectName projectName = ProjectName.of(PROJECT_ID); - SecretName secretName = SecretName.of(PROJECT_ID, secretId); - - try { - client.getSecret(secretName); - } catch (Exception e) { - com.google.cloud.secretmanager.v1.Secret secret = - com.google.cloud.secretmanager.v1.Secret.newBuilder() - .setReplication( - com.google.cloud.secretmanager.v1.Replication.newBuilder() - .setAutomatic( - com.google.cloud.secretmanager.v1.Replication.Automatic.newBuilder() - .build()) - .build()) - .build(); - client.createSecret(projectName, secretId, secret); - byte[] secretBytes = new byte[32]; - new SecureRandom().nextBytes(secretBytes); - client.addSecretVersion( - secretName, - SecretPayload.newBuilder() - .setData(ByteString.copyFrom(java.util.Base64.getUrlEncoder().encode(secretBytes))) - .build()); - } - gcpSecretVersionName = secretName.toString() + "/versions/latest"; - } - - @AfterClass - public static void tearDown() throws IOException { - if (gcpSecretVersionName != null) { - SecretManagerServiceClient client = SecretManagerServiceClient.create(); - SecretName secretName = SecretName.of(PROJECT_ID, secretId); - client.deleteSecret(secretName); - } - } } /** Tests validating basic {@link GroupByKey} scenarios. */ @@ -673,55 +614,6 @@ public class GroupByKeyTest implements Serializable { public void testLargeKeys100MB() throws Exception { runLargeKeysTest(p, 100 << 20); } - - @Test - @Category(NeedsRunner.class) - public void testGroupByKeyWithValidGcpSecretOption() { - if (gcpSecretVersionName == null) { - // Skip test if we couldn't set up secret manager - return; - } - List<KV<String, Integer>> ungroupedPairs = - Arrays.asList( - KV.of("k1", 3), - KV.of("k5", Integer.MAX_VALUE), - KV.of("k5", Integer.MIN_VALUE), - KV.of("k2", 66), - KV.of("k1", 4), - KV.of("k2", -33), - KV.of("k3", 0)); - - PCollection<KV<String, Integer>> input = - p.apply( - Create.of(ungroupedPairs) - .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))); - - p.getOptions().setGbek(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); - PCollection<KV<String, Iterable<Integer>>> output = input.apply(GroupByKey.create()); - - SerializableFunction<Iterable<KV<String, Iterable<Integer>>>, Void> checker = - containsKvs( - kv("k1", 3, 4), - kv("k5", Integer.MIN_VALUE, Integer.MAX_VALUE), - kv("k2", 66, -33), - kv("k3", 0)); - PAssert.that(output).satisfies(checker); - PAssert.that(output).inWindow(GlobalWindow.INSTANCE).satisfies(checker); - - p.run(); - } - - @Test - @Category(NeedsRunner.class) - public void testGroupByKeyWithInvalidGcpSecretOption() { - if (gcpSecretVersionName == null) { - // Skip test if we couldn't set up secret manager - return; - } - p.getOptions().setGbek("type:gcpsecret;version_name:bad_path/versions/latest"); - p.apply(Create.of(KV.of("k1", 1))).apply(GroupByKey.create()); - assertThrows(RuntimeException.class, () -> p.run()); - } } /** Tests validating GroupByKey behaviors with windowing. */
