This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 75e3986 [BEAM-4176] Initial implementation for running portable
runner tests (#5935)
75e3986 is described below
commit 75e3986171ce68a6fc4c42689698576c407f05b1
Author: Ankur <[email protected]>
AuthorDate: Wed Jul 25 15:31:35 2018 -0700
[BEAM-4176] Initial implementation for running portable runner tests (#5935)
* TestPortableRunner for validate runner tests
* Dynamically assign jobServer host and port.
* Documentation formatting
* Disable Metrics for Flink Portable Runner
* Change to gradle files and check for pipeline completion for flink
validates runner test
* Review Comments
* Adding javadoc for default option factory
* Update lambda to handle 0 method args.
* Fix simple typo
* Use varargs in array based method declaration.
* Fixing Log message
---
runners/flink/job-server/build.gradle | 75 +++++++++++++++-
.../beam/runners/flink/FlinkJobInvocation.java | 2 +-
.../beam/runners/flink/FlinkJobServerDriver.java | 95 +++++++++++++++-----
runners/reference/java/build.gradle | 5 ++
.../testing/TestPortablePipelineOptions.java | 65 ++++++++++++++
.../reference/testing/TestPortableRunner.java | 100 +++++++++++++++++++++
6 files changed, 317 insertions(+), 25 deletions(-)
diff --git a/runners/flink/job-server/build.gradle
b/runners/flink/job-server/build.gradle
index 552e04c..c76a130 100644
--- a/runners/flink/job-server/build.gradle
+++ b/runners/flink/job-server/build.gradle
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+import groovy.json.JsonOutput
apply plugin: org.apache.beam.gradle.BeamModulePlugin
applyJavaNature(
@@ -23,16 +24,32 @@ applyJavaNature(
},
)
+/*
+ * We need to rely on manually specifying these evaluationDependsOn to ensure
that
+ * the following projects are evaluated before we evaluate this project. This
is because
+ * we are attempting to reference the "sourceSets.test.output" directly.
+ */
+evaluationDependsOn(":beam-sdks-java-core")
+evaluationDependsOn(":beam-runners-core-java")
+
description = "Apache Beam :: Runners :: Flink :: Job Server"
apply plugin: "application"
mainClassName = "org.apache.beam.runners.flink.FlinkJobServerDriver"
+configurations {
+ validatesRunner
+}
+
dependencies {
compile project(path: ":beam-runners-flink_2.11", configuration: "shadow")
+ validatesRunner project(path: ":beam-runners-flink_2.11", configuration:
"shadowTest")
+ validatesRunner project(path: ":beam-sdks-java-core", configuration:
"shadowTest")
+ validatesRunner project(path: ":beam-runners-core-java", configuration:
"shadowTest")
+ validatesRunner project(path: ":beam-runners-reference-java", configuration:
"shadowTest")
compile project(path:
":beam-sdks-java-extensions-google-cloud-platform-core", configuration:
"shadow")
-// TODO: Enable AWS and HDPS file system.
+// TODO: Enable AWS and HDFS file system.
}
// NOTE: runShadow must be used in order to run the job server. The standard
run
@@ -45,3 +62,59 @@ runShadow {
// Enable remote debugging.
jvmArgs = ["-Xdebug",
"-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"]
}
+
+class PortableValidatesRunnerConfig {
+ // Task name for validate runner case.
+ String name
+ // Fully qualified JobServerClass name to use.
+ String jobServerDriver
+ // A string representing the jobServer Configuration.
+ String jobServerConfig
+ // Flag to include tests for streaming or batch.
+ boolean streaming
+}
+
+def createPortableValidatesRunnerTask = {
+ def config = it ? it as PortableValidatesRunnerConfig : new
PortableValidatesRunnerConfig()
+ tasks.create(name: config.name, type: Test) {
+ group = "Verification"
+ description = "Validates the PortableRunner with JobServer
${config.jobServerDriver}"
+ systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
+
"--runner=org.apache.beam.runners.reference.testing.TestPortableRunner",
+ "--jobServerDriver=${config.jobServerDriver}",
+ config.jobServerConfig ?
"--jobServerConfig=${config.jobServerConfig}" : "",
+ ])
+ classpath = configurations.validatesRunner
+ testClassesDirs =
files(project(":beam-sdks-java-core").sourceSets.test.output.classesDirs,
project(":beam-runners-core-java").sourceSets.test.output.classesDirs)
+ maxParallelForks 1
+ if (config.streaming) {
+ useJUnit {
+ includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
+ excludeCategories
'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
+ excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesImpulse'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesSchema'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
+ }
+ } else {
+ useJUnit {
+ includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
+ excludeCategories
'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
+ excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesSchema'
+ excludeCategories
'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
+ }
+ }
+ }
+}
+
+createPortableValidatesRunnerTask(name: "validatesPortableRunner",
jobServerDriver: "org.apache.beam.runners.flink.FlinkJobServerDriver",
jobServerConfig: "", streaming: false)
+
+task validatesRunner {
+ group = "Verification"
+ description "Validates Portable Flink runner"
+ dependsOn validatesPortableRunner
+}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
index 528a487..8855c3d 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
@@ -89,7 +89,7 @@ public class FlinkJobInvocation implements JobInvocation {
}
private PipelineResult runPipeline() throws Exception {
- MetricsEnvironment.setMetricsSupported(true);
+ MetricsEnvironment.setMetricsSupported(false);
LOG.info("Translating pipeline to Flink program.");
// Fused pipeline proto.
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
index 5930e2b..6d6685c 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.flink;
+import com.google.common.base.Strings;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -45,9 +46,12 @@ public class FlinkJobServerDriver implements Runnable {
private final ListeningExecutorService executor;
private final ServerConfiguration configuration;
private final ServerFactory serverFactory;
+ private GrpcFnServer<InMemoryJobService> jobServer;
+ private GrpcFnServer<BeamFileSystemArtifactStagingService>
artifactStagingServer;
- private static class ServerConfiguration {
- @Option(name = "--job-host", required = true, usage = "The job server host
string")
+ /** Configuration for the jobServer. */
+ public static class ServerConfiguration {
+ @Option(name = "--job-host", usage = "The job server host string")
private String host = "";
@Option(name = "--artifacts-dir", usage = "The location to store staged
artifact files")
@@ -57,21 +61,11 @@ public class FlinkJobServerDriver implements Runnable {
private String flinkMasterUrl = "[auto]";
}
- public static void main(String[] args) throws IOException {
- ServerConfiguration configuration = new ServerConfiguration();
- CmdLineParser parser = new CmdLineParser(configuration);
- try {
- parser.parseArgument(args);
- } catch (CmdLineException e) {
- LOG.error("Unable to parse command line arguments.", e);
- printUsage(parser);
- return;
- }
+ public static void main(String[] args) throws Exception {
//TODO: Expose the fileSystem related options.
// Register standard file systems.
FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
- FlinkJobServerDriver driver = fromConfig(configuration);
- driver.run();
+ fromParams(args).run();
}
private static void printUsage(CmdLineParser parser) {
@@ -81,6 +75,20 @@ public class FlinkJobServerDriver implements Runnable {
System.err.println();
}
+ public static FlinkJobServerDriver fromParams(String[] args) {
+ ServerConfiguration configuration = new ServerConfiguration();
+ CmdLineParser parser = new CmdLineParser(configuration);
+ try {
+ parser.parseArgument(args);
+ } catch (CmdLineException e) {
+ LOG.error("Unable to parse command line arguments.", e);
+ printUsage(parser);
+ throw new IllegalArgumentException("Unable to parse command line
arguments.", e);
+ }
+
+ return fromConfig(configuration);
+ }
+
public static FlinkJobServerDriver fromConfig(ServerConfiguration
configuration) {
ThreadFactory threadFactory =
new
ThreadFactoryBuilder().setNameFormat("flink-runner-job-server").setDaemon(true).build();
@@ -109,28 +117,64 @@ public class FlinkJobServerDriver implements Runnable {
@Override
public void run() {
try {
- GrpcFnServer<InMemoryJobService> server = createJobServer();
- server.getServer().awaitTermination();
+ jobServer = createJobServer();
+ jobServer.getServer().awaitTermination();
} catch (InterruptedException e) {
LOG.warn("Job server interrupted", e);
} catch (Exception e) {
LOG.warn("Exception during job server creation", e);
+ } finally {
+ stop();
+ }
+ }
+
+ public String start() throws IOException {
+ jobServer = createJobServer();
+ return jobServer.getApiServiceDescriptor().getUrl();
+ }
+
+ public void stop() {
+ if (jobServer != null) {
+ try {
+ jobServer.close();
+ LOG.info("JobServer stopped on {}",
jobServer.getApiServiceDescriptor().getUrl());
+ jobServer = null;
+ } catch (Exception e) {
+ LOG.error("Error while closing the jobServer.", e);
+ }
+ }
+ if (artifactStagingServer != null) {
+ try {
+ artifactStagingServer.close();
+ LOG.info(
+ "ArtifactStagingServer stopped on {}",
+ artifactStagingServer.getApiServiceDescriptor().getUrl());
+ artifactStagingServer = null;
+ } catch (Exception e) {
+ LOG.error("Error while closing the artifactStagingServer.", e);
+ }
}
}
private GrpcFnServer<InMemoryJobService> createJobServer() throws
IOException {
InMemoryJobService service = createJobService();
- Endpoints.ApiServiceDescriptor descriptor =
-
Endpoints.ApiServiceDescriptor.newBuilder().setUrl(configuration.host).build();
- return GrpcFnServer.create(service, descriptor, serverFactory);
+ GrpcFnServer<InMemoryJobService> jobServiceGrpcFnServer;
+ if (Strings.isNullOrEmpty(configuration.host)) {
+ jobServiceGrpcFnServer = GrpcFnServer.allocatePortAndCreateFor(service,
serverFactory);
+ } else {
+ Endpoints.ApiServiceDescriptor descriptor =
+
Endpoints.ApiServiceDescriptor.newBuilder().setUrl(configuration.host).build();
+ jobServiceGrpcFnServer = GrpcFnServer.create(service, descriptor,
serverFactory);
+ }
+ LOG.info("JobServer started on {}",
jobServiceGrpcFnServer.getApiServiceDescriptor().getUrl());
+ return jobServiceGrpcFnServer;
}
private InMemoryJobService createJobService() throws IOException {
- GrpcFnServer<BeamFileSystemArtifactStagingService> artifactStagingService =
- createArtifactStagingService();
+ artifactStagingServer = createArtifactStagingService();
JobInvoker invoker = createJobInvoker();
return InMemoryJobService.create(
- artifactStagingService.getApiServiceDescriptor(),
+ artifactStagingServer.getApiServiceDescriptor(),
(String session) -> {
try {
return
BeamFileSystemArtifactStagingService.generateStagingSessionToken(
@@ -145,7 +189,12 @@ public class FlinkJobServerDriver implements Runnable {
private GrpcFnServer<BeamFileSystemArtifactStagingService>
createArtifactStagingService()
throws IOException {
BeamFileSystemArtifactStagingService service = new
BeamFileSystemArtifactStagingService();
- return GrpcFnServer.allocatePortAndCreateFor(service, serverFactory);
+ GrpcFnServer<BeamFileSystemArtifactStagingService> artifactStagingService =
+ GrpcFnServer.allocatePortAndCreateFor(service, serverFactory);
+ LOG.info(
+ "ArtifactStagingService started on {}",
+ artifactStagingService.getApiServiceDescriptor().getUrl());
+ return artifactStagingService;
}
private JobInvoker createJobInvoker() throws IOException {
diff --git a/runners/reference/java/build.gradle
b/runners/reference/java/build.gradle
index 30fe3cd..7bbaed0 100644
--- a/runners/reference/java/build.gradle
+++ b/runners/reference/java/build.gradle
@@ -24,7 +24,12 @@ ext.summary = """A Java implementation of the Beam Model
which utilizes the port
framework to execute user-definied functions."""
+configurations {
+ validatesRunner
+}
+
dependencies {
+ compile library.java.hamcrest_library
shadow project(path: ":beam-model-pipeline", configuration: "shadow")
shadow project(path: ":beam-runners-core-construction-java", configuration:
"shadow")
shadow project(path: ":beam-sdks-java-fn-execution", configuration: "shadow")
diff --git
a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/TestPortablePipelineOptions.java
b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/TestPortablePipelineOptions.java
new file mode 100644
index 0000000..c4bcba1
--- /dev/null
+++
b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/TestPortablePipelineOptions.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.reference.testing;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.options.Validation.Required;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+
+/** Options for {@link TestPortableRunner}. */
+public interface TestPortablePipelineOptions extends TestPipelineOptions,
PortablePipelineOptions {
+
+ @Required
+ @Description(
+ "Fully qualified class name of TestJobServiceDriver capable of managing
the JobService.")
+ Class getJobServerDriver();
+
+ void setJobServerDriver(Class jobServerDriver);
+
+ @Description("String containing comma separated arguments for the
JobServer.")
+ @Default.InstanceFactory(DefaultJobServerConfigFactory.class)
+ String[] getJobServerConfig();
+
+ void setJobServerConfig(String... jobServerConfig);
+
+ /** Factory for default config. */
+ class DefaultJobServerConfigFactory implements DefaultValueFactory<String[]>
{
+
+ @Override
+ public String[] create(PipelineOptions options) {
+ return new String[0];
+ }
+ }
+
+ /** Register {@link TestPortablePipelineOptions}. */
+ @AutoService(PipelineOptionsRegistrar.class)
+ class TestPortablePipelineOptionsRegistrar implements
PipelineOptionsRegistrar {
+
+ @Override
+ public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+ return ImmutableList.of(TestPortablePipelineOptions.class);
+ }
+ }
+}
diff --git
a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/TestPortableRunner.java
b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/TestPortableRunner.java
new file mode 100644
index 0000000..d7295f2
--- /dev/null
+++
b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/TestPortableRunner.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.reference.testing;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.lang.reflect.InvocationTargetException;
+import org.apache.beam.runners.reference.PortableRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.hamcrest.Matchers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link TestPortableRunner} is a pipeline runner that wraps a {@link
PortableRunner} when running
+ * tests against the {@link TestPipeline}.
+ *
+ * <p>This runner requires a JobServerDriver with following methods.
+ *
+ * <ul>
+ * <li>public static Object fromParams(String... params)
+ * <li>public String start() // Start JobServer and returns the JobServer
host and port.
+ * <li>public void stop() // Stop the JobServer and free all resources.
+ * </ul>
+ *
+ * @see TestPipeline
+ */
+public class TestPortableRunner extends PipelineRunner<PipelineResult> {
+ private static final Logger LOG =
LoggerFactory.getLogger(TestPortableRunner.class);
+ private final PortablePipelineOptions options;
+
+ private TestPortableRunner(PortablePipelineOptions options) {
+ this.options = options;
+ }
+
+ public static TestPortableRunner fromOptions(PipelineOptions options) {
+ return new TestPortableRunner(options.as(PortablePipelineOptions.class));
+ }
+
+ @Override
+ public PipelineResult run(Pipeline pipeline) {
+ TestPortablePipelineOptions testPortablePipelineOptions =
+ options.as(TestPortablePipelineOptions.class);
+ String jobServerHostPort;
+ Object jobServerDriver;
+ Class<?> jobServerDriverClass =
testPortablePipelineOptions.getJobServerDriver();
+ String[] parameters = testPortablePipelineOptions.getJobServerConfig();
+ try {
+ jobServerDriver =
+ InstanceBuilder.ofType(jobServerDriverClass)
+ .fromFactoryMethod("fromParams")
+ .withArg(String[].class, parameters)
+ .build();
+ jobServerHostPort = (String)
jobServerDriverClass.getMethod("start").invoke(jobServerDriver);
+ } catch (IllegalAccessException | NoSuchMethodException |
InvocationTargetException e) {
+ throw new IllegalArgumentException(e);
+ }
+
+ try {
+ PortablePipelineOptions portableOptions =
options.as(PortablePipelineOptions.class);
+ portableOptions.setRunner(PortableRunner.class);
+ portableOptions.setJobEndpoint(jobServerHostPort);
+ PortableRunner runner = PortableRunner.fromOptions(portableOptions);
+ PipelineResult result = runner.run(pipeline);
+ assertThat("Pipeline did not succeed.", result.waitUntilFinish(),
Matchers.is(State.DONE));
+ return result;
+ } finally {
+ try {
+ jobServerDriverClass.getMethod("stop").invoke(jobServerDriver);
+ } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException e) {
+ LOG.error(
+ String.format(
+ "Provided JobServiceDriver %s does not implement stop().",
jobServerDriverClass),
+ e);
+ }
+ }
+ }
+}