This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a4fb844 Secure GRPC channel for SDK worker (#4984)
a4fb844 is described below
commit a4fb844df82051ef93bb7e2d47967e143eabcc5c
Author: ananvay <[email protected]>
AuthorDate: Wed Apr 4 17:50:04 2018 -0700
Secure GRPC channel for SDK worker (#4984)
---
.../apache_beam/runners/worker/data_plane.py | 25 +++++++++++++++-------
.../apache_beam/runners/worker/sdk_worker.py | 15 ++++++++++---
2 files changed, 29 insertions(+), 11 deletions(-)
diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py
b/sdks/python/apache_beam/runners/worker/data_plane.py
index 7c79c4c..1ff60aa 100644
--- a/sdks/python/apache_beam/runners/worker/data_plane.py
+++ b/sdks/python/apache_beam/runners/worker/data_plane.py
@@ -295,9 +295,13 @@ class GrpcClientDataChannelFactory(DataChannelFactory):
Caches the created channels by ``data descriptor url``.
"""
- def __init__(self):
+ def __init__(self, credentials=None):
self._data_channel_cache = {}
self._lock = threading.Lock()
+ self._credentials = None
+ if credentials is not None:
+ logging.info('Using secure channel creds.')
+ self._credentials = credentials
def create_data_channel(self, remote_grpc_port):
url = remote_grpc_port.api_service_descriptor.url
@@ -305,18 +309,23 @@ class GrpcClientDataChannelFactory(DataChannelFactory):
with self._lock:
if url not in self._data_channel_cache:
logging.info('Creating channel for %s', url)
- grpc_channel = grpc.insecure_channel(
- url,
- # Options to have no limits (-1) on the size of the messages
- # received or sent over the data plane. The actual buffer size is
- # controlled in a layer above.
- options=[("grpc.max_receive_message_length", -1),
- ("grpc.max_send_message_length", -1)])
+ # Options to have no limits (-1) on the size of the messages
+ # received or sent over the data plane. The actual buffer size
+ # is controlled in a layer above.
+ channel_options = [("grpc.max_receive_message_length", -1),
+ ("grpc.max_send_message_length", -1)]
+ grpc_channel = None
+ if self._credentials is None:
+ grpc_channel = grpc.insecure_channel(url, options=channel_options)
+ else:
+ grpc_channel = grpc.secure_channel(
+ url, self._credentials, options=channel_options)
# Add workerId to the grpc channel
grpc_channel = grpc.intercept_channel(grpc_channel,
WorkerIdInterceptor())
self._data_channel_cache[url] = GrpcClientDataChannel(
beam_fn_api_pb2_grpc.BeamFnDataStub(grpc_channel))
+
return self._data_channel_cache[url]
def close(self):
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py
b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index c77659b..3b6ed65 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -40,12 +40,21 @@ from apache_beam.runners.worker.worker_id_interceptor
import WorkerIdInterceptor
class SdkHarness(object):
REQUEST_METHOD_PREFIX = '_request_'
- def __init__(self, control_address, worker_count):
+ def __init__(self, control_address, worker_count, credentials=None):
self._worker_count = worker_count
self._worker_index = 0
+ if credentials is None:
+ logging.info('Creating insecure channel.')
+ self._control_channel = grpc.insecure_channel(control_address)
+ else:
+ logging.info('Creating secure channel.')
+ self._control_channel = grpc.secure_channel(control_address, credentials)
+ grpc.channel_ready_future(self._control_channel).result()
+ logging.info('Secure channel established.')
self._control_channel = grpc.intercept_channel(
- grpc.insecure_channel(control_address), WorkerIdInterceptor())
- self._data_channel_factory = data_plane.GrpcClientDataChannelFactory()
+ self._control_channel, WorkerIdInterceptor())
+ self._data_channel_factory = data_plane.GrpcClientDataChannelFactory(
+ credentials)
self.workers = queue.Queue()
# one thread is enough for getting the progress report.
# Assumption:
--
To stop receiving notification emails like this one, please contact
[email protected].