This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch tpollard/streamasync
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 5cdf740edea883d302774d6c136d107df6dc4b14
Author: Tom Pollard <[email protected]>
AuthorDate: Fri Sep 27 14:51:53 2019 +0100

    Add len of session/total elements members to Stream
---
 src/buildstream/_frontend/status.py     |  4 ++--
 src/buildstream/_frontend/widget.py     |  4 ++--
 src/buildstream/_scheduler/scheduler.py |  5 ++++-
 src/buildstream/_stream.py              | 19 +++++++++++++++----
 4 files changed, 23 insertions(+), 9 deletions(-)

diff --git a/src/buildstream/_frontend/status.py 
b/src/buildstream/_frontend/status.py
index a204bd9..578298d 100644
--- a/src/buildstream/_frontend/status.py
+++ b/src/buildstream/_frontend/status.py
@@ -373,8 +373,8 @@ class _StatusHeader():
         #
         #  ========= 00:00:00 project-name (143/387) =========
         #
-        session = str(len(self._stream.session_elements))
-        total = str(len(self._stream.total_elements))
+        session = self._stream.len_session_elements
+        total = self._stream.len_total_elements
 
         size = 0
         text = ''
diff --git a/src/buildstream/_frontend/widget.py 
b/src/buildstream/_frontend/widget.py
index 181ee7d..d40b87c 100644
--- a/src/buildstream/_frontend/widget.py
+++ b/src/buildstream/_frontend/widget.py
@@ -565,8 +565,8 @@ class LogLine(Widget):
         text += self.content_profile.fmt("Pipeline Summary\n", bold=True)
         values = OrderedDict()
 
-        values['Total'] = 
self.content_profile.fmt(str(len(stream.total_elements)))
-        values['Session'] = 
self.content_profile.fmt(str(len(stream.session_elements)))
+        values['Total'] = self.content_profile.fmt(stream.len_total_elements)
+        values['Session'] = 
self.content_profile.fmt(stream.len_session_elements)
 
         processed_maxlen = 1
         skipped_maxlen = 1
diff --git a/src/buildstream/_scheduler/scheduler.py 
b/src/buildstream/_scheduler/scheduler.py
index 62c2754..bb3fac5 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -67,6 +67,7 @@ class NotificationType(FastEnum):
     EXCEPTION = "exception"
     START = "start"
     TASK_GROUPS = "task_groups"
+    ELEMENT_TOTALS = "element_totals"
 
 
 # Notification()
@@ -90,7 +91,8 @@ class Notification():
                  message=None,
                  task_error=None,
                  exception=None,
-                 task_groups=None):
+                 task_groups=None,
+                 element_totals=None):
         self.notification_type = notification_type
         self.full_name = full_name
         self.job_action = job_action
@@ -101,6 +103,7 @@ class Notification():
         self.task_error = task_error  # Tuple of domain & reason
         self.exception = exception
         self.task_groups = task_groups
+        self.element_totals = element_totals
 
 
 # Scheduler()
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index c0bd110..d01605e 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -72,6 +72,8 @@ class Stream():
         self.session_elements = []   # List of elements being processed this 
session
         self.total_elements = []     # Total list of elements based on targets
         self.queues = []             # Queue objects
+        self.len_session_elements = None
+        self.len_total_elements = None
 
         #
         # Private members
@@ -82,7 +84,6 @@ class Stream():
         self._project = None
         self._pipeline = None
         self._state = State(session_start)  # Owned by Stream, used by Core to 
set state
-        #self._notification_pipe_front, self._notification_pipe_back = 
mp.Pipe()
         self._subprocess = None
         self._starttime = session_start  # Synchronised with Scheduler's 
relative start time
 
@@ -127,13 +128,13 @@ class Stream():
 
         mp_context = mp.get_context(method='fork')
         process_name = "stream-{}".format(func.__name__)
-        
+
         self._notify_front = mp.Queue()
         self._notify_back = mp.Queue()
         # Tell the scheduler to not use the notifier callback
         self._scheduler._notify_front = self._notify_front
         self._scheduler._notify_back = self._notify_back
-        
+
         args = list(args)
         args.insert(0, self._notify_front)
         args.insert(0, func)
@@ -1444,6 +1445,14 @@ class Stream():
             else:
                 self._session_start_callback()
 
+        # Also send through the session & total elements list lengths for 
status rendering
+        element_totals = str(len(self.session_elements)), 
str(len(self.total_elements))
+        if self._notify_front:
+            
self._notify_front.put(Notification(NotificationType.ELEMENT_TOTALS,
+                                                element_totals=element_totals))
+        else:
+            self.len_session_elements, self.len_total_elements = element_totals
+
         status = self._scheduler.run(self.queues)
 
         if status == SchedStatus.ERROR:
@@ -1768,6 +1777,8 @@ class Stream():
             raise SubprocessException(**notification.exception)
         elif notification.notification_type == NotificationType.START:
             self._session_start_callback()
+        elif notification.notification_type == NotificationType.ELEMENT_TOTALS:
+            self.len_session_elements, self.len_total_elements = 
notification.element_totals
         else:
             raise StreamError("Unrecognised notification type received")
 
@@ -1789,7 +1800,7 @@ class Stream():
             except queue.Empty:
                 notification = None
                 break
-    
+
     def __getstate__(self):
         # The only use-cases for pickling in BuildStream at the time of writing
         # are enabling the 'spawn' method of starting child processes, and

Reply via email to