Repository: incubator-beam Updated Branches: refs/heads/master c314e670e -> ac0875de8
SparkRunner calls pipeline.run * Remove SparkStreamingPipelineOptions. * Run pipeline with Pipeline.run(). * Better EmbeddedKafka. * Avoid NPE if factory wasn't created. * Let EmbeddedKafka/Zookeeper discover ports on their own. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ac0875de Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ac0875de Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ac0875de Branch: refs/heads/master Commit: ac0875de84085e1298575d0887e83e5deee5f418 Parents: c314e67 Author: Sela <ans...@paypal.com> Authored: Wed Jul 27 23:11:37 2016 +0300 Committer: Dan Halperin <dhalp...@google.com> Committed: Wed Aug 3 22:31:48 2016 -0700 ---------------------------------------------------------------------- .../runners/spark/SparkPipelineOptions.java | 6 ++++ .../apache/beam/runners/spark/SparkRunner.java | 14 +++------ .../runners/spark/SparkRunnerRegistrar.java | 6 ++-- .../spark/SparkStreamingPipelineOptions.java | 32 -------------------- .../beam/runners/spark/TestSparkRunner.java | 2 -- .../apache/beam/runners/spark/DeDupTest.java | 2 +- .../beam/runners/spark/EmptyInputTest.java | 2 +- .../beam/runners/spark/SimpleWordCountTest.java | 4 +-- .../runners/spark/SparkRunnerRegistrarTest.java | 2 +- .../apache/beam/runners/spark/TfIdfTest.java | 2 +- .../beam/runners/spark/io/AvroPipelineTest.java | 2 +- .../beam/runners/spark/io/NumShardsTest.java | 2 +- .../io/hadoop/HadoopFileFormatPipelineTest.java | 2 +- .../spark/translation/CombineGloballyTest.java | 2 +- .../spark/translation/CombinePerKeyTest.java | 2 +- .../spark/translation/DoFnOutputTest.java | 6 ++-- .../translation/MultiOutputWordCountTest.java | 2 +- .../spark/translation/SerializationTest.java | 2 +- .../spark/translation/SideEffectsTest.java | 8 ++--- .../streaming/FlattenStreamingTest.java | 8 ++--- .../streaming/KafkaStreamingTest.java | 13 ++++---- .../streaming/SimpleStreamingWordCountTest.java | 8 ++--- .../streaming/utils/EmbeddedKafkaCluster.java | 4 ++- 23 files changed, 49 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java index 4bb2a57..6ef3741 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java @@ -33,4 +33,10 @@ public interface SparkPipelineOptions extends PipelineOptions, StreamingOptions, @Default.String("local[1]") String getSparkMaster(); void setSparkMaster(String master); + + @Description("Timeout to wait (in msec) for a streaming execution to stop, -1 runs until " + + "execution is stopped") + @Default.Long(-1) + Long getTimeout(); + void setTimeout(Long batchInterval); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index dfda987..d994ec4 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -69,8 +69,6 @@ import org.slf4j.LoggerFactory; * options.setSparkMaster("spark://host:port"); * EvaluationResult result = SparkRunner.create(options).run(p); * } - * - * To create a Spark streaming pipeline runner use {@link SparkStreamingPipelineOptions} */ public final class SparkRunner extends PipelineRunner<EvaluationResult> { @@ -146,12 +144,6 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> { @Override public EvaluationResult run(Pipeline pipeline) { try { - // validate streaming configuration - if (mOptions.isStreaming() && !(mOptions instanceof SparkStreamingPipelineOptions)) { - throw new RuntimeException("A streaming job must be configured with " - + SparkStreamingPipelineOptions.class.getSimpleName() + ", found " - + mOptions.getClass().getSimpleName()); - } LOG.info("Executing pipeline using the SparkRunner."); JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions .getSparkMaster(), mOptions.getAppName()); @@ -179,6 +171,9 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> { return ctxt; } else { + if (mOptions.getTimeout() > 0) { + LOG.info("Timeout is ignored by the SparkRunner in batch."); + } EvaluationContext ctxt = new EvaluationContext(jsc, pipeline); SparkPipelineTranslator translator = new TransformTranslator.Translator(); pipeline.traverseTopologically(new SparkPipelineEvaluator(ctxt, translator)); @@ -210,9 +205,8 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> { private EvaluationContext createStreamingEvaluationContext(JavaSparkContext jsc, Pipeline pipeline, Duration batchDuration) { - SparkStreamingPipelineOptions streamingOptions = (SparkStreamingPipelineOptions) mOptions; JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration); - return new StreamingEvaluationContext(jsc, pipeline, jssc, streamingOptions.getTimeout()); + return new StreamingEvaluationContext(jsc, pipeline, jssc, mOptions.getTimeout()); } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java index 2bed6a5..7a31753 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java @@ -49,15 +49,13 @@ public final class SparkRunnerRegistrar { } /** - * Registers the {@link SparkPipelineOptions} and {@link SparkStreamingPipelineOptions}. + * Registers the {@link SparkPipelineOptions}. */ @AutoService(PipelineOptionsRegistrar.class) public static class Options implements PipelineOptionsRegistrar { @Override public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() { - return ImmutableList.<Class<? extends PipelineOptions>>of( - SparkPipelineOptions.class, - SparkStreamingPipelineOptions.class); + return ImmutableList.<Class<? extends PipelineOptions>>of(SparkPipelineOptions.class); } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkStreamingPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkStreamingPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkStreamingPipelineOptions.java deleted file mode 100644 index 5944acd..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkStreamingPipelineOptions.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark; - -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; - -/** - * Options used to configure Spark streaming. - */ -public interface SparkStreamingPipelineOptions extends SparkPipelineOptions { - @Description("Timeout to wait (in msec) for the streaming execution so stop, -1 runs until " - + "execution is stopped") - @Default.Long(-1) - Long getTimeout(); - void setTimeout(Long batchInterval); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java index e2b953d..50ed5f3 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java @@ -46,8 +46,6 @@ import org.apache.beam.sdk.values.POutput; * options.setSparkMaster("spark://host:port"); * EvaluationResult result = SparkRunner.create(options).run(p); * } - * - * To create a Spark streaming pipeline runner use {@link SparkStreamingPipelineOptions} */ public final class TestSparkRunner extends PipelineRunner<EvaluationResult> { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java index dcf04a7..9a16744 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java @@ -56,7 +56,7 @@ public class DeDupTest { PAssert.that(output).containsInAnyOrder(EXPECTED_SET); - EvaluationResult res = SparkRunner.create().run(p); + EvaluationResult res = (EvaluationResult) p.run(); res.close(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java index 7befec2..c2e331f 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java @@ -49,7 +49,7 @@ public class EmptyInputTest { PCollection<String> inputWords = p.apply(Create.of(empty).withCoder(StringUtf8Coder.of())); PCollection<String> output = inputWords.apply(Combine.globally(new ConcatWords())); - EvaluationResult res = SparkRunner.create().run(p); + EvaluationResult res = (EvaluationResult) p.run(); assertEquals("", Iterables.getOnlyElement(res.get(output))); res.close(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java index 6f5ce5e..441d92d 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java @@ -67,7 +67,7 @@ public class SimpleWordCountTest { PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET); - EvaluationResult res = SparkRunner.create().run(p); + EvaluationResult res = (EvaluationResult) p.run(); res.close(); } @@ -87,7 +87,7 @@ public class SimpleWordCountTest { File outputFile = testFolder.newFile(); output.apply("WriteCounts", TextIO.Write.to(outputFile.getAbsolutePath()).withoutSharding()); - EvaluationResult res = SparkRunner.create().run(p); + EvaluationResult res = (EvaluationResult) p.run(); res.close(); assertThat(Sets.newHashSet(FileUtils.readLines(outputFile)), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java index 236251b..3ca9df4 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java @@ -41,7 +41,7 @@ public class SparkRunnerRegistrarTest { @Test public void testOptions() { assertEquals( - ImmutableList.of(SparkPipelineOptions.class, SparkStreamingPipelineOptions.class), + ImmutableList.of(SparkPipelineOptions.class), new SparkRunnerRegistrar.Options().getPipelineOptions()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java index e4a293f..074e6aa 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java @@ -72,7 +72,7 @@ public class TfIdfTest { PAssert.that(words).containsInAnyOrder(Arrays.asList("a", "m", "n", "b", "c", "d")); - EvaluationResult res = SparkRunner.create().run(pipeline); + EvaluationResult res = (EvaluationResult) pipeline.run(); res.close(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java index 787292e..d862424 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java @@ -81,7 +81,7 @@ public class AvroPipelineTest { PCollection<GenericRecord> input = p.apply( AvroIO.Read.from(inputFile.getAbsolutePath()).withSchema(schema)); input.apply(AvroIO.Write.to(outputDir.getAbsolutePath()).withSchema(schema)); - EvaluationResult res = SparkRunner.create().run(p); + EvaluationResult res = (EvaluationResult) p.run(); res.close(); List<GenericRecord> records = readGenericFile(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java index 36d8b67..9c65917 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java @@ -79,7 +79,7 @@ public class NumShardsTest { PCollection<String> output = inputWords.apply(new WordCount.CountWords()) .apply(MapElements.via(new WordCount.FormatAsTextFn())); output.apply(TextIO.Write.to(outputDir.getAbsolutePath()).withNumShards(3).withSuffix(".txt")); - EvaluationResult res = SparkRunner.create().run(p); + EvaluationResult res = (EvaluationResult) p.run(); res.close(); int count = 0; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java index 6d09503..01aa839 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java @@ -92,7 +92,7 @@ public class HadoopFileFormatPipelineTest { HadoopIO.Write.Bound<IntWritable, Text> write = HadoopIO.Write.to(outputFile.getAbsolutePath(), outputFormatClass, IntWritable.class, Text.class); input.apply(write.withoutSharding()); - EvaluationResult res = SparkRunner.create().run(p); + EvaluationResult res = (EvaluationResult) p.run(); res.close(); IntWritable key = new IntWritable(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java index 798f55a..e4ef7d7 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java @@ -55,7 +55,7 @@ public class CombineGloballyTest { PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of())); PCollection<String> output = inputWords.apply(Combine.globally(new WordMerger())); - EvaluationResult res = SparkRunner.create().run(p); + EvaluationResult res = (EvaluationResult) p.run(); assertEquals("hi there,hi,hi sue bob,hi sue,,bob hi", Iterables.getOnlyElement(res.get(output))); res.close(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java index 2e477e9..dee9213 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java @@ -57,7 +57,7 @@ public class CombinePerKeyTest { Pipeline p = Pipeline.create(options); PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of())); PCollection<KV<String, Long>> cnts = inputWords.apply(new SumPerKey<String>()); - EvaluationResult res = SparkRunner.create().run(p); + EvaluationResult res = (EvaluationResult) p.run(); Map<String, Long> actualCnts = new HashMap<>(); for (KV<String, Long> kv : res.get(cnts)) { actualCnts.put(kv.getKey(), kv.getValue()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java index 263ce99..e4b25bb 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java @@ -41,9 +41,9 @@ public class DoFnOutputTest implements Serializable { public void test() throws Exception { SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); options.setRunner(SparkRunner.class); - Pipeline pipeline = Pipeline.create(options); + Pipeline p = Pipeline.create(options); - PCollection<String> strings = pipeline.apply(Create.of("a")); + PCollection<String> strings = p.apply(Create.of("a")); // Test that values written from startBundle() and finishBundle() are written to // the output PCollection<String> output = strings.apply(ParDo.of(new OldDoFn<String, String>() { @@ -63,7 +63,7 @@ public class DoFnOutputTest implements Serializable { PAssert.that(output).containsInAnyOrder("start", "a", "finish"); - EvaluationResult res = SparkRunner.create().run(pipeline); + EvaluationResult res = (EvaluationResult) p.run(); res.close(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java index 739eec3..066521b 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java @@ -84,7 +84,7 @@ public class MultiOutputWordCountTest { PCollection<Long> unique = luc.get(lowerCnts).apply( ApproximateUnique.<KV<String, Long>>globally(16)); - EvaluationResult res = SparkRunner.create().run(p); + EvaluationResult res = (EvaluationResult) p.run(); PAssert.that(luc.get(lowerCnts).apply(ParDo.of(new FormatCountsFn()))) .containsInAnyOrder(EXPECTED_LOWER_COUNTS); Iterable<KV<String, Long>> actualUpper = res.get(luc.get(upperCnts)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java index 5e96c46..fb97b8b 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java @@ -140,7 +140,7 @@ public class SerializationTest { PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET); - EvaluationResult res = SparkRunner.create().run(p); + EvaluationResult res = (EvaluationResult) p.run(); res.close(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java index 5775565..6cefa49 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java @@ -50,11 +50,11 @@ public class SideEffectsTest implements Serializable { public void test() throws Exception { SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); options.setRunner(SparkRunner.class); - Pipeline pipeline = Pipeline.create(options); + Pipeline p = Pipeline.create(options); - pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class)); + p.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class)); - pipeline.apply(Create.of("a")).apply(ParDo.of(new OldDoFn<String, String>() { + p.apply(Create.of("a")).apply(ParDo.of(new OldDoFn<String, String>() { @Override public void processElement(ProcessContext c) throws Exception { throw new UserException(); @@ -62,7 +62,7 @@ public class SideEffectsTest implements Serializable { })); try { - pipeline.run(); + p.run(); fail("Run should thrown an exception"); } catch (RuntimeException e) { assertNotNull(e.getCause()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java index ed7e9b7..deb1b6a 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java @@ -18,8 +18,8 @@ package org.apache.beam.runners.spark.translation.streaming; import org.apache.beam.runners.spark.EvaluationResult; +import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.SparkRunner; -import org.apache.beam.runners.spark.SparkStreamingPipelineOptions; import org.apache.beam.runners.spark.io.CreateStream; import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming; import org.apache.beam.sdk.Pipeline; @@ -57,8 +57,8 @@ public class FlattenStreamingTest { @Test public void testRun() throws Exception { - SparkStreamingPipelineOptions options = - PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class); + SparkPipelineOptions options = + PipelineOptionsFactory.as(SparkPipelineOptions.class); options.setRunner(SparkRunner.class); options.setStreaming(true); options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval @@ -77,7 +77,7 @@ public class FlattenStreamingTest { PAssertStreaming.assertContents(union, EXPECTED_UNION); - EvaluationResult res = SparkRunner.create(options).run(p); + EvaluationResult res = (EvaluationResult) p.run(); res.close(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java index c005f14..fa98ca3 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java @@ -18,8 +18,8 @@ package org.apache.beam.runners.spark.translation.streaming; import org.apache.beam.runners.spark.EvaluationResult; +import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.SparkRunner; -import org.apache.beam.runners.spark.SparkStreamingPipelineOptions; import org.apache.beam.runners.spark.io.KafkaIO; import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster; import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming; @@ -56,10 +56,9 @@ import kafka.serializer.StringDecoder; */ public class KafkaStreamingTest { private static final EmbeddedKafkaCluster.EmbeddedZookeeper EMBEDDED_ZOOKEEPER = - new EmbeddedKafkaCluster.EmbeddedZookeeper(17001); + new EmbeddedKafkaCluster.EmbeddedZookeeper(); private static final EmbeddedKafkaCluster EMBEDDED_KAFKA_CLUSTER = - new EmbeddedKafkaCluster(EMBEDDED_ZOOKEEPER.getConnection(), - new Properties(), Collections.singletonList(6667)); + new EmbeddedKafkaCluster(EMBEDDED_ZOOKEEPER.getConnection(), new Properties()); private static final String TOPIC = "kafka_dataflow_test_topic"; private static final Map<String, String> KAFKA_MESSAGES = ImmutableMap.of( "k1", "v1", "k2", "v2", "k3", "v3", "k4", "v4" @@ -89,8 +88,8 @@ public class KafkaStreamingTest { @Test public void testRun() throws Exception { // test read from Kafka - SparkStreamingPipelineOptions options = - PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class); + SparkPipelineOptions options = + PipelineOptionsFactory.as(SparkPipelineOptions.class); options.setRunner(SparkRunner.class); options.setStreaming(true); options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval @@ -112,7 +111,7 @@ public class KafkaStreamingTest { PAssertStreaming.assertContents(formattedKV, EXPECTED); - EvaluationResult res = SparkRunner.create(options).run(p); + EvaluationResult res = (EvaluationResult) p.run(); res.close(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java index 4fa03f6..5627056 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java @@ -19,8 +19,8 @@ package org.apache.beam.runners.spark.translation.streaming; import org.apache.beam.runners.spark.EvaluationResult; +import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.SparkRunner; -import org.apache.beam.runners.spark.SparkStreamingPipelineOptions; import org.apache.beam.runners.spark.examples.WordCount; import org.apache.beam.runners.spark.io.CreateStream; import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming; @@ -54,8 +54,8 @@ public class SimpleStreamingWordCountTest implements Serializable { @Test public void testRun() throws Exception { - SparkStreamingPipelineOptions options = - PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class); + SparkPipelineOptions options = + PipelineOptionsFactory.as(SparkPipelineOptions.class); options.setRunner(SparkRunner.class); options.setStreaming(true); options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval @@ -70,7 +70,7 @@ public class SimpleStreamingWordCountTest implements Serializable { .apply(MapElements.via(new WordCount.FormatAsTextFn())); PAssertStreaming.assertContents(output, EXPECTED_COUNTS); - EvaluationResult res = SparkRunner.create(options).run(p); + EvaluationResult res = (EvaluationResult) p.run(); res.close(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac0875de/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java index 0fec573..cd326ef 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java @@ -219,7 +219,9 @@ public class EmbeddedKafkaCluster { public void shutdown() { - factory.shutdown(); + if (factory != null) { + factory.shutdown(); + } try { TestUtils.deleteFile(snapshotDir); } catch (FileNotFoundException e) {