This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new fc37a04  Add a timeout to the TestStream GRPC and fix the Streaming 
cache timeout
     new 7acbff4  Merge pull request #11440 from [BEAM-9767] Add a timeout to 
the TestStream GRPC and fix the Streaming cache timeout
fc37a04 is described below

commit fc37a04f471438ae85eae05e9249f4f8ac06c650
Author: Sam Rohde <[email protected]>
AuthorDate: Thu Apr 16 11:59:27 2020 -0700

    Add a timeout to the TestStream GRPC and fix the Streaming cache timeout
    
    Change-Id: I33908eab8313a90829a2115029f87b7f2f454f1b
---
 .../apache_beam/runners/direct/test_stream_impl.py   | 20 +++++++++++++++++---
 .../runners/interactive/caching/streaming_cache.py   |  5 +----
 2 files changed, 18 insertions(+), 7 deletions(-)

diff --git a/sdks/python/apache_beam/runners/direct/test_stream_impl.py 
b/sdks/python/apache_beam/runners/direct/test_stream_impl.py
index 321171e..318b2a3 100644
--- a/sdks/python/apache_beam/runners/direct/test_stream_impl.py
+++ b/sdks/python/apache_beam/runners/direct/test_stream_impl.py
@@ -29,6 +29,7 @@ from __future__ import absolute_import
 from __future__ import print_function
 
 import itertools
+import logging
 
 from apache_beam import ParDo
 from apache_beam import coders
@@ -57,6 +58,8 @@ except ImportError:
       'Exception: grpc was not able to be imported. '
       'Skip importing all grpc related moduels.')
 
+_LOGGER = logging.getLogger(__name__)
+
 
 class _WatermarkController(PTransform):
   """A runner-overridable PTransform Primitive to control the watermark.
@@ -270,9 +273,20 @@ class _TestStream(PTransform):
     event_request = beam_runner_api_pb2.EventsRequest(
         output_ids=[str(tag) for tag in output_tags])
 
-    event_stream = stub.Events(event_request)
-    for e in event_stream:
-      yield _TestStream.test_stream_payload_to_events(e, coder)
+    event_stream = stub.Events(event_request, timeout=30)
+    try:
+      while True:
+        yield _TestStream.test_stream_payload_to_events(
+            next(event_stream), coder)
+    except StopIteration:
+      return
+    except grpc.RpcError as e:
+      if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
+        _LOGGER.warning(
+            'TestStream timed out waiting for new events from service.'
+            ' Stopping pipeline.')
+        return
+      raise e
 
   @staticmethod
   def test_stream_payload_to_events(payload, coder):
diff --git 
a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py 
b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
index ffc1b4a..03ac494 100644
--- a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
+++ b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
@@ -167,14 +167,11 @@ class StreamingCacheSource:
   def _wait_until_file_exists(self, timeout_secs=30):
     """Blocks until the file exists for a maximum of timeout_secs.
     """
-    now_secs = time.time()
-    timeout_timestamp_secs = now_secs + timeout_secs
-
     # Wait for up to `timeout_secs` for the file to be available.
     start = time.time()
     while not os.path.exists(self._path):
       time.sleep(1)
-      if time.time() - start > timeout_timestamp_secs:
+      if time.time() - start > timeout_secs:
         from apache_beam.runners.interactive.pipeline_instrument import 
CacheKey
         pcollection_var = CacheKey.from_str(self._labels[-1]).var
         raise RuntimeError(

Reply via email to