Repository: beam
Updated Branches:
  refs/heads/master 9dad73c29 -> ed7b82e7e


[BEAM-1405] Refactor to remove repeated code from test


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/92707b9a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/92707b9a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/92707b9a

Branch: refs/heads/master
Commit: 92707b9a07b7bc367b375fb25293554f1de25d87
Parents: 9dad73c
Author: Ismaël Mejía <[email protected]>
Authored: Tue Feb 7 14:41:57 2017 +0100
Committer: Ismaël Mejía <[email protected]>
Committed: Tue Feb 7 14:46:40 2017 +0100

----------------------------------------------------------------------
 .../runners/spark/ProvidedSparkContextTest.java | 70 ++++++++------------
 1 file changed, 29 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/92707b9a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
index 2982844..00c894d 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.beam.runners.spark;
 
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 
 import com.google.common.collect.ImmutableSet;
@@ -48,15 +49,6 @@ public class ProvidedSparkContextTest {
     private static final String PROVIDED_CONTEXT_EXCEPTION =
             "The provided Spark context was not created or was stopped";
 
-    private SparkContextOptions getSparkContextOptions(JavaSparkContext jsc) {
-        final SparkContextOptions options = 
PipelineOptionsFactory.as(SparkContextOptions.class);
-        options.setRunner(SparkRunner.class);
-        options.setUsesProvidedSparkContext(true);
-        options.setProvidedSparkContext(jsc);
-        options.setEnableSparkMetricSinks(false);
-        return options;
-    }
-
     /**
      * Provide a context and call pipeline run.
      * @throws Exception
@@ -64,20 +56,7 @@ public class ProvidedSparkContextTest {
     @Test
     public void testWithProvidedContext() throws Exception {
         JavaSparkContext jsc = new JavaSparkContext("local[*]", 
"Existing_Context");
-
-        SparkContextOptions options = getSparkContextOptions(jsc);
-
-        Pipeline p = Pipeline.create(options);
-        PCollection<String> inputWords = 
p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
-                .of()));
-        PCollection<String> output = inputWords.apply(new 
WordCount.CountWords())
-                .apply(MapElements.via(new WordCount.FormatAsTextFn()));
-
-        PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
-
-        // Run test from pipeline
-        p.run().waitUntilFinish();
-
+        testWithValidProvidedContext(jsc);
         jsc.stop();
     }
 
@@ -87,8 +66,22 @@ public class ProvidedSparkContextTest {
      */
     @Test
     public void testWithNullContext() throws Exception {
-        JavaSparkContext jsc = null;
+        testWithInvalidContext(null);
+    }
+
+    /**
+     * A SparkRunner with a stopped provided Spark context cannot run 
pipelines.
+     * @throws Exception
+     */
+    @Test
+    public void testWithStoppedProvidedContext() throws Exception {
+        JavaSparkContext jsc = new JavaSparkContext("local[*]", 
"Existing_Context");
+        // Stop the provided Spark context directly
+        jsc.stop();
+        testWithInvalidContext(jsc);
+    }
 
+    private void testWithValidProvidedContext(JavaSparkContext jsc) throws 
Exception {
         SparkContextOptions options = getSparkContextOptions(jsc);
 
         Pipeline p = Pipeline.create(options);
@@ -99,24 +92,11 @@ public class ProvidedSparkContextTest {
 
         PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
 
-        try {
-            p.run().waitUntilFinish();
-            fail("Should throw an exception when The provided Spark context is 
null");
-        } catch (RuntimeException e){
-            assert(e.getMessage().contains(PROVIDED_CONTEXT_EXCEPTION));
-        }
+        // Run test from pipeline
+        p.run().waitUntilFinish();
     }
 
-    /**
-     * A SparkRunner with a stopped provided Spark context cannot run 
pipelines.
-     * @throws Exception
-     */
-    @Test
-    public void testWithStoppedProvidedContext() throws Exception {
-        JavaSparkContext jsc = new JavaSparkContext("local[*]", 
"Existing_Context");
-        // Stop the provided Spark context directly
-        jsc.stop();
-
+    private void testWithInvalidContext(JavaSparkContext jsc) {
         SparkContextOptions options = getSparkContextOptions(jsc);
 
         Pipeline p = Pipeline.create(options);
@@ -129,10 +109,18 @@ public class ProvidedSparkContextTest {
 
         try {
             p.run().waitUntilFinish();
-            fail("Should throw an exception when The provided Spark context is 
stopped");
+            fail("Should throw an exception when The provided Spark context is 
null or stopped");
         } catch (RuntimeException e){
             assert(e.getMessage().contains(PROVIDED_CONTEXT_EXCEPTION));
         }
     }
 
+    private static SparkContextOptions getSparkContextOptions(JavaSparkContext 
jsc) {
+        final SparkContextOptions options = 
PipelineOptionsFactory.as(SparkContextOptions.class);
+        options.setRunner(SparkRunner.class);
+        options.setUsesProvidedSparkContext(true);
+        options.setProvidedSparkContext(jsc);
+        options.setEnableSparkMetricSinks(false);
+        return options;
+    }
 }

Reply via email to