This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 71d68e0 [BEAM-7039] set up validatesPortableRunner tests for Spark
new 9edd37c Merge pull request #8285: [BEAM-7039] Set up
validatesPortableRunner tests for Spark
71d68e0 is described below
commit 71d68e0234b35233e75277607ae02f091e59a415
Author: Kyle Weaver <[email protected]>
AuthorDate: Thu Mar 28 10:27:57 2019 -0700
[BEAM-7039] set up validatesPortableRunner tests for Spark
---
.../org/apache/beam/gradle/BeamModulePlugin.groovy | 7 +-
runners/spark/job-server/build.gradle | 122 +++++++++++++++++++++
settings.gradle | 2 +
3 files changed, 129 insertions(+), 2 deletions(-)
diff --git
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 38e0ba1..74bc02f 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -265,6 +265,8 @@ class BeamModulePlugin implements Plugin<Project> {
}
// Configuration for the classpath when running the test.
Configuration testClasspathConfiguration
+ // Additional system properties.
+ Properties systemProperties = []
enum Environment {
DOCKER, // Docker-based Harness execution
@@ -1556,6 +1558,7 @@ class BeamModulePlugin implements Plugin<Project> {
"--environmentCacheMillis=10000"
]
def expansionPort = startingExpansionPortNumber.getAndDecrement()
+ config.systemProperties.put("expansionPort", expansionPort)
beamTestPipelineOptions.addAll(config.pipelineOpts)
if (config.environment ==
PortableValidatesRunnerConfiguration.Environment.EMBEDDED) {
beamTestPipelineOptions += "--defaultEnvironmentType=EMBEDDED"
@@ -1563,11 +1566,11 @@ class BeamModulePlugin implements Plugin<Project> {
if (config.jobServerConfig) {
beamTestPipelineOptions.add("--jobServerConfig=${config.jobServerConfig}")
}
+ config.systemProperties.put("beamTestPipelineOptions",
JsonOutput.toJson(beamTestPipelineOptions))
project.tasks.create(name: name, type: Test) {
group = "Verification"
description = "Validates the PortableRunner with JobServer
${config.jobServerDriver}"
- systemProperty "beamTestPipelineOptions",
JsonOutput.toJson(beamTestPipelineOptions)
- systemProperty "expansionPort", expansionPort
+ systemProperties config.systemProperties
classpath = config.testClasspathConfiguration
testClassesDirs =
project.files(project.project(":beam-sdks-java-core").sourceSets.test.output.classesDirs,
project.project(":beam-runners-core-java").sourceSets.test.output.classesDirs,
project.project(":beam-runners-core-construction-java").sourceSets.test.output.classesDirs)
maxParallelForks config.numParallelTests
diff --git a/runners/spark/job-server/build.gradle
b/runners/spark/job-server/build.gradle
new file mode 100644
index 0000000..2ce34fe
--- /dev/null
+++ b/runners/spark/job-server/build.gradle
@@ -0,0 +1,122 @@
+import org.apache.beam.gradle.BeamModulePlugin
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Spark Runner JobServer build file
+ */
+
+apply plugin: 'org.apache.beam.module'
+apply plugin: 'application'
+// we need to set mainClassName before applying shadow plugin
+mainClassName = "org.apache.beam.runners.spark.SparkJobServerDriver"
+
+applyJavaNature(
+ validateShadowJar: false,
+ exportJavadoc: false,
+ shadowClosure: {
+ append "reference.conf"
+ },
+)
+
+def sparkRunnerProject = ":${project.name.replace("-job-server", "")}"
+
+description = project(sparkRunnerProject).description + " :: Job Server"
+
+configurations {
+ validatesPortableRunner
+}
+
+configurations.all {
+ exclude group: "org.slf4j", module: "slf4j-jdk14"
+}
+
+dependencies {
+ compile project(path: sparkRunnerProject, configuration: "shadow")
+ compile project(path: sparkRunnerProject, configuration: "provided")
+ validatesPortableRunner project(path: sparkRunnerProject, configuration:
"shadowTest")
+ validatesPortableRunner project(path: sparkRunnerProject, configuration:
"provided")
+ validatesPortableRunner project(path: ":beam-sdks-java-core", configuration:
"shadowTest")
+ validatesPortableRunner project(path: ":beam-runners-core-java",
configuration: "shadowTest")
+ validatesPortableRunner project(path: ":beam-runners-reference-java",
configuration: "shadowTest")
+ compile project(path:
":beam-sdks-java-extensions-google-cloud-platform-core", configuration:
"shadow")
+// TODO: Enable AWS and HDFS file system.
+}
+
+// NOTE: runShadow must be used in order to run the job server. The standard
run
+// task will not work because the Spark runner classes only exist in the shadow
+// jar.
+runShadow {
+ args = []
+ if (project.hasProperty('jobHost'))
+ args += ["--job-host=${project.property('jobHost')}"]
+ if (project.hasProperty('artifactsDir'))
+ args += ["--artifacts-dir=${project.property('artifactsDir')}"]
+ if (project.hasProperty('cleanArtifactsPerJob'))
+ args +=
["--clean-artifacts-per-job=${project.property('cleanArtifactsPerJob')}"]
+
+ // Enable remote debugging.
+ jvmArgs = ["-Xdebug",
"-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"]
+ if (project.hasProperty("logLevel"))
+ jvmArgs +=
["-Dorg.slf4j.simpleLogger.defaultLogLevel=${project.property('logLevel')}"]
+}
+
+def portableValidatesRunnerTask(String name) {
+ createPortableValidatesRunnerTask(
+ name: "validatesPortableRunner${name}",
+ jobServerDriver: "org.apache.beam.runners.spark.SparkJobServerDriver",
+ jobServerConfig: "--job-host=localhost,--job-port=0,--artifact-port=0",
+ testClasspathConfiguration: configurations.validatesPortableRunner,
+ numParallelTests: 1,
+ environment:
BeamModulePlugin.PortableValidatesRunnerConfiguration.Environment.EMBEDDED,
+ systemProperties: [
+ "beam.spark.test.reuseSparkContext": "true",
+ "spark.ui.enabled": "false",
+ "spark.ui.showConsoleProgress": "false",
+ ],
+ testCategories: {
+ includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
+ excludeCategories
'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
+ excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesAttemptedMetrics'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesCounterMetrics'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesDistributionMetrics'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
+ // TODO re-enable when state is supported
+ excludeCategories 'org.apache.beam.sdk.testing.UsesStatefulParDo'
+ //SplitableDoFnTests
+ excludeCategories
'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo'
+ excludeCategories
'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs'
+ excludeCategories
'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
+ },
+ )
+}
+
+project.ext.validatesPortableRunnerBatch = portableValidatesRunnerTask("Batch")
+
+task validatesPortableRunner() {
+ dependsOn validatesPortableRunnerBatch
+}
diff --git a/settings.gradle b/settings.gradle
index 2aa3188..980acb8 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -81,6 +81,8 @@ include "beam-runners-reference-job-server"
project(":beam-runners-reference-job-server").dir =
file("runners/reference/job-server")
include "beam-runners-spark"
project(":beam-runners-spark").dir = file("runners/spark")
+include "beam-runners-spark-job-server"
+project(":beam-runners-spark-job-server").dir =
file("runners/spark/job-server")
include "beam-runners-samza"
project(":beam-runners-samza").dir = file("runners/samza")
include "beam-runners-samza-job-server"