Repository: beam Updated Branches: refs/heads/master fb41b2950 -> e9d746a34
Add logging and test aborting for timeouts. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/768c854f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/768c854f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/768c854f Branch: refs/heads/master Commit: 768c854f3723dd747e905aeb0d35024ae44e2cda Parents: fb41b29 Author: Robert Bradshaw <rober...@gmail.com> Authored: Tue Nov 21 12:25:18 2017 -0800 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Tue Nov 21 12:25:18 2017 -0800 ---------------------------------------------------------------------- .../portability/universal_local_runner.py | 5 ++- .../portability/universal_local_runner_test.py | 32 ++++++++++++++++++-- 2 files changed, 34 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/768c854f/sdks/python/apache_beam/runners/portability/universal_local_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/portability/universal_local_runner.py b/sdks/python/apache_beam/runners/portability/universal_local_runner.py index 579983c..b951194 100644 --- a/sdks/python/apache_beam/runners/portability/universal_local_runner.py +++ b/sdks/python/apache_beam/runners/portability/universal_local_runner.py @@ -178,7 +178,9 @@ class PipelineResult(runner.PipelineResult): for message in self._job_service.GetMessageStream( beam_job_api_pb2.JobMessagesRequest(job_id=self._job_id)): self._messages.append(message) - threading.Thread(target=read_messages).start() + t = threading.Thread(target=read_messages, name='wait_until_finish_read') + t.daemon = True + t.start() for state_response in self._job_service.GetStateStream( beam_job_api_pb2.GetJobStateRequest(job_id=self._job_id)): @@ -244,6 +246,7 @@ class BeamJob(threading.Thread): logging.exception("Error running pipeline.") traceback.print_exc() self.state = beam_job_api_pb2.JobState.FAILED + raise def cancel(self): if self.state not in TERMINAL_STATES: http://git-wip-us.apache.org/repos/asf/beam/blob/768c854f/sdks/python/apache_beam/runners/portability/universal_local_runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/portability/universal_local_runner_test.py b/sdks/python/apache_beam/runners/portability/universal_local_runner_test.py index e1104dc..1fc244b 100644 --- a/sdks/python/apache_beam/runners/portability/universal_local_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/universal_local_runner_test.py @@ -16,6 +16,11 @@ # import logging +import platform +import signal +import sys +import threading +import traceback import unittest import apache_beam as beam @@ -27,9 +32,31 @@ from apache_beam.testing.util import equal_to class UniversalLocalRunnerTest(fn_api_runner_test.FnApiRunnerTest): + TIMEOUT_SECS = 30 + _use_grpc = False _use_subprocesses = False + def setUp(self): + if platform.system() != 'Windows': + def handler(signum, frame): + msg = 'Timed out after %s seconds.' % self.TIMEOUT_SECS + print '=' * 20, msg, '=' * 20 + traceback.print_stack(frame) + threads_by_id = {th.ident: th for th in threading.enumerate()} + for thread_id, stack in sys._current_frames().items(): + th = threads_by_id.get(thread_id) + print + print '# Thread:', th or thread_id + traceback.print_stack(stack) + raise BaseException(msg) + signal.signal(signal.SIGALRM, handler) + signal.alarm(self.TIMEOUT_SECS) + + def tearDown(self): + if platform.system() != 'Windows': + signal.alarm(0) + @classmethod def get_runner(cls): # Don't inherit. @@ -41,7 +68,8 @@ class UniversalLocalRunnerTest(fn_api_runner_test.FnApiRunnerTest): @classmethod def tearDownClass(cls): - cls._runner.cleanup() + if hasattr(cls, '_runner'): + cls._runner.cleanup() def create_pipeline(self): return beam.Pipeline(self.get_runner()) @@ -56,7 +84,7 @@ class UniversalLocalRunnerTest(fn_api_runner_test.FnApiRunnerTest): def test_errors(self): # TODO: figure out a way for runner to parse and raise the # underlying exception. - with self.assertRaises(BaseException): + with self.assertRaises(Exception): with self.create_pipeline() as p: def raise_error(x): raise RuntimeError('x')