pabloem commented on a change in pull request #11440: [BEAM-9767] Add a timeout
to the TestStream GRPC and fix the Streaming cache timeout
URL: https://github.com/apache/beam/pull/11440#discussion_r410518017
##########
File path: sdks/python/apache_beam/runners/direct/test_stream_impl.py
##########
@@ -270,9 +273,20 @@ def events_from_rpc(endpoint, output_tags, coder):
event_request = beam_runner_api_pb2.EventsRequest(
output_ids=[str(tag) for tag in output_tags])
- event_stream = stub.Events(event_request)
- for e in event_stream:
- yield _TestStream.test_stream_payload_to_events(e, coder)
+ event_stream = stub.Events(event_request, timeout=30)
+ try:
+ while True:
+ yield _TestStream.test_stream_payload_to_events(
+ next(event_stream), coder)
+ except StopIteration:
+ return
+ except grpc.RpcError as e:
+ if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
Review comment:
Have you tested this? I see that we're raising `RuntimeError` in
streaming_cache.py, but here we're catching `grpc.RpcError` - is that as
intended?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services