This is an automated email from the ASF dual-hosted git repository. mxm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 71d68e0 [BEAM-7039] set up validatesPortableRunner tests for Spark new 9edd37c Merge pull request #8285: [BEAM-7039] Set up validatesPortableRunner tests for Spark 71d68e0 is described below commit 71d68e0234b35233e75277607ae02f091e59a415 Author: Kyle Weaver <kcwea...@google.com> AuthorDate: Thu Mar 28 10:27:57 2019 -0700 [BEAM-7039] set up validatesPortableRunner tests for Spark --- .../org/apache/beam/gradle/BeamModulePlugin.groovy | 7 +- runners/spark/job-server/build.gradle | 122 +++++++++++++++++++++ settings.gradle | 2 + 3 files changed, 129 insertions(+), 2 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 38e0ba1..74bc02f 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -265,6 +265,8 @@ class BeamModulePlugin implements Plugin<Project> { } // Configuration for the classpath when running the test. Configuration testClasspathConfiguration + // Additional system properties. + Properties systemProperties = [] enum Environment { DOCKER, // Docker-based Harness execution @@ -1556,6 +1558,7 @@ class BeamModulePlugin implements Plugin<Project> { "--environmentCacheMillis=10000" ] def expansionPort = startingExpansionPortNumber.getAndDecrement() + config.systemProperties.put("expansionPort", expansionPort) beamTestPipelineOptions.addAll(config.pipelineOpts) if (config.environment == PortableValidatesRunnerConfiguration.Environment.EMBEDDED) { beamTestPipelineOptions += "--defaultEnvironmentType=EMBEDDED" @@ -1563,11 +1566,11 @@ class BeamModulePlugin implements Plugin<Project> { if (config.jobServerConfig) { beamTestPipelineOptions.add("--jobServerConfig=${config.jobServerConfig}") } + config.systemProperties.put("beamTestPipelineOptions", JsonOutput.toJson(beamTestPipelineOptions)) project.tasks.create(name: name, type: Test) { group = "Verification" description = "Validates the PortableRunner with JobServer ${config.jobServerDriver}" - systemProperty "beamTestPipelineOptions", JsonOutput.toJson(beamTestPipelineOptions) - systemProperty "expansionPort", expansionPort + systemProperties config.systemProperties classpath = config.testClasspathConfiguration testClassesDirs = project.files(project.project(":beam-sdks-java-core").sourceSets.test.output.classesDirs, project.project(":beam-runners-core-java").sourceSets.test.output.classesDirs, project.project(":beam-runners-core-construction-java").sourceSets.test.output.classesDirs) maxParallelForks config.numParallelTests diff --git a/runners/spark/job-server/build.gradle b/runners/spark/job-server/build.gradle new file mode 100644 index 0000000..2ce34fe --- /dev/null +++ b/runners/spark/job-server/build.gradle @@ -0,0 +1,122 @@ +import org.apache.beam.gradle.BeamModulePlugin + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Spark Runner JobServer build file + */ + +apply plugin: 'org.apache.beam.module' +apply plugin: 'application' +// we need to set mainClassName before applying shadow plugin +mainClassName = "org.apache.beam.runners.spark.SparkJobServerDriver" + +applyJavaNature( + validateShadowJar: false, + exportJavadoc: false, + shadowClosure: { + append "reference.conf" + }, +) + +def sparkRunnerProject = ":${project.name.replace("-job-server", "")}" + +description = project(sparkRunnerProject).description + " :: Job Server" + +configurations { + validatesPortableRunner +} + +configurations.all { + exclude group: "org.slf4j", module: "slf4j-jdk14" +} + +dependencies { + compile project(path: sparkRunnerProject, configuration: "shadow") + compile project(path: sparkRunnerProject, configuration: "provided") + validatesPortableRunner project(path: sparkRunnerProject, configuration: "shadowTest") + validatesPortableRunner project(path: sparkRunnerProject, configuration: "provided") + validatesPortableRunner project(path: ":beam-sdks-java-core", configuration: "shadowTest") + validatesPortableRunner project(path: ":beam-runners-core-java", configuration: "shadowTest") + validatesPortableRunner project(path: ":beam-runners-reference-java", configuration: "shadowTest") + compile project(path: ":beam-sdks-java-extensions-google-cloud-platform-core", configuration: "shadow") +// TODO: Enable AWS and HDFS file system. +} + +// NOTE: runShadow must be used in order to run the job server. The standard run +// task will not work because the Spark runner classes only exist in the shadow +// jar. +runShadow { + args = [] + if (project.hasProperty('jobHost')) + args += ["--job-host=${project.property('jobHost')}"] + if (project.hasProperty('artifactsDir')) + args += ["--artifacts-dir=${project.property('artifactsDir')}"] + if (project.hasProperty('cleanArtifactsPerJob')) + args += ["--clean-artifacts-per-job=${project.property('cleanArtifactsPerJob')}"] + + // Enable remote debugging. + jvmArgs = ["-Xdebug", "-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"] + if (project.hasProperty("logLevel")) + jvmArgs += ["-Dorg.slf4j.simpleLogger.defaultLogLevel=${project.property('logLevel')}"] +} + +def portableValidatesRunnerTask(String name) { + createPortableValidatesRunnerTask( + name: "validatesPortableRunner${name}", + jobServerDriver: "org.apache.beam.runners.spark.SparkJobServerDriver", + jobServerConfig: "--job-host=localhost,--job-port=0,--artifact-port=0", + testClasspathConfiguration: configurations.validatesPortableRunner, + numParallelTests: 1, + environment: BeamModulePlugin.PortableValidatesRunnerConfiguration.Environment.EMBEDDED, + systemProperties: [ + "beam.spark.test.reuseSparkContext": "true", + "spark.ui.enabled": "false", + "spark.ui.showConsoleProgress": "false", + ], + testCategories: { + includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner' + excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders' + excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB' + excludeCategories 'org.apache.beam.sdk.testing.UsesAttemptedMetrics' + excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics' + excludeCategories 'org.apache.beam.sdk.testing.UsesCounterMetrics' + excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging' + excludeCategories 'org.apache.beam.sdk.testing.UsesDistributionMetrics' + excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage' + excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics' + excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle' + excludeCategories 'org.apache.beam.sdk.testing.UsesMapState' + excludeCategories 'org.apache.beam.sdk.testing.UsesSetState' + excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream' + // TODO re-enable when state is supported + excludeCategories 'org.apache.beam.sdk.testing.UsesStatefulParDo' + //SplitableDoFnTests + excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo' + excludeCategories 'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs' + excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo' + }, + ) +} + +project.ext.validatesPortableRunnerBatch = portableValidatesRunnerTask("Batch") + +task validatesPortableRunner() { + dependsOn validatesPortableRunnerBatch +} diff --git a/settings.gradle b/settings.gradle index 2aa3188..980acb8 100644 --- a/settings.gradle +++ b/settings.gradle @@ -81,6 +81,8 @@ include "beam-runners-reference-job-server" project(":beam-runners-reference-job-server").dir = file("runners/reference/job-server") include "beam-runners-spark" project(":beam-runners-spark").dir = file("runners/spark") +include "beam-runners-spark-job-server" +project(":beam-runners-spark-job-server").dir = file("runners/spark/job-server") include "beam-runners-samza" project(":beam-runners-samza").dir = file("runners/samza") include "beam-runners-samza-job-server"