Repository: incubator-beam Updated Branches: refs/heads/master d9cdcadf5 -> 1a200a65d
Configure RunnableOnService tests for Spark runner, batch mode Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4254749b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4254749b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4254749b Branch: refs/heads/master Commit: 4254749bf103c4bb6f68e316768c0aa46d9f7df0 Parents: 5c17bfa Author: Kenneth Knowles <[email protected]> Authored: Thu May 5 15:11:07 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu Jun 16 11:19:19 2016 -0700 ---------------------------------------------------------------------- runners/spark/pom.xml | 112 +++++++++++++------ .../runners/spark/SparkRunnerRegistrar.java | 3 +- .../runners/spark/TestSparkPipelineRunner.java | 77 +++++++++++++ .../runners/spark/SparkRunnerRegistrarTest.java | 2 +- 4 files changed, 155 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4254749b/runners/spark/pom.xml ---------------------------------------------------------------------- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index e7d0834..747464e 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -37,6 +37,62 @@ <spark.version>1.6.1</spark.version> </properties> + <profiles> + <profile> + <id>jacoco</id> + <build> + <plugins> + <plugin> + <groupId>org.jacoco</groupId> + <artifactId>jacoco-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + </profile> + + <profile> + <!-- This profile adds execution of RunnableOnService integration tests + against a local Spark endpoint. --> + <id>runnable-on-service-tests</id> + <activation><activeByDefault>false</activeByDefault></activation> + <build> + <pluginManagement> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <executions> + <execution> + <id>runnable-on-service-tests</id> + <configuration> + <groups>org.apache.beam.sdk.testing.RunnableOnService</groups> + <parallel>none</parallel> + <failIfNoTests>true</failIfNoTests> + <dependenciesToScan> + <dependency>org.apache.beam:java-sdk-all</dependency> + </dependenciesToScan> + <excludes> + org.apache.beam.sdk.io.BoundedReadFromUnboundedSourceTest + </excludes> + <systemPropertyVariables> + <beamTestPipelineOptions> + [ + "--runner=org.apache.beam.runners.spark.TestSparkPipelineRunner", + "--streaming=false" + ] + </beamTestPipelineOptions> + <dataflow.spark.test.reuseSparkContext>true</dataflow.spark.test.reuseSparkContext> + </systemPropertyVariables> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </pluginManagement> + </build> + </profile> + </profiles> + <dependencies> <dependency> <groupId>org.apache.spark</groupId> @@ -122,6 +178,25 @@ <groupId>org.apache.beam</groupId> <artifactId>beam-runners-direct-java</artifactId> <version>0.2.0-incubating-SNAPSHOT</version> + </dependency> + + <!-- Depend on test jar to scan for RunnableOnService tests --> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> <scope>test</scope> </dependency> </dependencies> @@ -237,41 +312,4 @@ </plugins> </build> - <profiles> - <profile> - <id>jacoco</id> - <build> - <plugins> - <plugin> - <groupId>org.jacoco</groupId> - <artifactId>jacoco-maven-plugin</artifactId> - </plugin> - </plugins> - </build> - </profile> - - <profile> - <id>disable-runnable-on-service-tests</id> - <activation> - <activeByDefault>true</activeByDefault> - </activation> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <executions> - <execution> - <id>runnable-on-service-tests</id> - <configuration> - <skip>true</skip> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> - </profile> - </profiles> - </project> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4254749b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java index 9537ec6..baa2241 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java @@ -43,7 +43,8 @@ public final class SparkRunnerRegistrar { public static class Runner implements PipelineRunnerRegistrar { @Override public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() { - return ImmutableList.<Class<? extends PipelineRunner<?>>>of(SparkPipelineRunner.class); + return ImmutableList.<Class<? extends PipelineRunner<?>>>of( + SparkPipelineRunner.class, TestSparkPipelineRunner.class); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4254749b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineRunner.java new file mode 100644 index 0000000..d11d1c1 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineRunner.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsValidator; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; + +/** + * The SparkPipelineRunner translate operations defined on a pipeline to a representation executable + * by Spark, and then submitting the job to Spark to be executed. If we wanted to run a dataflow + * pipeline with the default options of a single threaded spark instance in local mode, we would do + * the following: + * + * {@code + * Pipeline p = [logic for pipeline creation] + * EvaluationResult result = SparkPipelineRunner.create().run(p); + * } + * + * To create a pipeline runner to run against a different spark cluster, with a custom master url we + * would do the following: + * + * {@code + * Pipeline p = [logic for pipeline creation] + * SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); + * options.setSparkMaster("spark://host:port"); + * EvaluationResult result = SparkPipelineRunner.create(options).run(p); + * } + * + * To create a Spark streaming pipeline runner use {@link SparkStreamingPipelineOptions} + */ +public final class TestSparkPipelineRunner extends PipelineRunner<EvaluationResult> { + + private SparkPipelineRunner delegate; + + private TestSparkPipelineRunner(SparkPipelineOptions options) { + this.delegate = SparkPipelineRunner.fromOptions(options); + } + + public static TestSparkPipelineRunner fromOptions(PipelineOptions options) { + // Default options suffice to set it up as a test runner + SparkPipelineOptions sparkOptions = + PipelineOptionsValidator.validate(SparkPipelineOptions.class, options); + return new TestSparkPipelineRunner(sparkOptions); + } + + @Override + public <OutputT extends POutput, InputT extends PInput> + OutputT apply(PTransform<InputT, OutputT> transform, InputT input) { + return delegate.apply(transform, input); + }; + + @Override + public EvaluationResult run(Pipeline pipeline) { + return delegate.run(pipeline); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4254749b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java index 88f4a06..d2e57aa 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java @@ -47,7 +47,7 @@ public class SparkRunnerRegistrarTest { @Test public void testRunners() { - assertEquals(ImmutableList.of(SparkPipelineRunner.class), + assertEquals(ImmutableList.of(SparkPipelineRunner.class, TestSparkPipelineRunner.class), new SparkRunnerRegistrar.Runner().getPipelineRunners()); }
