Repository: beam Updated Branches: refs/heads/master 9dad73c29 -> ed7b82e7e
[BEAM-1405] Refactor to remove repeated code from test Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/92707b9a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/92707b9a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/92707b9a Branch: refs/heads/master Commit: 92707b9a07b7bc367b375fb25293554f1de25d87 Parents: 9dad73c Author: Ismaël MejÃa <[email protected]> Authored: Tue Feb 7 14:41:57 2017 +0100 Committer: Ismaël MejÃa <[email protected]> Committed: Tue Feb 7 14:46:40 2017 +0100 ---------------------------------------------------------------------- .../runners/spark/ProvidedSparkContextTest.java | 70 ++++++++------------ 1 file changed, 29 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/92707b9a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java index 2982844..00c894d 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.spark; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; import com.google.common.collect.ImmutableSet; @@ -48,15 +49,6 @@ public class ProvidedSparkContextTest { private static final String PROVIDED_CONTEXT_EXCEPTION = "The provided Spark context was not created or was stopped"; - private SparkContextOptions getSparkContextOptions(JavaSparkContext jsc) { - final SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class); - options.setRunner(SparkRunner.class); - options.setUsesProvidedSparkContext(true); - options.setProvidedSparkContext(jsc); - options.setEnableSparkMetricSinks(false); - return options; - } - /** * Provide a context and call pipeline run. * @throws Exception @@ -64,20 +56,7 @@ public class ProvidedSparkContextTest { @Test public void testWithProvidedContext() throws Exception { JavaSparkContext jsc = new JavaSparkContext("local[*]", "Existing_Context"); - - SparkContextOptions options = getSparkContextOptions(jsc); - - Pipeline p = Pipeline.create(options); - PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder - .of())); - PCollection<String> output = inputWords.apply(new WordCount.CountWords()) - .apply(MapElements.via(new WordCount.FormatAsTextFn())); - - PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET); - - // Run test from pipeline - p.run().waitUntilFinish(); - + testWithValidProvidedContext(jsc); jsc.stop(); } @@ -87,8 +66,22 @@ public class ProvidedSparkContextTest { */ @Test public void testWithNullContext() throws Exception { - JavaSparkContext jsc = null; + testWithInvalidContext(null); + } + + /** + * A SparkRunner with a stopped provided Spark context cannot run pipelines. + * @throws Exception + */ + @Test + public void testWithStoppedProvidedContext() throws Exception { + JavaSparkContext jsc = new JavaSparkContext("local[*]", "Existing_Context"); + // Stop the provided Spark context directly + jsc.stop(); + testWithInvalidContext(jsc); + } + private void testWithValidProvidedContext(JavaSparkContext jsc) throws Exception { SparkContextOptions options = getSparkContextOptions(jsc); Pipeline p = Pipeline.create(options); @@ -99,24 +92,11 @@ public class ProvidedSparkContextTest { PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET); - try { - p.run().waitUntilFinish(); - fail("Should throw an exception when The provided Spark context is null"); - } catch (RuntimeException e){ - assert(e.getMessage().contains(PROVIDED_CONTEXT_EXCEPTION)); - } + // Run test from pipeline + p.run().waitUntilFinish(); } - /** - * A SparkRunner with a stopped provided Spark context cannot run pipelines. - * @throws Exception - */ - @Test - public void testWithStoppedProvidedContext() throws Exception { - JavaSparkContext jsc = new JavaSparkContext("local[*]", "Existing_Context"); - // Stop the provided Spark context directly - jsc.stop(); - + private void testWithInvalidContext(JavaSparkContext jsc) { SparkContextOptions options = getSparkContextOptions(jsc); Pipeline p = Pipeline.create(options); @@ -129,10 +109,18 @@ public class ProvidedSparkContextTest { try { p.run().waitUntilFinish(); - fail("Should throw an exception when The provided Spark context is stopped"); + fail("Should throw an exception when The provided Spark context is null or stopped"); } catch (RuntimeException e){ assert(e.getMessage().contains(PROVIDED_CONTEXT_EXCEPTION)); } } + private static SparkContextOptions getSparkContextOptions(JavaSparkContext jsc) { + final SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class); + options.setRunner(SparkRunner.class); + options.setUsesProvidedSparkContext(true); + options.setProvidedSparkContext(jsc); + options.setEnableSparkMetricSinks(false); + return options; + } }
