Add unit test for TextIO output to support the mvn exec:exec example we provide in README
Satisfy checkstyle Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/932e5b49 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/932e5b49 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/932e5b49 Branch: refs/heads/master Commit: 932e5b4976e2c5bf388768f7d3cdeb4da4e84e71 Parents: 1ca3b30 Author: Sela <[email protected]> Authored: Thu Apr 14 23:18:24 2016 +0300 Committer: Sela <[email protected]> Committed: Tue Apr 19 22:23:17 2016 +0300 ---------------------------------------------------------------------- .../spark/translation/EvaluationContext.java | 3 +- .../beam/runners/spark/SimpleWordCountTest.java | 35 ++++++++++++++++++-- 2 files changed, 35 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/932e5b49/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java index 531a6ce..6d49bd3 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java @@ -262,7 +262,8 @@ public class EvaluationContext implements EvaluationResult { @Override public <T> Iterable<T> get(PCollection<T> pcollection) { - @SuppressWarnings("unchecked") RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection); + @SuppressWarnings("unchecked") + RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection); Iterable<WindowedValue<T>> windowedValues = rddHolder.getValues(pcollection); return Iterables.transform(windowedValues, WindowingHelpers.<T>unwindowValueFunction()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/932e5b49/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java index faa4dbf..4e9c0b8 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java @@ -21,6 +21,7 @@ package org.apache.beam.runners.spark; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Count; @@ -31,11 +32,18 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.commons.io.FileUtils; +import org.junit.rules.TemporaryFolder; +import org.junit.Rule; +import org.junit.Test; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; -import org.junit.Test; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; +import java.io.File; import java.util.Arrays; import java.util.List; import java.util.Set; @@ -50,7 +58,7 @@ public class SimpleWordCountTest { ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2"); @Test - public void testRun() throws Exception { + public void testInMem() throws Exception { SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); options.setRunner(SparkPipelineRunner.class); Pipeline p = Pipeline.create(options); @@ -64,6 +72,29 @@ public class SimpleWordCountTest { res.close(); } + @Rule + public TemporaryFolder testFolder = new TemporaryFolder(); + + @Test + public void testOutputFile() throws Exception { + SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); + options.setRunner(SparkPipelineRunner.class); + Pipeline p = Pipeline.create(options); + PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder + .of()); + PCollection<String> output = inputWords.apply(new CountWords()); + + File outputFile = testFolder.newFile(); + output.apply( + TextIO.Write.named("WriteCounts").to(outputFile.getAbsolutePath()).withoutSharding()); + + EvaluationResult res = SparkPipelineRunner.create().run(p); + res.close(); + + assertThat(Sets.newHashSet(FileUtils.readLines(outputFile)), + containsInAnyOrder(EXPECTED_COUNT_SET.toArray())); + } + /** * A DoFn that tokenizes lines of text into individual words. */
