This is an automated email from the ASF dual-hosted git repository. vterentev pushed a commit to branch cherrypick-35394 in repository https://gitbox.apache.org/repos/asf/beam.git
commit 6c42f85c07ddf2559c1ff77cdbb75089812cd23e Author: Vitaly Terentyev <[email protected]> AuthorDate: Mon Jun 23 13:48:04 2025 +0400 Fix suppressed Spotbugs issues (cherry picked from commit 2feb407ea0ef8b5962ca6061955d14acb404803a) --- .../complete/datatokenization/utils/SchemasUtils.java | 8 +++++--- .../beam/examples/subprocess/SubProcessPipelineOptions.java | 2 +- .../subprocess/configuration/SubProcessConfiguration.java | 12 +++++------- .../apache/beam/examples/kotlin/cookbook/TriggerExample.kt | 3 +-- .../org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java | 5 ++--- .../main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java | 7 ++++--- .../java/org/apache/beam/sdk/io/synthetic/SyntheticStep.java | 6 ++---- 7 files changed, 20 insertions(+), 23 deletions(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/SchemasUtils.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/SchemasUtils.java index bf3253769cc..4d1a6cb66ad 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/SchemasUtils.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/SchemasUtils.java @@ -21,7 +21,6 @@ import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.fromTableSchema; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import com.google.api.services.bigquery.model.TableSchema; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.IOException; import java.io.InputStream; import java.io.Reader; @@ -88,13 +87,16 @@ public class SchemasUtils { jsonBeamSchema = BigQueryHelpers.toJsonString(schema.getFields()); } - @SuppressFBWarnings("DCN_NULLPOINTER_EXCEPTION") private void validateSchemaTypes(TableSchema bigQuerySchema) { + if (bigQuerySchema == null) { + LOG.error("Provided BigQuery schema is null. Please check your input."); + return; + } try { beamSchema = fromTableSchema(bigQuerySchema); } catch (UnsupportedOperationException exception) { LOG.error("Check json schema, {}", exception.getMessage()); - } catch (NullPointerException npe) { + } catch (Exception e) { LOG.error("Missing schema keywords, please check what all required fields presented"); } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/subprocess/SubProcessPipelineOptions.java b/examples/java/src/main/java/org/apache/beam/examples/subprocess/SubProcessPipelineOptions.java index b98644d2e23..3cd4238df4f 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/subprocess/SubProcessPipelineOptions.java +++ b/examples/java/src/main/java/org/apache/beam/examples/subprocess/SubProcessPipelineOptions.java @@ -82,7 +82,7 @@ public interface SubProcessPipelineOptions extends PipelineOptions { configuration.setWorkerPath(subProcessPipelineOptions.getWorkerPath()); configuration.setWaitTime(subProcessPipelineOptions.getWaitTime()); configuration.setOnlyUpLoadLogsOnError(subProcessPipelineOptions.getOnlyUpLoadLogsOnError()); - configuration.concurrency = subProcessPipelineOptions.getConcurrency(); + configuration.setConcurrency(subProcessPipelineOptions.getConcurrency()); return configuration; } diff --git a/examples/java/src/main/java/org/apache/beam/examples/subprocess/configuration/SubProcessConfiguration.java b/examples/java/src/main/java/org/apache/beam/examples/subprocess/configuration/SubProcessConfiguration.java index 11e44d0da53..98013110c33 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/subprocess/configuration/SubProcessConfiguration.java +++ b/examples/java/src/main/java/org/apache/beam/examples/subprocess/configuration/SubProcessConfiguration.java @@ -17,7 +17,6 @@ */ package org.apache.beam.examples.subprocess.configuration; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.Serializable; /** @@ -25,23 +24,22 @@ import java.io.Serializable; * are copied from the Options to all them to be Serializable. */ @SuppressWarnings({"serial", "nullness"}) // TODO(https://github.com/apache/beam/issues/20497) -@SuppressFBWarnings("PA_PUBLIC_PRIMITIVE_ATTRIBUTE") // TODO(#35312) public class SubProcessConfiguration implements Serializable { // Source GCS directory where the C++ library is located gs://bucket/tests - public String sourcePath; + private String sourcePath; // Working directory for the process I/O - public String workerPath; + private String workerPath; // The maximum time to wait for the sub-process to complete - public Integer waitTime; + private Integer waitTime; // "As sub-processes can be heavy weight match the concurrency level to num cores on the machines" - public Integer concurrency; + private Integer concurrency; // Should log files only be uploaded if error - public Boolean onlyUpLoadLogsOnError; + private Boolean onlyUpLoadLogsOnError; public Boolean getOnlyUpLoadLogsOnError() { return onlyUpLoadLogsOnError; diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/TriggerExample.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/TriggerExample.kt index c5038f4441c..4afa7d0dfc7 100644 --- a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/TriggerExample.kt +++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/TriggerExample.kt @@ -486,10 +486,8 @@ object TriggerExample { @ProcessElement @Throws(Exception::class) - @SuppressFBWarnings("DMI_RANDOM_USED_ONLY_ONCE") fun processElement(c: ProcessContext) { var timestamp = Instant.now() - val random = Random() if (random.nextDouble() < THRESHOLD) { val range = MAX_DELAY - MIN_DELAY val delayInMinutes = random.nextInt(range) + MIN_DELAY @@ -504,6 +502,7 @@ object TriggerExample { // MIN_DELAY and MAX_DELAY in minutes. private const val MIN_DELAY = 1 private const val MAX_DELAY = 100 + private val random = Random() } } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java index da919682ff2..93e7ff2b663 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java @@ -23,7 +23,6 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Pr import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; @@ -238,6 +237,7 @@ class KafkaExactlyOnceSink<K, V> private static class ExactlyOnceWriter<K, V> extends DoFn<KV<Integer, Iterable<KV<Long, TimestampedValue<ProducerRecord<K, V>>>>>, Void> { + private static final Random RANDOM = new Random(); private static final String NEXT_ID = "nextId"; private static final String MIN_BUFFERED_ID = "minBufferedId"; private static final String OUT_OF_ORDER_BUFFER = "outOfOrderBuffer"; @@ -551,7 +551,6 @@ class KafkaExactlyOnceSink<K, V> } } - @SuppressFBWarnings("DMI_RANDOM_USED_ONLY_ONCE") // TODO(#35312) private ShardWriter<K, V> initShardWriter( int shard, ValueState<String> writerIdState, long nextId) throws IOException { @@ -586,7 +585,7 @@ class KafkaExactlyOnceSink<K, V> writerId = String.format( "%X - %s", - new Random().nextInt(Integer.MAX_VALUE), + RANDOM.nextInt(Integer.MAX_VALUE), DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss") .withZone(DateTimeZone.UTC) .print(DateTimeUtils.currentTimeMillis())); diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java index a0f041d22e8..1352d6bd864 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java @@ -20,7 +20,6 @@ package org.apache.beam.sdk.io.kafka; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; @@ -40,6 +39,9 @@ import org.checkerframework.checker.nullness.qual.Nullable; * KafkaIO.ReadSourceDescriptors}. */ public final class KafkaIOUtils { + + private static final Random RANDOM = new Random(); + // A set of config defaults. static final Map<String, Object> DEFAULT_CONSUMER_PROPERTIES = ImmutableMap.of( @@ -99,7 +101,6 @@ public final class KafkaIOUtils { return config; } - @SuppressFBWarnings("DMI_RANDOM_USED_ONLY_ONCE") // TODO(#35312) static Map<String, Object> getOffsetConsumerConfig( String name, @Nullable Map<String, Object> offsetConfig, Map<String, Object> consumerConfig) { Map<String, Object> offsetConsumerConfig = new HashMap<>(consumerConfig); @@ -110,7 +111,7 @@ public final class KafkaIOUtils { String offsetGroupId = String.format( "%s_offset_consumer_%d_%s", - name, new Random().nextInt(Integer.MAX_VALUE), (groupId == null ? "none" : groupId)); + name, RANDOM.nextInt(Integer.MAX_VALUE), (groupId == null ? "none" : groupId)); offsetConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, offsetGroupId); if (offsetConfig != null) { diff --git a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticStep.java b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticStep.java index 5ccf170bd75..78e0d4ff5f3 100644 --- a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticStep.java +++ b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticStep.java @@ -21,7 +21,6 @@ import static org.apache.beam.sdk.io.synthetic.delay.SyntheticDelay.delay; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import com.fasterxml.jackson.annotation.JsonProperty; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.Random; import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.io.synthetic.delay.SyntheticDelay; @@ -53,6 +52,7 @@ import org.joda.time.Duration; */ public class SyntheticStep extends DoFn<KV<byte[], byte[]>, KV<byte[], byte[]>> { + private static final Random RANDOM = new Random(); private final Options options; // used when maxWorkerThroughput is set @@ -75,13 +75,11 @@ public class SyntheticStep extends DoFn<KV<byte[], byte[]>, KV<byte[], byte[]>> } }); - @SuppressFBWarnings("DMI_RANDOM_USED_ONLY_ONCE") // TODO(#35312) public SyntheticStep(Options options) { options.validate(); this.options = options; - Random rand = new Random(); // use a random id so that a pipeline could have multiple SyntheticSteps - this.idAndThroughput = KV.of(rand.nextLong(), options.maxWorkerThroughput); + this.idAndThroughput = KV.of(RANDOM.nextLong(), options.maxWorkerThroughput); } @ProcessElement
