Repository: beam
Updated Branches:
  refs/heads/master fb41b2950 -> e9d746a34


Add logging and test aborting for timeouts.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/768c854f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/768c854f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/768c854f

Branch: refs/heads/master
Commit: 768c854f3723dd747e905aeb0d35024ae44e2cda
Parents: fb41b29
Author: Robert Bradshaw <rober...@gmail.com>
Authored: Tue Nov 21 12:25:18 2017 -0800
Committer: Robert Bradshaw <rober...@gmail.com>
Committed: Tue Nov 21 12:25:18 2017 -0800

----------------------------------------------------------------------
 .../portability/universal_local_runner.py       |  5 ++-
 .../portability/universal_local_runner_test.py  | 32 ++++++++++++++++++--
 2 files changed, 34 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/768c854f/sdks/python/apache_beam/runners/portability/universal_local_runner.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/runners/portability/universal_local_runner.py 
b/sdks/python/apache_beam/runners/portability/universal_local_runner.py
index 579983c..b951194 100644
--- a/sdks/python/apache_beam/runners/portability/universal_local_runner.py
+++ b/sdks/python/apache_beam/runners/portability/universal_local_runner.py
@@ -178,7 +178,9 @@ class PipelineResult(runner.PipelineResult):
       for message in self._job_service.GetMessageStream(
           beam_job_api_pb2.JobMessagesRequest(job_id=self._job_id)):
         self._messages.append(message)
-    threading.Thread(target=read_messages).start()
+    t = threading.Thread(target=read_messages, name='wait_until_finish_read')
+    t.daemon = True
+    t.start()
 
     for state_response in self._job_service.GetStateStream(
         beam_job_api_pb2.GetJobStateRequest(job_id=self._job_id)):
@@ -244,6 +246,7 @@ class BeamJob(threading.Thread):
         logging.exception("Error running pipeline.")
         traceback.print_exc()
         self.state = beam_job_api_pb2.JobState.FAILED
+        raise
 
   def cancel(self):
     if self.state not in TERMINAL_STATES:

http://git-wip-us.apache.org/repos/asf/beam/blob/768c854f/sdks/python/apache_beam/runners/portability/universal_local_runner_test.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/runners/portability/universal_local_runner_test.py 
b/sdks/python/apache_beam/runners/portability/universal_local_runner_test.py
index e1104dc..1fc244b 100644
--- a/sdks/python/apache_beam/runners/portability/universal_local_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/universal_local_runner_test.py
@@ -16,6 +16,11 @@
 #
 
 import logging
+import platform
+import signal
+import sys
+import threading
+import traceback
 import unittest
 
 import apache_beam as beam
@@ -27,9 +32,31 @@ from apache_beam.testing.util import equal_to
 
 class UniversalLocalRunnerTest(fn_api_runner_test.FnApiRunnerTest):
 
+  TIMEOUT_SECS = 30
+
   _use_grpc = False
   _use_subprocesses = False
 
+  def setUp(self):
+    if platform.system() != 'Windows':
+      def handler(signum, frame):
+        msg = 'Timed out after %s seconds.' % self.TIMEOUT_SECS
+        print '=' * 20, msg, '=' * 20
+        traceback.print_stack(frame)
+        threads_by_id = {th.ident: th for th in threading.enumerate()}
+        for thread_id, stack in sys._current_frames().items():
+          th = threads_by_id.get(thread_id)
+          print
+          print '# Thread:', th or thread_id
+          traceback.print_stack(stack)
+        raise BaseException(msg)
+      signal.signal(signal.SIGALRM, handler)
+      signal.alarm(self.TIMEOUT_SECS)
+
+  def tearDown(self):
+    if platform.system() != 'Windows':
+      signal.alarm(0)
+
   @classmethod
   def get_runner(cls):
     # Don't inherit.
@@ -41,7 +68,8 @@ class 
UniversalLocalRunnerTest(fn_api_runner_test.FnApiRunnerTest):
 
   @classmethod
   def tearDownClass(cls):
-    cls._runner.cleanup()
+    if hasattr(cls, '_runner'):
+      cls._runner.cleanup()
 
   def create_pipeline(self):
     return beam.Pipeline(self.get_runner())
@@ -56,7 +84,7 @@ class 
UniversalLocalRunnerTest(fn_api_runner_test.FnApiRunnerTest):
   def test_errors(self):
     # TODO: figure out a way for runner to parse and raise the
     # underlying exception.
-    with self.assertRaises(BaseException):
+    with self.assertRaises(Exception):
       with self.create_pipeline() as p:
         def raise_error(x):
           raise RuntimeError('x')

Reply via email to