This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 45ddff8 Return empty iterator on empty cache
new 5d33f9c Merge pull request #11663 from [BEAM-9767]: Fix flaky
streaming wordcount
45ddff8 is described below
commit 45ddff8b7bb63f29d8f923475f53d922b6cc1988
Author: Sam Rohde <[email protected]>
AuthorDate: Thu May 7 14:46:22 2020 -0700
Return empty iterator on empty cache
The BackgroundCachingJob condition checker was failing due to a data race
where the StreamingCache was created and read from before there was any data.
This raised an uncaught StopIteration exception and failed the condition
checker.
Change-Id: Iecf47ec35dbf3c77881f8d59d77a872b41e11d3a
---
.../runners/interactive/caching/streaming_cache.py | 8 +++++++-
.../runners/interactive/caching/streaming_cache_test.py | 11 +++++++++++
.../runners/interactive/interactive_runner_test.py | 2 +-
3 files changed, 19 insertions(+), 2 deletions(-)
diff --git
a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
index 03ac494..b2204cf 100644
--- a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
+++ b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
@@ -295,7 +295,13 @@ class StreamingCache(CacheManager):
reader = StreamingCacheSource(
self._cache_dir, labels, self._is_cache_complete).read(tail=False)
- header = next(reader)
+
+ # Return an empty iterator if there is nothing in the file yet. This can
+ # only happen when tail is False.
+ try:
+ header = next(reader)
+ except StopIteration:
+ return iter([]), -1
return StreamingCache.Reader([header], [reader]).read(), 1
def read_multiple(self, labels):
diff --git
a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache_test.py
b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache_test.py
index c73134e..0d51512 100644
---
a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache_test.py
+++
b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache_test.py
@@ -54,6 +54,17 @@ class StreamingCacheTest(unittest.TestCase):
cache.write([TestStreamFileRecord()], 'my_label')
self.assertTrue(cache.exists('my_label'))
+ def test_empty(self):
+ CACHED_PCOLLECTION_KEY = repr(CacheKey('arbitrary_key', '', '', ''))
+
+ cache = StreamingCache(cache_dir=None)
+ self.assertFalse(cache.exists(CACHED_PCOLLECTION_KEY))
+ cache.write([], CACHED_PCOLLECTION_KEY)
+ reader, _ = cache.read(CACHED_PCOLLECTION_KEY)
+
+ # Assert that an empty reader returns an empty list.
+ self.assertFalse([e for e in reader])
+
def test_single_reader(self):
"""Tests that we expect to see all the correctly emitted
TestStreamPayloads.
"""
diff --git
a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
index 0d92aa5..32961b7 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
@@ -155,7 +155,7 @@ class InteractiveRunnerTest(unittest.TestCase):
@unittest.skipIf(
sys.version_info < (3, 5, 3),
'The tests require at least Python 3.6 to work.')
- @timeout(30)
+ @timeout(60)
def test_streaming_wordcount(self):
class WordExtractingDoFn(beam.DoFn):
def process(self, element):