This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch tpollard/streamasync
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit f307ac2616802d60dd4e95319f98d50739a549b7
Author: Tom Pollard <[email protected]>
AuthorDate: Thu Sep 26 11:50:15 2019 +0100

    Add notifications for session_start & task_groups
---
 src/buildstream/_scheduler/scheduler.py | 15 +++++++++++++--
 src/buildstream/_stream.py              | 11 +++++++++--
 2 files changed, 22 insertions(+), 4 deletions(-)

diff --git a/src/buildstream/_scheduler/scheduler.py 
b/src/buildstream/_scheduler/scheduler.py
index 1734782..62c2754 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -65,6 +65,8 @@ class NotificationType(FastEnum):
     MESSAGE = "message"
     TASK_ERROR = "task_error"
     EXCEPTION = "exception"
+    START = "start"
+    TASK_GROUPS = "task_groups"
 
 
 # Notification()
@@ -87,8 +89,8 @@ class Notification():
                  element=None,
                  message=None,
                  task_error=None,
-                 for_scheduler=False,
-                 exception=None):
+                 exception=None,
+                 task_groups=None):
         self.notification_type = notification_type
         self.full_name = full_name
         self.job_action = job_action
@@ -98,6 +100,7 @@ class Notification():
         self.message = message
         self.task_error = task_error  # Tuple of domain & reason
         self.exception = exception
+        self.task_groups = task_groups
 
 
 # Scheduler()
@@ -217,6 +220,14 @@ class Scheduler():
         else:
             status = SchedStatus.SUCCESS
 
+        # Send the state taskgroups if we're running under the subprocess
+        if self._notify_front:
+            # Don't pickle state
+            for group in self._state.task_groups.values():
+                group._state = None
+            notification = Notification(NotificationType.TASK_GROUPS, 
task_groups=self._state.task_groups)
+            self._notify_front.put(notification)
+
         return status
 
     # clear_queues()
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 9b2fd5c..c0bd110 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1439,7 +1439,10 @@ class Stream():
         self.total_elements = list(self._pipeline.dependencies(self.targets, 
Scope.ALL))
 
         if self._session_start_callback is not None:
-            self._session_start_callback()
+            if self._notify_front:
+                self._notify_front.put(Notification(NotificationType.START))
+            else:
+                self._session_start_callback()
 
         status = self._scheduler.run(self.queues)
 
@@ -1734,7 +1737,9 @@ class Stream():
         return element_targets, artifact_refs
 
     def _scheduler_notification_handler(self, notification):
-        if notification.notification_type == NotificationType.MESSAGE:
+        if notification.notification_type == NotificationType.TASK_GROUPS:
+            self._state.task_groups = notification.task_groups
+        elif notification.notification_type == NotificationType.MESSAGE:
             self._context.messenger.message(notification.message)
         elif notification.notification_type == NotificationType.INTERRUPT:
             self._interrupt_callback()
@@ -1761,6 +1766,8 @@ class Stream():
         elif notification.notification_type == NotificationType.EXCEPTION:
             # Regenerate the exception here, so we don't have to pickle it
             raise SubprocessException(**notification.exception)
+        elif notification.notification_type == NotificationType.START:
+            self._session_start_callback()
         else:
             raise StreamError("Unrecognised notification type received")
 

Reply via email to