kennknowles commented on a change in pull request #11792:
URL: https://github.com/apache/beam/pull/11792#discussion_r429029786



##########
File path: runners/portability/java/build.gradle
##########
@@ -1,3 +1,13 @@
+import groovy.json.JsonOutput
+
+import java.nio.file.FileSystems

Review comment:
       TODO(me): remove these imports
   
   I first went through the "normal" route of using all this stuff to watch for 
the pid file but it was verbose and had race conditions. No point. Just check 
and sleep, now.

##########
File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactRetrievalService.java
##########
@@ -39,7 +39,7 @@
 public class ArtifactRetrievalService
     extends ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceImplBase 
implements FnService {
 
-  public static final int DEFAULT_BUFFER_SIZE = 4 << 20; // 4 MB
+  public static final int DEFAULT_BUFFER_SIZE = 2 << 20; // 2 MB

Review comment:
       @robertwb @lukecwik first thing I hit putting this together was 
exceeding message size limit

##########
File path: runners/portability/java/build.gradle
##########
@@ -31,9 +45,123 @@ dependencies {
   compile project(path: ":sdks:java:harness", configuration: "shadow")
   compile library.java.vendored_grpc_1_26_0
   compile library.java.slf4j_api
+
   testCompile project(path: ":runners:core-construction-java", configuration: 
"testRuntime")
   testCompile library.java.hamcrest_core
   testCompile library.java.junit
   testCompile library.java.mockito_core
   testCompile library.java.slf4j_jdk14
+
+  validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
+  validatesRunner project(path: ":runners:core-java", configuration: 
"testRuntime")
+  validatesRunner project(path: project.path, configuration: "testRuntime")
+}
+
+
+project.evaluationDependsOn(":sdks:java:core")
+project.evaluationDependsOn(":runners:core-java")
+
+ext.virtualenvDir = "${project.buildDir}/virtualenv"
+ext.localJobServicePidFile = "${project.buildDir}/local_job_service_pid"
+ext.localJobServicePortFile = project.hasProperty("localJobServicePortFile") ? 
project.property("localJobServicePortFile") : 
"${project.buildDir}/local_job_service_port"
+ext.localJobServiceStdoutFile = "${project.buildDir}/local_job_service_stdout"
+
+ext.pythonSdkDir = "${project.rootDir}/sdks/python"
+
+void execInVirtualenv(String... args) {
+  String shellCommand = ". ${virtualenvDir}/bin/activate && " + args.collect { 
arg -> "'" + arg.replaceAll("'", "\\'") + "'" }.join(" ")
+  exec {
+    workingDir pythonSdkDir
+    commandLine "sh", "-c", shellCommand
+  }
 }
+
+void execBackgroundInVirtualenv(String... args) {
+  String shellCommand = ". ${virtualenvDir}/bin/activate && " + args.collect { 
arg -> "'" + arg.replaceAll("'", "\\'") + "'" }.join(" ")
+  println "execBackgroundInVirtualEnv: ${shellCommand}"
+  ProcessBuilder pb = new 
ProcessBuilder().redirectErrorStream(true).directory(new 
File(pythonSdkDir)).command(["sh", "-c", shellCommand])
+  Process proc = pb.start();
+
+  // redirectIO does not work for connecting to groovy/gradle stdout
+  BufferedReader reader = new BufferedReader(new 
InputStreamReader(proc.getInputStream()));
+  String line
+  while ((line = reader.readLine()) != null) {
+    println line
+  }
+  proc.waitFor();
+}
+
+task virtualenv {
+  doLast {
+    exec {
+      commandLine "virtualenv", virtualenvDir, "--python=python3"
+    }
+    execInVirtualenv "pip", "install", "--retries", "10", "--upgrade", 
"tox==3.11.1", "--requirement", 
"${project.rootDir}/sdks/python/build-requirements.txt"
+    execInVirtualenv "python", "setup.py", "build", "--build-base=${buildDir}"
+    execInVirtualenv "pip", "install", "-e", "."
+  }
+}
+
+task startLocalJobService {
+  dependsOn virtualenv
+
+  doLast {
+    execBackgroundInVirtualenv "python",
+        "-m", "apache_beam.runners.portability.local_job_service_main",
+        "--background",
+        "--stdout_file=${localJobServiceStdoutFile}",
+        "--pid_file=${localJobServicePidFile}",
+        "--port_file=${localJobServicePortFile}"

Review comment:
       It would be nicer for the port to be an output of the task and read it 
in by the other task, but that plumbing seems to be unnatural based on gradle 
docs I could find.

##########
File path: runners/portability/java/build.gradle
##########
@@ -31,9 +45,123 @@ dependencies {
   compile project(path: ":sdks:java:harness", configuration: "shadow")
   compile library.java.vendored_grpc_1_26_0
   compile library.java.slf4j_api
+
   testCompile project(path: ":runners:core-construction-java", configuration: 
"testRuntime")
   testCompile library.java.hamcrest_core
   testCompile library.java.junit
   testCompile library.java.mockito_core
   testCompile library.java.slf4j_jdk14
+
+  validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
+  validatesRunner project(path: ":runners:core-java", configuration: 
"testRuntime")
+  validatesRunner project(path: project.path, configuration: "testRuntime")
+}
+
+
+project.evaluationDependsOn(":sdks:java:core")
+project.evaluationDependsOn(":runners:core-java")
+
+ext.virtualenvDir = "${project.buildDir}/virtualenv"
+ext.localJobServicePidFile = "${project.buildDir}/local_job_service_pid"
+ext.localJobServicePortFile = project.hasProperty("localJobServicePortFile") ? 
project.property("localJobServicePortFile") : 
"${project.buildDir}/local_job_service_port"
+ext.localJobServiceStdoutFile = "${project.buildDir}/local_job_service_stdout"
+
+ext.pythonSdkDir = "${project.rootDir}/sdks/python"
+
+void execInVirtualenv(String... args) {
+  String shellCommand = ". ${virtualenvDir}/bin/activate && " + args.collect { 
arg -> "'" + arg.replaceAll("'", "\\'") + "'" }.join(" ")
+  exec {
+    workingDir pythonSdkDir
+    commandLine "sh", "-c", shellCommand
+  }
 }
+
+void execBackgroundInVirtualenv(String... args) {
+  String shellCommand = ". ${virtualenvDir}/bin/activate && " + args.collect { 
arg -> "'" + arg.replaceAll("'", "\\'") + "'" }.join(" ")
+  println "execBackgroundInVirtualEnv: ${shellCommand}"
+  ProcessBuilder pb = new 
ProcessBuilder().redirectErrorStream(true).directory(new 
File(pythonSdkDir)).command(["sh", "-c", shellCommand])
+  Process proc = pb.start();
+
+  // redirectIO does not work for connecting to groovy/gradle stdout
+  BufferedReader reader = new BufferedReader(new 
InputStreamReader(proc.getInputStream()));
+  String line
+  while ((line = reader.readLine()) != null) {
+    println line
+  }
+  proc.waitFor();
+}
+
+task virtualenv {
+  doLast {
+    exec {
+      commandLine "virtualenv", virtualenvDir, "--python=python3"
+    }
+    execInVirtualenv "pip", "install", "--retries", "10", "--upgrade", 
"tox==3.11.1", "--requirement", 
"${project.rootDir}/sdks/python/build-requirements.txt"
+    execInVirtualenv "python", "setup.py", "build", "--build-base=${buildDir}"
+    execInVirtualenv "pip", "install", "-e", "."
+  }
+}
+
+task startLocalJobService {
+  dependsOn virtualenv
+
+  doLast {
+    execBackgroundInVirtualenv "python",
+        "-m", "apache_beam.runners.portability.local_job_service_main",
+        "--background",
+        "--stdout_file=${localJobServiceStdoutFile}",
+        "--pid_file=${localJobServicePidFile}",
+        "--port_file=${localJobServicePortFile}"
+
+    File pidFile = new File(localJobServicePidFile)
+    int totalSleep = 0
+    while (!pidFile.exists()) {
+      sleep(500)
+      totalSleep += 500
+      if (totalSleep > 5000) {
+        throw new RuntimeException("Local job service pid file never showed 
up");
+      }
+    }
+  }
+}
+
+task stopLocalJobService {
+  doLast {
+    execInVirtualenv "python",
+        "-m", "apache_beam.runners.portability.local_job_service_main",
+        "--stop",
+        "--pid_file=${localJobServicePidFile}"
+  }
+}
+
+startLocalJobService.finalizedBy stopLocalJobService
+
+/**
+ * Runs Java ValidatesRunner tests against the Universal Local Runner (ULR) 
aka local_job_service_main
+ * with subprocess SDK harness environments.
+ */
+task ulrValidatesRunnerTests(type: Test) {
+  dependsOn ":sdks:java:container:docker"
+
+  if (!project.hasProperty("localJobServicePortFile")) {
+    dependsOn startLocalJobService
+  }
+
+  group = "Verification"
+  description "PortableRunner Java subprocess ValidatesRunner suite"
+  classpath = configurations.validatesRunner
+  systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
+      "--runner=TestUniversalRunner",
+      "--localJobServicePortFile=${localJobServicePortFile}"
+  ])
+  testClassesDirs = 
files(project(":sdks:java:core").sourceSets.test.output.classesDirs)
+  useJUnit {
+    includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
+  }
+  filter {
+    includeTestsMatching 'ImpulseTest'

Review comment:
       TODO(me): remove this once we get past the sanity checking phase

##########
File path: 
runners/portability/java/src/main/java/org/apache/beam/runners/portability/testing/TestUniversalRunner.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.portability.testing;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import org.apache.beam.runners.portability.PortableRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.hamcrest.Matchers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link PipelineRunner} a {@link Pipeline} against a {@code JobService}. 
*/
+public class TestUniversalRunner extends PipelineRunner<PipelineResult> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestUniversalRunner.class);
+
+  private final PipelineOptions options;
+
+  private TestUniversalRunner(PipelineOptions options) {
+    this.options = options;
+  }
+
+  /**
+   * Constructs a runner from the provided options.
+   *
+   * @param options Properties which configure the runner.
+   * @return The newly created runner.
+   */
+  public static TestUniversalRunner fromOptions(PipelineOptions options) {

Review comment:
       Had to add this, because `TestPortableRunner` couples "check that the 
job succeeds" logic with a bunch of other things having to do with launching an 
existing Java runner as a portable runner, not relevant to actual portable 
runner services.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to