This is an automated email from the ASF dual-hosted git repository.
scott pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3c5f3af [BEAM-5303] Adding Python VR tests for Java Reference Runner.
new 298c9bc Merge pull request #7768: [BEAM-5303] Adding Python VR tests
for Java Reference Runner.
3c5f3af is described below
commit 3c5f3afc7f8ee447d827a684d4c0fd7db02b6e29
Author: Daniel Oliveira <[email protected]>
AuthorDate: Wed Feb 6 16:01:18 2019 -0800
[BEAM-5303] Adding Python VR tests for Java Reference Runner.
Adding a gradle target and a test class for running ValidatesRunner
tests with the Java Reference Runner. Also had to add a dependency to
base_image_requirements.txt to get it to run properly. Note that this
was also causing an error when running Flink's VR tests with Docker.
---
.../portable/job/ReferenceRunnerJobService.java | 37 ++++-
.../portability/java_reference_runner_test.py | 163 +++++++++++++++++++++
sdks/python/build.gradle | 14 ++
sdks/python/container/base_image_requirements.txt | 3 +
4 files changed, 212 insertions(+), 5 deletions(-)
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java
index f0a83e1..07517af 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java
@@ -43,6 +43,7 @@ import org.apache.beam.runners.fnexecution.FnService;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.ServerFactory;
import
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusRuntimeException;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver;
@@ -211,11 +212,18 @@ public class ReferenceRunnerJobService extends
JobServiceImplBase implements FnS
public void getState(
GetJobStateRequest request, StreamObserver<GetJobStateResponse>
responseObserver) {
LOG.trace("{} {}", GetJobStateRequest.class.getSimpleName(), request);
- responseObserver.onNext(
- GetJobStateResponse.newBuilder()
- .setState(jobStates.getOrDefault(request.getJobId(),
Enum.UNRECOGNIZED))
- .build());
- responseObserver.onCompleted();
+ try {
+ responseObserver.onNext(
+ GetJobStateResponse.newBuilder()
+ .setState(jobStates.getOrDefault(request.getJobId(),
Enum.UNRECOGNIZED))
+ .build());
+ responseObserver.onCompleted();
+ } catch (Exception e) {
+ String errMessage =
+ String.format("Encountered Unexpected Exception for Invocation %s",
request.getJobId());
+ LOG.error(errMessage, e);
+ responseObserver.onError(Status.INTERNAL.withCause(e).asException());
+ }
}
@Override
@@ -242,6 +250,25 @@ public class ReferenceRunnerJobService extends
JobServiceImplBase implements FnS
}
@Override
+ public void describePipelineOptions(
+ JobApi.DescribePipelineOptionsRequest request,
+ StreamObserver<JobApi.DescribePipelineOptionsResponse> responseObserver)
{
+ LOG.trace("{} {}",
JobApi.DescribePipelineOptionsRequest.class.getSimpleName(), request);
+ try {
+ JobApi.DescribePipelineOptionsResponse response =
+ JobApi.DescribePipelineOptionsResponse.newBuilder()
+ .addAllOptions(
+
PipelineOptionsFactory.describe(PipelineOptionsFactory.getRegisteredOptions()))
+ .build();
+ responseObserver.onNext(response);
+ responseObserver.onCompleted();
+ } catch (Exception e) {
+ LOG.error("Error describing pipeline options", e);
+ responseObserver.onError(Status.INTERNAL.withCause(e).asException());
+ }
+ }
+
+ @Override
public void getMessageStream(
JobMessagesRequest request, StreamObserver<JobMessagesResponse>
responseObserver) {
// Not implemented
diff --git
a/sdks/python/apache_beam/runners/portability/java_reference_runner_test.py
b/sdks/python/apache_beam/runners/portability/java_reference_runner_test.py
new file mode 100644
index 0000000..59cc5fb
--- /dev/null
+++ b/sdks/python/apache_beam/runners/portability/java_reference_runner_test.py
@@ -0,0 +1,163 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This file is an entry point for running validatesRunner tests with the Python
+# SDK and the Java Reference Runner. Executing this file starts up an instance
+# of the Java Reference Runner's job server before executing tests and teardown
+# the job server afterwards.
+from __future__ import absolute_import
+
+import argparse
+import logging
+import sys
+import unittest
+
+import apache_beam as beam
+from apache_beam.options.pipeline_options import DebugOptions
+from apache_beam.options.pipeline_options import PortableOptions
+from apache_beam.runners.portability import portable_runner
+from apache_beam.runners.portability import portable_runner_test
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+if __name__ == '__main__':
+ # Run as
+ #
+ # python -m apache_beam.runners.portability.java_reference_runner_test \
+ # --job_server_jar=/path/to/job_server.jar \
+ # --environment_type=docker \
+ # [Test.test_method, ...]
+
+ parser = argparse.ArgumentParser(add_help=True)
+ parser.add_argument('--job_server_jar',
+ help='Job server jar to submit jobs.')
+ parser.add_argument('--environment_type', default='docker',
+ help='Environment type. docker or process')
+ parser.add_argument('--environment_config', help='Environment config.')
+
+ known_args, args = parser.parse_known_args(sys.argv)
+ sys.argv = args
+
+ job_server_jar = known_args.job_server_jar
+ environment_type = known_args.environment_type.lower()
+ environment_config = (
+ known_args.environment_config if known_args.environment_config else None)
+
+ # This is defined here to only be run when we invoke this file explicitly.
+ class JavaReferenceRunnerTest(portable_runner_test.PortableRunnerTest):
+ _use_grpc = True
+ _use_subprocesses = True
+
+ @classmethod
+ def _subprocess_command(cls, port):
+ return [
+ 'java',
+ #'-Dorg.slf4j.simpleLogger.defaultLogLevel=info'
+ '-jar', job_server_jar,
+ '--port', str(port),
+ ]
+
+ @classmethod
+ def get_runner(cls):
+ return portable_runner.PortableRunner()
+
+ def create_options(self):
+ options = super(JavaReferenceRunnerTest, self).create_options()
+ options.view_as(DebugOptions).experiments = ['beam_fn_api']
+ options._all_options['parallelism'] = 1
+ options.view_as(PortableOptions).environment_type = (
+ environment_type.upper())
+ if environment_config:
+ options.view_as(PortableOptions).environment_config =
environment_config
+ return options
+
+ def test_assert_that(self):
+ # We still want to make sure asserts fail, even if the message
+ # isn't right (BEAM-6600).
+ with self.assertRaises(Exception):
+ with self.create_pipeline() as p:
+ assert_that(p | beam.Create(['a', 'b']), equal_to(['a']))
+
+ def test_pardo_side_inputs(self):
+ # Skip until Reference Runner supports side unputs.
+ raise unittest.SkipTest("BEAM-2928")
+
+ def test_pardo_windowed_side_inputs(self):
+ # Skip until Reference Runner supports side unputs.
+ raise unittest.SkipTest("BEAM-2928")
+
+ def test_flattened_side_input(self):
+ # Skip until Reference Runner supports side unputs.
+ raise unittest.SkipTest("BEAM-2928")
+
+ def test_gbk_side_input(self):
+ # Skip until Reference Runner supports side unputs.
+ raise unittest.SkipTest("BEAM-2928")
+
+ def test_multimap_side_input(self):
+ # Skip until Reference Runner supports side unputs.
+ raise unittest.SkipTest("BEAM-2928")
+
+ def test_pardo_unfusable_side_inputs(self):
+ # Skip until Reference Runner supports side unputs.
+ raise unittest.SkipTest("BEAM-2928")
+
+ def test_pardo_state_only(self):
+ # Skip until Reference Runner supports state.
+ raise unittest.SkipTest("BEAM-2917")
+
+ def test_pardo_timers(self):
+ # Skip until Reference Runner supports state.
+ raise unittest.SkipTest("BEAM-2917")
+
+ def test_pardo_state_timers(self):
+ # Skip until Reference Runner supports state.
+ raise unittest.SkipTest("BEAM-2917")
+
+ # Can't read host files from within docker, read a "local" file there.
+ def test_read(self):
+ with self.create_pipeline() as p:
+ lines = p | beam.io.ReadFromText('/etc/profile')
+ assert_that(lines, lambda lines: len(lines) > 0)
+
+ def test_large_elements(self):
+ # Skip until Reference Runner supports large elements.
+ raise unittest.SkipTest("BEAM-6622")
+
+ def test_error_message_includes_stage(self):
+ # Skip until Reference Runner provides message support.
+ raise unittest.SkipTest("BEAM-6600")
+
+ def test_error_traceback_includes_user_code(self):
+ # Skip until Reference Runner provides message support.
+ raise unittest.SkipTest("BEAM-6600")
+
+ def test_metrics(self):
+ # Skip until Reference Runner provides metrics support.
+ raise unittest.SkipTest("BEAM-5452")
+
+ def test_non_user_metrics(self):
+ # Skip until Reference Runner provides metrics support.
+ raise unittest.SkipTest("BEAM-5452")
+
+ def test_progress_metrics(self):
+ # Skip until Reference Runner provides metrics support.
+ raise unittest.SkipTest("BEAM-5452")
+
+ # Run the tests.
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
diff --git a/sdks/python/build.gradle b/sdks/python/build.gradle
index 6155d4f..2ca0119 100644
--- a/sdks/python/build.gradle
+++ b/sdks/python/build.gradle
@@ -368,6 +368,20 @@ task flinkValidatesRunner() {
dependsOn 'flinkCompatibilityMatrixLoopback'
}
+// Run Python ValidatesRunner tests using the Java ReferenceRunner as a job
server and Docker as
+// the SDK environment.
+task javaReferenceRunnerValidatesRunner() {
+ dependsOn 'setupVirtualenv'
+ dependsOn ':beam-runners-reference-job-server:shadowJar'
+ dependsOn ':beam-sdks-python-container:docker'
+ doLast {
+ exec {
+ executable 'sh'
+ args '-c', ". ${project.ext.envdir}/bin/activate && pip install -e
.[test] && python -m apache_beam.runners.portability.java_reference_runner_test
--job_server_jar=${project(":beam-runners-reference-job-server:").shadowJar.archivePath}
--environment_type=DOCKER"
+ }
+ }
+}
+
task postCommit() {
dependsOn "directRunnerIT"
dependsOn "hdfsIntegrationTest"
diff --git a/sdks/python/container/base_image_requirements.txt
b/sdks/python/container/base_image_requirements.txt
index ef5346e..032a9ca 100644
--- a/sdks/python/container/base_image_requirements.txt
+++ b/sdks/python/container/base_image_requirements.txt
@@ -68,3 +68,6 @@ protorpc==0.11.1
python-gflags==3.0.6
setuptools<=39.1.0 # requirement for Tensorflow.
tensorflow==1.11.0
+
+# Packages needed for testing.
+tenacity>=5.0.2
\ No newline at end of file