This is an automated email from the ASF dual-hosted git repository. robertwb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 679a37f Changed BeamPython FnAPI state channel to use secure connection when credentials provided new 73674b8 Merge pull request #7549 Secure state channel 679a37f is described below commit 679a37f088313f5eada9d8e2a95c8db9f894d0c6 Author: Craig Chambers <chamb...@google.com> AuthorDate: Thu Jan 17 09:20:45 2019 -0800 Changed BeamPython FnAPI state channel to use secure connection when credentials provided --- .../apache_beam/runners/worker/sdk_worker.py | 26 +++++++++++++--------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index b23cf68..4eb22a8 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -71,7 +71,7 @@ class SdkHarness(object): self._control_channel, WorkerIdInterceptor(self._worker_id)) self._data_channel_factory = data_plane.GrpcClientDataChannelFactory( credentials) - self._state_handler_factory = GrpcStateHandlerFactory() + self._state_handler_factory = GrpcStateHandlerFactory(credentials) self._profiler_factory = profiler_factory self.workers = queue.Queue() # one thread is enough for getting the progress report. @@ -345,10 +345,11 @@ class GrpcStateHandlerFactory(StateHandlerFactory): Caches the created channels by ``state descriptor url``. """ - def __init__(self): + def __init__(self, credentials=None): self._state_handler_cache = {} self._lock = threading.Lock() self._throwing_state_handler = ThrowingStateHandler() + self._credentials = credentials def create_state_handler(self, api_service_descriptor): if not api_service_descriptor: @@ -357,14 +358,19 @@ class GrpcStateHandlerFactory(StateHandlerFactory): if url not in self._state_handler_cache: with self._lock: if url not in self._state_handler_cache: - logging.info('Creating insecure state channel for %s', url) - grpc_channel = GRPCChannelFactory.insecure_channel( - url, - # Options to have no limits (-1) on the size of the messages - # received or sent over the data plane. The actual buffer size is - # controlled in a layer above. - options=[("grpc.max_receive_message_length", -1), - ("grpc.max_send_message_length", -1)]) + # Options to have no limits (-1) on the size of the messages + # received or sent over the data plane. The actual buffer size is + # controlled in a layer above. + options = [('grpc.max_receive_message_length', -1), + ('grpc.max_send_message_length', -1)] + if self._credentials is None: + logging.info('Creating insecure state channel for %s.', url) + grpc_channel = GRPCChannelFactory.insecure_channel( + url, options=options) + else: + logging.info('Creating secure state channel for %s.', url) + grpc_channel = GRPCChannelFactory.secure_channel( + url, self._credentials, options=options) logging.info('State channel established.') # Add workerId to the grpc channel grpc_channel = grpc.intercept_channel(grpc_channel,