Repository: incubator-beam Updated Branches: refs/heads/master 8597a3cf4 -> 3c6e147d9
Spark tests: force spark runner They were somehow using the DirectRunner before. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/937f58e6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/937f58e6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/937f58e6 Branch: refs/heads/master Commit: 937f58e67ed5010c86005a4e7e5613ea682a3a57 Parents: 5101158 Author: Dan Halperin <[email protected]> Authored: Mon Jul 25 21:45:06 2016 -0700 Committer: Dan Halperin <[email protected]> Committed: Tue Jul 26 10:52:28 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/runners/spark/io/AvroPipelineTest.java | 5 ++++- .../runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java | 5 ++++- .../beam/runners/spark/translation/CombinePerKeyTest.java | 5 ++++- .../runners/spark/translation/MultiOutputWordCountTest.java | 5 ++++- 4 files changed, 16 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/937f58e6/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java index 4cce03d..787292e 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java @@ -24,6 +24,7 @@ import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.AvroIO; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.values.PCollection; @@ -74,7 +75,9 @@ public class AvroPipelineTest { savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); populateGenericFile(Lists.newArrayList(savedRecord), schema); - Pipeline p = Pipeline.create(PipelineOptionsFactory.create()); + PipelineOptions options = PipelineOptionsFactory.create(); + options.setRunner(SparkRunner.class); + Pipeline p = Pipeline.create(options); PCollection<GenericRecord> input = p.apply( AvroIO.Read.from(inputFile.getAbsolutePath()).withSchema(schema)); input.apply(AvroIO.Write.to(outputDir.getAbsolutePath()).withSchema(schema)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/937f58e6/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java index 4d1658f..6d09503 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java @@ -25,6 +25,7 @@ import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.runners.spark.coders.WritableCoder; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -69,7 +70,9 @@ public class HadoopFileFormatPipelineTest { public void testSequenceFile() throws Exception { populateFile(); - Pipeline p = Pipeline.create(PipelineOptionsFactory.create()); + PipelineOptions options = PipelineOptionsFactory.create(); + options.setRunner(SparkRunner.class); + Pipeline p = Pipeline.create(options); @SuppressWarnings("unchecked") Class<? extends FileInputFormat<IntWritable, Text>> inputFormatClass = (Class<? extends FileInputFormat<IntWritable, Text>>) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/937f58e6/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java index 65c6870..600217d 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java @@ -24,6 +24,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; @@ -51,7 +52,9 @@ public class CombinePerKeyTest { ImmutableList.of("the", "quick", "brown", "fox", "jumped", "over", "the", "lazy", "dog"); @Test public void testRun() { - Pipeline p = Pipeline.create(PipelineOptionsFactory.create()); + PipelineOptions options = PipelineOptionsFactory.create(); + options.setRunner(SparkRunner.class); + Pipeline p = Pipeline.create(options); PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of())); PCollection<KV<String, Long>> cnts = inputWords.apply(new SumPerKey<String>()); EvaluationResult res = SparkRunner.create().run(p); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/937f58e6/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java index 787691d..ded3eb2 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java @@ -22,6 +22,7 @@ import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.AggregatorValues; import org.apache.beam.sdk.testing.PAssert; @@ -68,7 +69,9 @@ public class MultiOutputWordCountTest { @Test public void testRun() throws Exception { - Pipeline p = Pipeline.create(PipelineOptionsFactory.create()); + PipelineOptions options = PipelineOptionsFactory.create(); + options.setRunner(SparkRunner.class); + Pipeline p = Pipeline.create(options); PCollection<String> regex = p.apply(Create.of("[^a-zA-Z']+")); PCollection<String> w1 = p.apply(Create.of("Here are some words to count", "and some others")); PCollection<String> w2 = p.apply(Create.of("Here are some more words", "and even more words"));
