Repository: incubator-beam
Updated Branches:
  refs/heads/master 8597a3cf4 -> 3c6e147d9


Spark tests: force spark runner

They were somehow using the DirectRunner before.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/937f58e6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/937f58e6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/937f58e6

Branch: refs/heads/master
Commit: 937f58e67ed5010c86005a4e7e5613ea682a3a57
Parents: 5101158
Author: Dan Halperin <[email protected]>
Authored: Mon Jul 25 21:45:06 2016 -0700
Committer: Dan Halperin <[email protected]>
Committed: Tue Jul 26 10:52:28 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/runners/spark/io/AvroPipelineTest.java | 5 ++++-
 .../runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java   | 5 ++++-
 .../beam/runners/spark/translation/CombinePerKeyTest.java       | 5 ++++-
 .../runners/spark/translation/MultiOutputWordCountTest.java     | 5 ++++-
 4 files changed, 16 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/937f58e6/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index 4cce03d..787292e 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -24,6 +24,7 @@ import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.AvroIO;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -74,7 +75,9 @@ public class AvroPipelineTest {
     savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
     populateGenericFile(Lists.newArrayList(savedRecord), schema);
 
-    Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
+    PipelineOptions options = PipelineOptionsFactory.create();
+    options.setRunner(SparkRunner.class);
+    Pipeline p = Pipeline.create(options);
     PCollection<GenericRecord> input = p.apply(
         AvroIO.Read.from(inputFile.getAbsolutePath()).withSchema(schema));
     
input.apply(AvroIO.Write.to(outputDir.getAbsolutePath()).withSchema(schema));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/937f58e6/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
index 4d1658f..6d09503 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
@@ -25,6 +25,7 @@ import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.runners.spark.coders.WritableCoder;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -69,7 +70,9 @@ public class HadoopFileFormatPipelineTest {
   public void testSequenceFile() throws Exception {
     populateFile();
 
-    Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
+    PipelineOptions options = PipelineOptionsFactory.create();
+    options.setRunner(SparkRunner.class);
+    Pipeline p = Pipeline.create(options);
     @SuppressWarnings("unchecked")
     Class<? extends FileInputFormat<IntWritable, Text>> inputFormatClass =
         (Class<? extends FileInputFormat<IntWritable, Text>>)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/937f58e6/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
index 65c6870..600217d 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
@@ -24,6 +24,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -51,7 +52,9 @@ public class CombinePerKeyTest {
         ImmutableList.of("the", "quick", "brown", "fox", "jumped", "over", 
"the", "lazy", "dog");
     @Test
     public void testRun() {
-        Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
+        PipelineOptions options = PipelineOptionsFactory.create();
+        options.setRunner(SparkRunner.class);
+        Pipeline p = Pipeline.create(options);
         PCollection<String> inputWords = 
p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
         PCollection<KV<String, Long>> cnts = inputWords.apply(new 
SumPerKey<String>());
         EvaluationResult res = SparkRunner.create().run(p);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/937f58e6/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
index 787691d..ded3eb2 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
@@ -22,6 +22,7 @@ import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.AggregatorValues;
 import org.apache.beam.sdk.testing.PAssert;
@@ -68,7 +69,9 @@ public class MultiOutputWordCountTest {
 
   @Test
   public void testRun() throws Exception {
-    Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
+    PipelineOptions options = PipelineOptionsFactory.create();
+    options.setRunner(SparkRunner.class);
+    Pipeline p = Pipeline.create(options);
     PCollection<String> regex = p.apply(Create.of("[^a-zA-Z']+"));
     PCollection<String> w1 = p.apply(Create.of("Here are some words to count", 
"and some others"));
     PCollection<String> w2 = p.apply(Create.of("Here are some more words", 
"and even more words"));

Reply via email to