Repository: incubator-beam
Updated Branches:
  refs/heads/master ff825b077 -> 0f3b05335


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
index 1c83700..afcca93 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
@@ -18,13 +18,6 @@
 package org.apache.beam.runners.spark.translation.streaming;
 
 
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-
 import org.apache.beam.runners.spark.translation.EvaluationContext;
 import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
 import org.apache.beam.sdk.Pipeline;
@@ -35,6 +28,7 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
+
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaRDDLike;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -43,6 +37,13 @@ import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaDStreamLike;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+
 
 /**
  * Streaming evaluation context helps to handle streaming.
@@ -179,11 +180,11 @@ public class StreamingEvaluationContext extends 
EvaluationContext {
 
   //---------------- override in order to expose in package
   @Override
-  protected <I extends PInput> I getInput(PTransform<I, ?> transform) {
+  protected <InputT extends PInput> InputT getInput(PTransform<InputT, ?> 
transform) {
     return super.getInput(transform);
   }
   @Override
-  protected <O extends POutput> O getOutput(PTransform<?, O> transform) {
+  protected <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> 
transform) {
     return super.getOutput(transform);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index d9daeb0..c1ecc43 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -17,18 +17,6 @@
  */
 package org.apache.beam.runners.spark.translation.streaming;
 
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.google.api.client.util.Lists;
-import com.google.api.client.util.Maps;
-import com.google.api.client.util.Sets;
-import com.google.common.reflect.TypeToken;
-import kafka.serializer.Decoder;
 import org.apache.beam.runners.spark.io.ConsoleIO;
 import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.runners.spark.io.KafkaIO;
@@ -58,6 +46,12 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PDone;
+
+import com.google.api.client.util.Lists;
+import com.google.api.client.util.Maps;
+import com.google.api.client.util.Sets;
+import com.google.common.reflect.TypeToken;
+
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.VoidFunction;
@@ -68,6 +62,15 @@ import org.apache.spark.streaming.api.java.JavaDStreamLike;
 import org.apache.spark.streaming.api.java.JavaPairInputDStream;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.apache.spark.streaming.kafka.KafkaUtils;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import kafka.serializer.Decoder;
 import scala.Tuple2;
 
 
@@ -173,14 +176,14 @@ public final class StreamingTransformTranslator {
     };
   }
 
-  private static <PT extends PTransform<?, ?>> TransformEvaluator<PT> 
rddTransform(
+  private static <TransformT extends PTransform<?, ?>> 
TransformEvaluator<TransformT> rddTransform(
       final SparkPipelineTranslator rddTranslator) {
-    return new TransformEvaluator<PT>() {
+    return new TransformEvaluator<TransformT>() {
       @SuppressWarnings("unchecked")
       @Override
-      public void evaluate(PT transform, EvaluationContext context) {
-        TransformEvaluator<PT> rddEvaluator =
-            rddTranslator.translate((Class<PT>) transform.getClass());
+      public void evaluate(TransformT transform, EvaluationContext context) {
+        TransformEvaluator<TransformT> rddEvaluator =
+            rddTranslator.translate((Class<TransformT>) transform.getClass());
 
         StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
         if (sec.hasStream(transform)) {
@@ -203,19 +206,20 @@ public final class StreamingTransformTranslator {
    * RDD transform function If the transformation function doesn't have an 
input, create a fake one
    * as an empty RDD.
    *
-   * @param <PT> PTransform type
+   * @param <TransformT> PTransform type
    */
-  private static final class RDDTransform<PT extends PTransform<?, ?>>
+  private static final class RDDTransform<TransformT extends PTransform<?, ?>>
       implements Function<JavaRDD<WindowedValue<Object>>, 
JavaRDD<WindowedValue<Object>>> {
 
     private final StreamingEvaluationContext context;
     private final AppliedPTransform<?, ?, ?> appliedPTransform;
-    private final TransformEvaluator<PT> rddEvaluator;
-    private final PT transform;
+    private final TransformEvaluator<TransformT> rddEvaluator;
+    private final TransformT transform;
 
 
-    private RDDTransform(StreamingEvaluationContext context, 
TransformEvaluator<PT> rddEvaluator,
-        PT transform) {
+    private RDDTransform(StreamingEvaluationContext context,
+                         TransformEvaluator<TransformT> rddEvaluator,
+        TransformT transform) {
       this.context = context;
       this.appliedPTransform = context.getCurrentTransform();
       this.rddEvaluator = rddEvaluator;
@@ -243,13 +247,13 @@ public final class StreamingTransformTranslator {
   }
 
   @SuppressWarnings("unchecked")
-  private static <PT extends PTransform<?, ?>> TransformEvaluator<PT> 
foreachRDD(
+  private static <TransformT extends PTransform<?, ?>> 
TransformEvaluator<TransformT> foreachRDD(
       final SparkPipelineTranslator rddTranslator) {
-    return new TransformEvaluator<PT>() {
+    return new TransformEvaluator<TransformT>() {
       @Override
-      public void evaluate(PT transform, EvaluationContext context) {
-        TransformEvaluator<PT> rddEvaluator =
-            rddTranslator.translate((Class<PT>) transform.getClass());
+      public void evaluate(TransformT transform, EvaluationContext context) {
+        TransformEvaluator<TransformT> rddEvaluator =
+            rddTranslator.translate((Class<TransformT>) transform.getClass());
 
         StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
         if (sec.hasStream(transform)) {
@@ -268,19 +272,19 @@ public final class StreamingTransformTranslator {
   /**
    * RDD output function.
    *
-   * @param <PT> PTransform type
+   * @param <TransformT> PTransform type
    */
-  private static final class RDDOutputOperator<PT extends PTransform<?, ?>>
+  private static final class RDDOutputOperator<TransformT extends 
PTransform<?, ?>>
       implements VoidFunction<JavaRDD<WindowedValue<Object>>> {
 
     private final StreamingEvaluationContext context;
     private final AppliedPTransform<?, ?, ?> appliedPTransform;
-    private final TransformEvaluator<PT> rddEvaluator;
-    private final PT transform;
+    private final TransformEvaluator<TransformT> rddEvaluator;
+    private final TransformT transform;
 
 
     private RDDOutputOperator(StreamingEvaluationContext context,
-        TransformEvaluator<PT> rddEvaluator, PT transform) {
+                              TransformEvaluator<TransformT> rddEvaluator, 
TransformT transform) {
       this.context = context;
       this.appliedPTransform = context.getCurrentTransform();
       this.rddEvaluator = rddEvaluator;
@@ -325,7 +329,7 @@ public final class StreamingTransformTranslator {
         //--- then we apply windowing to the elements
         DoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn);
         DoFnFunction<T, T> dofn = new DoFnFunction<>(addWindowsDoFn,
-            ((StreamingEvaluationContext)context).getRuntimeContext(), null);
+            ((StreamingEvaluationContext) context).getRuntimeContext(), null);
         @SuppressWarnings("unchecked")
         JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>> 
dstream =
             (JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>>)
@@ -361,9 +365,10 @@ public final class StreamingTransformTranslator {
   }
 
   @SuppressWarnings("unchecked")
-  private static <PT extends PTransform<?, ?>> TransformEvaluator<PT>
-      getTransformEvaluator(Class<PT> clazz, SparkPipelineTranslator 
rddTranslator) {
-    TransformEvaluator<PT> transform = (TransformEvaluator<PT>) 
EVALUATORS.get(clazz);
+  private static <TransformT extends PTransform<?, ?>> 
TransformEvaluator<TransformT>
+      getTransformEvaluator(Class<TransformT> clazz, SparkPipelineTranslator 
rddTranslator) {
+    TransformEvaluator<TransformT> transform =
+        (TransformEvaluator<TransformT>) EVALUATORS.get(clazz);
     if (transform == null) {
       if (UNSUPPORTED_EVALUATORS.contains(clazz)) {
         throw new UnsupportedOperationException("Dataflow transformation " + 
clazz
@@ -383,7 +388,8 @@ public final class StreamingTransformTranslator {
     return transform;
   }
 
-  private static <PT extends PTransform<?, ?>> Class<?> 
getPTransformOutputClazz(Class<PT> clazz) {
+  private static <TransformT extends PTransform<?, ?>> Class<?>
+  getPTransformOutputClazz(Class<TransformT> clazz) {
     Type[] types = ((ParameterizedType) 
clazz.getGenericSuperclass()).getActualTypeArguments();
     return TypeToken.of(clazz).resolveType(types[1]).getRawType();
   }
@@ -407,7 +413,8 @@ public final class StreamingTransformTranslator {
     }
 
     @Override
-    public <PT extends PTransform<?, ?>> TransformEvaluator<PT> 
translate(Class<PT> clazz) {
+    public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT>
+    translate(Class<TransformT> clazz) {
       return getTransformEvaluator(clazz, rddTranslator);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java
index 8c018d3..6e36102 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java
@@ -54,12 +54,12 @@ public final class StreamingWindowPipelineDetector extends 
SparkPipelineRunner.E
 
   // Use the smallest window (fixed or sliding) as Spark streaming's batch 
duration
   @Override
-  protected <PT extends PTransform<? super PInput, POutput>> void
+  protected <TransformT extends PTransform<? super PInput, POutput>> void
       doVisitTransform(TransformTreeNode node) {
     @SuppressWarnings("unchecked")
-    PT transform = (PT) node.getTransform();
+    TransformT transform = (TransformT) node.getTransform();
     @SuppressWarnings("unchecked")
-    Class<PT> transformClass = (Class<PT>) (Class<?>) transform.getClass();
+    Class<TransformT> transformClass = (Class<TransformT>) (Class<?>) 
transform.getClass();
     if (transformClass.isAssignableFrom(Window.Bound.class)) {
       WindowFn<?, ?> windowFn = WINDOW_FG.get("windowFn", transform);
       if (windowFn instanceof FixedWindows) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
index 1824a9d..d3fa05a 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
@@ -18,17 +18,21 @@
 
 package org.apache.beam.runners.spark.util;
 
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.Serializable;
-
 import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.sdk.coders.Coder;
+
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.broadcast.Broadcast;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Broadcast helper.
+ */
 public abstract class BroadcastHelper<T> implements Serializable {
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/util/ByteArray.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/ByteArray.java 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/ByteArray.java
index b1254d4..8c493f5 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/ByteArray.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/ByteArray.java
@@ -17,11 +17,14 @@
  */
 package org.apache.beam.runners.spark.util;
 
+import com.google.common.primitives.UnsignedBytes;
+
 import java.io.Serializable;
 import java.util.Arrays;
 
-import com.google.common.primitives.UnsignedBytes;
-
+/**
+ * Serializable byte array.
+ */
 public class ByteArray implements Serializable, Comparable<ByteArray> {
 
   private final byte[] value;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
index 9a8aa2e..654614a 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
@@ -24,6 +24,9 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PInput;
 
+/**
+ * A {@link PTransform} wrapping another transform.
+ */
 public class SinglePrimitiveOutputPTransform<T> extends PTransform<PInput, 
PCollection<T>> {
   private PTransform<PInput, PCollection<T>> transform;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
index f9b00cc..7b25e34 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
@@ -35,6 +35,9 @@ import org.junit.Test;
 import java.util.Collections;
 import java.util.List;
 
+/**
+ * Empty input test.
+ */
 public class EmptyInputTest {
 
   @Test
@@ -51,6 +54,9 @@ public class EmptyInputTest {
     res.close();
   }
 
+  /**
+   * Concat words serizaliable function used in test.
+   */
   public static class ConcatWords implements 
SerializableFunction<Iterable<String>, String> {
     @Override
     public String apply(Iterable<String> input) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
index 4e9c0b8..eee120e 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
@@ -18,10 +18,13 @@
 
 package org.apache.beam.runners.spark;
 
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
@@ -32,16 +35,14 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.commons.io.FileUtils;
-import org.junit.rules.TemporaryFolder;
-import org.junit.Rule;
-import org.junit.Test;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertThat;
+import org.apache.commons.io.FileUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.util.Arrays;
@@ -49,6 +50,9 @@ import java.util.List;
 import java.util.Set;
 import java.util.regex.Pattern;
 
+/**
+ * Simple word count test.
+ */
 public class SimpleWordCountTest {
   private static final String[] WORDS_ARRAY = {
       "hi there", "hi", "hi sue bob",
@@ -133,6 +137,9 @@ public class SimpleWordCountTest {
     }
   }
 
+  /**
+   * A {@link PTransform} counting words.
+   */
   public static class CountWords extends PTransform<PCollection<String>, 
PCollection<String>> {
     @Override
     public PCollection<String> apply(PCollection<String> lines) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
index 3643bac..88f4a06 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
@@ -18,19 +18,21 @@
 
 package org.apache.beam.runners.spark;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
 import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
 import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 import java.util.ServiceLoader;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
 /**
  * Test {@link SparkRunnerRegistrar}.
  */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index 693e2c6..f358878 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -47,6 +47,9 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.List;
 
+/**
+ * Avro pipeline test.
+ */
 public class AvroPipelineTest {
 
   private File inputFile;
@@ -82,7 +85,8 @@ public class AvroPipelineTest {
     assertEquals(Lists.newArrayList(savedRecord), records);
   }
 
-  private void populateGenericFile(List<GenericRecord> genericRecords, Schema 
schema) throws IOException {
+  private void populateGenericFile(List<GenericRecord> genericRecords,
+                                   Schema schema) throws IOException {
     FileOutputStream outputStream = new FileOutputStream(this.inputFile);
     GenericDatumWriter<GenericRecord> genericDatumWriter = new 
GenericDatumWriter<>(schema);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
index 85eeabd..8ce35c4 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
@@ -49,6 +49,9 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 
+/**
+ * Number of shards test.
+ */
 public class NumShardsTest {
 
   private static final String[] WORDS_ARRAY = {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
index 0c8c6fc..eaa508c 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
@@ -47,6 +47,9 @@ import org.junit.rules.TemporaryFolder;
 import java.io.File;
 import java.io.IOException;
 
+/**
+ * Pipeline on the Hadoop file format test.
+ */
 public class HadoopFileFormatPipelineTest {
 
   private File inputFile;
@@ -69,16 +72,21 @@ public class HadoopFileFormatPipelineTest {
     Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
     @SuppressWarnings("unchecked")
     Class<? extends FileInputFormat<IntWritable, Text>> inputFormatClass =
-        (Class<? extends FileInputFormat<IntWritable, Text>>) (Class<?>) 
SequenceFileInputFormat.class;
-    HadoopIO.Read.Bound<IntWritable,Text> read =
-        HadoopIO.Read.from(inputFile.getAbsolutePath(), inputFormatClass, 
IntWritable.class, Text.class);
+        (Class<? extends FileInputFormat<IntWritable, Text>>)
+            (Class<?>) SequenceFileInputFormat.class;
+    HadoopIO.Read.Bound<IntWritable, Text> read =
+        HadoopIO.Read.from(inputFile.getAbsolutePath(),
+            inputFormatClass,
+            IntWritable.class,
+            Text.class);
     PCollection<KV<IntWritable, Text>> input = p.apply(read)
         .setCoder(KvCoder.of(WritableCoder.of(IntWritable.class), 
WritableCoder.of(Text.class)));
     @SuppressWarnings("unchecked")
     Class<? extends FileOutputFormat<IntWritable, Text>> outputFormatClass =
-        (Class<? extends FileOutputFormat<IntWritable, Text>>) (Class<?>) 
TemplatedSequenceFileOutputFormat.class;
+        (Class<? extends FileOutputFormat<IntWritable, Text>>)
+            (Class<?>) TemplatedSequenceFileOutputFormat.class;
     @SuppressWarnings("unchecked")
-    HadoopIO.Write.Bound<IntWritable,Text> write = 
HadoopIO.Write.to(outputFile.getAbsolutePath(),
+    HadoopIO.Write.Bound<IntWritable, Text> write = 
HadoopIO.Write.to(outputFile.getAbsolutePath(),
         outputFormatClass, IntWritable.class, Text.class);
     input.apply(write.withoutSharding());
     EvaluationResult res = SparkPipelineRunner.create().run(p);
@@ -86,7 +94,8 @@ public class HadoopFileFormatPipelineTest {
 
     IntWritable key = new IntWritable();
     Text value = new Text();
-    try (Reader reader = new Reader(new Configuration(), Reader.file(new 
Path(outputFile.toURI())))) {
+    try (Reader reader = new Reader(new Configuration(),
+        Reader.file(new Path(outputFile.toURI())))) {
       int i = 0;
       while (reader.next(key, value)) {
         assertEquals(i, key.get());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java
index 55991a4..e1620db 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java
@@ -28,6 +28,9 @@ import static org.junit.Assert.assertEquals;
 
 import org.junit.Test;
 
+/**
+ * Test on the {@link ShardNameBuilder}.
+ */
 public class ShardNameBuilderTest {
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
index a644673..ac64540 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
@@ -37,6 +37,9 @@ import org.junit.Test;
 import java.util.Arrays;
 import java.util.List;
 
+/**
+ * Combine globally test.
+ */
 public class CombineGloballyTest {
 
   private static final String[] WORDS_ARRAY = {
@@ -53,10 +56,14 @@ public class CombineGloballyTest {
     PCollection<String> output = inputWords.apply(Combine.globally(new 
WordMerger()));
 
     EvaluationResult res = SparkPipelineRunner.create().run(p);
-    assertEquals("hi there,hi,hi sue bob,hi sue,,bob hi", 
Iterables.getOnlyElement(res.get(output)));
+    assertEquals("hi there,hi,hi sue bob,hi sue,,bob hi",
+        Iterables.getOnlyElement(res.get(output)));
     res.close();
   }
 
+  /**
+   * Word merger combine function used in the test.
+   */
   public static class WordMerger extends Combine.CombineFn<String, 
StringBuilder, String> {
 
     @Override
@@ -83,7 +90,7 @@ public class CombineGloballyTest {
 
     @Override
     public String extractOutput(StringBuilder accumulator) {
-      return accumulator != null ? accumulator.toString(): "";
+      return accumulator != null ? accumulator.toString() : "";
     }
 
     private static StringBuilder combine(StringBuilder accum, String datum) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
index 4e0bc5d..4e6c888 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
@@ -42,6 +42,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+/**
+ * Combine per key function test.
+ */
 public class CombinePerKeyTest {
 
     private static final List<String> WORDS =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
index ca97a96..0334bfe 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
@@ -33,6 +33,9 @@ import org.junit.Test;
 
 import java.io.Serializable;
 
+/**
+ * DoFN output test.
+ */
 public class DoFnOutputTest implements Serializable {
   @Test
   public void test() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
index 6a862c9..3402bb4 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
@@ -52,6 +52,9 @@ import org.junit.Test;
 
 import java.util.Set;
 
+/**
+ * Multi-output word count test.
+ */
 public class MultiOutputWordCountTest {
 
   private static final TupleTag<String> upper = new TupleTag<>();
@@ -128,6 +131,9 @@ public class MultiOutputWordCountTest {
     }
   }
 
+  /**
+   * Count words {@link PTransform} used in the test.
+   */
   public static class CountWords extends PTransform<PCollection<String>, 
PCollectionTuple> {
 
     private final PCollectionView<String> regex;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
index 75d3fb2..22a2241 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
@@ -51,8 +51,14 @@ import java.util.List;
 import java.util.Set;
 import java.util.regex.Pattern;
 
+/**
+ * Serialization test.
+ */
 public class SerializationTest {
 
+  /**
+   * Simple String holder.
+   */
   public static class StringHolder { // not serializable
     private final String string;
 
@@ -80,12 +86,17 @@ public class SerializationTest {
     }
   }
 
+  /**
+   * Simple String holder with UTF-8 encoding.
+   */
   public static class StringHolderUtf8Coder extends AtomicCoder<StringHolder> {
 
     private final StringUtf8Coder stringUtf8Coder = StringUtf8Coder.of();
 
     @Override
-    public void encode(StringHolder value, OutputStream outStream, Context 
context) throws IOException {
+    public void encode(StringHolder value,
+                       OutputStream outStream,
+                       Context context) throws IOException {
       stringUtf8Coder.encode(value.toString(), outStream, context);
     }
 
@@ -171,7 +182,8 @@ public class SerializationTest {
     }
   }
 
-  private static class CountWords extends 
PTransform<PCollection<StringHolder>, PCollection<StringHolder>> {
+  private static class CountWords
+      extends PTransform<PCollection<StringHolder>, PCollection<StringHolder>> 
{
     @Override
     public PCollection<StringHolder> apply(PCollection<StringHolder> lines) {
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
index 14abbfc..5674900 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
@@ -38,6 +38,9 @@ import org.junit.Test;
 import java.io.Serializable;
 import java.net.URI;
 
+/**
+ * Side effects test.
+ */
 public class SideEffectsTest implements Serializable {
 
   static class UserException extends RuntimeException {
@@ -66,7 +69,8 @@ public class SideEffectsTest implements Serializable {
 
       // TODO: remove the version check (and the setup and teardown methods) 
when we no
       // longer support Spark 1.3 or 1.4
-      String version = 
SparkContextFactory.getSparkContext(options.getSparkMaster(), 
options.getAppName()).version();
+      String version = 
SparkContextFactory.getSparkContext(options.getSparkMaster(),
+          options.getAppName()).version();
       if (!version.startsWith("1.3.") && !version.startsWith("1.4.")) {
         assertTrue(e.getCause() instanceof UserException);
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java
index bf18486..59888c2 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java
@@ -24,6 +24,9 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.junit.Assert;
 import org.junit.Test;
 
+/**
+ * Simple test on the Spark runner pipeline options.
+ */
 public class SparkPipelineOptionsTest {
   @Test
   public void testDefaultCreateMethod() {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
index 0db8913..8062658 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
@@ -39,6 +39,9 @@ import org.junit.Test;
 import java.util.Arrays;
 import java.util.List;
 
+/**
+ * Windowed word count test.
+ */
 public class WindowedWordCountTest {
   private static final String[] WORDS_ARRAY = {
       "hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"};

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
index 9152d72..15b2f39 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
@@ -63,7 +63,7 @@ public class FlattenStreamingTest {
         PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class);
     options.setAppName(this.getClass().getSimpleName());
     options.setRunner(SparkPipelineRunner.class);
-    options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval
+    options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval
     Pipeline p = Pipeline.create(options);
 
     PCollection<String> w1 =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
index e1ff227..fd75e74 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
@@ -99,7 +99,7 @@ public class KafkaStreamingTest {
         PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class);
     options.setAppName(this.getClass().getSimpleName());
     options.setRunner(SparkPipelineRunner.class);
-    options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval
+    options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval
     Pipeline p = Pipeline.create(options);
 
     Map<String, String> kafkaParams = ImmutableMap.of(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
index ef224da..28133ca 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
@@ -42,6 +42,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
+/**
+ * Simple word count streaming test.
+ */
 public class SimpleStreamingWordCountTest {
 
   private static final String[] WORDS_ARRAY = {
@@ -58,7 +61,7 @@ public class SimpleStreamingWordCountTest {
         PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class);
     options.setAppName(this.getClass().getSimpleName());
     options.setRunner(SparkPipelineRunner.class);
-    options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval
+    options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval
     Pipeline p = Pipeline.create(options);
 
     PCollection<String> inputWords =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java
index 2ade467..0fec573 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java
@@ -39,6 +39,7 @@ import kafka.server.KafkaServer;
 import kafka.utils.Time;
 
 /**
+ * Embedded Kafka cluster.
  * https://gist.github.com/fjavieralba/7930018
  */
 public class EmbeddedKafkaCluster {
@@ -169,6 +170,9 @@ public class EmbeddedKafkaCluster {
     return "EmbeddedKafkaCluster{" + "brokerList='" + brokerList + "'}";
   }
 
+  /**
+   * Embedded Zookeeper.
+   */
   public static class EmbeddedZookeeper {
     private int port = -1;
     private int tickTime = 500;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
index 041cc50..3d8fc32 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
@@ -26,8 +26,9 @@ import org.junit.Assert;
  * success/failure counters.
  */
 public final class PAssertStreaming {
+
   /**
-   * Copied aggregator names from {@link org.apache.beam.sdk.testing.PAssert}
+   * Copied aggregator names from {@link org.apache.beam.sdk.testing.PAssert}.
    */
   static final String SUCCESS_COUNTER = "PAssertSuccess";
   static final String FAILURE_COUNTER = "PAssertFailure";


Reply via email to