Fix a few Coverity inspection results plus more IntelliJ results

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1c603d1c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1c603d1c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1c603d1c

Branch: refs/heads/master
Commit: 1c603d1c526e0610a3ad4edee9afe4f7ee4fd1e8
Parents: 8779701
Author: Sean Owen <so...@cloudera.com>
Authored: Thu Oct 8 22:35:25 2015 +0100
Committer: Tom White <t...@cloudera.com>
Committed: Thu Mar 10 11:15:16 2016 +0000

----------------------------------------------------------------------
 .../java/com/cloudera/dataflow/hadoop/WritableCoder.java | 11 ++++++++---
 .../com/cloudera/dataflow/spark/BroadcastHelper.java     |  4 ++++
 .../main/java/com/cloudera/dataflow/spark/ByteArray.java |  2 +-
 .../com/cloudera/dataflow/spark/EvaluationContext.java   |  4 ++--
 .../com/cloudera/dataflow/spark/SparkProcessContext.java |  5 ++---
 .../com/cloudera/dataflow/spark/TransformTranslator.java |  4 ++--
 .../com/cloudera/dataflow/spark/CombineGloballyTest.java |  2 +-
 .../dataflow/spark/MultiOutputWordCountTest.java         |  5 +++--
 .../com/cloudera/dataflow/spark/SerializationTest.java   |  4 ++--
 .../com/cloudera/dataflow/spark/SideEffectsTest.java     |  5 ++---
 10 files changed, 27 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c603d1c/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/WritableCoder.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/WritableCoder.java 
b/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/WritableCoder.java
index ea47109..759fb58 100644
--- 
a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/WritableCoder.java
+++ 
b/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/WritableCoder.java
@@ -20,6 +20,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.lang.reflect.InvocationTargetException;
 import java.util.List;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -56,7 +57,9 @@ public class WritableCoder<T extends Writable> extends 
StandardCoder<T> {
    */
   public static <T extends Writable> WritableCoder<T> of(Class<T> clazz) {
     if (clazz.equals(NullWritable.class)) {
-      return (WritableCoder<T>) NullWritableCoder.of();
+      @SuppressWarnings("unchecked")
+      WritableCoder<T> result = (WritableCoder<T>) NullWritableCoder.of();
+      return result;
     }
     return new WritableCoder<>(clazz);
   }
@@ -87,11 +90,13 @@ public class WritableCoder<T extends Writable> extends 
StandardCoder<T> {
   @Override
   public T decode(InputStream inStream, Context context) throws IOException {
     try {
-      T t = type.newInstance();
+      T t = type.getConstructor().newInstance();
       t.readFields(new DataInputStream(inStream));
       return t;
-    } catch (InstantiationException | IllegalAccessException e) {
+    } catch (NoSuchMethodException | InstantiationException | 
IllegalAccessException e) {
       throw new CoderException("unable to deserialize record", e);
+    } catch (InvocationTargetException ite) {
+      throw new CoderException("unable to deserialize record", ite.getCause());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c603d1c/runners/spark/src/main/java/com/cloudera/dataflow/spark/BroadcastHelper.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/BroadcastHelper.java 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/BroadcastHelper.java
index 27d23eb..6ef70f3 100644
--- 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/BroadcastHelper.java
+++ 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/BroadcastHelper.java
@@ -62,6 +62,7 @@ abstract class BroadcastHelper<T> implements Serializable {
       this.value = value;
     }
 
+    @Override
     public synchronized T getValue() {
       if (value == null) {
         value = bcast.getValue();
@@ -69,6 +70,7 @@ abstract class BroadcastHelper<T> implements Serializable {
       return value;
     }
 
+    @Override
     public void broadcast(JavaSparkContext jsc) {
       this.bcast = jsc.broadcast(value);
     }
@@ -90,6 +92,7 @@ abstract class BroadcastHelper<T> implements Serializable {
       this.coder = coder;
     }
 
+    @Override
     public synchronized T getValue() {
       if (value == null) {
         value = deserialize();
@@ -97,6 +100,7 @@ abstract class BroadcastHelper<T> implements Serializable {
       return value;
     }
 
+    @Override
     public void broadcast(JavaSparkContext jsc) {
       this.bcast = jsc.broadcast(CoderHelpers.toByteArray(value, coder));
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c603d1c/runners/spark/src/main/java/com/cloudera/dataflow/spark/ByteArray.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/ByteArray.java 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/ByteArray.java
index 1db0a8b..06db572 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/ByteArray.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/ByteArray.java
@@ -23,7 +23,7 @@ class ByteArray implements Serializable, 
Comparable<ByteArray> {
 
   private final byte[] value;
 
-  public ByteArray(byte[] value) {
+  ByteArray(byte[] value) {
     this.value = value;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c603d1c/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java
 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java
index 649cbe9..eb9554f 100644
--- 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java
+++ 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java
@@ -78,12 +78,12 @@ public class EvaluationContext implements EvaluationResult {
     private Coder<T> coder;
     private JavaRDDLike<T, ?> rdd;
 
-    public RDDHolder(Iterable<T> values, Coder<T> coder) {
+    RDDHolder(Iterable<T> values, Coder<T> coder) {
       this.values = values;
       this.coder = coder;
     }
 
-    public RDDHolder(JavaRDDLike<T, ?> rdd) {
+    RDDHolder(JavaRDDLike<T, ?> rdd) {
       this.rdd = rdd;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c603d1c/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
index 7777f21..ee2235a 100644
--- 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
+++ 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
@@ -61,7 +61,7 @@ abstract class SparkProcessContext<I, O, V> extends DoFn<I, 
O>.ProcessContext {
   }
 
   void setup() {
-    super.setupDelegateAggregators();
+    setupDelegateAggregators();
   }
 
   @Override
@@ -190,7 +190,7 @@ abstract class SparkProcessContext<I, O, V> extends DoFn<I, 
O>.ProcessContext {
     private Iterator<V> outputIterator;
     private boolean calledFinish = false;
 
-    public ProcCtxtIterator(Iterator<I> iterator, DoFn<I, O> doFn) {
+    ProcCtxtIterator(Iterator<I> iterator, DoFn<I, O> doFn) {
       this.inputIterator = iterator;
       this.doFn = doFn;
       this.outputIterator = getOutputIterator();
@@ -215,7 +215,6 @@ abstract class SparkProcessContext<I, O, V> extends DoFn<I, 
O>.ProcessContext {
             throw new IllegalStateException(e);
           }
           outputIterator = getOutputIterator();
-          continue; // try to consume outputIterator from start of loop
         } else {
           // no more input to consume, but finishBundle can produce more output
           if (!calledFinish) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c603d1c/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
index dfb01f1..4537aa4 100644
--- 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
+++ 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
@@ -124,8 +124,8 @@ public final class TransformTranslator {
             (JavaRDDLike<KV<K, V>, ?>) context.getInputRDD(transform);
         @SuppressWarnings("unchecked")
         KvCoder<K, V> coder = (KvCoder<K, V>) 
context.getInput(transform).getCoder();
-        final Coder<K> keyCoder = coder.getKeyCoder();
-        final Coder<V> valueCoder = coder.getValueCoder();
+        Coder<K> keyCoder = coder.getKeyCoder();
+        Coder<V> valueCoder = coder.getValueCoder();
 
         // Use coders to convert objects in the PCollection to byte arrays, so 
they
         // can be transferred over the network for the shuffle.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c603d1c/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombineGloballyTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombineGloballyTest.java
 
b/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombineGloballyTest.java
index be5f6dc..51ba905 100644
--- 
a/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombineGloballyTest.java
+++ 
b/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombineGloballyTest.java
@@ -66,7 +66,7 @@ public class CombineGloballyTest {
       StringBuilder sb = new StringBuilder();
       for (StringBuilder accum : accumulators) {
         if (accum != null) {
-          sb.append(accum.toString());
+          sb.append(accum);
         }
       }
       return sb;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c603d1c/runners/spark/src/test/java/com/cloudera/dataflow/spark/MultiOutputWordCountTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/com/cloudera/dataflow/spark/MultiOutputWordCountTest.java
 
b/runners/spark/src/test/java/com/cloudera/dataflow/spark/MultiOutputWordCountTest.java
index bf2ecdc..179816d 100644
--- 
a/runners/spark/src/test/java/com/cloudera/dataflow/spark/MultiOutputWordCountTest.java
+++ 
b/runners/spark/src/test/java/com/cloudera/dataflow/spark/MultiOutputWordCountTest.java
@@ -65,6 +65,7 @@ public class MultiOutputWordCountTest {
 
     EvaluationResult res = SparkPipelineRunner.create().run(p);
     Iterable<KV<String, Long>> actualLower = res.get(luc.get(lowerCnts));
+    Assert.assertEquals("and", actualLower.iterator().next().getKey());
     Iterable<KV<String, Long>> actualUpper = res.get(luc.get(upperCnts));
     Assert.assertEquals("Here", actualUpper.iterator().next().getKey());
     Iterable<Long> actualUniqCount = res.get(unique);
@@ -85,9 +86,9 @@ public class MultiOutputWordCountTest {
    */
   static class ExtractWordsFn extends DoFn<String, String> {
 
-    private Aggregator<Integer, Integer> totalWords = 
createAggregator("totalWords",
+    private final Aggregator<Integer, Integer> totalWords = 
createAggregator("totalWords",
         new Sum.SumIntegerFn());
-    private Aggregator<Integer, Integer> maxWordLength = 
createAggregator("maxWordLength",
+    private final Aggregator<Integer, Integer> maxWordLength = 
createAggregator("maxWordLength",
         new Max.MaxIntegerFn());
     private final PCollectionView<String> regex;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c603d1c/runners/spark/src/test/java/com/cloudera/dataflow/spark/SerializationTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/com/cloudera/dataflow/spark/SerializationTest.java
 
b/runners/spark/src/test/java/com/cloudera/dataflow/spark/SerializationTest.java
index bd1a4e8..a8edb3a 100644
--- 
a/runners/spark/src/test/java/com/cloudera/dataflow/spark/SerializationTest.java
+++ 
b/runners/spark/src/test/java/com/cloudera/dataflow/spark/SerializationTest.java
@@ -43,7 +43,7 @@ import org.junit.Test;
 public class SerializationTest {
 
   public static class StringHolder { // not serializable
-    private String string;
+    private final String string;
 
     public StringHolder(String string) {
       this.string = string;
@@ -71,7 +71,7 @@ public class SerializationTest {
 
   public static class StringHolderUtf8Coder extends AtomicCoder<StringHolder> {
 
-    private StringUtf8Coder stringUtf8Coder = StringUtf8Coder.of();
+    private final StringUtf8Coder stringUtf8Coder = StringUtf8Coder.of();
 
     @Override
     public void encode(StringHolder value, OutputStream outStream, Context 
context) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c603d1c/runners/spark/src/test/java/com/cloudera/dataflow/spark/SideEffectsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/com/cloudera/dataflow/spark/SideEffectsTest.java 
b/runners/spark/src/test/java/com/cloudera/dataflow/spark/SideEffectsTest.java
index 7292bf0..666737d 100644
--- 
a/runners/spark/src/test/java/com/cloudera/dataflow/spark/SideEffectsTest.java
+++ 
b/runners/spark/src/test/java/com/cloudera/dataflow/spark/SideEffectsTest.java
@@ -37,7 +37,7 @@ public class SideEffectsTest implements Serializable {
     pipeline.getCoderRegistry().registerCoder(URI.class, 
StringDelegateCoder.of(URI.class));
 
     PCollection<String> strings = pipeline.apply(Create.of("a"));
-    PCollection<String> output = strings.apply(ParDo.of(new DoFn<String, 
String>() {
+    strings.apply(ParDo.of(new DoFn<String, String>() {
       @Override
       public void processElement(ProcessContext c) throws Exception {
         throw new IllegalStateException("Side effect");
@@ -49,7 +49,6 @@ public class SideEffectsTest implements Serializable {
       fail("Run should thrown an exception");
     } catch (Exception e) {
       // expected
-      e.printStackTrace();
     }
   }
-}
+}
\ No newline at end of file

Reply via email to