This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 4a69c9a Change TestStreamImpl to a producer/consumer pattern new 578694b Merge pull request #11634 from Change TestStreamImpl to a producer/consumer pattern 4a69c9a is described below commit 4a69c9a5cdc06b61583c7a2d09eb773fb2f8c240 Author: Sam Rohde <rohde.sam...@gmail.com> AuthorDate: Thu May 7 14:46:22 2020 -0700 Change TestStreamImpl to a producer/consumer pattern GRPC streaming RPCs are blocking with no non-blocking API. This changes the TestStreamImpl from RPC to a producer/consumer design with a timeout on reading from the producer queue. Change-Id: Ib6b8dad0a22db7cb4c3971e550cef003ef035562 --- .../apache_beam/runners/direct/test_stream_impl.py | 70 ++++++++++++++++++---- 1 file changed, 58 insertions(+), 12 deletions(-) diff --git a/sdks/python/apache_beam/runners/direct/test_stream_impl.py b/sdks/python/apache_beam/runners/direct/test_stream_impl.py index 318b2a3..6f1516b 100644 --- a/sdks/python/apache_beam/runners/direct/test_stream_impl.py +++ b/sdks/python/apache_beam/runners/direct/test_stream_impl.py @@ -30,6 +30,9 @@ from __future__ import print_function import itertools import logging +from queue import Empty as EmptyException +from queue import Queue +from threading import Thread from apache_beam import ParDo from apache_beam import coders @@ -61,6 +64,10 @@ except ImportError: _LOGGER = logging.getLogger(__name__) +class _EndOfStream: + pass + + class _WatermarkController(PTransform): """A runner-overridable PTransform Primitive to control the watermark. @@ -263,8 +270,12 @@ class _TestStream(PTransform): return itertools.chain(events) @staticmethod - def events_from_rpc(endpoint, output_tags, coder): + def _stream_events_from_rpc(endpoint, output_tags, coder, channel, is_alive): """Yields the events received from the given endpoint. + + This is the producer thread that reads events from the TestStreamService and + puts them onto the shared queue. At the end of the stream, an _EndOfStream + is placed on the channel to signify a successful end. """ stub_channel = grpc.insecure_channel(endpoint) stub = beam_runner_api_pb2_grpc.TestStreamServiceStub(stub_channel) @@ -273,20 +284,55 @@ class _TestStream(PTransform): event_request = beam_runner_api_pb2.EventsRequest( output_ids=[str(tag) for tag in output_tags]) - event_stream = stub.Events(event_request, timeout=30) - try: - while True: - yield _TestStream.test_stream_payload_to_events( - next(event_stream), coder) - except StopIteration: - return - except grpc.RpcError as e: - if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED: + event_stream = stub.Events(event_request) + for e in event_stream: + channel.put(_TestStream.test_stream_payload_to_events(e, coder)) + if not is_alive(): + return + channel.put(_EndOfStream()) + + @staticmethod + def events_from_rpc(endpoint, output_tags, coder): + """Yields the events received from the given endpoint. + + This method starts a new thread that reads from the TestStreamService and + puts the events onto a shared queue. This method then yields all elements + from the queue. Unfortunately, this is necessary because the GRPC API does + not allow for non-blocking calls when utilizing a streaming RPC. It is + officially suggested from the docs to use a producer/consumer pattern to + handle streaming RPCs. By doing so, this gives this method control over when + to cancel reading from the RPC if the server takes too long to respond. + """ + # Shared variable with the producer queue. This shuts down the producer if + # the consumer exits early. + is_alive = True + + # The shared queue that allows the producer and consumer to communicate. + channel = Queue() # type: Queue[Union[test_stream.Event, _EndOfStream]] + event_stream = Thread( + target=_TestStream._stream_events_from_rpc, + args=(endpoint, output_tags, coder, channel, lambda: is_alive)) + event_stream.setDaemon(True) + event_stream.start() + + # This pumps the shared queue for events until the _EndOfStream sentinel is + # reached. If the TestStreamService takes longer than expected, the queue + # will timeout and an EmptyException will be raised. This also sets the + # shared is_alive sentinel to shut down the producer. + while True: + try: + # Raise an EmptyException if there are no events during the last timeout + # period. + event = channel.get(timeout=30) + if isinstance(event, _EndOfStream): + break + yield event + except EmptyException as e: _LOGGER.warning( 'TestStream timed out waiting for new events from service.' ' Stopping pipeline.') - return - raise e + is_alive = False + raise e @staticmethod def test_stream_payload_to_events(payload, coder):