Repository: beam Updated Branches: refs/heads/master e066a9d6d -> d81ed2172
Avoid flakiness in data channel for empty streams. As empty stream is used as end-of-stream marker, don't ever send it as the data itself. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4ebebfdb Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4ebebfdb Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4ebebfdb Branch: refs/heads/master Commit: 4ebebfdb34de3e209c033de15e32cf67ab346d44 Parents: e066a9d Author: Robert Bradshaw <[email protected]> Authored: Wed Jun 7 23:00:43 2017 -0700 Committer: Luke Cwik <[email protected]> Committed: Thu Jun 8 10:06:17 2017 -0700 ---------------------------------------------------------------------- .../python/apache_beam/runners/worker/data_plane.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/4ebebfdb/sdks/python/apache_beam/runners/worker/data_plane.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py b/sdks/python/apache_beam/runners/worker/data_plane.py index 5edd0b4..7365db6 100644 --- a/sdks/python/apache_beam/runners/worker/data_plane.py +++ b/sdks/python/apache_beam/runners/worker/data_plane.py @@ -167,12 +167,18 @@ class _GrpcDataChannel(DataChannel): yield data def output_stream(self, instruction_id, target): + # TODO: Return an output stream that sends data + # to the Runner once a fixed size buffer is full. + # Currently we buffer all the data before sending + # any messages. def add_to_send_queue(data): - self._to_send.put( - beam_fn_api_pb2.Elements.Data( - instruction_reference=instruction_id, - target=target, - data=data)) + if data: + self._to_send.put( + beam_fn_api_pb2.Elements.Data( + instruction_reference=instruction_id, + target=target, + data=data)) + # End of stream marker. self._to_send.put( beam_fn_api_pb2.Elements.Data( instruction_reference=instruction_id,
