This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 07bb725bbf4817a4982f3eaeb5a4268bade3e9c7
Author: Tom Pollard <[email protected]>
AuthorDate: Mon Jul 15 11:51:18 2019 +0100

    Support exception handling from the subprocess
---
 src/buildstream/_exceptions.py             |  3 +-
 src/buildstream/_scheduler/jobs/job.py     |  2 +
 src/buildstream/_scheduler/queues/queue.py |  1 +
 src/buildstream/_scheduler/scheduler.py    | 27 +++++++++++++-
 src/buildstream/_stream.py                 | 59 ++++++++++++++++++++++--------
 src/buildstream/testing/runcli.py          |  2 +-
 src/buildstream/utils.py                   |  4 ++
 7 files changed, 80 insertions(+), 18 deletions(-)

diff --git a/src/buildstream/_exceptions.py b/src/buildstream/_exceptions.py
index d5b87a8..e977660 100644
--- a/src/buildstream/_exceptions.py
+++ b/src/buildstream/_exceptions.py
@@ -238,7 +238,8 @@ class LoadErrorReason(Enum):
 # interpreting project YAML
 #
 class LoadError(BstError):
-    def __init__(self, message, reason, *, detail=None):
+    def __init__(self, message, reason=None, *, detail=None):
+        # Second parameter needs to be a default arg due to unpickling issue, 
unpleasant.
         super().__init__(message, detail=detail, domain=ErrorDomain.LOAD, 
reason=reason)
 
 
diff --git a/src/buildstream/_scheduler/jobs/job.py 
b/src/buildstream/_scheduler/jobs/job.py
index d80d1a9..9e3ca13 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -488,6 +488,8 @@ class Job():
             # is currently managed in _exceptions.py
             set_last_task_error(envelope.message['domain'],
                                 envelope.message['reason'])
+            self._scheduler.set_last_task_error(envelope.message['domain'],
+                                                envelope.message['reason'])
         elif envelope.message_type is _MessageType.RESULT:
             assert self._result is None
             self._result = envelope.message
diff --git a/src/buildstream/_scheduler/queues/queue.py 
b/src/buildstream/_scheduler/queues/queue.py
index 8c81ce2..3435e3f 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -316,6 +316,7 @@ class Queue():
             # This just allows us stronger testing capability
             #
             set_last_task_error(e.domain, e.reason)
+            self._scheduler.set_last_task_error(e.domain, e.reason)
 
         except Exception:   # pylint: disable=broad-except
 
diff --git a/src/buildstream/_scheduler/scheduler.py 
b/src/buildstream/_scheduler/scheduler.py
index 2e9c740..6649865 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -53,6 +53,8 @@ class NotificationType(enum.Enum):
     JOB_COMPLETE = "job_complete"
     TICK = "tick"
     EXCEPTION = "exception"
+    TASK_ERROR = "task_error"
+    SCHED_TERMINATE = "sched_terminate"
 
 
 class Notification:
@@ -66,7 +68,9 @@ class Notification:
                  failed_element=False,
                  elapsed_time=None,
                  element=None,
-                 exception=None):
+                 exception=None,
+                 domain=None,
+                 reason=None):
 
         self.notification_type = notification_type
         self.full_name = full_name
@@ -76,6 +80,8 @@ class Notification:
         self.elapsed_time = elapsed_time
         self.element = element
         self.exception = exception
+        self.domain = domain
+        self.reason = reason
 
 
 # Scheduler()
@@ -331,6 +337,12 @@ class Scheduler():
         #
         self._cache_size_scheduled = True
 
+    def set_last_task_error(self, domain, reason):
+        notification = Notification(NotificationType.TASK_ERROR,
+                                    domain=domain,
+                                    reason=reason)
+        self._notify(notification)
+
     #######################################################
     #                  Local Private Methods              #
     #######################################################
@@ -673,3 +685,16 @@ class Scheduler():
         # a new use-case arises.
         #
         raise TypeError("Scheduler objects should not be pickled.")
+
+    def _loop(self):
+        assert self._notification_queue
+        # Check for and process new messages
+        while True:
+            try:
+                notification = self._notification_queue.get_nowait()
+                if notification.notification_type == 
NotificationType.SCHED_TERMINATE:
+                    print("handling notifications")
+                    self.terminate_jobs()
+            except queue.Empty:
+                notification = None
+                break
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index afbeb20..4de975e 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -35,7 +35,7 @@ import queue
 
 from ._artifact import Artifact
 from ._artifactelement import verify_artifact_ref, ArtifactElement
-from ._exceptions import StreamError, ImplError, BstError, 
ArtifactElementError, ArtifactError
+from ._exceptions import StreamError, ImplError, BstError, 
ArtifactElementError, ArtifactError, set_last_task_error
 from ._message import Message, MessageType
 from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \
     SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, 
NotificationType, JobStatus
@@ -107,6 +107,16 @@ class Stream():
         self._artifacts = self._context.artifactcache
         self._sourcecache = self._context.sourcecache
 
+    @staticmethod
+    def _subprocess_main(func, queue, *args, **kwargs):
+        # Set main process
+        utils._reset_main_pid()
+        try:
+            func(*args, **kwargs)
+        except Exception as e:
+            from ._scheduler.scheduler import Notification, NotificationType
+            queue.put(Notification(NotificationType.EXCEPTION, exception=e))
+
     def run_in_subprocess(self, func, *args, **kwargs):
         print("Args: {}".format([*args]))
         print("Kwargs: {}".format(list(kwargs.items())))
@@ -114,11 +124,16 @@ class Stream():
 
         mp_context = mp.get_context(method='fork')
         process_name = "stream-{}".format(func.__name__)
-        print("launchinglaunching subprocess:", process_name)
-        self._subprocess = mp_context.Process(target=func, args=args, 
kwargs=kwargs, name=process_name)
+        args = list(args)
+        args.insert(0, self._notification_queue)
+        args.insert(0, func)
+        print("launching subprocess:", process_name)
+        self._subprocess = mp_context.Process(target=Stream._subprocess_main, 
args=args,
+                                              kwargs=kwargs, name=process_name)
+
         self._subprocess.start()
 
-        # TODO connect signal handlers
+        # TODO connect signal handlers with asyncio
         while self._subprocess.exitcode is None:
             # check every given time interval on subprocess state
             self._subprocess.join(0.1)
@@ -126,13 +141,17 @@ class Stream():
             self._loop()
         print("Stopping loop...")
 
-        # try:
-        #     while True:
-        #         notification = self._notification_queue.get_nowait()
-        #         self._scheduler_notification_handler(notification)
-        # except queue.Empty:
-        #     print("Finished processing notifications")
-        #     pass
+        # Set main process back
+        utils._reset_main_pid()
+
+        # Ensure no more notifcations to process
+        try:
+            while True:
+                notification = self._notification_queue.get_nowait()
+                self._scheduler_notification_handler(notification)
+        except queue.Empty:
+            print("Finished processing notifications")
+            pass
 
     # cleanup()
     #
@@ -1086,7 +1105,15 @@ class Stream():
     # Terminate jobs
     #
     def terminate(self):
+        #if self._scheduler.loop:
+            # Scheduler not in subprocess
         self._scheduler.terminate_jobs()
+        #else:
+        #    # Handle calling subprocessed scheduler outside of main process
+        #    assert self._notification_queue
+        #    from ._scheduler.scheduler import Notification, NotificationType
+        #    
self._notifiaction_queue.put(Notification(NotificationType.SCHED_TERMINATE))
+        #    self._scheduler.terminate_jobs()
 
     # quit()
     #
@@ -1625,8 +1652,9 @@ class Stream():
                     unique_id = None
                 self._state.fail_task(notification.job_action, 
notification.full_name, unique_id)
         elif notification.notification_type == NotificationType.EXCEPTION:
-            # TODO
-            pass
+            raise notification.exception
+        elif notification.notification_type == NotificationType.TASK_ERROR:
+            set_last_task_error(notification.domain, notification.reason)
         else:
             raise StreamError("Unreccognised notification type recieved")
 
@@ -1653,8 +1681,9 @@ class Stream():
         while True:
             try:
                 notification = self._notification_queue.get_nowait()
-                print("handling notifications")
-                self._scheduler_notification_handler(notification)
+                if notification.notification_type != 
NotificationType.SCHED_TERMINATE:
+                    print("handling notifications")
+                    self._scheduler_notification_handler(notification)
             except queue.Empty:
                 notification = None
                 break
diff --git a/src/buildstream/testing/runcli.py 
b/src/buildstream/testing/runcli.py
index 95bf83e..ad71705 100644
--- a/src/buildstream/testing/runcli.py
+++ b/src/buildstream/testing/runcli.py
@@ -85,7 +85,6 @@ class Result():
         # in the case that the exit code reported is 0 (success).
         #
         if self.exit_code != 0:
-
             # Check if buildstream failed to handle an
             # exception, topevel CLI exit should always
             # be a SystemExit exception.
@@ -149,6 +148,7 @@ class Result():
                     self.exception.domain,
                     self.exception.reason
                 ))
+
         assert self.exit_code == -1, fail_message
         assert self.exc is not None, fail_message
         assert self.exception is not None, fail_message
diff --git a/src/buildstream/utils.py b/src/buildstream/utils.py
index 2c57925..ed1e123 100644
--- a/src/buildstream/utils.py
+++ b/src/buildstream/utils.py
@@ -711,6 +711,10 @@ def _is_main_process():
     assert _main_pid is not None
     return os.getpid() == _main_pid
 
+def _reset_main_pid():
+    global _main_pid
+    _main_pid = os.getpid()
+
 
 # Recursively remove directories, ignoring file permissions as much as
 # possible.

Reply via email to