Repository: incubator-beam
Updated Branches:
  refs/heads/master d9cdcadf5 -> 1a200a65d


Configure RunnableOnService tests for Spark runner, batch mode


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4254749b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4254749b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4254749b

Branch: refs/heads/master
Commit: 4254749bf103c4bb6f68e316768c0aa46d9f7df0
Parents: 5c17bfa
Author: Kenneth Knowles <[email protected]>
Authored: Thu May 5 15:11:07 2016 -0700
Committer: Kenneth Knowles <[email protected]>
Committed: Thu Jun 16 11:19:19 2016 -0700

----------------------------------------------------------------------
 runners/spark/pom.xml                           | 112 +++++++++++++------
 .../runners/spark/SparkRunnerRegistrar.java     |   3 +-
 .../runners/spark/TestSparkPipelineRunner.java  |  77 +++++++++++++
 .../runners/spark/SparkRunnerRegistrarTest.java |   2 +-
 4 files changed, 155 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4254749b/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index e7d0834..747464e 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -37,6 +37,62 @@
     <spark.version>1.6.1</spark.version>
   </properties>
 
+  <profiles>
+    <profile>
+      <id>jacoco</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.jacoco</groupId>
+            <artifactId>jacoco-maven-plugin</artifactId>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+
+    <profile>
+      <!-- This profile adds execution of RunnableOnService integration tests 
+           against a local Spark endpoint. -->
+      <id>runnable-on-service-tests</id>
+      <activation><activeByDefault>false</activeByDefault></activation>
+      <build>
+        <pluginManagement>
+          <plugins>
+            <plugin>
+              <groupId>org.apache.maven.plugins</groupId>
+              <artifactId>maven-surefire-plugin</artifactId>
+              <executions>
+                <execution>
+                  <id>runnable-on-service-tests</id>
+                  <configuration>
+                    
<groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
+                    <parallel>none</parallel>
+                    <failIfNoTests>true</failIfNoTests>
+                    <dependenciesToScan>
+                      <dependency>org.apache.beam:java-sdk-all</dependency>
+                    </dependenciesToScan>
+                    <excludes>
+                      org.apache.beam.sdk.io.BoundedReadFromUnboundedSourceTest
+                    </excludes>
+                    <systemPropertyVariables>
+                      <beamTestPipelineOptions>
+                        [
+                          
"--runner=org.apache.beam.runners.spark.TestSparkPipelineRunner",
+                          "--streaming=false"
+                        ]
+                      </beamTestPipelineOptions>
+                      
<dataflow.spark.test.reuseSparkContext>true</dataflow.spark.test.reuseSparkContext>
+                    </systemPropertyVariables>
+                  </configuration>
+                </execution>
+              </executions>
+            </plugin>
+          </plugins>
+        </pluginManagement>
+      </build>
+    </profile>
+  </profiles>
+
   <dependencies>
     <dependency>
       <groupId>org.apache.spark</groupId>
@@ -122,6 +178,25 @@
       <groupId>org.apache.beam</groupId>
       <artifactId>beam-runners-direct-java</artifactId>
       <version>0.2.0-incubating-SNAPSHOT</version>
+    </dependency>
+
+    <!-- Depend on test jar to scan for RunnableOnService tests -->
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-jdk14</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
       <scope>test</scope>
     </dependency>
   </dependencies>
@@ -237,41 +312,4 @@
     </plugins>
   </build>
 
-  <profiles>
-    <profile>
-      <id>jacoco</id>
-      <build>
-        <plugins>
-          <plugin>
-            <groupId>org.jacoco</groupId>
-            <artifactId>jacoco-maven-plugin</artifactId>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-
-    <profile>
-      <id>disable-runnable-on-service-tests</id>
-      <activation>
-        <activeByDefault>true</activeByDefault>
-      </activation>
-      <build>
-        <plugins>
-          <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-surefire-plugin</artifactId>
-            <executions>
-              <execution>
-                <id>runnable-on-service-tests</id>
-                <configuration>
-                  <skip>true</skip>
-                </configuration>
-              </execution>
-            </executions>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-  </profiles>
-
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4254749b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
index 9537ec6..baa2241 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
@@ -43,7 +43,8 @@ public final class SparkRunnerRegistrar {
   public static class Runner implements PipelineRunnerRegistrar {
     @Override
     public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
-      return ImmutableList.<Class<? extends 
PipelineRunner<?>>>of(SparkPipelineRunner.class);
+      return ImmutableList.<Class<? extends PipelineRunner<?>>>of(
+          SparkPipelineRunner.class, TestSparkPipelineRunner.class);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4254749b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineRunner.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineRunner.java
new file mode 100644
index 0000000..d11d1c1
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineRunner.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+/**
+ * The SparkPipelineRunner translate operations defined on a pipeline to a 
representation executable
+ * by Spark, and then submitting the job to Spark to be executed. If we wanted 
to run a dataflow
+ * pipeline with the default options of a single threaded spark instance in 
local mode, we would do
+ * the following:
+ *
+ * {@code
+ * Pipeline p = [logic for pipeline creation]
+ * EvaluationResult result = SparkPipelineRunner.create().run(p);
+ * }
+ *
+ * To create a pipeline runner to run against a different spark cluster, with 
a custom master url we
+ * would do the following:
+ *
+ * {@code
+ * Pipeline p = [logic for pipeline creation]
+ * SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
+ * options.setSparkMaster("spark://host:port");
+ * EvaluationResult result = SparkPipelineRunner.create(options).run(p);
+ * }
+ *
+ * To create a Spark streaming pipeline runner use {@link 
SparkStreamingPipelineOptions}
+ */
+public final class TestSparkPipelineRunner extends 
PipelineRunner<EvaluationResult> {
+
+  private SparkPipelineRunner delegate;
+
+  private TestSparkPipelineRunner(SparkPipelineOptions options) {
+    this.delegate = SparkPipelineRunner.fromOptions(options);
+  }
+
+  public static TestSparkPipelineRunner fromOptions(PipelineOptions options) {
+    // Default options suffice to set it up as a test runner
+    SparkPipelineOptions sparkOptions =
+        PipelineOptionsValidator.validate(SparkPipelineOptions.class, options);
+    return new TestSparkPipelineRunner(sparkOptions);
+  }
+
+  @Override
+  public <OutputT extends POutput, InputT extends PInput>
+      OutputT apply(PTransform<InputT, OutputT> transform, InputT input) {
+    return delegate.apply(transform, input);
+  };
+
+  @Override
+  public EvaluationResult run(Pipeline pipeline) {
+    return delegate.run(pipeline);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4254749b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
index 88f4a06..d2e57aa 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
@@ -47,7 +47,7 @@ public class SparkRunnerRegistrarTest {
 
   @Test
   public void testRunners() {
-    assertEquals(ImmutableList.of(SparkPipelineRunner.class),
+    assertEquals(ImmutableList.of(SparkPipelineRunner.class, 
TestSparkPipelineRunner.class),
         new SparkRunnerRegistrar.Runner().getPipelineRunners());
   }
 

Reply via email to