This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new fc37a04 Add a timeout to the TestStream GRPC and fix the Streaming
cache timeout
new 7acbff4 Merge pull request #11440 from [BEAM-9767] Add a timeout to
the TestStream GRPC and fix the Streaming cache timeout
fc37a04 is described below
commit fc37a04f471438ae85eae05e9249f4f8ac06c650
Author: Sam Rohde <[email protected]>
AuthorDate: Thu Apr 16 11:59:27 2020 -0700
Add a timeout to the TestStream GRPC and fix the Streaming cache timeout
Change-Id: I33908eab8313a90829a2115029f87b7f2f454f1b
---
.../apache_beam/runners/direct/test_stream_impl.py | 20 +++++++++++++++++---
.../runners/interactive/caching/streaming_cache.py | 5 +----
2 files changed, 18 insertions(+), 7 deletions(-)
diff --git a/sdks/python/apache_beam/runners/direct/test_stream_impl.py
b/sdks/python/apache_beam/runners/direct/test_stream_impl.py
index 321171e..318b2a3 100644
--- a/sdks/python/apache_beam/runners/direct/test_stream_impl.py
+++ b/sdks/python/apache_beam/runners/direct/test_stream_impl.py
@@ -29,6 +29,7 @@ from __future__ import absolute_import
from __future__ import print_function
import itertools
+import logging
from apache_beam import ParDo
from apache_beam import coders
@@ -57,6 +58,8 @@ except ImportError:
'Exception: grpc was not able to be imported. '
'Skip importing all grpc related moduels.')
+_LOGGER = logging.getLogger(__name__)
+
class _WatermarkController(PTransform):
"""A runner-overridable PTransform Primitive to control the watermark.
@@ -270,9 +273,20 @@ class _TestStream(PTransform):
event_request = beam_runner_api_pb2.EventsRequest(
output_ids=[str(tag) for tag in output_tags])
- event_stream = stub.Events(event_request)
- for e in event_stream:
- yield _TestStream.test_stream_payload_to_events(e, coder)
+ event_stream = stub.Events(event_request, timeout=30)
+ try:
+ while True:
+ yield _TestStream.test_stream_payload_to_events(
+ next(event_stream), coder)
+ except StopIteration:
+ return
+ except grpc.RpcError as e:
+ if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
+ _LOGGER.warning(
+ 'TestStream timed out waiting for new events from service.'
+ ' Stopping pipeline.')
+ return
+ raise e
@staticmethod
def test_stream_payload_to_events(payload, coder):
diff --git
a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
index ffc1b4a..03ac494 100644
--- a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
+++ b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
@@ -167,14 +167,11 @@ class StreamingCacheSource:
def _wait_until_file_exists(self, timeout_secs=30):
"""Blocks until the file exists for a maximum of timeout_secs.
"""
- now_secs = time.time()
- timeout_timestamp_secs = now_secs + timeout_secs
-
# Wait for up to `timeout_secs` for the file to be available.
start = time.time()
while not os.path.exists(self._path):
time.sleep(1)
- if time.time() - start > timeout_timestamp_secs:
+ if time.time() - start > timeout_secs:
from apache_beam.runners.interactive.pipeline_instrument import
CacheKey
pcollection_var = CacheKey.from_str(self._labels[-1]).var
raise RuntimeError(