This is an automated email from the ASF dual-hosted git repository.

vterentev pushed a commit to branch cherrypick-35394
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 6c42f85c07ddf2559c1ff77cdbb75089812cd23e
Author: Vitaly Terentyev <[email protected]>
AuthorDate: Mon Jun 23 13:48:04 2025 +0400

    Fix suppressed Spotbugs issues
    
    (cherry picked from commit 2feb407ea0ef8b5962ca6061955d14acb404803a)
---
 .../complete/datatokenization/utils/SchemasUtils.java        |  8 +++++---
 .../beam/examples/subprocess/SubProcessPipelineOptions.java  |  2 +-
 .../subprocess/configuration/SubProcessConfiguration.java    | 12 +++++-------
 .../apache/beam/examples/kotlin/cookbook/TriggerExample.kt   |  3 +--
 .../org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java   |  5 ++---
 .../main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java |  7 ++++---
 .../java/org/apache/beam/sdk/io/synthetic/SyntheticStep.java |  6 ++----
 7 files changed, 20 insertions(+), 23 deletions(-)

diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/SchemasUtils.java
 
b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/SchemasUtils.java
index bf3253769cc..4d1a6cb66ad 100644
--- 
a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/SchemasUtils.java
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/SchemasUtils.java
@@ -21,7 +21,6 @@ import static 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.fromTableSchema;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
 
 import com.google.api.services.bigquery.model.TableSchema;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.Reader;
@@ -88,13 +87,16 @@ public class SchemasUtils {
     jsonBeamSchema = BigQueryHelpers.toJsonString(schema.getFields());
   }
 
-  @SuppressFBWarnings("DCN_NULLPOINTER_EXCEPTION")
   private void validateSchemaTypes(TableSchema bigQuerySchema) {
+    if (bigQuerySchema == null) {
+      LOG.error("Provided BigQuery schema is null. Please check your input.");
+      return;
+    }
     try {
       beamSchema = fromTableSchema(bigQuerySchema);
     } catch (UnsupportedOperationException exception) {
       LOG.error("Check json schema, {}", exception.getMessage());
-    } catch (NullPointerException npe) {
+    } catch (Exception e) {
       LOG.error("Missing schema keywords, please check what all required 
fields presented");
     }
   }
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/subprocess/SubProcessPipelineOptions.java
 
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/SubProcessPipelineOptions.java
index b98644d2e23..3cd4238df4f 100644
--- 
a/examples/java/src/main/java/org/apache/beam/examples/subprocess/SubProcessPipelineOptions.java
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/SubProcessPipelineOptions.java
@@ -82,7 +82,7 @@ public interface SubProcessPipelineOptions extends 
PipelineOptions {
       configuration.setWorkerPath(subProcessPipelineOptions.getWorkerPath());
       configuration.setWaitTime(subProcessPipelineOptions.getWaitTime());
       
configuration.setOnlyUpLoadLogsOnError(subProcessPipelineOptions.getOnlyUpLoadLogsOnError());
-      configuration.concurrency = subProcessPipelineOptions.getConcurrency();
+      configuration.setConcurrency(subProcessPipelineOptions.getConcurrency());
 
       return configuration;
     }
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/subprocess/configuration/SubProcessConfiguration.java
 
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/configuration/SubProcessConfiguration.java
index 11e44d0da53..98013110c33 100644
--- 
a/examples/java/src/main/java/org/apache/beam/examples/subprocess/configuration/SubProcessConfiguration.java
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/configuration/SubProcessConfiguration.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.examples.subprocess.configuration;
 
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.Serializable;
 
 /**
@@ -25,23 +24,22 @@ import java.io.Serializable;
  * are copied from the Options to all them to be Serializable.
  */
 @SuppressWarnings({"serial", "nullness"}) // 
TODO(https://github.com/apache/beam/issues/20497)
-@SuppressFBWarnings("PA_PUBLIC_PRIMITIVE_ATTRIBUTE") // TODO(#35312)
 public class SubProcessConfiguration implements Serializable {
 
   // Source GCS directory where the C++ library is located gs://bucket/tests
-  public String sourcePath;
+  private String sourcePath;
 
   // Working directory for the process I/O
-  public String workerPath;
+  private String workerPath;
 
   // The maximum time to wait for the sub-process to complete
-  public Integer waitTime;
+  private Integer waitTime;
 
   // "As sub-processes can be heavy weight match the concurrency level to num 
cores on the machines"
-  public Integer concurrency;
+  private Integer concurrency;
 
   // Should log files only be uploaded if error
-  public Boolean onlyUpLoadLogsOnError;
+  private Boolean onlyUpLoadLogsOnError;
 
   public Boolean getOnlyUpLoadLogsOnError() {
     return onlyUpLoadLogsOnError;
diff --git 
a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/TriggerExample.kt
 
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/TriggerExample.kt
index c5038f4441c..4afa7d0dfc7 100644
--- 
a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/TriggerExample.kt
+++ 
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/TriggerExample.kt
@@ -486,10 +486,8 @@ object TriggerExample {
 
         @ProcessElement
         @Throws(Exception::class)
-        @SuppressFBWarnings("DMI_RANDOM_USED_ONLY_ONCE")
         fun processElement(c: ProcessContext) {
             var timestamp = Instant.now()
-            val random = Random()
             if (random.nextDouble() < THRESHOLD) {
                 val range = MAX_DELAY - MIN_DELAY
                 val delayInMinutes = random.nextInt(range) + MIN_DELAY
@@ -504,6 +502,7 @@ object TriggerExample {
             // MIN_DELAY and MAX_DELAY in minutes.
             private const val MIN_DELAY = 1
             private const val MAX_DELAY = 100
+            private val random = Random()
         }
     }
 
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
index da919682ff2..93e7ff2b663 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
@@ -23,7 +23,6 @@ import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Pr
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -238,6 +237,7 @@ class KafkaExactlyOnceSink<K, V>
   private static class ExactlyOnceWriter<K, V>
       extends DoFn<KV<Integer, Iterable<KV<Long, 
TimestampedValue<ProducerRecord<K, V>>>>>, Void> {
 
+    private static final Random RANDOM = new Random();
     private static final String NEXT_ID = "nextId";
     private static final String MIN_BUFFERED_ID = "minBufferedId";
     private static final String OUT_OF_ORDER_BUFFER = "outOfOrderBuffer";
@@ -551,7 +551,6 @@ class KafkaExactlyOnceSink<K, V>
       }
     }
 
-    @SuppressFBWarnings("DMI_RANDOM_USED_ONLY_ONCE") // TODO(#35312)
     private ShardWriter<K, V> initShardWriter(
         int shard, ValueState<String> writerIdState, long nextId) throws 
IOException {
 
@@ -586,7 +585,7 @@ class KafkaExactlyOnceSink<K, V>
           writerId =
               String.format(
                   "%X - %s",
-                  new Random().nextInt(Integer.MAX_VALUE),
+                  RANDOM.nextInt(Integer.MAX_VALUE),
                   DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
                       .withZone(DateTimeZone.UTC)
                       .print(DateTimeUtils.currentTimeMillis()));
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java
index a0f041d22e8..1352d6bd864 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.io.kafka;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
 
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
@@ -40,6 +39,9 @@ import org.checkerframework.checker.nullness.qual.Nullable;
  * KafkaIO.ReadSourceDescriptors}.
  */
 public final class KafkaIOUtils {
+
+  private static final Random RANDOM = new Random();
+
   // A set of config defaults.
   static final Map<String, Object> DEFAULT_CONSUMER_PROPERTIES =
       ImmutableMap.of(
@@ -99,7 +101,6 @@ public final class KafkaIOUtils {
     return config;
   }
 
-  @SuppressFBWarnings("DMI_RANDOM_USED_ONLY_ONCE") // TODO(#35312)
   static Map<String, Object> getOffsetConsumerConfig(
       String name, @Nullable Map<String, Object> offsetConfig, Map<String, 
Object> consumerConfig) {
     Map<String, Object> offsetConsumerConfig = new HashMap<>(consumerConfig);
@@ -110,7 +111,7 @@ public final class KafkaIOUtils {
     String offsetGroupId =
         String.format(
             "%s_offset_consumer_%d_%s",
-            name, new Random().nextInt(Integer.MAX_VALUE), (groupId == null ? 
"none" : groupId));
+            name, RANDOM.nextInt(Integer.MAX_VALUE), (groupId == null ? "none" 
: groupId));
     offsetConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, offsetGroupId);
 
     if (offsetConfig != null) {
diff --git 
a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticStep.java
 
b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticStep.java
index 5ccf170bd75..78e0d4ff5f3 100644
--- 
a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticStep.java
+++ 
b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticStep.java
@@ -21,7 +21,6 @@ import static 
org.apache.beam.sdk.io.synthetic.delay.SyntheticDelay.delay;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.sdk.io.synthetic.delay.SyntheticDelay;
@@ -53,6 +52,7 @@ import org.joda.time.Duration;
  */
 public class SyntheticStep extends DoFn<KV<byte[], byte[]>, KV<byte[], 
byte[]>> {
 
+  private static final Random RANDOM = new Random();
   private final Options options;
 
   // used when maxWorkerThroughput is set
@@ -75,13 +75,11 @@ public class SyntheticStep extends DoFn<KV<byte[], byte[]>, 
KV<byte[], byte[]>>
                 }
               });
 
-  @SuppressFBWarnings("DMI_RANDOM_USED_ONLY_ONCE") // TODO(#35312)
   public SyntheticStep(Options options) {
     options.validate();
     this.options = options;
-    Random rand = new Random();
     // use a random id so that a pipeline could have multiple SyntheticSteps
-    this.idAndThroughput = KV.of(rand.nextLong(), options.maxWorkerThroughput);
+    this.idAndThroughput = KV.of(RANDOM.nextLong(), 
options.maxWorkerThroughput);
   }
 
   @ProcessElement

Reply via email to