Repository: incubator-beam Updated Branches: refs/heads/master ff825b077 -> 0f3b05335
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java index 1c83700..afcca93 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java @@ -18,13 +18,6 @@ package org.apache.beam.runners.spark.translation.streaming; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.LinkedBlockingQueue; - import org.apache.beam.runners.spark.translation.EvaluationContext; import org.apache.beam.runners.spark.translation.SparkRuntimeContext; import org.apache.beam.sdk.Pipeline; @@ -35,6 +28,7 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; + import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDDLike; import org.apache.spark.api.java.JavaSparkContext; @@ -43,6 +37,13 @@ import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaDStreamLike; import org.apache.spark.streaming.api.java.JavaStreamingContext; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; + /** * Streaming evaluation context helps to handle streaming. @@ -179,11 +180,11 @@ public class StreamingEvaluationContext extends EvaluationContext { //---------------- override in order to expose in package @Override - protected <I extends PInput> I getInput(PTransform<I, ?> transform) { + protected <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform) { return super.getInput(transform); } @Override - protected <O extends POutput> O getOutput(PTransform<?, O> transform) { + protected <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform) { return super.getOutput(transform); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index d9daeb0..c1ecc43 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -17,18 +17,6 @@ */ package org.apache.beam.runners.spark.translation.streaming; -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import com.google.api.client.util.Lists; -import com.google.api.client.util.Maps; -import com.google.api.client.util.Sets; -import com.google.common.reflect.TypeToken; -import kafka.serializer.Decoder; import org.apache.beam.runners.spark.io.ConsoleIO; import org.apache.beam.runners.spark.io.CreateStream; import org.apache.beam.runners.spark.io.KafkaIO; @@ -58,6 +46,12 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PDone; + +import com.google.api.client.util.Lists; +import com.google.api.client.util.Maps; +import com.google.api.client.util.Sets; +import com.google.common.reflect.TypeToken; + import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction; @@ -68,6 +62,15 @@ import org.apache.spark.streaming.api.java.JavaDStreamLike; import org.apache.spark.streaming.api.java.JavaPairInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; + +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import kafka.serializer.Decoder; import scala.Tuple2; @@ -173,14 +176,14 @@ public final class StreamingTransformTranslator { }; } - private static <PT extends PTransform<?, ?>> TransformEvaluator<PT> rddTransform( + private static <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> rddTransform( final SparkPipelineTranslator rddTranslator) { - return new TransformEvaluator<PT>() { + return new TransformEvaluator<TransformT>() { @SuppressWarnings("unchecked") @Override - public void evaluate(PT transform, EvaluationContext context) { - TransformEvaluator<PT> rddEvaluator = - rddTranslator.translate((Class<PT>) transform.getClass()); + public void evaluate(TransformT transform, EvaluationContext context) { + TransformEvaluator<TransformT> rddEvaluator = + rddTranslator.translate((Class<TransformT>) transform.getClass()); StreamingEvaluationContext sec = (StreamingEvaluationContext) context; if (sec.hasStream(transform)) { @@ -203,19 +206,20 @@ public final class StreamingTransformTranslator { * RDD transform function If the transformation function doesn't have an input, create a fake one * as an empty RDD. * - * @param <PT> PTransform type + * @param <TransformT> PTransform type */ - private static final class RDDTransform<PT extends PTransform<?, ?>> + private static final class RDDTransform<TransformT extends PTransform<?, ?>> implements Function<JavaRDD<WindowedValue<Object>>, JavaRDD<WindowedValue<Object>>> { private final StreamingEvaluationContext context; private final AppliedPTransform<?, ?, ?> appliedPTransform; - private final TransformEvaluator<PT> rddEvaluator; - private final PT transform; + private final TransformEvaluator<TransformT> rddEvaluator; + private final TransformT transform; - private RDDTransform(StreamingEvaluationContext context, TransformEvaluator<PT> rddEvaluator, - PT transform) { + private RDDTransform(StreamingEvaluationContext context, + TransformEvaluator<TransformT> rddEvaluator, + TransformT transform) { this.context = context; this.appliedPTransform = context.getCurrentTransform(); this.rddEvaluator = rddEvaluator; @@ -243,13 +247,13 @@ public final class StreamingTransformTranslator { } @SuppressWarnings("unchecked") - private static <PT extends PTransform<?, ?>> TransformEvaluator<PT> foreachRDD( + private static <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> foreachRDD( final SparkPipelineTranslator rddTranslator) { - return new TransformEvaluator<PT>() { + return new TransformEvaluator<TransformT>() { @Override - public void evaluate(PT transform, EvaluationContext context) { - TransformEvaluator<PT> rddEvaluator = - rddTranslator.translate((Class<PT>) transform.getClass()); + public void evaluate(TransformT transform, EvaluationContext context) { + TransformEvaluator<TransformT> rddEvaluator = + rddTranslator.translate((Class<TransformT>) transform.getClass()); StreamingEvaluationContext sec = (StreamingEvaluationContext) context; if (sec.hasStream(transform)) { @@ -268,19 +272,19 @@ public final class StreamingTransformTranslator { /** * RDD output function. * - * @param <PT> PTransform type + * @param <TransformT> PTransform type */ - private static final class RDDOutputOperator<PT extends PTransform<?, ?>> + private static final class RDDOutputOperator<TransformT extends PTransform<?, ?>> implements VoidFunction<JavaRDD<WindowedValue<Object>>> { private final StreamingEvaluationContext context; private final AppliedPTransform<?, ?, ?> appliedPTransform; - private final TransformEvaluator<PT> rddEvaluator; - private final PT transform; + private final TransformEvaluator<TransformT> rddEvaluator; + private final TransformT transform; private RDDOutputOperator(StreamingEvaluationContext context, - TransformEvaluator<PT> rddEvaluator, PT transform) { + TransformEvaluator<TransformT> rddEvaluator, TransformT transform) { this.context = context; this.appliedPTransform = context.getCurrentTransform(); this.rddEvaluator = rddEvaluator; @@ -325,7 +329,7 @@ public final class StreamingTransformTranslator { //--- then we apply windowing to the elements DoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn); DoFnFunction<T, T> dofn = new DoFnFunction<>(addWindowsDoFn, - ((StreamingEvaluationContext)context).getRuntimeContext(), null); + ((StreamingEvaluationContext) context).getRuntimeContext(), null); @SuppressWarnings("unchecked") JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>> dstream = (JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>>) @@ -361,9 +365,10 @@ public final class StreamingTransformTranslator { } @SuppressWarnings("unchecked") - private static <PT extends PTransform<?, ?>> TransformEvaluator<PT> - getTransformEvaluator(Class<PT> clazz, SparkPipelineTranslator rddTranslator) { - TransformEvaluator<PT> transform = (TransformEvaluator<PT>) EVALUATORS.get(clazz); + private static <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> + getTransformEvaluator(Class<TransformT> clazz, SparkPipelineTranslator rddTranslator) { + TransformEvaluator<TransformT> transform = + (TransformEvaluator<TransformT>) EVALUATORS.get(clazz); if (transform == null) { if (UNSUPPORTED_EVALUATORS.contains(clazz)) { throw new UnsupportedOperationException("Dataflow transformation " + clazz @@ -383,7 +388,8 @@ public final class StreamingTransformTranslator { return transform; } - private static <PT extends PTransform<?, ?>> Class<?> getPTransformOutputClazz(Class<PT> clazz) { + private static <TransformT extends PTransform<?, ?>> Class<?> + getPTransformOutputClazz(Class<TransformT> clazz) { Type[] types = ((ParameterizedType) clazz.getGenericSuperclass()).getActualTypeArguments(); return TypeToken.of(clazz).resolveType(types[1]).getRawType(); } @@ -407,7 +413,8 @@ public final class StreamingTransformTranslator { } @Override - public <PT extends PTransform<?, ?>> TransformEvaluator<PT> translate(Class<PT> clazz) { + public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> + translate(Class<TransformT> clazz) { return getTransformEvaluator(clazz, rddTranslator); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java index 8c018d3..6e36102 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java @@ -54,12 +54,12 @@ public final class StreamingWindowPipelineDetector extends SparkPipelineRunner.E // Use the smallest window (fixed or sliding) as Spark streaming's batch duration @Override - protected <PT extends PTransform<? super PInput, POutput>> void + protected <TransformT extends PTransform<? super PInput, POutput>> void doVisitTransform(TransformTreeNode node) { @SuppressWarnings("unchecked") - PT transform = (PT) node.getTransform(); + TransformT transform = (TransformT) node.getTransform(); @SuppressWarnings("unchecked") - Class<PT> transformClass = (Class<PT>) (Class<?>) transform.getClass(); + Class<TransformT> transformClass = (Class<TransformT>) (Class<?>) transform.getClass(); if (transformClass.isAssignableFrom(Window.Bound.class)) { WindowFn<?, ?> windowFn = WINDOW_FG.get("windowFn", transform); if (windowFn instanceof FixedWindows) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java index 1824a9d..d3fa05a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java @@ -18,17 +18,21 @@ package org.apache.beam.runners.spark.util; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.Serializable; - import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.sdk.coders.Coder; + import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.Serializable; + +/** + * Broadcast helper. + */ public abstract class BroadcastHelper<T> implements Serializable { /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/util/ByteArray.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/ByteArray.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/ByteArray.java index b1254d4..8c493f5 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/ByteArray.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/ByteArray.java @@ -17,11 +17,14 @@ */ package org.apache.beam.runners.spark.util; +import com.google.common.primitives.UnsignedBytes; + import java.io.Serializable; import java.util.Arrays; -import com.google.common.primitives.UnsignedBytes; - +/** + * Serializable byte array. + */ public class ByteArray implements Serializable, Comparable<ByteArray> { private final byte[] value; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java index 9a8aa2e..654614a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java @@ -24,6 +24,9 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PInput; +/** + * A {@link PTransform} wrapping another transform. + */ public class SinglePrimitiveOutputPTransform<T> extends PTransform<PInput, PCollection<T>> { private PTransform<PInput, PCollection<T>> transform; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java index f9b00cc..7b25e34 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java @@ -35,6 +35,9 @@ import org.junit.Test; import java.util.Collections; import java.util.List; +/** + * Empty input test. + */ public class EmptyInputTest { @Test @@ -51,6 +54,9 @@ public class EmptyInputTest { res.close(); } + /** + * Concat words serizaliable function used in test. + */ public static class ConcatWords implements SerializableFunction<Iterable<String>, String> { @Override public String apply(Iterable<String> input) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java index 4e9c0b8..eee120e 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java @@ -18,10 +18,13 @@ package org.apache.beam.runners.spark; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; + import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Count; @@ -32,16 +35,14 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.commons.io.FileUtils; -import org.junit.rules.TemporaryFolder; -import org.junit.Rule; -import org.junit.Test; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertThat; +import org.apache.commons.io.FileUtils; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.File; import java.util.Arrays; @@ -49,6 +50,9 @@ import java.util.List; import java.util.Set; import java.util.regex.Pattern; +/** + * Simple word count test. + */ public class SimpleWordCountTest { private static final String[] WORDS_ARRAY = { "hi there", "hi", "hi sue bob", @@ -133,6 +137,9 @@ public class SimpleWordCountTest { } } + /** + * A {@link PTransform} counting words. + */ public static class CountWords extends PTransform<PCollection<String>, PCollection<String>> { @Override public PCollection<String> apply(PCollection<String> lines) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java index 3643bac..88f4a06 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java @@ -18,19 +18,21 @@ package org.apache.beam.runners.spark; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + import org.apache.beam.sdk.options.PipelineOptionsRegistrar; import org.apache.beam.sdk.runners.PipelineRunnerRegistrar; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import java.util.ServiceLoader; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - /** * Test {@link SparkRunnerRegistrar}. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java index 693e2c6..f358878 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java @@ -47,6 +47,9 @@ import java.io.FileOutputStream; import java.io.IOException; import java.util.List; +/** + * Avro pipeline test. + */ public class AvroPipelineTest { private File inputFile; @@ -82,7 +85,8 @@ public class AvroPipelineTest { assertEquals(Lists.newArrayList(savedRecord), records); } - private void populateGenericFile(List<GenericRecord> genericRecords, Schema schema) throws IOException { + private void populateGenericFile(List<GenericRecord> genericRecords, + Schema schema) throws IOException { FileOutputStream outputStream = new FileOutputStream(this.inputFile); GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<>(schema); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java index 85eeabd..8ce35c4 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java @@ -49,6 +49,9 @@ import java.util.Arrays; import java.util.List; import java.util.Set; +/** + * Number of shards test. + */ public class NumShardsTest { private static final String[] WORDS_ARRAY = { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java index 0c8c6fc..eaa508c 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java @@ -47,6 +47,9 @@ import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; +/** + * Pipeline on the Hadoop file format test. + */ public class HadoopFileFormatPipelineTest { private File inputFile; @@ -69,16 +72,21 @@ public class HadoopFileFormatPipelineTest { Pipeline p = Pipeline.create(PipelineOptionsFactory.create()); @SuppressWarnings("unchecked") Class<? extends FileInputFormat<IntWritable, Text>> inputFormatClass = - (Class<? extends FileInputFormat<IntWritable, Text>>) (Class<?>) SequenceFileInputFormat.class; - HadoopIO.Read.Bound<IntWritable,Text> read = - HadoopIO.Read.from(inputFile.getAbsolutePath(), inputFormatClass, IntWritable.class, Text.class); + (Class<? extends FileInputFormat<IntWritable, Text>>) + (Class<?>) SequenceFileInputFormat.class; + HadoopIO.Read.Bound<IntWritable, Text> read = + HadoopIO.Read.from(inputFile.getAbsolutePath(), + inputFormatClass, + IntWritable.class, + Text.class); PCollection<KV<IntWritable, Text>> input = p.apply(read) .setCoder(KvCoder.of(WritableCoder.of(IntWritable.class), WritableCoder.of(Text.class))); @SuppressWarnings("unchecked") Class<? extends FileOutputFormat<IntWritable, Text>> outputFormatClass = - (Class<? extends FileOutputFormat<IntWritable, Text>>) (Class<?>) TemplatedSequenceFileOutputFormat.class; + (Class<? extends FileOutputFormat<IntWritable, Text>>) + (Class<?>) TemplatedSequenceFileOutputFormat.class; @SuppressWarnings("unchecked") - HadoopIO.Write.Bound<IntWritable,Text> write = HadoopIO.Write.to(outputFile.getAbsolutePath(), + HadoopIO.Write.Bound<IntWritable, Text> write = HadoopIO.Write.to(outputFile.getAbsolutePath(), outputFormatClass, IntWritable.class, Text.class); input.apply(write.withoutSharding()); EvaluationResult res = SparkPipelineRunner.create().run(p); @@ -86,7 +94,8 @@ public class HadoopFileFormatPipelineTest { IntWritable key = new IntWritable(); Text value = new Text(); - try (Reader reader = new Reader(new Configuration(), Reader.file(new Path(outputFile.toURI())))) { + try (Reader reader = new Reader(new Configuration(), + Reader.file(new Path(outputFile.toURI())))) { int i = 0; while (reader.next(key, value)) { assertEquals(i, key.get()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java index 55991a4..e1620db 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java @@ -28,6 +28,9 @@ import static org.junit.Assert.assertEquals; import org.junit.Test; +/** + * Test on the {@link ShardNameBuilder}. + */ public class ShardNameBuilderTest { @Test http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java index a644673..ac64540 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java @@ -37,6 +37,9 @@ import org.junit.Test; import java.util.Arrays; import java.util.List; +/** + * Combine globally test. + */ public class CombineGloballyTest { private static final String[] WORDS_ARRAY = { @@ -53,10 +56,14 @@ public class CombineGloballyTest { PCollection<String> output = inputWords.apply(Combine.globally(new WordMerger())); EvaluationResult res = SparkPipelineRunner.create().run(p); - assertEquals("hi there,hi,hi sue bob,hi sue,,bob hi", Iterables.getOnlyElement(res.get(output))); + assertEquals("hi there,hi,hi sue bob,hi sue,,bob hi", + Iterables.getOnlyElement(res.get(output))); res.close(); } + /** + * Word merger combine function used in the test. + */ public static class WordMerger extends Combine.CombineFn<String, StringBuilder, String> { @Override @@ -83,7 +90,7 @@ public class CombineGloballyTest { @Override public String extractOutput(StringBuilder accumulator) { - return accumulator != null ? accumulator.toString(): ""; + return accumulator != null ? accumulator.toString() : ""; } private static StringBuilder combine(StringBuilder accum, String datum) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java index 4e0bc5d..4e6c888 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java @@ -42,6 +42,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +/** + * Combine per key function test. + */ public class CombinePerKeyTest { private static final List<String> WORDS = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java index ca97a96..0334bfe 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java @@ -33,6 +33,9 @@ import org.junit.Test; import java.io.Serializable; +/** + * DoFN output test. + */ public class DoFnOutputTest implements Serializable { @Test public void test() throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java index 6a862c9..3402bb4 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java @@ -52,6 +52,9 @@ import org.junit.Test; import java.util.Set; +/** + * Multi-output word count test. + */ public class MultiOutputWordCountTest { private static final TupleTag<String> upper = new TupleTag<>(); @@ -128,6 +131,9 @@ public class MultiOutputWordCountTest { } } + /** + * Count words {@link PTransform} used in the test. + */ public static class CountWords extends PTransform<PCollection<String>, PCollectionTuple> { private final PCollectionView<String> regex; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java index 75d3fb2..22a2241 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java @@ -51,8 +51,14 @@ import java.util.List; import java.util.Set; import java.util.regex.Pattern; +/** + * Serialization test. + */ public class SerializationTest { + /** + * Simple String holder. + */ public static class StringHolder { // not serializable private final String string; @@ -80,12 +86,17 @@ public class SerializationTest { } } + /** + * Simple String holder with UTF-8 encoding. + */ public static class StringHolderUtf8Coder extends AtomicCoder<StringHolder> { private final StringUtf8Coder stringUtf8Coder = StringUtf8Coder.of(); @Override - public void encode(StringHolder value, OutputStream outStream, Context context) throws IOException { + public void encode(StringHolder value, + OutputStream outStream, + Context context) throws IOException { stringUtf8Coder.encode(value.toString(), outStream, context); } @@ -171,7 +182,8 @@ public class SerializationTest { } } - private static class CountWords extends PTransform<PCollection<StringHolder>, PCollection<StringHolder>> { + private static class CountWords + extends PTransform<PCollection<StringHolder>, PCollection<StringHolder>> { @Override public PCollection<StringHolder> apply(PCollection<StringHolder> lines) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java index 14abbfc..5674900 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java @@ -38,6 +38,9 @@ import org.junit.Test; import java.io.Serializable; import java.net.URI; +/** + * Side effects test. + */ public class SideEffectsTest implements Serializable { static class UserException extends RuntimeException { @@ -66,7 +69,8 @@ public class SideEffectsTest implements Serializable { // TODO: remove the version check (and the setup and teardown methods) when we no // longer support Spark 1.3 or 1.4 - String version = SparkContextFactory.getSparkContext(options.getSparkMaster(), options.getAppName()).version(); + String version = SparkContextFactory.getSparkContext(options.getSparkMaster(), + options.getAppName()).version(); if (!version.startsWith("1.3.") && !version.startsWith("1.4.")) { assertTrue(e.getCause() instanceof UserException); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java index bf18486..59888c2 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java @@ -24,6 +24,9 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.junit.Assert; import org.junit.Test; +/** + * Simple test on the Spark runner pipeline options. + */ public class SparkPipelineOptionsTest { @Test public void testDefaultCreateMethod() { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java index 0db8913..8062658 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java @@ -39,6 +39,9 @@ import org.junit.Test; import java.util.Arrays; import java.util.List; +/** + * Windowed word count test. + */ public class WindowedWordCountTest { private static final String[] WORDS_ARRAY = { "hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"}; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java index 9152d72..15b2f39 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java @@ -63,7 +63,7 @@ public class FlattenStreamingTest { PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class); options.setAppName(this.getClass().getSimpleName()); options.setRunner(SparkPipelineRunner.class); - options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval + options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval Pipeline p = Pipeline.create(options); PCollection<String> w1 = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java index e1ff227..fd75e74 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java @@ -99,7 +99,7 @@ public class KafkaStreamingTest { PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class); options.setAppName(this.getClass().getSimpleName()); options.setRunner(SparkPipelineRunner.class); - options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval + options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval Pipeline p = Pipeline.create(options); Map<String, String> kafkaParams = ImmutableMap.of( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java index ef224da..28133ca 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java @@ -42,6 +42,9 @@ import java.util.Collections; import java.util.List; import java.util.Set; +/** + * Simple word count streaming test. + */ public class SimpleStreamingWordCountTest { private static final String[] WORDS_ARRAY = { @@ -58,7 +61,7 @@ public class SimpleStreamingWordCountTest { PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class); options.setAppName(this.getClass().getSimpleName()); options.setRunner(SparkPipelineRunner.class); - options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval + options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval Pipeline p = Pipeline.create(options); PCollection<String> inputWords = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java index 2ade467..0fec573 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java @@ -39,6 +39,7 @@ import kafka.server.KafkaServer; import kafka.utils.Time; /** + * Embedded Kafka cluster. * https://gist.github.com/fjavieralba/7930018 */ public class EmbeddedKafkaCluster { @@ -169,6 +170,9 @@ public class EmbeddedKafkaCluster { return "EmbeddedKafkaCluster{" + "brokerList='" + brokerList + "'}"; } + /** + * Embedded Zookeeper. + */ public static class EmbeddedZookeeper { private int port = -1; private int tickTime = 500; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java index 041cc50..3d8fc32 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java @@ -26,8 +26,9 @@ import org.junit.Assert; * success/failure counters. */ public final class PAssertStreaming { + /** - * Copied aggregator names from {@link org.apache.beam.sdk.testing.PAssert} + * Copied aggregator names from {@link org.apache.beam.sdk.testing.PAssert}. */ static final String SUCCESS_COUNTER = "PAssertSuccess"; static final String FAILURE_COUNTER = "PAssertFailure";
