Fix flakiness in sideinputs_test
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ebeb3fd9 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ebeb3fd9 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ebeb3fd9 Branch: refs/heads/tez-runner Commit: ebeb3fd998c1a6a8ce4f9514d54d9e28a6619aa0 Parents: 2df25db Author: Charles Chen <[email protected]> Authored: Mon Nov 13 11:41:47 2017 -0800 Committer: Charles Chen <[email protected]> Committed: Mon Nov 13 11:56:07 2017 -0800 ---------------------------------------------------------------------- .../apache_beam/runners/worker/sideinputs.py | 22 ++++++++++---------- .../runners/worker/sideinputs_test.py | 1 + 2 files changed, 12 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ebeb3fd9/sdks/python/apache_beam/runners/worker/sideinputs.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/worker/sideinputs.py b/sdks/python/apache_beam/runners/worker/sideinputs.py index c91fe95..6c7831d 100644 --- a/sdks/python/apache_beam/runners/worker/sideinputs.py +++ b/sdks/python/apache_beam/runners/worker/sideinputs.py @@ -129,17 +129,17 @@ class PrefetchingSourceSetIterable(object): num_readers_finished = 0 try: while True: - element = self.element_queue.get() - if element is READER_THREAD_IS_DONE_SENTINEL: - num_readers_finished += 1 - if num_readers_finished == self.num_reader_threads: - if self.has_errored: - raise self.reader_exceptions.get() - return - elif self.has_errored: - raise self.reader_exceptions.get() - else: - yield element + try: + element = self.element_queue.get() + if element is READER_THREAD_IS_DONE_SENTINEL: + num_readers_finished += 1 + if num_readers_finished == self.num_reader_threads: + return + else: + yield element + finally: + if self.has_errored: + raise self.reader_exceptions.get() except GeneratorExit: self.has_errored = True raise http://git-wip-us.apache.org/repos/asf/beam/blob/ebeb3fd9/sdks/python/apache_beam/runners/worker/sideinputs_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/worker/sideinputs_test.py b/sdks/python/apache_beam/runners/worker/sideinputs_test.py index 73d34fb..bb688dd 100644 --- a/sdks/python/apache_beam/runners/worker/sideinputs_test.py +++ b/sdks/python/apache_beam/runners/worker/sideinputs_test.py @@ -121,6 +121,7 @@ class PrefetchingSourceIteratorTest(unittest.TestCase): def perpetual_generator(value): while True: yield value + time.sleep(0.1) sources = [ FakeSource(perpetual_generator(1)),
