This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 75e3986  [BEAM-4176] Initial implementation for running portable 
runner tests (#5935)
75e3986 is described below

commit 75e3986171ce68a6fc4c42689698576c407f05b1
Author: Ankur <[email protected]>
AuthorDate: Wed Jul 25 15:31:35 2018 -0700

    [BEAM-4176] Initial implementation for running portable runner tests (#5935)
    
    * TestPortableRunner for validate runner tests
    
    * Dynamically assign jobServer host and port.
    
    * Documentation formatting
    
    * Disable Metrics for Flink Portable Runner
    
    * Change to gradle files and check for pipeline completion for flink 
validates runner test
    
    * Review Comments
    
    * Adding javadoc for default option factory
    
    * Update lambda to handle 0 method args.
    
    * Fix simple typo
    
    * Use varargs in array based method declaration.
    
    * Fixing Log message
---
 runners/flink/job-server/build.gradle              |  75 +++++++++++++++-
 .../beam/runners/flink/FlinkJobInvocation.java     |   2 +-
 .../beam/runners/flink/FlinkJobServerDriver.java   |  95 +++++++++++++++-----
 runners/reference/java/build.gradle                |   5 ++
 .../testing/TestPortablePipelineOptions.java       |  65 ++++++++++++++
 .../reference/testing/TestPortableRunner.java      | 100 +++++++++++++++++++++
 6 files changed, 317 insertions(+), 25 deletions(-)

diff --git a/runners/flink/job-server/build.gradle 
b/runners/flink/job-server/build.gradle
index 552e04c..c76a130 100644
--- a/runners/flink/job-server/build.gradle
+++ b/runners/flink/job-server/build.gradle
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+import groovy.json.JsonOutput
 
 apply plugin: org.apache.beam.gradle.BeamModulePlugin
 applyJavaNature(
@@ -23,16 +24,32 @@ applyJavaNature(
   },
 )
 
+/*
+ * We need to rely on manually specifying these evaluationDependsOn to ensure 
that
+ * the following projects are evaluated before we evaluate this project. This 
is because
+ * we are attempting to reference the "sourceSets.test.output" directly.
+ */
+evaluationDependsOn(":beam-sdks-java-core")
+evaluationDependsOn(":beam-runners-core-java")
+
 description = "Apache Beam :: Runners :: Flink :: Job Server"
 
 apply plugin: "application"
 
 mainClassName = "org.apache.beam.runners.flink.FlinkJobServerDriver"
 
+configurations {
+  validatesRunner
+}
+
 dependencies {
   compile project(path: ":beam-runners-flink_2.11", configuration: "shadow")
+  validatesRunner project(path: ":beam-runners-flink_2.11", configuration: 
"shadowTest")
+  validatesRunner project(path: ":beam-sdks-java-core", configuration: 
"shadowTest")
+  validatesRunner project(path: ":beam-runners-core-java", configuration: 
"shadowTest")
+  validatesRunner project(path: ":beam-runners-reference-java", configuration: 
"shadowTest")
   compile project(path: 
":beam-sdks-java-extensions-google-cloud-platform-core", configuration: 
"shadow")
-//  TODO: Enable AWS and HDPS file system.
+//  TODO: Enable AWS and HDFS file system.
 }
 
 // NOTE: runShadow must be used in order to run the job server. The standard 
run
@@ -45,3 +62,59 @@ runShadow {
   // Enable remote debugging.
   jvmArgs = ["-Xdebug", 
"-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"]
 }
+
+class PortableValidatesRunnerConfig {
+  // Task name for validate runner case.
+  String name
+  // Fully qualified JobServerClass name to use.
+  String jobServerDriver
+  // A string representing the jobServer Configuration.
+  String jobServerConfig
+  // Flag to include tests for streaming or batch.
+  boolean streaming
+}
+
+def createPortableValidatesRunnerTask = {
+  def config = it ? it as PortableValidatesRunnerConfig : new 
PortableValidatesRunnerConfig()
+  tasks.create(name: config.name, type: Test) {
+    group = "Verification"
+    description = "Validates the PortableRunner with JobServer 
${config.jobServerDriver}"
+    systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
+            
"--runner=org.apache.beam.runners.reference.testing.TestPortableRunner",
+            "--jobServerDriver=${config.jobServerDriver}",
+            config.jobServerConfig ? 
"--jobServerConfig=${config.jobServerConfig}" : "",
+    ])
+    classpath = configurations.validatesRunner
+    testClassesDirs = 
files(project(":beam-sdks-java-core").sourceSets.test.output.classesDirs, 
project(":beam-runners-core-java").sourceSets.test.output.classesDirs)
+    maxParallelForks 1
+    if (config.streaming) {
+      useJUnit {
+        includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
+        excludeCategories 
'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
+        excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
+        excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
+        excludeCategories 'org.apache.beam.sdk.testing.UsesImpulse'
+        excludeCategories 'org.apache.beam.sdk.testing.UsesSchema'
+        excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
+      }
+    } else {
+      useJUnit {
+        includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
+        excludeCategories 
'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
+        excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
+        excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
+        excludeCategories 'org.apache.beam.sdk.testing.UsesSchema'
+        excludeCategories 
'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs'
+        excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
+      }
+    }
+  }
+}
+
+createPortableValidatesRunnerTask(name: "validatesPortableRunner", 
jobServerDriver: "org.apache.beam.runners.flink.FlinkJobServerDriver", 
jobServerConfig: "", streaming: false)
+
+task validatesRunner {
+  group = "Verification"
+  description "Validates Portable Flink runner"
+  dependsOn validatesPortableRunner
+}
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
index 528a487..8855c3d 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
@@ -89,7 +89,7 @@ public class FlinkJobInvocation implements JobInvocation {
   }
 
   private PipelineResult runPipeline() throws Exception {
-    MetricsEnvironment.setMetricsSupported(true);
+    MetricsEnvironment.setMetricsSupported(false);
 
     LOG.info("Translating pipeline to Flink program.");
     // Fused pipeline proto.
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
index 5930e2b..6d6685c 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.flink;
 
+import com.google.common.base.Strings;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -45,9 +46,12 @@ public class FlinkJobServerDriver implements Runnable {
   private final ListeningExecutorService executor;
   private final ServerConfiguration configuration;
   private final ServerFactory serverFactory;
+  private GrpcFnServer<InMemoryJobService> jobServer;
+  private GrpcFnServer<BeamFileSystemArtifactStagingService> 
artifactStagingServer;
 
-  private static class ServerConfiguration {
-    @Option(name = "--job-host", required = true, usage = "The job server host 
string")
+  /** Configuration for the jobServer. */
+  public static class ServerConfiguration {
+    @Option(name = "--job-host", usage = "The job server host string")
     private String host = "";
 
     @Option(name = "--artifacts-dir", usage = "The location to store staged 
artifact files")
@@ -57,21 +61,11 @@ public class FlinkJobServerDriver implements Runnable {
     private String flinkMasterUrl = "[auto]";
   }
 
-  public static void main(String[] args) throws IOException {
-    ServerConfiguration configuration = new ServerConfiguration();
-    CmdLineParser parser = new CmdLineParser(configuration);
-    try {
-      parser.parseArgument(args);
-    } catch (CmdLineException e) {
-      LOG.error("Unable to parse command line arguments.", e);
-      printUsage(parser);
-      return;
-    }
+  public static void main(String[] args) throws Exception {
     //TODO: Expose the fileSystem related options.
     // Register standard file systems.
     FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
-    FlinkJobServerDriver driver = fromConfig(configuration);
-    driver.run();
+    fromParams(args).run();
   }
 
   private static void printUsage(CmdLineParser parser) {
@@ -81,6 +75,20 @@ public class FlinkJobServerDriver implements Runnable {
     System.err.println();
   }
 
+  public static FlinkJobServerDriver fromParams(String[] args) {
+    ServerConfiguration configuration = new ServerConfiguration();
+    CmdLineParser parser = new CmdLineParser(configuration);
+    try {
+      parser.parseArgument(args);
+    } catch (CmdLineException e) {
+      LOG.error("Unable to parse command line arguments.", e);
+      printUsage(parser);
+      throw new IllegalArgumentException("Unable to parse command line 
arguments.", e);
+    }
+
+    return fromConfig(configuration);
+  }
+
   public static FlinkJobServerDriver fromConfig(ServerConfiguration 
configuration) {
     ThreadFactory threadFactory =
         new 
ThreadFactoryBuilder().setNameFormat("flink-runner-job-server").setDaemon(true).build();
@@ -109,28 +117,64 @@ public class FlinkJobServerDriver implements Runnable {
   @Override
   public void run() {
     try {
-      GrpcFnServer<InMemoryJobService> server = createJobServer();
-      server.getServer().awaitTermination();
+      jobServer = createJobServer();
+      jobServer.getServer().awaitTermination();
     } catch (InterruptedException e) {
       LOG.warn("Job server interrupted", e);
     } catch (Exception e) {
       LOG.warn("Exception during job server creation", e);
+    } finally {
+      stop();
+    }
+  }
+
+  public String start() throws IOException {
+    jobServer = createJobServer();
+    return jobServer.getApiServiceDescriptor().getUrl();
+  }
+
+  public void stop() {
+    if (jobServer != null) {
+      try {
+        jobServer.close();
+        LOG.info("JobServer stopped on {}", 
jobServer.getApiServiceDescriptor().getUrl());
+        jobServer = null;
+      } catch (Exception e) {
+        LOG.error("Error while closing the jobServer.", e);
+      }
+    }
+    if (artifactStagingServer != null) {
+      try {
+        artifactStagingServer.close();
+        LOG.info(
+            "ArtifactStagingServer stopped on {}",
+            artifactStagingServer.getApiServiceDescriptor().getUrl());
+        artifactStagingServer = null;
+      } catch (Exception e) {
+        LOG.error("Error while closing the artifactStagingServer.", e);
+      }
     }
   }
 
   private GrpcFnServer<InMemoryJobService> createJobServer() throws 
IOException {
     InMemoryJobService service = createJobService();
-    Endpoints.ApiServiceDescriptor descriptor =
-        
Endpoints.ApiServiceDescriptor.newBuilder().setUrl(configuration.host).build();
-    return GrpcFnServer.create(service, descriptor, serverFactory);
+    GrpcFnServer<InMemoryJobService> jobServiceGrpcFnServer;
+    if (Strings.isNullOrEmpty(configuration.host)) {
+      jobServiceGrpcFnServer = GrpcFnServer.allocatePortAndCreateFor(service, 
serverFactory);
+    } else {
+      Endpoints.ApiServiceDescriptor descriptor =
+          
Endpoints.ApiServiceDescriptor.newBuilder().setUrl(configuration.host).build();
+      jobServiceGrpcFnServer = GrpcFnServer.create(service, descriptor, 
serverFactory);
+    }
+    LOG.info("JobServer started on {}", 
jobServiceGrpcFnServer.getApiServiceDescriptor().getUrl());
+    return jobServiceGrpcFnServer;
   }
 
   private InMemoryJobService createJobService() throws IOException {
-    GrpcFnServer<BeamFileSystemArtifactStagingService> artifactStagingService =
-        createArtifactStagingService();
+    artifactStagingServer = createArtifactStagingService();
     JobInvoker invoker = createJobInvoker();
     return InMemoryJobService.create(
-        artifactStagingService.getApiServiceDescriptor(),
+        artifactStagingServer.getApiServiceDescriptor(),
         (String session) -> {
           try {
             return 
BeamFileSystemArtifactStagingService.generateStagingSessionToken(
@@ -145,7 +189,12 @@ public class FlinkJobServerDriver implements Runnable {
   private GrpcFnServer<BeamFileSystemArtifactStagingService> 
createArtifactStagingService()
       throws IOException {
     BeamFileSystemArtifactStagingService service = new 
BeamFileSystemArtifactStagingService();
-    return GrpcFnServer.allocatePortAndCreateFor(service, serverFactory);
+    GrpcFnServer<BeamFileSystemArtifactStagingService> artifactStagingService =
+        GrpcFnServer.allocatePortAndCreateFor(service, serverFactory);
+    LOG.info(
+        "ArtifactStagingService started on {}",
+        artifactStagingService.getApiServiceDescriptor().getUrl());
+    return artifactStagingService;
   }
 
   private JobInvoker createJobInvoker() throws IOException {
diff --git a/runners/reference/java/build.gradle 
b/runners/reference/java/build.gradle
index 30fe3cd..7bbaed0 100644
--- a/runners/reference/java/build.gradle
+++ b/runners/reference/java/build.gradle
@@ -24,7 +24,12 @@ ext.summary = """A Java implementation of the Beam Model 
which utilizes the port
 framework to execute user-definied functions."""
 
 
+configurations {
+  validatesRunner
+}
+
 dependencies {
+  compile library.java.hamcrest_library
   shadow project(path: ":beam-model-pipeline", configuration: "shadow")
   shadow project(path: ":beam-runners-core-construction-java", configuration: 
"shadow")
   shadow project(path: ":beam-sdks-java-fn-execution", configuration: "shadow")
diff --git 
a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/TestPortablePipelineOptions.java
 
b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/TestPortablePipelineOptions.java
new file mode 100644
index 0000000..c4bcba1
--- /dev/null
+++ 
b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/TestPortablePipelineOptions.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.reference.testing;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.options.Validation.Required;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+
+/** Options for {@link TestPortableRunner}. */
+public interface TestPortablePipelineOptions extends TestPipelineOptions, 
PortablePipelineOptions {
+
+  @Required
+  @Description(
+      "Fully qualified class name of TestJobServiceDriver capable of managing 
the JobService.")
+  Class getJobServerDriver();
+
+  void setJobServerDriver(Class jobServerDriver);
+
+  @Description("String containing comma separated arguments for the 
JobServer.")
+  @Default.InstanceFactory(DefaultJobServerConfigFactory.class)
+  String[] getJobServerConfig();
+
+  void setJobServerConfig(String... jobServerConfig);
+
+  /** Factory for default config. */
+  class DefaultJobServerConfigFactory implements DefaultValueFactory<String[]> 
{
+
+    @Override
+    public String[] create(PipelineOptions options) {
+      return new String[0];
+    }
+  }
+
+  /** Register {@link TestPortablePipelineOptions}. */
+  @AutoService(PipelineOptionsRegistrar.class)
+  class TestPortablePipelineOptionsRegistrar implements 
PipelineOptionsRegistrar {
+
+    @Override
+    public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+      return ImmutableList.of(TestPortablePipelineOptions.class);
+    }
+  }
+}
diff --git 
a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/TestPortableRunner.java
 
b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/TestPortableRunner.java
new file mode 100644
index 0000000..d7295f2
--- /dev/null
+++ 
b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/TestPortableRunner.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.reference.testing;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.lang.reflect.InvocationTargetException;
+import org.apache.beam.runners.reference.PortableRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.hamcrest.Matchers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link TestPortableRunner} is a pipeline runner that wraps a {@link 
PortableRunner} when running
+ * tests against the {@link TestPipeline}.
+ *
+ * <p>This runner requires a JobServerDriver with following methods.
+ *
+ * <ul>
+ *   <li>public static Object fromParams(String... params)
+ *   <li>public String start() // Start JobServer and returns the JobServer 
host and port.
+ *   <li>public void stop() // Stop the JobServer and free all resources.
+ * </ul>
+ *
+ * @see TestPipeline
+ */
+public class TestPortableRunner extends PipelineRunner<PipelineResult> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestPortableRunner.class);
+  private final PortablePipelineOptions options;
+
+  private TestPortableRunner(PortablePipelineOptions options) {
+    this.options = options;
+  }
+
+  public static TestPortableRunner fromOptions(PipelineOptions options) {
+    return new TestPortableRunner(options.as(PortablePipelineOptions.class));
+  }
+
+  @Override
+  public PipelineResult run(Pipeline pipeline) {
+    TestPortablePipelineOptions testPortablePipelineOptions =
+        options.as(TestPortablePipelineOptions.class);
+    String jobServerHostPort;
+    Object jobServerDriver;
+    Class<?> jobServerDriverClass = 
testPortablePipelineOptions.getJobServerDriver();
+    String[] parameters = testPortablePipelineOptions.getJobServerConfig();
+    try {
+      jobServerDriver =
+          InstanceBuilder.ofType(jobServerDriverClass)
+              .fromFactoryMethod("fromParams")
+              .withArg(String[].class, parameters)
+              .build();
+      jobServerHostPort = (String) 
jobServerDriverClass.getMethod("start").invoke(jobServerDriver);
+    } catch (IllegalAccessException | NoSuchMethodException | 
InvocationTargetException e) {
+      throw new IllegalArgumentException(e);
+    }
+
+    try {
+      PortablePipelineOptions portableOptions = 
options.as(PortablePipelineOptions.class);
+      portableOptions.setRunner(PortableRunner.class);
+      portableOptions.setJobEndpoint(jobServerHostPort);
+      PortableRunner runner = PortableRunner.fromOptions(portableOptions);
+      PipelineResult result = runner.run(pipeline);
+      assertThat("Pipeline did not succeed.", result.waitUntilFinish(), 
Matchers.is(State.DONE));
+      return result;
+    } finally {
+      try {
+        jobServerDriverClass.getMethod("stop").invoke(jobServerDriver);
+      } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException e) {
+        LOG.error(
+            String.format(
+                "Provided JobServiceDriver %s does not implement stop().", 
jobServerDriverClass),
+            e);
+      }
+    }
+  }
+}

Reply via email to