This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch users/damccorm/simpler-spark-repro
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 82897f9221e4968184d633dd5df45c97dca24bda
Author: Danny Mccormick <[email protected]>
AuthorDate: Fri Oct 10 14:39:29 2025 -0400

    Try minimal repro to fix spark streaming job
---
 .../beam_PostCommit_Java_PVR_Spark3_Streaming.json |   2 +-
 .../apache/beam/sdk/transforms/GroupByKeyIT.java   | 235 ---------------------
 .../apache/beam/sdk/transforms/GroupByKeyTest.java | 108 ----------
 3 files changed, 1 insertion(+), 344 deletions(-)

diff --git 
a/.github/trigger_files/beam_PostCommit_Java_PVR_Spark3_Streaming.json 
b/.github/trigger_files/beam_PostCommit_Java_PVR_Spark3_Streaming.json
index e0266d62f2e..f1ba03a243e 100644
--- a/.github/trigger_files/beam_PostCommit_Java_PVR_Spark3_Streaming.json
+++ b/.github/trigger_files/beam_PostCommit_Java_PVR_Spark3_Streaming.json
@@ -1,4 +1,4 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run",
-  "modification": 4
+  "modification": 5
 }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java
deleted file mode 100644
index 1c8168a42a0..00000000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import com.google.cloud.secretmanager.v1.ProjectName;
-import com.google.cloud.secretmanager.v1.SecretManagerServiceClient;
-import com.google.cloud.secretmanager.v1.SecretName;
-import com.google.cloud.secretmanager.v1.SecretPayload;
-import com.google.protobuf.ByteString;
-import java.io.IOException;
-import java.security.SecureRandom;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Integration test for GroupByKey transforms and some other transforms which 
use GBK. */
-@RunWith(JUnit4.class)
-public class GroupByKeyIT {
-  @Rule public ExpectedException thrown = ExpectedException.none();
-
-  private static final String PROJECT_ID = "apache-beam-testing";
-  private static final String SECRET_ID = "gbek-test";
-  private static String gcpSecretVersionName;
-  private static String secretId;
-
-  @BeforeClass
-  public static void setup() throws IOException {
-    secretId = String.format("%s-%d", SECRET_ID, new 
SecureRandom().nextInt(10000));
-    SecretManagerServiceClient client;
-    try {
-      client = SecretManagerServiceClient.create();
-    } catch (IOException e) {
-      gcpSecretVersionName = null;
-      return;
-    }
-    ProjectName projectName = ProjectName.of(PROJECT_ID);
-    SecretName secretName = SecretName.of(PROJECT_ID, secretId);
-
-    try {
-      client.getSecret(secretName);
-    } catch (Exception e) {
-      com.google.cloud.secretmanager.v1.Secret secret =
-          com.google.cloud.secretmanager.v1.Secret.newBuilder()
-              .setReplication(
-                  com.google.cloud.secretmanager.v1.Replication.newBuilder()
-                      .setAutomatic(
-                          
com.google.cloud.secretmanager.v1.Replication.Automatic.newBuilder()
-                              .build())
-                      .build())
-              .build();
-      client.createSecret(projectName, secretId, secret);
-      byte[] secretBytes = new byte[32];
-      new SecureRandom().nextBytes(secretBytes);
-      client.addSecretVersion(
-          secretName,
-          SecretPayload.newBuilder()
-              
.setData(ByteString.copyFrom(java.util.Base64.getUrlEncoder().encode(secretBytes)))
-              .build());
-    }
-    gcpSecretVersionName = secretName.toString() + "/versions/latest";
-  }
-
-  @AfterClass
-  public static void tearDown() throws IOException {
-    if (gcpSecretVersionName != null) {
-      SecretManagerServiceClient client = SecretManagerServiceClient.create();
-      SecretName secretName = SecretName.of(PROJECT_ID, secretId);
-      client.deleteSecret(secretName);
-    }
-  }
-
-  @Test
-  public void testGroupByKeyWithValidGcpSecretOption() throws Exception {
-    if (gcpSecretVersionName == null) {
-      // Skip test if we couldn't set up secret manager
-      return;
-    }
-    PipelineOptions options = TestPipeline.testingPipelineOptions();
-    options.setGbek(String.format("type:gcpsecret;version_name:%s", 
gcpSecretVersionName));
-    Pipeline p = Pipeline.create(options);
-    List<KV<String, Integer>> ungroupedPairs =
-        Arrays.asList(
-            KV.of("k1", 3),
-            KV.of("k5", Integer.MAX_VALUE),
-            KV.of("k5", Integer.MIN_VALUE),
-            KV.of("k2", 66),
-            KV.of("k1", 4),
-            KV.of("k2", -33),
-            KV.of("k3", 0));
-
-    PCollection<KV<String, Integer>> input =
-        p.apply(
-            Create.of(ungroupedPairs)
-                .withCoder(KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of())));
-
-    PCollection<KV<String, Iterable<Integer>>> output = 
input.apply(GroupByKey.create());
-
-    PAssert.that(output)
-        .containsInAnyOrder(
-            KV.of("k1", Arrays.asList(3, 4)),
-            KV.of("k5", Arrays.asList(Integer.MAX_VALUE, Integer.MIN_VALUE)),
-            KV.of("k2", Arrays.asList(66, -33)),
-            KV.of("k3", Arrays.asList(0)));
-
-    p.run();
-  }
-
-  @Test
-  public void testGroupByKeyWithInvalidGcpSecretOption() throws Exception {
-    if (gcpSecretVersionName == null) {
-      // Skip test if we couldn't set up secret manager
-      return;
-    }
-    PipelineOptions options = TestPipeline.testingPipelineOptions();
-    options.setGbek("type:gcpsecret;version_name:bad_path/versions/latest");
-    Pipeline p = Pipeline.create(options);
-    p.apply(Create.of(KV.of("k1", 1))).apply(GroupByKey.create());
-    thrown.expect(RuntimeException.class);
-    p.run();
-  }
-
-  // Redistribute depends on GBK under the hood and can have runner-specific 
implementations
-  @Test
-  public void testRedistributeWithValidGcpSecretOption() throws Exception {
-    if (gcpSecretVersionName == null) {
-      // Skip test if we couldn't set up secret manager
-      return;
-    }
-    PipelineOptions options = TestPipeline.testingPipelineOptions();
-    options.setGbek(String.format("type:gcpsecret;version_name:%s", 
gcpSecretVersionName));
-    Pipeline p = Pipeline.create(options);
-
-    List<KV<String, Integer>> ungroupedPairs =
-        Arrays.asList(
-            KV.of("k1", 3),
-            KV.of("k5", Integer.MAX_VALUE),
-            KV.of("k5", Integer.MIN_VALUE),
-            KV.of("k2", 66),
-            KV.of("k1", 4),
-            KV.of("k2", -33),
-            KV.of("k3", 0));
-    PCollection<KV<String, Integer>> input =
-        p.apply(
-            Create.of(ungroupedPairs)
-                .withCoder(KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of())));
-    PCollection<KV<String, Integer>> output = 
input.apply(Redistribute.byKey());
-    PAssert.that(output).containsInAnyOrder(ungroupedPairs);
-
-    p.run();
-  }
-
-  @Test
-  public void testRedistributeWithInvalidGcpSecretOption() throws Exception {
-    if (gcpSecretVersionName == null) {
-      // Skip test if we couldn't set up secret manager
-      return;
-    }
-    PipelineOptions options = TestPipeline.testingPipelineOptions();
-    options.setGbek("type:gcpsecret;version_name:bad_path/versions/latest");
-    Pipeline p = Pipeline.create(options);
-    p.apply(Create.of(KV.of("k1", 1))).apply(Redistribute.byKey());
-    thrown.expect(RuntimeException.class);
-    p.run();
-  }
-
-  // Combine.PerKey depends on GBK under the hood, but can be overriden by a 
runner. This can
-  // fail unless it is handled specially, so we should test it specifically
-  @Test
-  public void testCombinePerKeyWithValidGcpSecretOption() throws Exception {
-    if (gcpSecretVersionName == null) {
-      // Skip test if we couldn't set up secret manager
-      return;
-    }
-    PipelineOptions options = TestPipeline.testingPipelineOptions();
-    options.setGbek(String.format("type:gcpsecret;version_name:%s", 
gcpSecretVersionName));
-    Pipeline p = Pipeline.create(options);
-
-    List<KV<String, Integer>> ungroupedPairs =
-        Arrays.asList(
-            KV.of("k1", 3), KV.of("k2", 66), KV.of("k1", 4), KV.of("k2", -33), 
KV.of("k3", 0));
-    List<KV<String, Integer>> sums = Arrays.asList(KV.of("k1", 7), KV.of("k2", 
33), KV.of("k3", 0));
-    PCollection<KV<String, Integer>> input =
-        p.apply(
-            Create.of(ungroupedPairs)
-                .withCoder(KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of())));
-    PCollection<KV<String, Integer>> output = 
input.apply(Combine.perKey(Sum.ofIntegers()));
-    PAssert.that(output).containsInAnyOrder(sums);
-
-    p.run();
-  }
-
-  @Test
-  public void testCombinePerKeyWithInvalidGcpSecretOption() throws Exception {
-    if (gcpSecretVersionName == null) {
-      // Skip test if we couldn't set up secret manager
-      return;
-    }
-    PipelineOptions options = TestPipeline.testingPipelineOptions();
-    options.setGbek("type:gcpsecret;version_name:bad_path/versions/latest");
-    Pipeline p = Pipeline.create(options);
-    p.apply(Create.of(KV.of("k1", 1))).apply(Combine.perKey(Sum.ofIntegers()));
-    thrown.expect(RuntimeException.class);
-    p.run();
-  }
-}
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index d9a3e3ed20d..5464838ad4d 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -26,18 +26,12 @@ import static org.hamcrest.Matchers.empty;
 import static 
org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
 import static org.junit.Assert.assertThrows;
 
-import com.google.cloud.secretmanager.v1.ProjectName;
-import com.google.cloud.secretmanager.v1.SecretManagerServiceClient;
-import com.google.cloud.secretmanager.v1.SecretName;
-import com.google.cloud.secretmanager.v1.SecretPayload;
-import com.google.protobuf.ByteString;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
-import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -96,9 +90,7 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 import org.hamcrest.Matcher;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
-import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -117,57 +109,6 @@ public class GroupByKeyTest implements Serializable {
   /** Shared test base class with setup/teardown helpers. */
   public abstract static class SharedTestBase {
     @Rule public transient TestPipeline p = TestPipeline.create();
-
-    private static final String PROJECT_ID = "apache-beam-testing";
-    private static final String SECRET_ID = "gbek-test";
-    public static String gcpSecretVersionName;
-    private static String secretId;
-
-    @BeforeClass
-    public static void setup() throws IOException {
-      secretId = String.format("%s-%d", SECRET_ID, new 
SecureRandom().nextInt(10000));
-      SecretManagerServiceClient client;
-      try {
-        client = SecretManagerServiceClient.create();
-      } catch (IOException e) {
-        gcpSecretVersionName = null;
-        return;
-      }
-      ProjectName projectName = ProjectName.of(PROJECT_ID);
-      SecretName secretName = SecretName.of(PROJECT_ID, secretId);
-
-      try {
-        client.getSecret(secretName);
-      } catch (Exception e) {
-        com.google.cloud.secretmanager.v1.Secret secret =
-            com.google.cloud.secretmanager.v1.Secret.newBuilder()
-                .setReplication(
-                    com.google.cloud.secretmanager.v1.Replication.newBuilder()
-                        .setAutomatic(
-                            
com.google.cloud.secretmanager.v1.Replication.Automatic.newBuilder()
-                                .build())
-                        .build())
-                .build();
-        client.createSecret(projectName, secretId, secret);
-        byte[] secretBytes = new byte[32];
-        new SecureRandom().nextBytes(secretBytes);
-        client.addSecretVersion(
-            secretName,
-            SecretPayload.newBuilder()
-                
.setData(ByteString.copyFrom(java.util.Base64.getUrlEncoder().encode(secretBytes)))
-                .build());
-      }
-      gcpSecretVersionName = secretName.toString() + "/versions/latest";
-    }
-
-    @AfterClass
-    public static void tearDown() throws IOException {
-      if (gcpSecretVersionName != null) {
-        SecretManagerServiceClient client = 
SecretManagerServiceClient.create();
-        SecretName secretName = SecretName.of(PROJECT_ID, secretId);
-        client.deleteSecret(secretName);
-      }
-    }
   }
 
   /** Tests validating basic {@link GroupByKey} scenarios. */
@@ -673,55 +614,6 @@ public class GroupByKeyTest implements Serializable {
     public void testLargeKeys100MB() throws Exception {
       runLargeKeysTest(p, 100 << 20);
     }
-
-    @Test
-    @Category(NeedsRunner.class)
-    public void testGroupByKeyWithValidGcpSecretOption() {
-      if (gcpSecretVersionName == null) {
-        // Skip test if we couldn't set up secret manager
-        return;
-      }
-      List<KV<String, Integer>> ungroupedPairs =
-          Arrays.asList(
-              KV.of("k1", 3),
-              KV.of("k5", Integer.MAX_VALUE),
-              KV.of("k5", Integer.MIN_VALUE),
-              KV.of("k2", 66),
-              KV.of("k1", 4),
-              KV.of("k2", -33),
-              KV.of("k3", 0));
-
-      PCollection<KV<String, Integer>> input =
-          p.apply(
-              Create.of(ungroupedPairs)
-                  .withCoder(KvCoder.of(StringUtf8Coder.of(), 
BigEndianIntegerCoder.of())));
-
-      p.getOptions().setGbek(String.format("type:gcpsecret;version_name:%s", 
gcpSecretVersionName));
-      PCollection<KV<String, Iterable<Integer>>> output = 
input.apply(GroupByKey.create());
-
-      SerializableFunction<Iterable<KV<String, Iterable<Integer>>>, Void> 
checker =
-          containsKvs(
-              kv("k1", 3, 4),
-              kv("k5", Integer.MIN_VALUE, Integer.MAX_VALUE),
-              kv("k2", 66, -33),
-              kv("k3", 0));
-      PAssert.that(output).satisfies(checker);
-      PAssert.that(output).inWindow(GlobalWindow.INSTANCE).satisfies(checker);
-
-      p.run();
-    }
-
-    @Test
-    @Category(NeedsRunner.class)
-    public void testGroupByKeyWithInvalidGcpSecretOption() {
-      if (gcpSecretVersionName == null) {
-        // Skip test if we couldn't set up secret manager
-        return;
-      }
-      
p.getOptions().setGbek("type:gcpsecret;version_name:bad_path/versions/latest");
-      p.apply(Create.of(KV.of("k1", 1))).apply(GroupByKey.create());
-      assertThrows(RuntimeException.class, () -> p.run());
-    }
   }
 
   /** Tests validating GroupByKey behaviors with windowing. */

Reply via email to