[ 
https://issues.apache.org/jira/browse/BEAM-4176?focusedWorklogId=127546&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127546
 ]

ASF GitHub Bot logged work on BEAM-4176:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Jul/18 22:31
            Start Date: 25/Jul/18 22:31
    Worklog Time Spent: 10m 
      Work Description: lukecwik closed pull request #5935: [BEAM-4176] Initial 
implementation for running portable runner tests
URL: https://github.com/apache/beam/pull/5935
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/flink/job-server/build.gradle 
b/runners/flink/job-server/build.gradle
index 552e04c3d74..c76a130d430 100644
--- a/runners/flink/job-server/build.gradle
+++ b/runners/flink/job-server/build.gradle
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+import groovy.json.JsonOutput
 
 apply plugin: org.apache.beam.gradle.BeamModulePlugin
 applyJavaNature(
@@ -23,16 +24,32 @@ applyJavaNature(
   },
 )
 
+/*
+ * We need to rely on manually specifying these evaluationDependsOn to ensure 
that
+ * the following projects are evaluated before we evaluate this project. This 
is because
+ * we are attempting to reference the "sourceSets.test.output" directly.
+ */
+evaluationDependsOn(":beam-sdks-java-core")
+evaluationDependsOn(":beam-runners-core-java")
+
 description = "Apache Beam :: Runners :: Flink :: Job Server"
 
 apply plugin: "application"
 
 mainClassName = "org.apache.beam.runners.flink.FlinkJobServerDriver"
 
+configurations {
+  validatesRunner
+}
+
 dependencies {
   compile project(path: ":beam-runners-flink_2.11", configuration: "shadow")
+  validatesRunner project(path: ":beam-runners-flink_2.11", configuration: 
"shadowTest")
+  validatesRunner project(path: ":beam-sdks-java-core", configuration: 
"shadowTest")
+  validatesRunner project(path: ":beam-runners-core-java", configuration: 
"shadowTest")
+  validatesRunner project(path: ":beam-runners-reference-java", configuration: 
"shadowTest")
   compile project(path: 
":beam-sdks-java-extensions-google-cloud-platform-core", configuration: 
"shadow")
-//  TODO: Enable AWS and HDPS file system.
+//  TODO: Enable AWS and HDFS file system.
 }
 
 // NOTE: runShadow must be used in order to run the job server. The standard 
run
@@ -45,3 +62,59 @@ runShadow {
   // Enable remote debugging.
   jvmArgs = ["-Xdebug", 
"-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"]
 }
+
+class PortableValidatesRunnerConfig {
+  // Task name for validate runner case.
+  String name
+  // Fully qualified JobServerClass name to use.
+  String jobServerDriver
+  // A string representing the jobServer Configuration.
+  String jobServerConfig
+  // Flag to include tests for streaming or batch.
+  boolean streaming
+}
+
+def createPortableValidatesRunnerTask = {
+  def config = it ? it as PortableValidatesRunnerConfig : new 
PortableValidatesRunnerConfig()
+  tasks.create(name: config.name, type: Test) {
+    group = "Verification"
+    description = "Validates the PortableRunner with JobServer 
${config.jobServerDriver}"
+    systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
+            
"--runner=org.apache.beam.runners.reference.testing.TestPortableRunner",
+            "--jobServerDriver=${config.jobServerDriver}",
+            config.jobServerConfig ? 
"--jobServerConfig=${config.jobServerConfig}" : "",
+    ])
+    classpath = configurations.validatesRunner
+    testClassesDirs = 
files(project(":beam-sdks-java-core").sourceSets.test.output.classesDirs, 
project(":beam-runners-core-java").sourceSets.test.output.classesDirs)
+    maxParallelForks 1
+    if (config.streaming) {
+      useJUnit {
+        includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
+        excludeCategories 
'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
+        excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
+        excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
+        excludeCategories 'org.apache.beam.sdk.testing.UsesImpulse'
+        excludeCategories 'org.apache.beam.sdk.testing.UsesSchema'
+        excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
+      }
+    } else {
+      useJUnit {
+        includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
+        excludeCategories 
'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
+        excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
+        excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
+        excludeCategories 'org.apache.beam.sdk.testing.UsesSchema'
+        excludeCategories 
'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs'
+        excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
+      }
+    }
+  }
+}
+
+createPortableValidatesRunnerTask(name: "validatesPortableRunner", 
jobServerDriver: "org.apache.beam.runners.flink.FlinkJobServerDriver", 
jobServerConfig: "", streaming: false)
+
+task validatesRunner {
+  group = "Verification"
+  description "Validates Portable Flink runner"
+  dependsOn validatesPortableRunner
+}
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
index 528a48779c7..8855c3d1be3 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
@@ -89,7 +89,7 @@ private FlinkJobInvocation(
   }
 
   private PipelineResult runPipeline() throws Exception {
-    MetricsEnvironment.setMetricsSupported(true);
+    MetricsEnvironment.setMetricsSupported(false);
 
     LOG.info("Translating pipeline to Flink program.");
     // Fused pipeline proto.
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
index 5930e2b1e83..6d6685c5c37 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.flink;
 
+import com.google.common.base.Strings;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -45,9 +46,12 @@
   private final ListeningExecutorService executor;
   private final ServerConfiguration configuration;
   private final ServerFactory serverFactory;
+  private GrpcFnServer<InMemoryJobService> jobServer;
+  private GrpcFnServer<BeamFileSystemArtifactStagingService> 
artifactStagingServer;
 
-  private static class ServerConfiguration {
-    @Option(name = "--job-host", required = true, usage = "The job server host 
string")
+  /** Configuration for the jobServer. */
+  public static class ServerConfiguration {
+    @Option(name = "--job-host", usage = "The job server host string")
     private String host = "";
 
     @Option(name = "--artifacts-dir", usage = "The location to store staged 
artifact files")
@@ -57,21 +61,11 @@
     private String flinkMasterUrl = "[auto]";
   }
 
-  public static void main(String[] args) throws IOException {
-    ServerConfiguration configuration = new ServerConfiguration();
-    CmdLineParser parser = new CmdLineParser(configuration);
-    try {
-      parser.parseArgument(args);
-    } catch (CmdLineException e) {
-      LOG.error("Unable to parse command line arguments.", e);
-      printUsage(parser);
-      return;
-    }
+  public static void main(String[] args) throws Exception {
     //TODO: Expose the fileSystem related options.
     // Register standard file systems.
     FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
-    FlinkJobServerDriver driver = fromConfig(configuration);
-    driver.run();
+    fromParams(args).run();
   }
 
   private static void printUsage(CmdLineParser parser) {
@@ -81,6 +75,20 @@ private static void printUsage(CmdLineParser parser) {
     System.err.println();
   }
 
+  public static FlinkJobServerDriver fromParams(String[] args) {
+    ServerConfiguration configuration = new ServerConfiguration();
+    CmdLineParser parser = new CmdLineParser(configuration);
+    try {
+      parser.parseArgument(args);
+    } catch (CmdLineException e) {
+      LOG.error("Unable to parse command line arguments.", e);
+      printUsage(parser);
+      throw new IllegalArgumentException("Unable to parse command line 
arguments.", e);
+    }
+
+    return fromConfig(configuration);
+  }
+
   public static FlinkJobServerDriver fromConfig(ServerConfiguration 
configuration) {
     ThreadFactory threadFactory =
         new 
ThreadFactoryBuilder().setNameFormat("flink-runner-job-server").setDaemon(true).build();
@@ -109,28 +117,64 @@ private FlinkJobServerDriver(
   @Override
   public void run() {
     try {
-      GrpcFnServer<InMemoryJobService> server = createJobServer();
-      server.getServer().awaitTermination();
+      jobServer = createJobServer();
+      jobServer.getServer().awaitTermination();
     } catch (InterruptedException e) {
       LOG.warn("Job server interrupted", e);
     } catch (Exception e) {
       LOG.warn("Exception during job server creation", e);
+    } finally {
+      stop();
+    }
+  }
+
+  public String start() throws IOException {
+    jobServer = createJobServer();
+    return jobServer.getApiServiceDescriptor().getUrl();
+  }
+
+  public void stop() {
+    if (jobServer != null) {
+      try {
+        jobServer.close();
+        LOG.info("JobServer stopped on {}", 
jobServer.getApiServiceDescriptor().getUrl());
+        jobServer = null;
+      } catch (Exception e) {
+        LOG.error("Error while closing the jobServer.", e);
+      }
+    }
+    if (artifactStagingServer != null) {
+      try {
+        artifactStagingServer.close();
+        LOG.info(
+            "ArtifactStagingServer stopped on {}",
+            artifactStagingServer.getApiServiceDescriptor().getUrl());
+        artifactStagingServer = null;
+      } catch (Exception e) {
+        LOG.error("Error while closing the artifactStagingServer.", e);
+      }
     }
   }
 
   private GrpcFnServer<InMemoryJobService> createJobServer() throws 
IOException {
     InMemoryJobService service = createJobService();
-    Endpoints.ApiServiceDescriptor descriptor =
-        
Endpoints.ApiServiceDescriptor.newBuilder().setUrl(configuration.host).build();
-    return GrpcFnServer.create(service, descriptor, serverFactory);
+    GrpcFnServer<InMemoryJobService> jobServiceGrpcFnServer;
+    if (Strings.isNullOrEmpty(configuration.host)) {
+      jobServiceGrpcFnServer = GrpcFnServer.allocatePortAndCreateFor(service, 
serverFactory);
+    } else {
+      Endpoints.ApiServiceDescriptor descriptor =
+          
Endpoints.ApiServiceDescriptor.newBuilder().setUrl(configuration.host).build();
+      jobServiceGrpcFnServer = GrpcFnServer.create(service, descriptor, 
serverFactory);
+    }
+    LOG.info("JobServer started on {}", 
jobServiceGrpcFnServer.getApiServiceDescriptor().getUrl());
+    return jobServiceGrpcFnServer;
   }
 
   private InMemoryJobService createJobService() throws IOException {
-    GrpcFnServer<BeamFileSystemArtifactStagingService> artifactStagingService =
-        createArtifactStagingService();
+    artifactStagingServer = createArtifactStagingService();
     JobInvoker invoker = createJobInvoker();
     return InMemoryJobService.create(
-        artifactStagingService.getApiServiceDescriptor(),
+        artifactStagingServer.getApiServiceDescriptor(),
         (String session) -> {
           try {
             return 
BeamFileSystemArtifactStagingService.generateStagingSessionToken(
@@ -145,7 +189,12 @@ private InMemoryJobService createJobService() throws 
IOException {
   private GrpcFnServer<BeamFileSystemArtifactStagingService> 
createArtifactStagingService()
       throws IOException {
     BeamFileSystemArtifactStagingService service = new 
BeamFileSystemArtifactStagingService();
-    return GrpcFnServer.allocatePortAndCreateFor(service, serverFactory);
+    GrpcFnServer<BeamFileSystemArtifactStagingService> artifactStagingService =
+        GrpcFnServer.allocatePortAndCreateFor(service, serverFactory);
+    LOG.info(
+        "ArtifactStagingService started on {}",
+        artifactStagingService.getApiServiceDescriptor().getUrl());
+    return artifactStagingService;
   }
 
   private JobInvoker createJobInvoker() throws IOException {
diff --git a/runners/reference/java/build.gradle 
b/runners/reference/java/build.gradle
index 30fe3cdee29..7bbaed09126 100644
--- a/runners/reference/java/build.gradle
+++ b/runners/reference/java/build.gradle
@@ -24,7 +24,12 @@ ext.summary = """A Java implementation of the Beam Model 
which utilizes the port
 framework to execute user-definied functions."""
 
 
+configurations {
+  validatesRunner
+}
+
 dependencies {
+  compile library.java.hamcrest_library
   shadow project(path: ":beam-model-pipeline", configuration: "shadow")
   shadow project(path: ":beam-runners-core-construction-java", configuration: 
"shadow")
   shadow project(path: ":beam-sdks-java-fn-execution", configuration: "shadow")
diff --git 
a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/TestPortablePipelineOptions.java
 
b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/TestPortablePipelineOptions.java
new file mode 100644
index 00000000000..c4bcba11ea5
--- /dev/null
+++ 
b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/TestPortablePipelineOptions.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.reference.testing;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.options.Validation.Required;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+
+/** Options for {@link TestPortableRunner}. */
+public interface TestPortablePipelineOptions extends TestPipelineOptions, 
PortablePipelineOptions {
+
+  @Required
+  @Description(
+      "Fully qualified class name of TestJobServiceDriver capable of managing 
the JobService.")
+  Class getJobServerDriver();
+
+  void setJobServerDriver(Class jobServerDriver);
+
+  @Description("String containing comma separated arguments for the 
JobServer.")
+  @Default.InstanceFactory(DefaultJobServerConfigFactory.class)
+  String[] getJobServerConfig();
+
+  void setJobServerConfig(String... jobServerConfig);
+
+  /** Factory for default config. */
+  class DefaultJobServerConfigFactory implements DefaultValueFactory<String[]> 
{
+
+    @Override
+    public String[] create(PipelineOptions options) {
+      return new String[0];
+    }
+  }
+
+  /** Register {@link TestPortablePipelineOptions}. */
+  @AutoService(PipelineOptionsRegistrar.class)
+  class TestPortablePipelineOptionsRegistrar implements 
PipelineOptionsRegistrar {
+
+    @Override
+    public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+      return ImmutableList.of(TestPortablePipelineOptions.class);
+    }
+  }
+}
diff --git 
a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/TestPortableRunner.java
 
b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/TestPortableRunner.java
new file mode 100644
index 00000000000..d7295f2e934
--- /dev/null
+++ 
b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/TestPortableRunner.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.reference.testing;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.lang.reflect.InvocationTargetException;
+import org.apache.beam.runners.reference.PortableRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.hamcrest.Matchers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link TestPortableRunner} is a pipeline runner that wraps a {@link 
PortableRunner} when running
+ * tests against the {@link TestPipeline}.
+ *
+ * <p>This runner requires a JobServerDriver with following methods.
+ *
+ * <ul>
+ *   <li>public static Object fromParams(String... params)
+ *   <li>public String start() // Start JobServer and returns the JobServer 
host and port.
+ *   <li>public void stop() // Stop the JobServer and free all resources.
+ * </ul>
+ *
+ * @see TestPipeline
+ */
+public class TestPortableRunner extends PipelineRunner<PipelineResult> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestPortableRunner.class);
+  private final PortablePipelineOptions options;
+
+  private TestPortableRunner(PortablePipelineOptions options) {
+    this.options = options;
+  }
+
+  public static TestPortableRunner fromOptions(PipelineOptions options) {
+    return new TestPortableRunner(options.as(PortablePipelineOptions.class));
+  }
+
+  @Override
+  public PipelineResult run(Pipeline pipeline) {
+    TestPortablePipelineOptions testPortablePipelineOptions =
+        options.as(TestPortablePipelineOptions.class);
+    String jobServerHostPort;
+    Object jobServerDriver;
+    Class<?> jobServerDriverClass = 
testPortablePipelineOptions.getJobServerDriver();
+    String[] parameters = testPortablePipelineOptions.getJobServerConfig();
+    try {
+      jobServerDriver =
+          InstanceBuilder.ofType(jobServerDriverClass)
+              .fromFactoryMethod("fromParams")
+              .withArg(String[].class, parameters)
+              .build();
+      jobServerHostPort = (String) 
jobServerDriverClass.getMethod("start").invoke(jobServerDriver);
+    } catch (IllegalAccessException | NoSuchMethodException | 
InvocationTargetException e) {
+      throw new IllegalArgumentException(e);
+    }
+
+    try {
+      PortablePipelineOptions portableOptions = 
options.as(PortablePipelineOptions.class);
+      portableOptions.setRunner(PortableRunner.class);
+      portableOptions.setJobEndpoint(jobServerHostPort);
+      PortableRunner runner = PortableRunner.fromOptions(portableOptions);
+      PipelineResult result = runner.run(pipeline);
+      assertThat("Pipeline did not succeed.", result.waitUntilFinish(), 
Matchers.is(State.DONE));
+      return result;
+    } finally {
+      try {
+        jobServerDriverClass.getMethod("stop").invoke(jobServerDriver);
+      } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException e) {
+        LOG.error(
+            String.format(
+                "Provided JobServiceDriver %s does not implement stop().", 
jobServerDriverClass),
+            e);
+      }
+    }
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 127546)
    Time Spent: 5h 40m  (was: 5.5h)

> Java: Portable batch runner passes all ValidatesRunner tests that 
> non-portable runner passes
> --------------------------------------------------------------------------------------------
>
>                 Key: BEAM-4176
>                 URL: https://issues.apache.org/jira/browse/BEAM-4176
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Ben Sidhom
>            Priority: Major
>          Time Spent: 5h 40m
>  Remaining Estimate: 0h
>
> We need this as a sanity check that runner execution is correct.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to