http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/org/apache/beam/runners/spark/SideEffectsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/SideEffectsTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/SideEffectsTest.java
new file mode 100644
index 0000000..44d8e0f
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/SideEffectsTest.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package org.apache.beam.runners.spark;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.StringDelegateCoder;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.net.URI;
+
+import static org.junit.Assert.*;
+
+public class SideEffectsTest implements Serializable {
+
+  static class UserException extends RuntimeException {
+  }
+
+  @Test
+  public void test() throws Exception {
+    SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
+    options.setRunner(SparkPipelineRunner.class);
+    Pipeline pipeline = Pipeline.create(options);
+
+    pipeline.getCoderRegistry().registerCoder(URI.class, 
StringDelegateCoder.of(URI.class));
+
+    pipeline.apply(Create.of("a")).apply(ParDo.of(new DoFn<String, String>() {
+      @Override
+      public void processElement(ProcessContext c) throws Exception {
+        throw new UserException();
+      }
+    }));
+
+    try {
+      pipeline.run();
+      fail("Run should thrown an exception");
+    } catch (RuntimeException e) {
+      assertNotNull(e.getCause());
+
+      // TODO: remove the version check (and the setup and teardown methods) 
when we no
+      // longer support Spark 1.3 or 1.4
+      String version = 
SparkContextFactory.getSparkContext(options.getSparkMaster(), 
options.getAppName()).version();
+      if (!version.startsWith("1.3.") && !version.startsWith("1.4.")) {
+        assertTrue(e.getCause() instanceof UserException);
+      }
+    }
+  }
+
+  @Before
+  public void setup() {
+    System.setProperty(SparkContextFactory.TEST_REUSE_SPARK_CONTEXT, "true");
+  }
+
+  @After
+  public void teardown() {
+    System.setProperty(SparkContextFactory.TEST_REUSE_SPARK_CONTEXT, "false");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
new file mode 100644
index 0000000..f930855
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package org.apache.beam.runners.spark;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.transforms.*;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.collect.ImmutableSet;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+public class SimpleWordCountTest {
+  private static final String[] WORDS_ARRAY = {
+      "hi there", "hi", "hi sue bob",
+      "hi sue", "", "bob hi"};
+  private static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
+  private static final Set<String> EXPECTED_COUNT_SET =
+      ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
+
+  @Test
+  public void testRun() throws Exception {
+    SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
+    options.setRunner(SparkPipelineRunner.class);
+    Pipeline p = Pipeline.create(options);
+    PCollection<String> inputWords = 
p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder
+        .of());
+    PCollection<String> output = inputWords.apply(new CountWords());
+
+    DataflowAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
+
+    EvaluationResult res = SparkPipelineRunner.create().run(p);
+    res.close();
+  }
+
+  /**
+   * A DoFn that tokenizes lines of text into individual words.
+   */
+  static class ExtractWordsFn extends DoFn<String, String> {
+    private static final Pattern WORD_BOUNDARY = 
Pattern.compile("[^a-zA-Z']+");
+    private final Aggregator<Long, Long> emptyLines =
+        createAggregator("emptyLines", new Sum.SumLongFn());
+
+    @Override
+    public void processElement(ProcessContext c) {
+      // Split the line into words.
+      String[] words = WORD_BOUNDARY.split(c.element());
+
+      // Keep track of the number of lines without any words encountered while 
tokenizing.
+      // This aggregator is visible in the monitoring UI when run using 
DataflowPipelineRunner.
+      if (words.length == 0) {
+        emptyLines.addValue(1L);
+      }
+
+      // Output each word encountered into the output PCollection.
+      for (String word : words) {
+        if (!word.isEmpty()) {
+          c.output(word);
+        }
+      }
+    }
+  }
+
+  /**
+   * A DoFn that converts a Word and Count into a printable string.
+   */
+  private static class FormatCountsFn extends DoFn<KV<String, Long>, String> {
+    @Override
+    public void processElement(ProcessContext c) {
+      c.output(c.element().getKey() + ": " + c.element().getValue());
+    }
+  }
+
+  public static class CountWords extends PTransform<PCollection<String>, 
PCollection<String>> {
+    @Override
+    public PCollection<String> apply(PCollection<String> lines) {
+
+      // Convert lines of text into individual words.
+      PCollection<String> words = lines.apply(
+          ParDo.of(new ExtractWordsFn()));
+
+      // Count the number of times each word occurs.
+      PCollection<KV<String, Long>> wordCounts =
+          words.apply(Count.<String>perElement());
+
+      // Format each word and count into a printable string.
+
+      return wordCounts.apply(ParDo.of(new FormatCountsFn()));
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/org/apache/beam/runners/spark/TestSparkPipelineOptionsFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/TestSparkPipelineOptionsFactory.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/TestSparkPipelineOptionsFactory.java
new file mode 100644
index 0000000..3fc3ecc
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/TestSparkPipelineOptionsFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package org.apache.beam.runners.spark;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestSparkPipelineOptionsFactory {
+  @Test
+  public void testDefaultCreateMethod() {
+    SparkPipelineOptions actualOptions = SparkPipelineOptionsFactory.create();
+    Assert.assertEquals("local[1]", actualOptions.getSparkMaster());
+  }
+
+  @Test
+  public void testSettingCustomOptions() {
+    SparkPipelineOptions actualOptions = SparkPipelineOptionsFactory.create();
+    actualOptions.setSparkMaster("spark://207.184.161.138:7077");
+    Assert.assertEquals("spark://207.184.161.138:7077", 
actualOptions.getSparkMaster());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
new file mode 100644
index 0000000..0a36c9e
--- /dev/null
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package org.apache.beam.runners.spark;
+
+import com.google.cloud.dataflow.examples.complete.TfIdf;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.StringDelegateCoder;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.Keys;
+import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import org.junit.Test;
+
+import java.net.URI;
+import java.util.Arrays;
+
+/**
+ * A test based on {@code TfIdf} from the SDK.
+ */
+public class TfIdfTest {
+
+  @Test
+  public void testTfIdf() throws Exception {
+    Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());
+
+    pipeline.getCoderRegistry().registerCoder(URI.class, 
StringDelegateCoder.of(URI.class));
+
+    PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = pipeline
+        .apply(Create.of(
+            KV.of(new URI("x"), "a b c d"),
+            KV.of(new URI("y"), "a b c"),
+            KV.of(new URI("z"), "a m n")))
+        .apply(new TfIdf.ComputeTfIdf());
+
+    PCollection<String> words = wordToUriAndTfIdf
+        .apply(Keys.<String>create())
+        .apply(RemoveDuplicates.<String>create());
+
+    DataflowAssert.that(words).containsInAnyOrder(Arrays.asList("a", "m", "n", 
"b", "c", "d"));
+
+    EvaluationResult res = SparkPipelineRunner.create().run(pipeline);
+    res.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/org/apache/beam/runners/spark/TransformTranslatorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/TransformTranslatorTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/TransformTranslatorTest.java
new file mode 100644
index 0000000..f759fe9
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/TransformTranslatorTest.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package org.apache.beam.runners.spark;
+
+import com.google.api.client.repackaged.com.google.common.base.Joiner;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
+import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.base.Charsets;
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A test for the transforms registered in TransformTranslator.
+ * Builds a regular Dataflow pipeline with each of the mapped
+ * transforms, and makes sure that they work when the pipeline is
+ * executed in Spark.
+ */
+public class TransformTranslatorTest {
+
+  @Rule
+  public TestName name = new TestName();
+
+  private DirectPipelineRunner directRunner;
+  private SparkPipelineRunner sparkRunner;
+  private String testDataDirName;
+
+  @Before public void init() throws IOException {
+    sparkRunner = SparkPipelineRunner.create();
+    directRunner = DirectPipelineRunner.createForTest();
+    testDataDirName = Joiner.on(File.separator).join("target", "test-data", 
name.getMethodName())
+        + File.separator;
+    FileUtils.deleteDirectory(new File(testDataDirName));
+    new File(testDataDirName).mkdirs();
+  }
+
+  /**
+   * Builds a simple pipeline with TextIO.Read and TextIO.Write, runs the 
pipeline
+   * in DirectPipelineRunner and on SparkPipelineRunner, with the mapped 
dataflow-to-spark
+   * transforms. Finally it makes sure that the results are the same for both 
runs.
+   */
+  @Test
+  public void testTextIOReadAndWriteTransforms() throws IOException {
+    String directOut = runPipeline("direct", directRunner);
+    String sparkOut = runPipeline("spark", sparkRunner);
+
+    List<String> directOutput =
+        Files.readAllLines(Paths.get(directOut + "-00000-of-00001"), 
Charsets.UTF_8);
+
+    List<String> sparkOutput =
+        Files.readAllLines(Paths.get(sparkOut + "-00000-of-00001"), 
Charsets.UTF_8);
+
+    // sort output to get a stable result (PCollections are not ordered)
+    Collections.sort(directOutput);
+    Collections.sort(sparkOutput);
+
+    Assert.assertArrayEquals(directOutput.toArray(), sparkOutput.toArray());
+  }
+
+  private String runPipeline(String name, PipelineRunner<?> runner) {
+    Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
+    String outFile = Joiner.on(File.separator).join(testDataDirName, 
"test_text_out_" + name);
+    PCollection<String> lines =  
p.apply(TextIO.Read.from("src/test/resources/test_text.txt"));
+    lines.apply(TextIO.Write.to(outFile));
+    runner.run(p);
+    return outFile;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/org/apache/beam/runners/spark/WindowedWordCountTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/WindowedWordCountTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/WindowedWordCountTest.java
new file mode 100644
index 0000000..eb88542
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/WindowedWordCountTest.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package org.apache.beam.runners.spark;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.collect.ImmutableList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.joda.time.Duration;
+import org.junit.Test;
+
+public class WindowedWordCountTest {
+  private static final String[] WORDS_ARRAY = {
+          "hi there", "hi", "hi sue bob",
+          "hi sue", "", "bob hi"};
+  private static final Long[] TIMESTAMPS_ARRAY = {
+          60000L, 60000L, 60000L,
+          120000L, 120000L, 120000L};
+  private static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
+  private static final List<Long> TIMESTAMPS = Arrays.asList(TIMESTAMPS_ARRAY);
+  private static final List<String> EXPECTED_COUNT_SET =
+          ImmutableList.of("hi: 3", "there: 1", "sue: 1", "bob: 1",
+                  "hi: 2", "sue: 1", "bob: 1");
+
+  @Test
+  public void testRun() throws Exception {
+    SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
+    options.setRunner(SparkPipelineRunner.class);
+    Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
+    PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, 
TIMESTAMPS))
+            .setCoder(StringUtf8Coder.of());
+    PCollection<String> windowedWords = inputWords
+            
.apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))));
+
+    PCollection<String> output = windowedWords.apply(new 
SimpleWordCountTest.CountWords());
+
+    DataflowAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
+
+    EvaluationResult res = SparkPipelineRunner.create().run(p);
+    res.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java
new file mode 100644
index 0000000..ad7256c
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package org.apache.beam.runners.spark.coders;
+
+import com.google.cloud.dataflow.sdk.testing.CoderProperties;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.junit.Test;
+
+/**
+ * Tests for WritableCoder.
+ */
+public class WritableCoderTest {
+
+  @Test
+  public void testIntWritableEncoding() throws Exception {
+    IntWritable value = new IntWritable(42);
+    WritableCoder<IntWritable> coder = WritableCoder.of(IntWritable.class);
+
+    CoderProperties.coderDecodeEncodeEqual(coder, value);
+  }
+
+  @Test
+  public void testNullWritableEncoding() throws Exception {
+    WritableCoder<NullWritable> coder = WritableCoder.of(NullWritable.class);
+
+    CoderProperties.coderDecodeEncodeEqual(coder, NullWritable.get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
new file mode 100644
index 0000000..73dd2d3
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package org.apache.beam.runners.spark.io;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.AvroIO;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.collect.Lists;
+import com.google.common.io.Resources;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.runners.spark.EvaluationResult;
+import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.junit.Assert.assertEquals;
+
+public class AvroPipelineTest {
+
+  private File inputFile;
+  private File outputDir;
+
+  @Rule
+  public final TemporaryFolder tmpDir = new TemporaryFolder();
+
+  @Before
+  public void setUp() throws IOException {
+    inputFile = tmpDir.newFile("test.avro");
+    outputDir = tmpDir.newFolder("out");
+    outputDir.delete();
+  }
+
+  @Test
+  public void testGeneric() throws Exception {
+    Schema schema = new 
Schema.Parser().parse(Resources.getResource("person.avsc").openStream());
+    GenericRecord savedRecord = new GenericData.Record(schema);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericFile(Lists.newArrayList(savedRecord), schema);
+
+    Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
+    PCollection<GenericRecord> input = p.apply(
+        AvroIO.Read.from(inputFile.getAbsolutePath()).withSchema(schema));
+    
input.apply(AvroIO.Write.to(outputDir.getAbsolutePath()).withSchema(schema));
+    EvaluationResult res = SparkPipelineRunner.create().run(p);
+    res.close();
+
+    List<GenericRecord> records = readGenericFile();
+    assertEquals(Lists.newArrayList(savedRecord), records);
+  }
+
+  private void populateGenericFile(List<GenericRecord> genericRecords, Schema 
schema) throws IOException {
+    FileOutputStream outputStream = new FileOutputStream(this.inputFile);
+    GenericDatumWriter<GenericRecord> genericDatumWriter = new 
GenericDatumWriter<>(schema);
+
+    try (DataFileWriter<GenericRecord> dataFileWriter = new 
DataFileWriter<>(genericDatumWriter)) {
+      dataFileWriter.create(schema, outputStream);
+      for (GenericRecord record : genericRecords) {
+        dataFileWriter.append(record);
+      }
+    }
+    outputStream.close();
+  }
+
+  private List<GenericRecord> readGenericFile() throws IOException {
+    List<GenericRecord> records = Lists.newArrayList();
+    GenericDatumReader<GenericRecord> genericDatumReader = new 
GenericDatumReader<>();
+    try (DataFileReader<GenericRecord> dataFileReader =
+             new DataFileReader<>(new File(outputDir + "-00000-of-00001"), 
genericDatumReader)) {
+      for (GenericRecord record : dataFileReader) {
+        records.add(record);
+      }
+    }
+    return records;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
new file mode 100644
index 0000000..39525b2
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package org.apache.beam.runners.spark.io;
+
+import com.google.cloud.dataflow.examples.WordCount;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.MapElements;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+import org.apache.beam.runners.spark.EvaluationResult;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.SparkPipelineOptionsFactory;
+import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class NumShardsTest {
+
+  private static final String[] WORDS_ARRAY = {
+      "hi there", "hi", "hi sue bob",
+      "hi sue", "", "bob hi"};
+  private static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
+
+  private File outputDir;
+
+  @Rule
+  public final TemporaryFolder tmpDir = new TemporaryFolder();
+
+  @Before
+  public void setUp() throws IOException {
+    outputDir = tmpDir.newFolder("out");
+    outputDir.delete();
+  }
+
+  @Test
+  public void testText() throws Exception {
+    SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
+    options.setRunner(SparkPipelineRunner.class);
+    Pipeline p = Pipeline.create(options);
+    PCollection<String> inputWords = 
p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
+    PCollection<String> output = inputWords.apply(new WordCount.CountWords())
+        .apply(MapElements.via(new WordCount.FormatAsTextFn()));
+    
output.apply(TextIO.Write.to(outputDir.getAbsolutePath()).withNumShards(3).withSuffix(".txt"));
+    EvaluationResult res = SparkPipelineRunner.create().run(p);
+    res.close();
+
+    int count = 0;
+    Set<String> expected = Sets.newHashSet("hi: 5", "there: 1", "sue: 2", 
"bob: 2");
+    for (File f : tmpDir.getRoot().listFiles(new FileFilter() {
+      @Override public boolean accept(File pathname) {
+        return pathname.getName().matches("out-.*\\.txt");
+      }
+    })) {
+      count++;
+      for (String line : Files.readLines(f, Charsets.UTF_8)) {
+        assertTrue(line + " not found", expected.remove(line));
+      }
+    }
+    assertEquals(3, count);
+    assertTrue(expected.isEmpty());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
new file mode 100644
index 0000000..7a9be8b
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package org.apache.beam.runners.spark.io.hadoop;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import org.apache.beam.runners.spark.EvaluationResult;
+import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+public class HadoopFileFormatPipelineTest {
+
+  private File inputFile;
+  private File outputFile;
+
+  @Rule
+  public final TemporaryFolder tmpDir = new TemporaryFolder();
+
+  @Before
+  public void setUp() throws IOException {
+    inputFile = tmpDir.newFile("test.seq");
+    outputFile = tmpDir.newFolder("out");
+    outputFile.delete();
+  }
+
+  @Test
+  public void testSequenceFile() throws Exception {
+    populateFile();
+
+    Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
+    @SuppressWarnings("unchecked")
+    Class<? extends FileInputFormat<IntWritable, Text>> inputFormatClass =
+        (Class<? extends FileInputFormat<IntWritable, Text>>) (Class<?>) 
SequenceFileInputFormat.class;
+    HadoopIO.Read.Bound<IntWritable,Text> read =
+        HadoopIO.Read.from(inputFile.getAbsolutePath(), inputFormatClass, 
IntWritable.class, Text.class);
+    PCollection<KV<IntWritable, Text>> input = p.apply(read);
+    @SuppressWarnings("unchecked")
+    Class<? extends FileOutputFormat<IntWritable, Text>> outputFormatClass =
+        (Class<? extends FileOutputFormat<IntWritable, Text>>) (Class<?>) 
TemplatedSequenceFileOutputFormat.class;
+    @SuppressWarnings("unchecked")
+    HadoopIO.Write.Bound<IntWritable,Text> write = 
HadoopIO.Write.to(outputFile.getAbsolutePath(),
+        outputFormatClass, IntWritable.class, Text.class);
+    input.apply(write.withoutSharding());
+    EvaluationResult res = SparkPipelineRunner.create().run(p);
+    res.close();
+
+    IntWritable key = new IntWritable();
+    Text value = new Text();
+    try (Reader reader = new Reader(new Configuration(), Reader.file(new 
Path(outputFile.toURI())))) {
+      int i = 0;
+      while (reader.next(key, value)) {
+        assertEquals(i, key.get());
+        assertEquals("value-" + i, value.toString());
+        i++;
+      }
+    }
+  }
+
+  private void populateFile() throws IOException {
+    IntWritable key = new IntWritable();
+    Text value = new Text();
+    try (Writer writer = SequenceFile.createWriter(
+        new Configuration(),
+        Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class),
+        Writer.file(new Path(this.inputFile.toURI())))) {
+      for (int i = 0; i < 5; i++) {
+        key.set(i);
+        value.set("value-" + i);
+        writer.append(key, value);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java
new file mode 100644
index 0000000..b1d35d5
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package org.apache.beam.runners.spark.io.hadoop;
+
+import org.junit.Test;
+
+import static 
org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputDirectory;
+import static 
org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFilePrefix;
+import static 
org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFileTemplate;
+import static 
org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardCount;
+import static 
org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardNumber;
+import static org.junit.Assert.assertEquals;
+
+public class ShardNameBuilderTest {
+
+  @Test
+  public void testReplaceShardCount() {
+    assertEquals("", replaceShardCount("", 6));
+    assertEquals("-S-of-6", replaceShardCount("-S-of-N", 6));
+    assertEquals("-SS-of-06", replaceShardCount("-SS-of-NN", 6));
+    assertEquals("-S-of-60", replaceShardCount("-S-of-N", 60));
+    assertEquals("-SS-of-60", replaceShardCount("-SS-of-NN", 60));
+    assertEquals("/part-SSSSS", replaceShardCount("/part-SSSSS", 6));
+  }
+
+  @Test
+  public void testReplaceShardNumber() {
+    assertEquals("", replaceShardNumber("", 5));
+    assertEquals("-5-of-6", replaceShardNumber("-S-of-6", 5));
+    assertEquals("-05-of-06", replaceShardNumber("-SS-of-06", 5));
+    assertEquals("-59-of-60", replaceShardNumber("-S-of-60", 59));
+    assertEquals("-59-of-60", replaceShardNumber("-SS-of-60", 59));
+    assertEquals("/part-00005", replaceShardNumber("/part-SSSSS", 5));
+  }
+
+  @Test
+     public void testGetOutputDirectory() {
+    assertEquals("./", getOutputDirectory("foo", "-S-of-N"));
+    assertEquals("foo", getOutputDirectory("foo/bar", "-S-of-N"));
+    assertEquals("/foo", getOutputDirectory("/foo/bar", "-S-of-N"));
+    assertEquals("hdfs://foo/", getOutputDirectory("hdfs://foo/bar", 
"-S-of-N"));
+    assertEquals("foo/bar", getOutputDirectory("foo/bar", "/part-SSSSS"));
+    assertEquals("/foo/bar", getOutputDirectory("/foo/bar", "/part-SSSSS"));
+    assertEquals("hdfs://foo/bar", getOutputDirectory("hdfs://foo/bar", 
"/part-SSSSS"));
+  }
+
+  @Test
+  public void testGetOutputFilePrefix() {
+    assertEquals("foo", getOutputFilePrefix("foo", "-S-of-N"));
+    assertEquals("bar", getOutputFilePrefix("foo/bar", "-S-of-N"));
+    assertEquals("bar", getOutputFilePrefix("/foo/bar", "-S-of-N"));
+    assertEquals("bar", getOutputFilePrefix("hdfs://foo/bar", "-S-of-N"));
+    assertEquals("", getOutputFilePrefix("foo/bar", "/part-SSSSS"));
+    assertEquals("", getOutputFilePrefix("/foo/bar", "/part-SSSSS"));
+    assertEquals("", getOutputFilePrefix("hdfs://foo/bar", "/part-SSSSS"));
+  }
+
+  @Test
+  public void testGetOutputFileTemplate() {
+    assertEquals("-S-of-N", getOutputFileTemplate("foo", "-S-of-N"));
+    assertEquals("-S-of-N", getOutputFileTemplate("foo/bar", "-S-of-N"));
+    assertEquals("-S-of-N", getOutputFileTemplate("/foo/bar", "-S-of-N"));
+    assertEquals("-S-of-N", getOutputFileTemplate("hdfs://foo/bar", 
"-S-of-N"));
+    assertEquals("part-SSSSS", getOutputFileTemplate("foo/bar", 
"/part-SSSSS"));
+    assertEquals("part-SSSSS", getOutputFileTemplate("/foo/bar", 
"/part-SSSSS"));
+    assertEquals("part-SSSSS", getOutputFileTemplate("hdfs://foo/bar", 
"/part-SSSSS"));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/FlattenStreamingTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/FlattenStreamingTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/FlattenStreamingTest.java
new file mode 100644
index 0000000..828b26e
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/FlattenStreamingTest.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package org.apache.beam.runners.spark.streaming;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.transforms.Flatten;
+import com.google.cloud.dataflow.sdk.transforms.View;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollectionList;
+
+import org.apache.beam.runners.spark.io.CreateStream;
+import org.apache.beam.runners.spark.EvaluationResult;
+import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.streaming.utils.DataflowAssertStreaming;
+
+import org.joda.time.Duration;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Test Flatten (union) implementation for streaming.
+ */
+public class FlattenStreamingTest {
+
+  private static final String[] WORDS_ARRAY_1 = {
+      "one", "two", "three", "four"};
+  private static final List<Iterable<String>> WORDS_QUEUE_1 =
+      
Collections.<Iterable<String>>singletonList(Arrays.asList(WORDS_ARRAY_1));
+  private static final String[] WORDS_ARRAY_2 = {
+          "five", "six", "seven", "eight"};
+  private static final List<Iterable<String>> WORDS_QUEUE_2 =
+          
Collections.<Iterable<String>>singletonList(Arrays.asList(WORDS_ARRAY_2));
+  private static final String[] EXPECTED_UNION = {
+          "one", "two", "three", "four", "five", "six", "seven", "eight"};
+  private static final long TEST_TIMEOUT_MSEC = 1000L;
+
+  @Test
+  public void testRun() throws Exception {
+    SparkStreamingPipelineOptions options = 
SparkStreamingPipelineOptionsFactory.create();
+    options.setAppName(this.getClass().getSimpleName());
+    options.setRunner(SparkPipelineRunner.class);
+    options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval
+    Pipeline p = Pipeline.create(options);
+
+    PCollection<String> w1 =
+            
p.apply(CreateStream.fromQueue(WORDS_QUEUE_1)).setCoder(StringUtf8Coder.of());
+    PCollection<String> windowedW1 =
+            
w1.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))));
+    PCollection<String> w2 =
+            
p.apply(CreateStream.fromQueue(WORDS_QUEUE_2)).setCoder(StringUtf8Coder.of());
+    PCollection<String> windowedW2 =
+            
w2.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))));
+    PCollectionList<String> list = 
PCollectionList.of(windowedW1).and(windowedW2);
+    PCollection<String> union = list.apply(Flatten.<String>pCollections());
+
+    DataflowAssert.thatIterable(union.apply(View.<String>asIterable()))
+            .containsInAnyOrder(EXPECTED_UNION);
+
+    EvaluationResult res = SparkPipelineRunner.create(options).run(p);
+    res.close();
+
+    DataflowAssertStreaming.assertNoFailures(res);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/KafkaStreamingTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/KafkaStreamingTest.java
new file mode 100644
index 0000000..e9e685b
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/KafkaStreamingTest.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package org.apache.beam.runners.spark.streaming;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.View;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.beam.runners.spark.io.KafkaIO;
+import org.apache.beam.runners.spark.EvaluationResult;
+import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.streaming.utils.DataflowAssertStreaming;
+import org.apache.beam.runners.spark.streaming.utils.EmbeddedKafkaCluster;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.joda.time.Duration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import kafka.serializer.StringDecoder;
+
+/**
+ * Test Kafka as input.
+ */
+public class KafkaStreamingTest {
+  private static final EmbeddedKafkaCluster.EmbeddedZookeeper 
EMBEDDED_ZOOKEEPER =
+          new EmbeddedKafkaCluster.EmbeddedZookeeper(17001);
+  private static final EmbeddedKafkaCluster EMBEDDED_KAFKA_CLUSTER =
+          new EmbeddedKafkaCluster(EMBEDDED_ZOOKEEPER.getConnection(),
+                  new Properties(), Collections.singletonList(6667));
+  private static final String TOPIC = "kafka_dataflow_test_topic";
+  private static final Map<String, String> KAFKA_MESSAGES = ImmutableMap.of(
+      "k1", "v1", "k2", "v2", "k3", "v3", "k4", "v4"
+  );
+  private static final Set<String> EXPECTED = ImmutableSet.of(
+      "k1,v1", "k2,v2", "k3,v3", "k4,v4"
+  );
+  private static final long TEST_TIMEOUT_MSEC = 1000L;
+
+  @BeforeClass
+  public static void init() throws IOException {
+    EMBEDDED_ZOOKEEPER.startup();
+    EMBEDDED_KAFKA_CLUSTER.startup();
+
+    // write to Kafka
+    Properties producerProps = new Properties();
+    producerProps.putAll(EMBEDDED_KAFKA_CLUSTER.getProps());
+    producerProps.put("request.required.acks", 1);
+    producerProps.put("bootstrap.servers", 
EMBEDDED_KAFKA_CLUSTER.getBrokerList());
+    Serializer<String> stringSerializer = new StringSerializer();
+    try (@SuppressWarnings("unchecked") KafkaProducer<String, String> 
kafkaProducer =
+            new KafkaProducer(producerProps, stringSerializer, 
stringSerializer)) {
+      for (Map.Entry<String, String> en : KAFKA_MESSAGES.entrySet()) {
+        kafkaProducer.send(new ProducerRecord<>(TOPIC, en.getKey(), 
en.getValue()));
+      }
+    }
+  }
+
+  @Test
+  public void testRun() throws Exception {
+    // test read from Kafka
+    SparkStreamingPipelineOptions options = 
SparkStreamingPipelineOptionsFactory.create();
+    options.setAppName(this.getClass().getSimpleName());
+    options.setRunner(SparkPipelineRunner.class);
+    options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval
+    Pipeline p = Pipeline.create(options);
+
+    Map<String, String> kafkaParams = ImmutableMap.of(
+            "metadata.broker.list", EMBEDDED_KAFKA_CLUSTER.getBrokerList(),
+            "auto.offset.reset", "smallest"
+    );
+
+    PCollection<KV<String, String>> kafkaInput = 
p.apply(KafkaIO.Read.from(StringDecoder.class,
+        StringDecoder.class, String.class, String.class, 
Collections.singleton(TOPIC),
+        kafkaParams));
+    PCollection<KV<String, String>> windowedWords = kafkaInput
+        .apply(Window.<KV<String, 
String>>into(FixedWindows.of(Duration.standardSeconds(1))));
+
+    PCollection<String> formattedKV = windowedWords.apply(ParDo.of(new 
FormatKVFn()));
+
+    DataflowAssert.thatIterable(formattedKV.apply(View.<String>asIterable()))
+        .containsInAnyOrder(EXPECTED);
+
+    EvaluationResult res = SparkPipelineRunner.create(options).run(p);
+    res.close();
+
+    DataflowAssertStreaming.assertNoFailures(res);
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    EMBEDDED_KAFKA_CLUSTER.shutdown();
+    EMBEDDED_ZOOKEEPER.shutdown();
+  }
+
+  private static class FormatKVFn extends DoFn<KV<String, String>, String> {
+    @Override
+    public void processElement(ProcessContext c) {
+      c.output(c.element().getKey() + "," + c.element().getValue());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/SimpleStreamingWordCountTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/SimpleStreamingWordCountTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/SimpleStreamingWordCountTest.java
new file mode 100644
index 0000000..9a0609d
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/SimpleStreamingWordCountTest.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package org.apache.beam.runners.spark.streaming;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.transforms.View;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.beam.runners.spark.io.CreateStream;
+import org.apache.beam.runners.spark.EvaluationResult;
+import org.apache.beam.runners.spark.SimpleWordCountTest;
+import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.streaming.utils.DataflowAssertStreaming;
+
+import org.joda.time.Duration;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class SimpleStreamingWordCountTest {
+
+  private static final String[] WORDS_ARRAY = {
+      "hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"};
+  private static final List<Iterable<String>> WORDS_QUEUE =
+      Collections.<Iterable<String>>singletonList(Arrays.asList(WORDS_ARRAY));
+  private static final Set<String> EXPECTED_COUNT_SET =
+      ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
+  private static final long TEST_TIMEOUT_MSEC = 1000L;
+
+  @Test
+  public void testRun() throws Exception {
+    SparkStreamingPipelineOptions options = 
SparkStreamingPipelineOptionsFactory.create();
+    options.setAppName(this.getClass().getSimpleName());
+    options.setRunner(SparkPipelineRunner.class);
+    options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval
+    Pipeline p = Pipeline.create(options);
+
+    PCollection<String> inputWords =
+        
p.apply(CreateStream.fromQueue(WORDS_QUEUE)).setCoder(StringUtf8Coder.of());
+    PCollection<String> windowedWords = inputWords
+        
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))));
+
+    PCollection<String> output = windowedWords.apply(new 
SimpleWordCountTest.CountWords());
+
+    DataflowAssert.thatIterable(output.apply(View.<String>asIterable()))
+        .containsInAnyOrder(EXPECTED_COUNT_SET);
+
+    EvaluationResult res = SparkPipelineRunner.create(options).run(p);
+    res.close();
+
+    DataflowAssertStreaming.assertNoFailures(res);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/utils/DataflowAssertStreaming.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/utils/DataflowAssertStreaming.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/utils/DataflowAssertStreaming.java
new file mode 100644
index 0000000..19759d7
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/utils/DataflowAssertStreaming.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package org.apache.beam.runners.spark.streaming.utils;
+
+import org.apache.beam.runners.spark.EvaluationResult;
+
+import org.junit.Assert;
+
+/**
+ * Since DataflowAssert doesn't propagate assert exceptions, use Aggregators 
to assert streaming
+ * success/failure counters.
+ */
+public final class DataflowAssertStreaming {
+  /**
+   * Copied aggregator names from {@link 
com.google.cloud.dataflow.sdk.testing.DataflowAssert}
+   */
+  static final String SUCCESS_COUNTER = "DataflowAssertSuccess";
+  static final String FAILURE_COUNTER = "DataflowAssertFailure";
+
+  private DataflowAssertStreaming() {
+  }
+
+  public static void assertNoFailures(EvaluationResult res) {
+    int failures = res.getAggregatorValue(FAILURE_COUNTER, Integer.class);
+    Assert.assertEquals("Found " + failures + " failures, see the log for 
details", 0, failures);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/utils/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/utils/EmbeddedKafkaCluster.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/utils/EmbeddedKafkaCluster.java
new file mode 100644
index 0000000..333453a
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/utils/EmbeddedKafkaCluster.java
@@ -0,0 +1,314 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package org.apache.beam.runners.spark.streaming.utils;
+
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * https://gist.github.com/fjavieralba/7930018
+ */
+public class EmbeddedKafkaCluster {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
+
+  private final List<Integer> ports;
+  private final String zkConnection;
+  private final Properties baseProperties;
+
+  private final String brokerList;
+
+  private final List<KafkaServer> brokers;
+  private final List<File> logDirs;
+
+  public EmbeddedKafkaCluster(String zkConnection) {
+    this(zkConnection, new Properties());
+  }
+
+  public EmbeddedKafkaCluster(String zkConnection, Properties baseProperties) {
+    this(zkConnection, baseProperties, Collections.singletonList(-1));
+  }
+
+  public EmbeddedKafkaCluster(String zkConnection, Properties baseProperties, 
List<Integer> ports) {
+    this.zkConnection = zkConnection;
+    this.ports = resolvePorts(ports);
+    this.baseProperties = baseProperties;
+
+    this.brokers = new ArrayList<>();
+    this.logDirs = new ArrayList<>();
+
+    this.brokerList = constructBrokerList(this.ports);
+  }
+
+  private static List<Integer> resolvePorts(List<Integer> ports) {
+    List<Integer> resolvedPorts = new ArrayList<>();
+    for (Integer port : ports) {
+      resolvedPorts.add(resolvePort(port));
+    }
+    return resolvedPorts;
+  }
+
+  private static int resolvePort(int port) {
+    if (port == -1) {
+      return TestUtils.getAvailablePort();
+    }
+    return port;
+  }
+
+  private static String constructBrokerList(List<Integer> ports) {
+    StringBuilder sb = new StringBuilder();
+    for (Integer port : ports) {
+      if (sb.length() > 0) {
+        sb.append(",");
+      }
+      sb.append("localhost:").append(port);
+    }
+    return sb.toString();
+  }
+
+  public void startup() {
+    for (int i = 0; i < ports.size(); i++) {
+      Integer port = ports.get(i);
+      File logDir = TestUtils.constructTempDir("kafka-local");
+
+      Properties properties = new Properties();
+      properties.putAll(baseProperties);
+      properties.setProperty("zookeeper.connect", zkConnection);
+      properties.setProperty("broker.id", String.valueOf(i + 1));
+      properties.setProperty("host.name", "localhost");
+      properties.setProperty("port", Integer.toString(port));
+      properties.setProperty("log.dir", logDir.getAbsolutePath());
+      properties.setProperty("log.flush.interval.messages", String.valueOf(1));
+
+      KafkaServer broker = startBroker(properties);
+
+      brokers.add(broker);
+      logDirs.add(logDir);
+    }
+  }
+
+
+  private static KafkaServer startBroker(Properties props) {
+    KafkaServer server = new KafkaServer(new KafkaConfig(props), new 
SystemTime());
+    server.startup();
+    return server;
+  }
+
+  public Properties getProps() {
+    Properties props = new Properties();
+    props.putAll(baseProperties);
+    props.put("metadata.broker.list", brokerList);
+    props.put("zookeeper.connect", zkConnection);
+    return props;
+  }
+
+  public String getBrokerList() {
+    return brokerList;
+  }
+
+  public List<Integer> getPorts() {
+    return ports;
+  }
+
+  public String getZkConnection() {
+    return zkConnection;
+  }
+
+  public void shutdown() {
+    for (KafkaServer broker : brokers) {
+      try {
+        broker.shutdown();
+      } catch (Exception e) {
+        LOG.warn("{}", e.getMessage(), e);
+      }
+    }
+    for (File logDir : logDirs) {
+      try {
+        TestUtils.deleteFile(logDir);
+      } catch (FileNotFoundException e) {
+        LOG.warn("{}", e.getMessage(), e);
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "EmbeddedKafkaCluster{" + "brokerList='" + brokerList + "'}";
+  }
+
+  public static class EmbeddedZookeeper {
+    private int port = -1;
+    private int tickTime = 500;
+
+    private ServerCnxnFactory factory;
+    private File snapshotDir;
+    private File logDir;
+
+    public EmbeddedZookeeper() {
+      this(-1);
+    }
+
+    public EmbeddedZookeeper(int port) {
+      this(port, 500);
+    }
+
+    public EmbeddedZookeeper(int port, int tickTime) {
+      this.port = resolvePort(port);
+      this.tickTime = tickTime;
+    }
+
+    private static int resolvePort(int port) {
+      if (port == -1) {
+        return TestUtils.getAvailablePort();
+      }
+      return port;
+    }
+
+    public void startup() throws IOException {
+      if (this.port == -1) {
+        this.port = TestUtils.getAvailablePort();
+      }
+      this.factory = NIOServerCnxnFactory.createFactory(new 
InetSocketAddress("localhost", port),
+              1024);
+      this.snapshotDir = TestUtils.constructTempDir("embedded-zk/snapshot");
+      this.logDir = TestUtils.constructTempDir("embedded-zk/log");
+
+      try {
+        factory.startup(new ZooKeeperServer(snapshotDir, logDir, tickTime));
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+    }
+
+
+    public void shutdown() {
+      factory.shutdown();
+      try {
+        TestUtils.deleteFile(snapshotDir);
+      } catch (FileNotFoundException e) {
+        // ignore
+      }
+      try {
+        TestUtils.deleteFile(logDir);
+      } catch (FileNotFoundException e) {
+        // ignore
+      }
+    }
+
+    public String getConnection() {
+      return "localhost:" + port;
+    }
+
+    public void setPort(int port) {
+      this.port = port;
+    }
+
+    public void setTickTime(int tickTime) {
+      this.tickTime = tickTime;
+    }
+
+    public int getPort() {
+      return port;
+    }
+
+    public int getTickTime() {
+      return tickTime;
+    }
+
+    @Override
+    public String toString() {
+      return "EmbeddedZookeeper{" + "connection=" + getConnection() + "}";
+    }
+  }
+
+  static class SystemTime implements Time {
+    @Override
+    public long milliseconds() {
+      return System.currentTimeMillis();
+    }
+
+    @Override
+    public long nanoseconds() {
+      return System.nanoTime();
+    }
+
+    @Override
+    public void sleep(long ms) {
+      try {
+        Thread.sleep(ms);
+      } catch (InterruptedException e) {
+        // Ignore
+      }
+    }
+  }
+
+  static final class TestUtils {
+    private static final Random RANDOM = new Random();
+
+    private TestUtils() {
+    }
+
+    static File constructTempDir(String dirPrefix) {
+      File file = new File(System.getProperty("java.io.tmpdir"), dirPrefix + 
RANDOM.nextInt
+              (10000000));
+      if (!file.mkdirs()) {
+        throw new RuntimeException("could not create temp directory: " + 
file.getAbsolutePath());
+      }
+      file.deleteOnExit();
+      return file;
+    }
+
+    static int getAvailablePort() {
+      try {
+        try (ServerSocket socket = new ServerSocket(0)) {
+          return socket.getLocalPort();
+        }
+      } catch (IOException e) {
+        throw new IllegalStateException("Cannot find available port: " + 
e.getMessage(), e);
+      }
+    }
+
+    static boolean deleteFile(File path) throws FileNotFoundException {
+      if (!path.exists()) {
+        throw new FileNotFoundException(path.getAbsolutePath());
+      }
+      boolean ret = true;
+      if (path.isDirectory()) {
+        for (File f : path.listFiles()) {
+          ret = ret && deleteFile(f);
+        }
+      }
+      return ret && path.delete();
+    }
+  }
+}

Reply via email to