This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 45ddff8  Return empty iterator on empty cache
     new 5d33f9c  Merge pull request #11663 from [BEAM-9767]: Fix flaky 
streaming wordcount
45ddff8 is described below

commit 45ddff8b7bb63f29d8f923475f53d922b6cc1988
Author: Sam Rohde <[email protected]>
AuthorDate: Thu May 7 14:46:22 2020 -0700

    Return empty iterator on empty cache
    
    The BackgroundCachingJob condition checker was failing due to a data race 
where the StreamingCache was created and read from before there was any data. 
This raised an uncaught StopIteration exception and failed the condition 
checker.
    
    Change-Id: Iecf47ec35dbf3c77881f8d59d77a872b41e11d3a
---
 .../runners/interactive/caching/streaming_cache.py            |  8 +++++++-
 .../runners/interactive/caching/streaming_cache_test.py       | 11 +++++++++++
 .../runners/interactive/interactive_runner_test.py            |  2 +-
 3 files changed, 19 insertions(+), 2 deletions(-)

diff --git 
a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py 
b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
index 03ac494..b2204cf 100644
--- a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
+++ b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
@@ -295,7 +295,13 @@ class StreamingCache(CacheManager):
 
     reader = StreamingCacheSource(
         self._cache_dir, labels, self._is_cache_complete).read(tail=False)
-    header = next(reader)
+
+    # Return an empty iterator if there is nothing in the file yet. This can
+    # only happen when tail is False.
+    try:
+      header = next(reader)
+    except StopIteration:
+      return iter([]), -1
     return StreamingCache.Reader([header], [reader]).read(), 1
 
   def read_multiple(self, labels):
diff --git 
a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache_test.py 
b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache_test.py
index c73134e..0d51512 100644
--- 
a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache_test.py
+++ 
b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache_test.py
@@ -54,6 +54,17 @@ class StreamingCacheTest(unittest.TestCase):
     cache.write([TestStreamFileRecord()], 'my_label')
     self.assertTrue(cache.exists('my_label'))
 
+  def test_empty(self):
+    CACHED_PCOLLECTION_KEY = repr(CacheKey('arbitrary_key', '', '', ''))
+
+    cache = StreamingCache(cache_dir=None)
+    self.assertFalse(cache.exists(CACHED_PCOLLECTION_KEY))
+    cache.write([], CACHED_PCOLLECTION_KEY)
+    reader, _ = cache.read(CACHED_PCOLLECTION_KEY)
+
+    # Assert that an empty reader returns an empty list.
+    self.assertFalse([e for e in reader])
+
   def test_single_reader(self):
     """Tests that we expect to see all the correctly emitted 
TestStreamPayloads.
     """
diff --git 
a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py 
b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
index 0d92aa5..32961b7 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
@@ -155,7 +155,7 @@ class InteractiveRunnerTest(unittest.TestCase):
   @unittest.skipIf(
       sys.version_info < (3, 5, 3),
       'The tests require at least Python 3.6 to work.')
-  @timeout(30)
+  @timeout(60)
   def test_streaming_wordcount(self):
     class WordExtractingDoFn(beam.DoFn):
       def process(self, element):

Reply via email to