[
https://issues.apache.org/jira/browse/BEAM-4318?focusedWorklogId=110609&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-110609
]
ASF GitHub Bot logged work on BEAM-4318:
----------------------------------------
Author: ASF GitHub Bot
Created on: 11/Jun/18 13:16
Start Date: 11/Jun/18 13:16
Worklog Time Spent: 10m
Work Description: iemejia closed pull request #5430: [BEAM-4318] Enforce
ErrorProne analysis in Spark runner project
URL: https://github.com/apache/beam/pull/5430
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/runners/spark/build.gradle b/runners/spark/build.gradle
index b236173479e..cf7edd3c94f 100644
--- a/runners/spark/build.gradle
+++ b/runners/spark/build.gradle
@@ -19,7 +19,7 @@
import groovy.json.JsonOutput
apply from: project(":").file("build_rules.gradle")
-applyJavaNature()
+applyJavaNature(failOnWarning: true)
description = "Apache Beam :: Runners :: Spark"
@@ -53,6 +53,8 @@ test {
}
dependencies {
+ compileOnly library.java.findbugs_annotations
+ testCompileOnly library.java.findbugs_annotations
shadow project(path: ":beam-model-pipeline", configuration: "shadow")
shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
shadow project(path: ":beam-runners-core-construction-java", configuration:
"shadow")
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
index 7bd5a81bb86..2579f460776 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
@@ -83,8 +83,8 @@ private boolean shouldDebug(final TransformHierarchy.Node
node) {
.stream()
.anyMatch(
debugTransform ->
- debugTransform.getNode().equals(node) &&
debugTransform.isComposite())
- && shouldDebug(node.getEnclosingNode());
+ (debugTransform.getNode().equals(node) &&
debugTransform.isComposite())
+ && shouldDebug(node.getEnclosingNode()));
}
@Override
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
index 6c74146bd61..06fc5b0c82a 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
@@ -84,8 +84,8 @@ public SparkPipelineResult run(Pipeline pipeline) {
SparkNativePipelineVisitor visitor;
if (options.isStreaming()
- || options instanceof TestSparkPipelineOptions
- && ((TestSparkPipelineOptions) options).isForceStreaming()) {
+ || (options instanceof TestSparkPipelineOptions
+ && ((TestSparkPipelineOptions) options).isForceStreaming())) {
SparkPipelineTranslator streamingTranslator =
new StreamingTransformTranslator.Translator(translator);
EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, options,
jssc);
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index cacb2c42f11..5a488d4118d 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -96,6 +96,7 @@ public static TestSparkRunner fromOptions(PipelineOptions
options) {
}
@Override
+ @SuppressWarnings("Finally")
public SparkPipelineResult run(Pipeline pipeline) {
// Default options suffice to set it up as a test runner
TestSparkPipelineOptions testSparkOptions =
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java
index e23392e58e3..d334c84c015 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java
@@ -23,8 +23,8 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
-import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -68,7 +68,7 @@ private CoderHelpers() {
* @return List of bytes representing serialized objects.
*/
public static <T> List<byte[]> toByteArrays(Iterable<T> values, Coder<T>
coder) {
- List<byte[]> res = new LinkedList<>();
+ List<byte[]> res = new ArrayList<>();
for (T value : values) {
res.add(toByteArray(value, coder));
}
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java
index 0cf4951dc4f..fb4c6bc4bc4 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java
@@ -64,7 +64,7 @@ public StatelessJavaSerializer() {
this(null, null);
}
- @SuppressWarnings("unchecked")
+ @Override
public void write(Kryo kryo, Output output, Object object) {
try {
ObjectOutputStream objectStream = new ObjectOutputStream(output);
@@ -75,7 +75,7 @@ public void write(Kryo kryo, Output output, Object object) {
}
}
- @SuppressWarnings("unchecked")
+ @Override
public Object read (Kryo kryo, Input input, Class type) {
try {
return new ObjectInputStreamWithClassLoader(input,
kryo.getClassLoader()).readObject();
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
index 20bd6e2b93f..f3005c269bd 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
@@ -44,6 +44,7 @@
* of-line. This DoFn tokenizes lines of text into individual words; we pass
it to a ParDo in the
* pipeline.
*/
+ @SuppressWarnings("StringSplitter")
public static class ExtractWordsFn extends DoFn<String, String> {
private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class,
"emptyLines");
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
index 6e3a53b6aca..e122d05013b 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
@@ -20,10 +20,10 @@
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.collect.Lists;
+import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
-import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import
org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks;
@@ -90,8 +90,8 @@
public final class CreateStream<T> extends PTransform<PBegin, PCollection<T>> {
private final Duration batchDuration;
- private final Queue<Iterable<TimestampedValue<T>>> batches = new
LinkedList<>();
- private final Deque<SparkWatermarks> times = new LinkedList<>();
+ private final Queue<Iterable<TimestampedValue<T>>> batches = new
ArrayDeque<>();
+ private final Deque<SparkWatermarks> times = new ArrayDeque<>();
private final Coder<T> coder;
private Instant initialSystemTime;
private final boolean forceWatermarkSync;
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
index efb8e50cd2f..257362a1854 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.spark.io;
+import com.google.common.base.Splitter;
import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
@@ -128,7 +129,8 @@
private static <T> String getSourceName(Source<T> source, int id) {
StringBuilder sb = new StringBuilder();
- for (String s: source.getClass().getSimpleName().replace("$",
"").split("(?=[A-Z])")) {
+ for (String s: Splitter.onPattern("(?=[A-Z])")
+ .split(source.getClass().getSimpleName().replace("$", ""))) {
String trimmed = s.trim();
if (!trimmed.isEmpty()) {
sb.append(trimmed).append(" ");
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
index 91cefba2bb5..19aa1bc273f 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
@@ -282,6 +282,7 @@ public TimestampCombiner getTimestampCombiner() {
}
}
+ @SuppressWarnings("TypeParameterShadowing")
private class SparkCombiningState<K, InputT, AccumT, OutputT>
extends AbstractState<AccumT>
implements CombiningState<InputT, AccumT, OutputT> {
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index ce7795bb0da..e54e9f01271 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -211,7 +211,7 @@ public void computeOutputs() {
* @param <T> Type of object to return.
* @return Native object.
*/
- @SuppressWarnings("unchecked")
+ @SuppressWarnings("TypeParameterUnusedInFormals")
public <T> T get(PValue value) {
if (pobjects.containsKey(value)) {
T result = (T) pobjects.get(value);
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/Checkpoint.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/Checkpoint.java
index a7427b2648d..600376c3689 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/Checkpoint.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/Checkpoint.java
@@ -77,7 +77,7 @@ public static void writeObject(FileSystem fileSystem, Path
checkpointFilePath, O
return is != null ? IOUtils.toByteArray(is) : null;
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings("TypeParameterUnusedInFormals")
public static <T> T readObject(FileSystem fileSystem, Path
checkpointfilePath)
throws IOException, ClassNotFoundException {
byte[] bytes = read(fileSystem, checkpointfilePath);
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 5b562877719..0ead27c8dc5 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -377,7 +377,7 @@ public String toNativeString() {
private static <InputT, OutputT>
TransformEvaluator<ParDo.MultiOutput<InputT, OutputT>> parDo() {
return new TransformEvaluator<ParDo.MultiOutput<InputT, OutputT>>() {
- public void evaluate(
+ @Override public void evaluate(
final ParDo.MultiOutput<InputT, OutputT> transform, final
EvaluationContext context) {
final DoFn<InputT, OutputT> doFn = transform.getFn();
rejectSplittable(doFn);
diff --git
a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
index 7bfc980304a..92191179e78 100644
---
a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
+++
b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
@@ -74,7 +74,7 @@ public void test() throws IOException {
/**
* Traverses the Pipeline to check if the input is indeed a {@link
Read.Unbounded}.
*/
- private class UnboundedReadDetector extends
Pipeline.PipelineVisitor.Defaults {
+ private static class UnboundedReadDetector extends
Pipeline.PipelineVisitor.Defaults {
private boolean isUnbounded = false;
@Override
diff --git
a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
index e798cd11de9..72a3ab1fb14 100644
---
a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
+++
b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
@@ -22,9 +22,7 @@
import static org.junit.Assert.fail;
import com.google.common.collect.ImmutableSet;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
+import com.google.common.collect.ImmutableList;
import org.apache.beam.runners.spark.examples.WordCount;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
@@ -45,8 +43,8 @@
private static final String[] WORDS_ARRAY = {
"hi there", "hi", "hi sue bob",
"hi sue", "", "bob hi"};
- private static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
- private static final Set<String> EXPECTED_COUNT_SET =
+ private static final ImmutableList<String> WORDS =
ImmutableList.copyOf(WORDS_ARRAY);
+ private static final ImmutableSet<String> EXPECTED_COUNT_SET =
ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
private static final String PROVIDED_CONTEXT_EXCEPTION =
"The provided Spark context was not created or was stopped";
diff --git
a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
index 5b75d400816..7849451ce82 100644
---
a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
+++
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
@@ -41,7 +41,7 @@ public InMemoryMetrics(final Properties properties,
internalMetricRegistry = metricRegistry;
}
- @SuppressWarnings({"unchecked", "WeakerAccess"})
+ @SuppressWarnings("TypeParameterUnusedInFormals")
public static <T> T valueOf(final String name) {
final T retVal;
diff --git
a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java
index 0f15c8cbf3e..c58569e562b 100644
---
a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java
+++
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java
@@ -23,9 +23,7 @@
import static org.junit.Assert.assertThat;
import com.google.common.collect.ImmutableSet;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
+import com.google.common.collect.ImmutableList;
import org.apache.beam.runners.spark.ReuseSparkContextRule;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.StreamingTest;
@@ -63,9 +61,8 @@
@Rule
public final transient ReuseSparkContextRule noContextResue =
ReuseSparkContextRule.no();
- private static final List<String> WORDS = Arrays
- .asList("hi there", "hi", "hi sue bob", "hi sue", "", "bob hi");
- private static final Set<String> EXPECTED_COUNTS = ImmutableSet
+ private static final ImmutableList<String> WORDS = ImmutableList.of("hi
there", "hi", "hi sue bob", "hi sue", "", "bob hi");
+ private static final ImmutableSet<String> EXPECTED_COUNTS = ImmutableSet
.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
@Test
diff --git
a/runners/spark/src/test/java/org/apache/beam/runners/spark/metrics/SparkMetricsPusherTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/metrics/SparkMetricsPusherTest.java
index 04f1319ccaa..4bf9ef33356 100644
---
a/runners/spark/src/test/java/org/apache/beam/runners/spark/metrics/SparkMetricsPusherTest.java
+++
b/runners/spark/src/test/java/org/apache/beam/runners/spark/metrics/SparkMetricsPusherTest.java
@@ -44,12 +44,16 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A test that verifies that metrics push system works in spark runner.
*/
public class SparkMetricsPusherTest {
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkMetricsPusherTest.class);
+
@Rule
public final transient ReuseSparkContextRule noContextResue =
ReuseSparkContextRule.no();
@@ -108,7 +112,7 @@ public void processElement(ProcessContext context) {
counter.inc();
context.output(context.element());
} catch (Exception e) {
- e.printStackTrace();
+ LOG.warn("Exception caught" + e);
}
}
}
diff --git
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index d8363a33de1..d0804079a9d 100644
---
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -126,6 +126,7 @@ public void init() {
}
}
+ @SuppressWarnings("FutureReturnValueIgnored")
private static void produce(Map<String, Instant> messages) {
Properties producerProps = new Properties();
producerProps.putAll(EMBEDDED_KAFKA_CLUSTER.getProps());
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 110609)
Time Spent: 2h 50m (was: 2h 40m)
> Enforce ErrorProne analysis in Spark runner project
> ---------------------------------------------------
>
> Key: BEAM-4318
> URL: https://issues.apache.org/jira/browse/BEAM-4318
> Project: Beam
> Issue Type: Improvement
> Components: runner-spark
> Reporter: Scott Wegner
> Assignee: Teng Peng
> Priority: Minor
> Labels: errorprone, starter
> Time Spent: 2h 50m
> Remaining Estimate: 0h
>
> Java ErrorProne static analysis was [recently
> enabled|https://github.com/apache/beam/pull/5161] in the Gradle build
> process, but only as warnings. ErrorProne errors are generally useful and
> easy to fix. Some work was done to [make sdks-java-core
> ErrorProne-clean|https://github.com/apache/beam/pull/5319] and add
> enforcement. This task is clean ErrorProne warnings and add enforcement in
> {{beam-runners-spark}}. Additional context discussed on the [dev
> list|https://lists.apache.org/thread.html/95aae2785c3cd728c2d3378cbdff2a7ba19caffcd4faa2049d2e2f46@%3Cdev.beam.apache.org%3E].
> Fixing this issue will involve:
> # Follow instructions in the [Contribution
> Guide|https://beam.apache.org/contribute/] to set up a {{beam}} development
> environment.
> # Run the following command to compile and run ErrorProne analysis on the
> project: {{./gradlew :beam-runners-spark:assemble}}
> # Fix each ErrorProne warning from the {{runners/spark}} project.
> # In {{runners/spark/build.gradle}}, add {{failOnWarning: true}} to the call
> the {{applyJavaNature()}}
> ([example|https://github.com/apache/beam/pull/5319/files#diff-9390c20635aed5f42f83b97506a87333R20]).
> This starter issue is sponsored by [~swegner]. Feel free to [reach
> out|https://beam.apache.org/community/contact-us/] with questions or code
> review:
> * JIRA: [~swegner]
> * GitHub: [@swegner|https://github.com/swegner]
> * Slack: [@Scott Wegner|https://s.apache.org/beam-slack-channel]
> * Email: swegner at google dot com
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)