This is an automated email from the ASF dual-hosted git repository.

scott pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c5f3af  [BEAM-5303] Adding Python VR tests for Java Reference Runner.
     new 298c9bc  Merge pull request #7768: [BEAM-5303] Adding Python VR tests 
for Java Reference Runner.
3c5f3af is described below

commit 3c5f3afc7f8ee447d827a684d4c0fd7db02b6e29
Author: Daniel Oliveira <[email protected]>
AuthorDate: Wed Feb 6 16:01:18 2019 -0800

    [BEAM-5303] Adding Python VR tests for Java Reference Runner.
    
    Adding a gradle target and a test class for running ValidatesRunner
    tests with the Java Reference Runner. Also had to add a dependency to
    base_image_requirements.txt to get it to run properly. Note that this
    was also causing an error when running Flink's VR tests with Docker.
---
 .../portable/job/ReferenceRunnerJobService.java    |  37 ++++-
 .../portability/java_reference_runner_test.py      | 163 +++++++++++++++++++++
 sdks/python/build.gradle                           |  14 ++
 sdks/python/container/base_image_requirements.txt  |   3 +
 4 files changed, 212 insertions(+), 5 deletions(-)

diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java
index f0a83e1..07517af 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java
@@ -43,6 +43,7 @@ import org.apache.beam.runners.fnexecution.FnService;
 import org.apache.beam.runners.fnexecution.GrpcFnServer;
 import org.apache.beam.runners.fnexecution.ServerFactory;
 import 
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Status;
 import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusRuntimeException;
 import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver;
@@ -211,11 +212,18 @@ public class ReferenceRunnerJobService extends 
JobServiceImplBase implements FnS
   public void getState(
       GetJobStateRequest request, StreamObserver<GetJobStateResponse> 
responseObserver) {
     LOG.trace("{} {}", GetJobStateRequest.class.getSimpleName(), request);
-    responseObserver.onNext(
-        GetJobStateResponse.newBuilder()
-            .setState(jobStates.getOrDefault(request.getJobId(), 
Enum.UNRECOGNIZED))
-            .build());
-    responseObserver.onCompleted();
+    try {
+      responseObserver.onNext(
+          GetJobStateResponse.newBuilder()
+              .setState(jobStates.getOrDefault(request.getJobId(), 
Enum.UNRECOGNIZED))
+              .build());
+      responseObserver.onCompleted();
+    } catch (Exception e) {
+      String errMessage =
+          String.format("Encountered Unexpected Exception for Invocation %s", 
request.getJobId());
+      LOG.error(errMessage, e);
+      responseObserver.onError(Status.INTERNAL.withCause(e).asException());
+    }
   }
 
   @Override
@@ -242,6 +250,25 @@ public class ReferenceRunnerJobService extends 
JobServiceImplBase implements FnS
   }
 
   @Override
+  public void describePipelineOptions(
+      JobApi.DescribePipelineOptionsRequest request,
+      StreamObserver<JobApi.DescribePipelineOptionsResponse> responseObserver) 
{
+    LOG.trace("{} {}", 
JobApi.DescribePipelineOptionsRequest.class.getSimpleName(), request);
+    try {
+      JobApi.DescribePipelineOptionsResponse response =
+          JobApi.DescribePipelineOptionsResponse.newBuilder()
+              .addAllOptions(
+                  
PipelineOptionsFactory.describe(PipelineOptionsFactory.getRegisteredOptions()))
+              .build();
+      responseObserver.onNext(response);
+      responseObserver.onCompleted();
+    } catch (Exception e) {
+      LOG.error("Error describing pipeline options", e);
+      responseObserver.onError(Status.INTERNAL.withCause(e).asException());
+    }
+  }
+
+  @Override
   public void getMessageStream(
       JobMessagesRequest request, StreamObserver<JobMessagesResponse> 
responseObserver) {
     // Not implemented
diff --git 
a/sdks/python/apache_beam/runners/portability/java_reference_runner_test.py 
b/sdks/python/apache_beam/runners/portability/java_reference_runner_test.py
new file mode 100644
index 0000000..59cc5fb
--- /dev/null
+++ b/sdks/python/apache_beam/runners/portability/java_reference_runner_test.py
@@ -0,0 +1,163 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This file is an entry point for running validatesRunner tests with the Python
+# SDK and the Java Reference Runner. Executing this file starts up an instance
+# of the Java Reference Runner's job server before executing tests and teardown
+# the job server afterwards.
+from __future__ import absolute_import
+
+import argparse
+import logging
+import sys
+import unittest
+
+import apache_beam as beam
+from apache_beam.options.pipeline_options import DebugOptions
+from apache_beam.options.pipeline_options import PortableOptions
+from apache_beam.runners.portability import portable_runner
+from apache_beam.runners.portability import portable_runner_test
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+if __name__ == '__main__':
+  # Run as
+  #
+  # python -m apache_beam.runners.portability.java_reference_runner_test \
+  #     --job_server_jar=/path/to/job_server.jar \
+  #     --environment_type=docker \
+  #     [Test.test_method, ...]
+
+  parser = argparse.ArgumentParser(add_help=True)
+  parser.add_argument('--job_server_jar',
+                      help='Job server jar to submit jobs.')
+  parser.add_argument('--environment_type', default='docker',
+                      help='Environment type. docker or process')
+  parser.add_argument('--environment_config', help='Environment config.')
+
+  known_args, args = parser.parse_known_args(sys.argv)
+  sys.argv = args
+
+  job_server_jar = known_args.job_server_jar
+  environment_type = known_args.environment_type.lower()
+  environment_config = (
+      known_args.environment_config if known_args.environment_config else None)
+
+  # This is defined here to only be run when we invoke this file explicitly.
+  class JavaReferenceRunnerTest(portable_runner_test.PortableRunnerTest):
+    _use_grpc = True
+    _use_subprocesses = True
+
+    @classmethod
+    def _subprocess_command(cls, port):
+      return [
+          'java',
+          #'-Dorg.slf4j.simpleLogger.defaultLogLevel=info'
+          '-jar', job_server_jar,
+          '--port', str(port),
+      ]
+
+    @classmethod
+    def get_runner(cls):
+      return portable_runner.PortableRunner()
+
+    def create_options(self):
+      options = super(JavaReferenceRunnerTest, self).create_options()
+      options.view_as(DebugOptions).experiments = ['beam_fn_api']
+      options._all_options['parallelism'] = 1
+      options.view_as(PortableOptions).environment_type = (
+          environment_type.upper())
+      if environment_config:
+        options.view_as(PortableOptions).environment_config = 
environment_config
+      return options
+
+    def test_assert_that(self):
+      # We still want to make sure asserts fail, even if the message
+      # isn't right (BEAM-6600).
+      with self.assertRaises(Exception):
+        with self.create_pipeline() as p:
+          assert_that(p | beam.Create(['a', 'b']), equal_to(['a']))
+
+    def test_pardo_side_inputs(self):
+      # Skip until Reference Runner supports side unputs.
+      raise unittest.SkipTest("BEAM-2928")
+
+    def test_pardo_windowed_side_inputs(self):
+      # Skip until Reference Runner supports side unputs.
+      raise unittest.SkipTest("BEAM-2928")
+
+    def test_flattened_side_input(self):
+      # Skip until Reference Runner supports side unputs.
+      raise unittest.SkipTest("BEAM-2928")
+
+    def test_gbk_side_input(self):
+      # Skip until Reference Runner supports side unputs.
+      raise unittest.SkipTest("BEAM-2928")
+
+    def test_multimap_side_input(self):
+      # Skip until Reference Runner supports side unputs.
+      raise unittest.SkipTest("BEAM-2928")
+
+    def test_pardo_unfusable_side_inputs(self):
+      # Skip until Reference Runner supports side unputs.
+      raise unittest.SkipTest("BEAM-2928")
+
+    def test_pardo_state_only(self):
+      # Skip until Reference Runner supports state.
+      raise unittest.SkipTest("BEAM-2917")
+
+    def test_pardo_timers(self):
+      # Skip until Reference Runner supports state.
+      raise unittest.SkipTest("BEAM-2917")
+
+    def test_pardo_state_timers(self):
+      # Skip until Reference Runner supports state.
+      raise unittest.SkipTest("BEAM-2917")
+
+    # Can't read host files from within docker, read a "local" file there.
+    def test_read(self):
+      with self.create_pipeline() as p:
+        lines = p | beam.io.ReadFromText('/etc/profile')
+        assert_that(lines, lambda lines: len(lines) > 0)
+
+    def test_large_elements(self):
+      # Skip until Reference Runner supports large elements.
+      raise unittest.SkipTest("BEAM-6622")
+
+    def test_error_message_includes_stage(self):
+      # Skip until Reference Runner provides message support.
+      raise unittest.SkipTest("BEAM-6600")
+
+    def test_error_traceback_includes_user_code(self):
+      # Skip until Reference Runner provides message support.
+      raise unittest.SkipTest("BEAM-6600")
+
+    def test_metrics(self):
+      # Skip until Reference Runner provides metrics support.
+      raise unittest.SkipTest("BEAM-5452")
+
+    def test_non_user_metrics(self):
+      # Skip until Reference Runner provides metrics support.
+      raise unittest.SkipTest("BEAM-5452")
+
+    def test_progress_metrics(self):
+      # Skip until Reference Runner provides metrics support.
+      raise unittest.SkipTest("BEAM-5452")
+
+  # Run the tests.
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()
diff --git a/sdks/python/build.gradle b/sdks/python/build.gradle
index 6155d4f..2ca0119 100644
--- a/sdks/python/build.gradle
+++ b/sdks/python/build.gradle
@@ -368,6 +368,20 @@ task flinkValidatesRunner() {
   dependsOn 'flinkCompatibilityMatrixLoopback'
 }
 
+// Run Python ValidatesRunner tests using the Java ReferenceRunner as a job 
server and Docker as
+// the SDK environment.
+task javaReferenceRunnerValidatesRunner() {
+  dependsOn 'setupVirtualenv'
+  dependsOn ':beam-runners-reference-job-server:shadowJar'
+  dependsOn ':beam-sdks-python-container:docker'
+  doLast {
+    exec {
+      executable 'sh'
+      args '-c', ". ${project.ext.envdir}/bin/activate && pip install -e 
.[test] && python -m apache_beam.runners.portability.java_reference_runner_test 
--job_server_jar=${project(":beam-runners-reference-job-server:").shadowJar.archivePath}
 --environment_type=DOCKER"
+    }
+  }
+}
+
 task postCommit() {
   dependsOn "directRunnerIT"
   dependsOn "hdfsIntegrationTest"
diff --git a/sdks/python/container/base_image_requirements.txt 
b/sdks/python/container/base_image_requirements.txt
index ef5346e..032a9ca 100644
--- a/sdks/python/container/base_image_requirements.txt
+++ b/sdks/python/container/base_image_requirements.txt
@@ -68,3 +68,6 @@ protorpc==0.11.1
 python-gflags==3.0.6
 setuptools<=39.1.0 # requirement for Tensorflow.
 tensorflow==1.11.0
+
+# Packages needed for testing.
+tenacity>=5.0.2
\ No newline at end of file

Reply via email to