Avoid using beta grpc implementation.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/86de9de3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/86de9de3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/86de9de3 Branch: refs/heads/master Commit: 86de9de36cd25825dfdb553243d310a64d5d3471 Parents: da531b7 Author: Robert Bradshaw <[email protected]> Authored: Wed Sep 27 12:29:05 2017 -0700 Committer: Robert Bradshaw <[email protected]> Committed: Thu Sep 28 12:27:43 2017 -0700 ---------------------------------------------------------------------- .../apache_beam/runners/portability/fn_api_runner.py | 7 ++++--- .../runners/portability/universal_local_runner.py | 2 +- sdks/python/apache_beam/runners/worker/data_plane.py | 5 +++-- .../python/apache_beam/runners/worker/data_plane_test.py | 5 +++-- .../apache_beam/runners/worker/log_handler_test.py | 3 ++- sdks/python/apache_beam/runners/worker/sdk_worker.py | 3 ++- .../python/apache_beam/runners/worker/sdk_worker_test.py | 6 ++++-- sdks/python/gen_protos.py | 11 ++++++----- 8 files changed, 25 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/86de9de3/sdks/python/apache_beam/runners/portability/fn_api_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index 74bae11..21bf61a 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -38,6 +38,7 @@ from apache_beam.internal import pickler from apache_beam.io import iobase from apache_beam.metrics.execution import MetricsEnvironment from apache_beam.portability.api import beam_fn_api_pb2 +from apache_beam.portability.api import beam_fn_api_pb2_grpc from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.runners import pipeline_context from apache_beam.runners.portability import maptask_executor_runner @@ -1063,12 +1064,12 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): self.data_port = self.data_server.add_insecure_port('[::]:0') self.control_handler = streaming_rpc_handler( - beam_fn_api_pb2.BeamFnControlServicer, 'Control') - beam_fn_api_pb2.add_BeamFnControlServicer_to_server( + beam_fn_api_pb2_grpc.BeamFnControlServicer, 'Control') + beam_fn_api_pb2_grpc.add_BeamFnControlServicer_to_server( self.control_handler, self.control_server) self.data_plane_handler = data_plane.GrpcServerDataChannel() - beam_fn_api_pb2.add_BeamFnDataServicer_to_server( + beam_fn_api_pb2_grpc.add_BeamFnDataServicer_to_server( self.data_plane_handler, self.data_server) logging.info('starting control server on port %s', self.control_port) http://git-wip-us.apache.org/repos/asf/beam/blob/86de9de3/sdks/python/apache_beam/runners/portability/universal_local_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/portability/universal_local_runner.py b/sdks/python/apache_beam/runners/portability/universal_local_runner.py index 844b3a8..e3b588c 100644 --- a/sdks/python/apache_beam/runners/portability/universal_local_runner.py +++ b/sdks/python/apache_beam/runners/portability/universal_local_runner.py @@ -252,7 +252,7 @@ class BeamJob(threading.Thread): self.state = beam_job_api_pb2.JobState.CANCELLED -class JobServicer(beam_job_api_pb2.JobServiceServicer): +class JobServicer(beam_job_api_pb2_grpc.JobServiceServicer): """Servicer for the Beam Job API. Manages one or more pipelines, possibly concurrently. http://git-wip-us.apache.org/repos/asf/beam/blob/86de9de3/sdks/python/apache_beam/runners/worker/data_plane.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py b/sdks/python/apache_beam/runners/worker/data_plane.py index 737555a..5a511a0 100644 --- a/sdks/python/apache_beam/runners/worker/data_plane.py +++ b/sdks/python/apache_beam/runners/worker/data_plane.py @@ -31,6 +31,7 @@ import grpc from apache_beam.coders import coder_impl from apache_beam.portability.api import beam_fn_api_pb2 +from apache_beam.portability.api import beam_fn_api_pb2_grpc # This module is experimental. No backwards-compatibility guarantees. @@ -235,7 +236,7 @@ class GrpcClientDataChannel(_GrpcDataChannel): class GrpcServerDataChannel( - beam_fn_api_pb2.BeamFnDataServicer, _GrpcDataChannel): + beam_fn_api_pb2_grpc.BeamFnDataServicer, _GrpcDataChannel): """A DataChannel wrapping the server side of a BeamFnData connection.""" def Data(self, elements_iterator, context): @@ -281,7 +282,7 @@ class GrpcClientDataChannelFactory(DataChannelFactory): options=[("grpc.max_receive_message_length", -1), ("grpc.max_send_message_length", -1)]) self._data_channel_cache[url] = GrpcClientDataChannel( - beam_fn_api_pb2.BeamFnDataStub(grpc_channel)) + beam_fn_api_pb2_grpc.BeamFnDataStub(grpc_channel)) return self._data_channel_cache[url] def close(self): http://git-wip-us.apache.org/repos/asf/beam/blob/86de9de3/sdks/python/apache_beam/runners/worker/data_plane_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/worker/data_plane_test.py b/sdks/python/apache_beam/runners/worker/data_plane_test.py index db7ac0b..07ba8fd 100644 --- a/sdks/python/apache_beam/runners/worker/data_plane_test.py +++ b/sdks/python/apache_beam/runners/worker/data_plane_test.py @@ -30,6 +30,7 @@ from concurrent import futures import grpc from apache_beam.portability.api import beam_fn_api_pb2 +from apache_beam.portability.api import beam_fn_api_pb2_grpc from apache_beam.runners.worker import data_plane @@ -62,12 +63,12 @@ class DataChannelTest(unittest.TestCase): data_channel_service = data_plane.GrpcServerDataChannel() server = grpc.server(futures.ThreadPoolExecutor(max_workers=2)) - beam_fn_api_pb2.add_BeamFnDataServicer_to_server( + beam_fn_api_pb2_grpc.add_BeamFnDataServicer_to_server( data_channel_service, server) test_port = server.add_insecure_port('[::]:0') server.start() - data_channel_stub = beam_fn_api_pb2.BeamFnDataStub( + data_channel_stub = beam_fn_api_pb2_grpc.BeamFnDataStub( grpc.insecure_channel('localhost:%s' % test_port)) data_channel_client = data_plane.GrpcClientDataChannel(data_channel_stub) http://git-wip-us.apache.org/repos/asf/beam/blob/86de9de3/sdks/python/apache_beam/runners/worker/log_handler_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/worker/log_handler_test.py b/sdks/python/apache_beam/runners/worker/log_handler_test.py index 7edf667..e2cc194 100644 --- a/sdks/python/apache_beam/runners/worker/log_handler_test.py +++ b/sdks/python/apache_beam/runners/worker/log_handler_test.py @@ -23,10 +23,11 @@ from concurrent import futures import grpc from apache_beam.portability.api import beam_fn_api_pb2 +from apache_beam.portability.api import beam_fn_api_pb2_grpc from apache_beam.runners.worker import log_handler -class BeamFnLoggingServicer(beam_fn_api_pb2.BeamFnLoggingServicer): +class BeamFnLoggingServicer(beam_fn_api_pb2_grpc.BeamFnLoggingServicer): def __init__(self): self.log_records_received = [] http://git-wip-us.apache.org/repos/asf/beam/blob/86de9de3/sdks/python/apache_beam/runners/worker/sdk_worker.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index 3534e2b..ef33c6f 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -30,6 +30,7 @@ from concurrent import futures import grpc from apache_beam.portability.api import beam_fn_api_pb2 +from apache_beam.portability.api import beam_fn_api_pb2_grpc from apache_beam.runners.worker import bundle_processor from apache_beam.runners.worker import data_plane @@ -44,7 +45,7 @@ class SdkHarness(object): self._progress_thread_pool = futures.ThreadPoolExecutor(max_workers=1) def run(self): - contol_stub = beam_fn_api_pb2.BeamFnControlStub(self._control_channel) + contol_stub = beam_fn_api_pb2_grpc.BeamFnControlStub(self._control_channel) # TODO(robertwb): Wire up to new state api. state_stub = None self.worker = SdkWorker(state_stub, self._data_channel_factory) http://git-wip-us.apache.org/repos/asf/beam/blob/86de9de3/sdks/python/apache_beam/runners/worker/sdk_worker_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py index 7ad57cd..2532341 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py @@ -28,11 +28,12 @@ from concurrent import futures import grpc from apache_beam.portability.api import beam_fn_api_pb2 +from apache_beam.portability.api import beam_fn_api_pb2_grpc from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.runners.worker import sdk_worker -class BeamFnControlServicer(beam_fn_api_pb2.BeamFnControlServicer): +class BeamFnControlServicer(beam_fn_api_pb2_grpc.BeamFnControlServicer): def __init__(self, requests, raise_errors=True): self.requests = requests @@ -74,7 +75,8 @@ class SdkWorkerTest(unittest.TestCase): process_bundle_descriptor=process_bundle_descriptors))]) server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) - beam_fn_api_pb2.add_BeamFnControlServicer_to_server(test_controller, server) + beam_fn_api_pb2_grpc.add_BeamFnControlServicer_to_server( + test_controller, server) test_port = server.add_insecure_port("[::]:0") server.start() http://git-wip-us.apache.org/repos/asf/beam/blob/86de9de3/sdks/python/gen_protos.py ---------------------------------------------------------------------- diff --git a/sdks/python/gen_protos.py b/sdks/python/gen_protos.py index 59a98ce..d70158b 100644 --- a/sdks/python/gen_protos.py +++ b/sdks/python/gen_protos.py @@ -39,7 +39,7 @@ BEAM_PROTO_PATHS = [ PYTHON_OUTPUT_PATH = os.path.join('apache_beam', 'portability', 'api') -def generate_proto_files(): +def generate_proto_files(force=False): try: import grpc_tools @@ -54,7 +54,7 @@ def generate_proto_files(): out_dir = os.path.join(py_sdk_root, PYTHON_OUTPUT_PATH) out_files = [path for path in glob.glob(os.path.join(out_dir, '*_pb2.py'))] - if out_files and not proto_files: + if out_files and not proto_files and not force: # We have out_files but no protos; assume they're up to date. # This is actually the common case (e.g. installation from an sdist). logging.info('No proto files; using existing generated files.') @@ -69,7 +69,7 @@ def generate_proto_files(): 'No proto files found in %s.' % proto_dirs) # Regenerate iff the proto files are newer. - elif not out_files or len(out_files) < len(proto_files) or ( + elif force or not out_files or len(out_files) < len(proto_files) or ( min(os.path.getmtime(path) for path in out_files) <= max(os.path.getmtime(path) for path in proto_files)): try: @@ -92,7 +92,8 @@ def generate_proto_files(): ['--proto_path=%s' % builtin_protos] + ['--proto_path=%s' % d for d in proto_dirs] + ['--python_out=%s' % out_dir] + - ['--grpc_python_out=%s' % out_dir] + + # TODO(robertwb): Remove the prefix once it's the default. + ['--grpc_python_out=grpc_2_0:%s' % out_dir] + proto_files) ret_code = protoc.main(args) if ret_code: @@ -128,4 +129,4 @@ def _install_grpcio_tools_and_generate_proto_files(): if __name__ == '__main__': - generate_proto_files() + generate_proto_files(force=True)
