Avoid using beta grpc implementation.

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/86de9de3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/86de9de3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/86de9de3

Branch: refs/heads/master
Commit: 86de9de36cd25825dfdb553243d310a64d5d3471
Parents: da531b7
Author: Robert Bradshaw <[email protected]>
Authored: Wed Sep 27 12:29:05 2017 -0700
Committer: Robert Bradshaw <[email protected]>
Committed: Thu Sep 28 12:27:43 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/runners/portability/fn_api_runner.py     |  7 ++++---
 .../runners/portability/universal_local_runner.py        |  2 +-
 sdks/python/apache_beam/runners/worker/data_plane.py     |  5 +++--
 .../python/apache_beam/runners/worker/data_plane_test.py |  5 +++--
 .../apache_beam/runners/worker/log_handler_test.py       |  3 ++-
 sdks/python/apache_beam/runners/worker/sdk_worker.py     |  3 ++-
 .../python/apache_beam/runners/worker/sdk_worker_test.py |  6 ++++--
 sdks/python/gen_protos.py                                | 11 ++++++-----
 8 files changed, 25 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/86de9de3/sdks/python/apache_beam/runners/portability/fn_api_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index 74bae11..21bf61a 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -38,6 +38,7 @@ from apache_beam.internal import pickler
 from apache_beam.io import iobase
 from apache_beam.metrics.execution import MetricsEnvironment
 from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import beam_fn_api_pb2_grpc
 from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.runners import pipeline_context
 from apache_beam.runners.portability import maptask_executor_runner
@@ -1063,12 +1064,12 @@ class 
FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
       self.data_port = self.data_server.add_insecure_port('[::]:0')
 
       self.control_handler = streaming_rpc_handler(
-          beam_fn_api_pb2.BeamFnControlServicer, 'Control')
-      beam_fn_api_pb2.add_BeamFnControlServicer_to_server(
+          beam_fn_api_pb2_grpc.BeamFnControlServicer, 'Control')
+      beam_fn_api_pb2_grpc.add_BeamFnControlServicer_to_server(
           self.control_handler, self.control_server)
 
       self.data_plane_handler = data_plane.GrpcServerDataChannel()
-      beam_fn_api_pb2.add_BeamFnDataServicer_to_server(
+      beam_fn_api_pb2_grpc.add_BeamFnDataServicer_to_server(
           self.data_plane_handler, self.data_server)
 
       logging.info('starting control server on port %s', self.control_port)

http://git-wip-us.apache.org/repos/asf/beam/blob/86de9de3/sdks/python/apache_beam/runners/portability/universal_local_runner.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/runners/portability/universal_local_runner.py 
b/sdks/python/apache_beam/runners/portability/universal_local_runner.py
index 844b3a8..e3b588c 100644
--- a/sdks/python/apache_beam/runners/portability/universal_local_runner.py
+++ b/sdks/python/apache_beam/runners/portability/universal_local_runner.py
@@ -252,7 +252,7 @@ class BeamJob(threading.Thread):
       self.state = beam_job_api_pb2.JobState.CANCELLED
 
 
-class JobServicer(beam_job_api_pb2.JobServiceServicer):
+class JobServicer(beam_job_api_pb2_grpc.JobServiceServicer):
   """Servicer for the Beam Job API.
 
   Manages one or more pipelines, possibly concurrently.

http://git-wip-us.apache.org/repos/asf/beam/blob/86de9de3/sdks/python/apache_beam/runners/worker/data_plane.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py 
b/sdks/python/apache_beam/runners/worker/data_plane.py
index 737555a..5a511a0 100644
--- a/sdks/python/apache_beam/runners/worker/data_plane.py
+++ b/sdks/python/apache_beam/runners/worker/data_plane.py
@@ -31,6 +31,7 @@ import grpc
 
 from apache_beam.coders import coder_impl
 from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import beam_fn_api_pb2_grpc
 
 # This module is experimental. No backwards-compatibility guarantees.
 
@@ -235,7 +236,7 @@ class GrpcClientDataChannel(_GrpcDataChannel):
 
 
 class GrpcServerDataChannel(
-    beam_fn_api_pb2.BeamFnDataServicer, _GrpcDataChannel):
+    beam_fn_api_pb2_grpc.BeamFnDataServicer, _GrpcDataChannel):
   """A DataChannel wrapping the server side of a BeamFnData connection."""
 
   def Data(self, elements_iterator, context):
@@ -281,7 +282,7 @@ class GrpcClientDataChannelFactory(DataChannelFactory):
           options=[("grpc.max_receive_message_length", -1),
                    ("grpc.max_send_message_length", -1)])
       self._data_channel_cache[url] = GrpcClientDataChannel(
-          beam_fn_api_pb2.BeamFnDataStub(grpc_channel))
+          beam_fn_api_pb2_grpc.BeamFnDataStub(grpc_channel))
     return self._data_channel_cache[url]
 
   def close(self):

http://git-wip-us.apache.org/repos/asf/beam/blob/86de9de3/sdks/python/apache_beam/runners/worker/data_plane_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/data_plane_test.py 
b/sdks/python/apache_beam/runners/worker/data_plane_test.py
index db7ac0b..07ba8fd 100644
--- a/sdks/python/apache_beam/runners/worker/data_plane_test.py
+++ b/sdks/python/apache_beam/runners/worker/data_plane_test.py
@@ -30,6 +30,7 @@ from concurrent import futures
 import grpc
 
 from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import beam_fn_api_pb2_grpc
 from apache_beam.runners.worker import data_plane
 
 
@@ -62,12 +63,12 @@ class DataChannelTest(unittest.TestCase):
     data_channel_service = data_plane.GrpcServerDataChannel()
 
     server = grpc.server(futures.ThreadPoolExecutor(max_workers=2))
-    beam_fn_api_pb2.add_BeamFnDataServicer_to_server(
+    beam_fn_api_pb2_grpc.add_BeamFnDataServicer_to_server(
         data_channel_service, server)
     test_port = server.add_insecure_port('[::]:0')
     server.start()
 
-    data_channel_stub = beam_fn_api_pb2.BeamFnDataStub(
+    data_channel_stub = beam_fn_api_pb2_grpc.BeamFnDataStub(
         grpc.insecure_channel('localhost:%s' % test_port))
     data_channel_client = data_plane.GrpcClientDataChannel(data_channel_stub)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/86de9de3/sdks/python/apache_beam/runners/worker/log_handler_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/log_handler_test.py 
b/sdks/python/apache_beam/runners/worker/log_handler_test.py
index 7edf667..e2cc194 100644
--- a/sdks/python/apache_beam/runners/worker/log_handler_test.py
+++ b/sdks/python/apache_beam/runners/worker/log_handler_test.py
@@ -23,10 +23,11 @@ from concurrent import futures
 import grpc
 
 from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import beam_fn_api_pb2_grpc
 from apache_beam.runners.worker import log_handler
 
 
-class BeamFnLoggingServicer(beam_fn_api_pb2.BeamFnLoggingServicer):
+class BeamFnLoggingServicer(beam_fn_api_pb2_grpc.BeamFnLoggingServicer):
 
   def __init__(self):
     self.log_records_received = []

http://git-wip-us.apache.org/repos/asf/beam/blob/86de9de3/sdks/python/apache_beam/runners/worker/sdk_worker.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py 
b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index 3534e2b..ef33c6f 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -30,6 +30,7 @@ from concurrent import futures
 import grpc
 
 from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import beam_fn_api_pb2_grpc
 from apache_beam.runners.worker import bundle_processor
 from apache_beam.runners.worker import data_plane
 
@@ -44,7 +45,7 @@ class SdkHarness(object):
     self._progress_thread_pool = futures.ThreadPoolExecutor(max_workers=1)
 
   def run(self):
-    contol_stub = beam_fn_api_pb2.BeamFnControlStub(self._control_channel)
+    contol_stub = beam_fn_api_pb2_grpc.BeamFnControlStub(self._control_channel)
     # TODO(robertwb): Wire up to new state api.
     state_stub = None
     self.worker = SdkWorker(state_stub, self._data_channel_factory)

http://git-wip-us.apache.org/repos/asf/beam/blob/86de9de3/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py 
b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
index 7ad57cd..2532341 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
@@ -28,11 +28,12 @@ from concurrent import futures
 import grpc
 
 from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import beam_fn_api_pb2_grpc
 from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.runners.worker import sdk_worker
 
 
-class BeamFnControlServicer(beam_fn_api_pb2.BeamFnControlServicer):
+class BeamFnControlServicer(beam_fn_api_pb2_grpc.BeamFnControlServicer):
 
   def __init__(self, requests, raise_errors=True):
     self.requests = requests
@@ -74,7 +75,8 @@ class SdkWorkerTest(unittest.TestCase):
             process_bundle_descriptor=process_bundle_descriptors))])
 
     server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
-    beam_fn_api_pb2.add_BeamFnControlServicer_to_server(test_controller, 
server)
+    beam_fn_api_pb2_grpc.add_BeamFnControlServicer_to_server(
+        test_controller, server)
     test_port = server.add_insecure_port("[::]:0")
     server.start()
 

http://git-wip-us.apache.org/repos/asf/beam/blob/86de9de3/sdks/python/gen_protos.py
----------------------------------------------------------------------
diff --git a/sdks/python/gen_protos.py b/sdks/python/gen_protos.py
index 59a98ce..d70158b 100644
--- a/sdks/python/gen_protos.py
+++ b/sdks/python/gen_protos.py
@@ -39,7 +39,7 @@ BEAM_PROTO_PATHS = [
 PYTHON_OUTPUT_PATH = os.path.join('apache_beam', 'portability', 'api')
 
 
-def generate_proto_files():
+def generate_proto_files(force=False):
 
   try:
     import grpc_tools
@@ -54,7 +54,7 @@ def generate_proto_files():
   out_dir = os.path.join(py_sdk_root, PYTHON_OUTPUT_PATH)
   out_files = [path for path in glob.glob(os.path.join(out_dir, '*_pb2.py'))]
 
-  if out_files and not proto_files:
+  if out_files and not proto_files and not force:
     # We have out_files but no protos; assume they're up to date.
     # This is actually the common case (e.g. installation from an sdist).
     logging.info('No proto files; using existing generated files.')
@@ -69,7 +69,7 @@ def generate_proto_files():
           'No proto files found in %s.' % proto_dirs)
 
   # Regenerate iff the proto files are newer.
-  elif not out_files or len(out_files) < len(proto_files) or (
+  elif force or not out_files or len(out_files) < len(proto_files) or (
       min(os.path.getmtime(path) for path in out_files)
       <= max(os.path.getmtime(path) for path in proto_files)):
     try:
@@ -92,7 +92,8 @@ def generate_proto_files():
         ['--proto_path=%s' % builtin_protos] +
         ['--proto_path=%s' % d for d in proto_dirs] +
         ['--python_out=%s' % out_dir] +
-        ['--grpc_python_out=%s' % out_dir] +
+        # TODO(robertwb): Remove the prefix once it's the default.
+        ['--grpc_python_out=grpc_2_0:%s' % out_dir] +
         proto_files)
       ret_code = protoc.main(args)
       if ret_code:
@@ -128,4 +129,4 @@ def _install_grpcio_tools_and_generate_proto_files():
 
 
 if __name__ == '__main__':
-  generate_proto_files()
+  generate_proto_files(force=True)

Reply via email to