Fix a few Coverity inspection results plus more IntelliJ results
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1c603d1c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1c603d1c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1c603d1c Branch: refs/heads/master Commit: 1c603d1c526e0610a3ad4edee9afe4f7ee4fd1e8 Parents: 8779701 Author: Sean Owen <so...@cloudera.com> Authored: Thu Oct 8 22:35:25 2015 +0100 Committer: Tom White <t...@cloudera.com> Committed: Thu Mar 10 11:15:16 2016 +0000 ---------------------------------------------------------------------- .../java/com/cloudera/dataflow/hadoop/WritableCoder.java | 11 ++++++++--- .../com/cloudera/dataflow/spark/BroadcastHelper.java | 4 ++++ .../main/java/com/cloudera/dataflow/spark/ByteArray.java | 2 +- .../com/cloudera/dataflow/spark/EvaluationContext.java | 4 ++-- .../com/cloudera/dataflow/spark/SparkProcessContext.java | 5 ++--- .../com/cloudera/dataflow/spark/TransformTranslator.java | 4 ++-- .../com/cloudera/dataflow/spark/CombineGloballyTest.java | 2 +- .../dataflow/spark/MultiOutputWordCountTest.java | 5 +++-- .../com/cloudera/dataflow/spark/SerializationTest.java | 4 ++-- .../com/cloudera/dataflow/spark/SideEffectsTest.java | 5 ++--- 10 files changed, 27 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c603d1c/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/WritableCoder.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/WritableCoder.java b/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/WritableCoder.java index ea47109..759fb58 100644 --- a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/WritableCoder.java +++ b/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/WritableCoder.java @@ -20,6 +20,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.lang.reflect.InvocationTargetException; import java.util.List; import com.fasterxml.jackson.annotation.JsonCreator; @@ -56,7 +57,9 @@ public class WritableCoder<T extends Writable> extends StandardCoder<T> { */ public static <T extends Writable> WritableCoder<T> of(Class<T> clazz) { if (clazz.equals(NullWritable.class)) { - return (WritableCoder<T>) NullWritableCoder.of(); + @SuppressWarnings("unchecked") + WritableCoder<T> result = (WritableCoder<T>) NullWritableCoder.of(); + return result; } return new WritableCoder<>(clazz); } @@ -87,11 +90,13 @@ public class WritableCoder<T extends Writable> extends StandardCoder<T> { @Override public T decode(InputStream inStream, Context context) throws IOException { try { - T t = type.newInstance(); + T t = type.getConstructor().newInstance(); t.readFields(new DataInputStream(inStream)); return t; - } catch (InstantiationException | IllegalAccessException e) { + } catch (NoSuchMethodException | InstantiationException | IllegalAccessException e) { throw new CoderException("unable to deserialize record", e); + } catch (InvocationTargetException ite) { + throw new CoderException("unable to deserialize record", ite.getCause()); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c603d1c/runners/spark/src/main/java/com/cloudera/dataflow/spark/BroadcastHelper.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/BroadcastHelper.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/BroadcastHelper.java index 27d23eb..6ef70f3 100644 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/BroadcastHelper.java +++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/BroadcastHelper.java @@ -62,6 +62,7 @@ abstract class BroadcastHelper<T> implements Serializable { this.value = value; } + @Override public synchronized T getValue() { if (value == null) { value = bcast.getValue(); @@ -69,6 +70,7 @@ abstract class BroadcastHelper<T> implements Serializable { return value; } + @Override public void broadcast(JavaSparkContext jsc) { this.bcast = jsc.broadcast(value); } @@ -90,6 +92,7 @@ abstract class BroadcastHelper<T> implements Serializable { this.coder = coder; } + @Override public synchronized T getValue() { if (value == null) { value = deserialize(); @@ -97,6 +100,7 @@ abstract class BroadcastHelper<T> implements Serializable { return value; } + @Override public void broadcast(JavaSparkContext jsc) { this.bcast = jsc.broadcast(CoderHelpers.toByteArray(value, coder)); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c603d1c/runners/spark/src/main/java/com/cloudera/dataflow/spark/ByteArray.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/ByteArray.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/ByteArray.java index 1db0a8b..06db572 100644 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/ByteArray.java +++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/ByteArray.java @@ -23,7 +23,7 @@ class ByteArray implements Serializable, Comparable<ByteArray> { private final byte[] value; - public ByteArray(byte[] value) { + ByteArray(byte[] value) { this.value = value; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c603d1c/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java index 649cbe9..eb9554f 100644 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java +++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java @@ -78,12 +78,12 @@ public class EvaluationContext implements EvaluationResult { private Coder<T> coder; private JavaRDDLike<T, ?> rdd; - public RDDHolder(Iterable<T> values, Coder<T> coder) { + RDDHolder(Iterable<T> values, Coder<T> coder) { this.values = values; this.coder = coder; } - public RDDHolder(JavaRDDLike<T, ?> rdd) { + RDDHolder(JavaRDDLike<T, ?> rdd) { this.rdd = rdd; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c603d1c/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java index 7777f21..ee2235a 100644 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java +++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java @@ -61,7 +61,7 @@ abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessContext { } void setup() { - super.setupDelegateAggregators(); + setupDelegateAggregators(); } @Override @@ -190,7 +190,7 @@ abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessContext { private Iterator<V> outputIterator; private boolean calledFinish = false; - public ProcCtxtIterator(Iterator<I> iterator, DoFn<I, O> doFn) { + ProcCtxtIterator(Iterator<I> iterator, DoFn<I, O> doFn) { this.inputIterator = iterator; this.doFn = doFn; this.outputIterator = getOutputIterator(); @@ -215,7 +215,6 @@ abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessContext { throw new IllegalStateException(e); } outputIterator = getOutputIterator(); - continue; // try to consume outputIterator from start of loop } else { // no more input to consume, but finishBundle can produce more output if (!calledFinish) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c603d1c/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java index dfb01f1..4537aa4 100644 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java +++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java @@ -124,8 +124,8 @@ public final class TransformTranslator { (JavaRDDLike<KV<K, V>, ?>) context.getInputRDD(transform); @SuppressWarnings("unchecked") KvCoder<K, V> coder = (KvCoder<K, V>) context.getInput(transform).getCoder(); - final Coder<K> keyCoder = coder.getKeyCoder(); - final Coder<V> valueCoder = coder.getValueCoder(); + Coder<K> keyCoder = coder.getKeyCoder(); + Coder<V> valueCoder = coder.getValueCoder(); // Use coders to convert objects in the PCollection to byte arrays, so they // can be transferred over the network for the shuffle. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c603d1c/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombineGloballyTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombineGloballyTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombineGloballyTest.java index be5f6dc..51ba905 100644 --- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombineGloballyTest.java +++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombineGloballyTest.java @@ -66,7 +66,7 @@ public class CombineGloballyTest { StringBuilder sb = new StringBuilder(); for (StringBuilder accum : accumulators) { if (accum != null) { - sb.append(accum.toString()); + sb.append(accum); } } return sb; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c603d1c/runners/spark/src/test/java/com/cloudera/dataflow/spark/MultiOutputWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/MultiOutputWordCountTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/MultiOutputWordCountTest.java index bf2ecdc..179816d 100644 --- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/MultiOutputWordCountTest.java +++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/MultiOutputWordCountTest.java @@ -65,6 +65,7 @@ public class MultiOutputWordCountTest { EvaluationResult res = SparkPipelineRunner.create().run(p); Iterable<KV<String, Long>> actualLower = res.get(luc.get(lowerCnts)); + Assert.assertEquals("and", actualLower.iterator().next().getKey()); Iterable<KV<String, Long>> actualUpper = res.get(luc.get(upperCnts)); Assert.assertEquals("Here", actualUpper.iterator().next().getKey()); Iterable<Long> actualUniqCount = res.get(unique); @@ -85,9 +86,9 @@ public class MultiOutputWordCountTest { */ static class ExtractWordsFn extends DoFn<String, String> { - private Aggregator<Integer, Integer> totalWords = createAggregator("totalWords", + private final Aggregator<Integer, Integer> totalWords = createAggregator("totalWords", new Sum.SumIntegerFn()); - private Aggregator<Integer, Integer> maxWordLength = createAggregator("maxWordLength", + private final Aggregator<Integer, Integer> maxWordLength = createAggregator("maxWordLength", new Max.MaxIntegerFn()); private final PCollectionView<String> regex; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c603d1c/runners/spark/src/test/java/com/cloudera/dataflow/spark/SerializationTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/SerializationTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/SerializationTest.java index bd1a4e8..a8edb3a 100644 --- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/SerializationTest.java +++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/SerializationTest.java @@ -43,7 +43,7 @@ import org.junit.Test; public class SerializationTest { public static class StringHolder { // not serializable - private String string; + private final String string; public StringHolder(String string) { this.string = string; @@ -71,7 +71,7 @@ public class SerializationTest { public static class StringHolderUtf8Coder extends AtomicCoder<StringHolder> { - private StringUtf8Coder stringUtf8Coder = StringUtf8Coder.of(); + private final StringUtf8Coder stringUtf8Coder = StringUtf8Coder.of(); @Override public void encode(StringHolder value, OutputStream outStream, Context context) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c603d1c/runners/spark/src/test/java/com/cloudera/dataflow/spark/SideEffectsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/SideEffectsTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/SideEffectsTest.java index 7292bf0..666737d 100644 --- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/SideEffectsTest.java +++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/SideEffectsTest.java @@ -37,7 +37,7 @@ public class SideEffectsTest implements Serializable { pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class)); PCollection<String> strings = pipeline.apply(Create.of("a")); - PCollection<String> output = strings.apply(ParDo.of(new DoFn<String, String>() { + strings.apply(ParDo.of(new DoFn<String, String>() { @Override public void processElement(ProcessContext c) throws Exception { throw new IllegalStateException("Side effect"); @@ -49,7 +49,6 @@ public class SideEffectsTest implements Serializable { fail("Run should thrown an exception"); } catch (Exception e) { // expected - e.printStackTrace(); } } -} +} \ No newline at end of file