This is an automated email from the ASF dual-hosted git repository. github-bot pushed a commit to branch tpollard/temp in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit bf1741582db3316e00d980e8b012b5b35d64d635 Author: Tom Pollard <[email protected]> AuthorDate: Fri Sep 27 17:46:46 2019 +0100 Make it more verbose with front & back notifications --- src/buildstream/_scheduler/scheduler.py | 48 +++++++++++------------ src/buildstream/_stream.py | 67 ++++++++++++++++----------------- 2 files changed, 56 insertions(+), 59 deletions(-) diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 6b5f306..3476162 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -157,8 +157,8 @@ class Scheduler: self._casd_process = None # handle to the casd process for monitoring purpose # Bidirectional pipe to send notifications back to the Scheduler's owner - self._notify_front = None - self._notify_back = None + self._notify_front_queue = None + self._notify_back_queue = None # Notifier callback to use if not running in a subprocess self._notifier = notifier @@ -190,7 +190,7 @@ class Scheduler: asyncio.set_event_loop(self.loop) # Notify that the loop has been created - self._notify(Notification(NotificationType.RUNNING)) + self._notify_front(Notification(NotificationType.RUNNING)) # Add timeouts self.loop.call_later(1, self._tick) @@ -204,7 +204,7 @@ class Scheduler: _watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure) # Add notification handler - if self._notify_back: + if self._notify_back_queue: self.loop.call_later(0.01, self._loop) # Start the profiler @@ -225,7 +225,7 @@ class Scheduler: self.loop = None # Notify that the loop has been reset - self._notify(Notification(NotificationType.RUNNING)) + self._notify_front(Notification(NotificationType.RUNNING)) if failed: status = SchedStatus.ERROR @@ -235,12 +235,12 @@ class Scheduler: status = SchedStatus.SUCCESS # Send the state taskgroups if we're running under the subprocess - if self._notify_front: + if self._notify_front_queue: # Don't pickle state for group in self._state.task_groups.values(): group._state = None notification = Notification(NotificationType.TASK_GROUPS, task_groups=self._state.task_groups) - self._notify_front.put(notification) + self._notify_front_queue.put(notification) return status @@ -279,7 +279,7 @@ class Scheduler: # Notify the frontend that we're terminated as it might be # from an interactive prompt callback or SIGTERM - self._notify(Notification(NotificationType.TERMINATED)) + self._notify_front(Notification(NotificationType.TERMINATED)) self.loop.call_soon(self._terminate_jobs_real) # Block this until we're finished terminating jobs, @@ -342,7 +342,7 @@ class Scheduler: job_status=status, element=element_info, ) - self._notify(notification) + self._notify_front(notification) self._sched() # notify_messenger() @@ -354,7 +354,7 @@ class Scheduler: # handler, as assigned by context's messenger. # def notify_messenger(self, message): - self._notify(Notification(NotificationType.MESSAGE, message=message)) + self._notify_front(Notification(NotificationType.MESSAGE, message=message)) # set_last_task_error() # @@ -368,7 +368,7 @@ class Scheduler: def set_last_task_error(self, domain, reason): task_error = domain, reason notification = Notification(NotificationType.TASK_ERROR, task_error=task_error) - self._notify(notification) + self._notify_front(notification) ####################################################### # Local Private Methods # @@ -407,7 +407,7 @@ class Scheduler: job_action=job.action_name, time=self._state.elapsed_time(start_time=self._starttime), ) - self._notify(notification) + self._notify_front(notification) job.start() # _sched_queue_jobs() @@ -497,7 +497,7 @@ class Scheduler: self._suspendtime = datetime.datetime.now() self.suspended = True # Notify that we're suspended - self._notify(Notification(NotificationType.SUSPENDED)) + self._notify_front(Notification(NotificationType.SUSPENDED)) for job in self._active_jobs: job.suspend() @@ -511,9 +511,9 @@ class Scheduler: job.resume() self.suspended = False # Notify that we're unsuspended - self._notify(Notification(NotificationType.SUSPENDED)) + self._notify_front(Notification(NotificationType.SUSPENDED)) self._starttime += datetime.datetime.now() - self._suspendtime - self._notify(Notification(NotificationType.SCHED_START_TIME, time=self._starttime)) + self._notify_front(Notification(NotificationType.SCHED_START_TIME, time=self._starttime)) self._suspendtime = None # _interrupt_event(): @@ -529,7 +529,7 @@ class Scheduler: return notification = Notification(NotificationType.INTERRUPT) - self._notify(notification) + self._notify_front(notification) # _terminate_event(): # @@ -582,7 +582,7 @@ class Scheduler: # Regular timeout for driving status in the UI def _tick(self): - self._notify(Notification(NotificationType.TICK)) + self._notify_front(Notification(NotificationType.TICK)) self.loop.call_later(1, self._tick) def _failure_retry(self, action_name, unique_id): @@ -597,14 +597,14 @@ class Scheduler: queue._task_group.failed_tasks.remove(element._get_full_name()) queue.enqueue([element]) - def _notify(self, notification): + def _notify_front(self, notification): # Check if we need to call the notifier callback - if self._notify_front: - self._notify_front.put(notification) + if self._notify_front_queue: + self._notify_front_queue.put(notification) else: self._notifier(notification) - def _stream_notification_handler(self, notification): + def _notification_handler(self, notification): if notification.notification_type == NotificationType.TERMINATE: self.terminate_jobs() elif notification.notification_type == NotificationType.QUIT: @@ -621,12 +621,12 @@ class Scheduler: raise ValueError("Unrecognised notification type received") def _loop(self): - assert self._notify_back + assert self._notify_back_queue # Check for and process new messages while True: try: - notification = self._notify_back.get_nowait() - self._stream_notification_handler(notification) + notification = self._notify_back_queue.get_nowait() + self._notification_handler(notification) except queue.Empty: notification = None break diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 841ccde..c8758da 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -106,17 +106,16 @@ class Stream: context.messenger.set_state(self._state) - self._scheduler = Scheduler(context, session_start, self._state, self._scheduler_notification_handler) + self._scheduler = Scheduler(context, session_start, self._state, self._notification_handler) self._first_non_track_queue = None self._session_start_callback = session_start_callback self._ticker_callback = ticker_callback self._interrupt_callback = interrupt_callback - self._notifier = self._scheduler._stream_notification_handler # Assign the schedulers notification handler self._scheduler_running = False self._scheduler_terminated = False self._scheduler_suspended = False - self._notify_front = None - self._notify_back = None + self._notify_front_queue = None + self._notify_back_queue = None # init() # @@ -145,14 +144,14 @@ class Stream: mp_context = mp.get_context(method="fork") process_name = "stream-{}".format(func.__name__) - self._notify_front = mp.Queue() - self._notify_back = mp.Queue() + self._notify_front_queue = mp.Queue() + self._notify_back_queue = mp.Queue() # Tell the scheduler to not use the notifier callback - self._scheduler._notify_front = self._notify_front - self._scheduler._notify_back = self._notify_back + self._scheduler._notify_front_queue = self._notify_front_queue + self._scheduler._notify_back_queue = self._notify_back_queue args = list(args) - args.insert(0, self._notify_front) + args.insert(0, self._notify_front_queue) args.insert(0, func) self._subprocess = mp_context.Process( @@ -172,8 +171,8 @@ class Stream: # Ensure no more notifcations to process try: while True: - notification = self._notify_front.get_nowait() - self._scheduler_notification_handler(notification) + notification = self._notify_front_queue.get_nowait() + self._notification_handler(notification) except queue.Empty: print("Finished processing notifications") pass @@ -184,7 +183,7 @@ class Stream: # def cleanup(self): # Close the notification queue - for q in [self._notify_back, self._notify_front]: + for q in [self._notify_back_queue, self._notify_front_queue]: if q is not None: q.close() # self._notification_queue.cancel_join_thread() @@ -1222,7 +1221,7 @@ class Stream: # def terminate(self): notification = Notification(NotificationType.TERMINATE) - self._notify(notification) + self._notify_back(notification) # quit() # @@ -1232,7 +1231,7 @@ class Stream: # def quit(self): notification = Notification(NotificationType.QUIT) - self._notify(notification) + self._notify_back(notification) # suspend() # @@ -1242,11 +1241,11 @@ class Stream: def suspend(self): # Send the notification to suspend jobs notification = Notification(NotificationType.SUSPEND) - self._notify(notification) + self._notify_back(notification) yield # Unsuspend jobs on context exit notification = Notification(NotificationType.UNSUSPEND) - self._notify(notification) + self._notify_back(notification) ############################################################# # Private Methods # @@ -1435,7 +1434,7 @@ class Stream: # def _failure_retry(self, action_name, unique_id): notification = Notification(NotificationType.RETRY, job_action=action_name, element=unique_id) - self._notify(notification) + self._notify_back(notification) # _run() # @@ -1449,17 +1448,11 @@ class Stream: self.total_elements = list(self._pipeline.dependencies(self.targets, Scope.ALL)) if self._session_start_callback is not None: - if self._notify_front: - self._notify_front.put(Notification(NotificationType.START)) - else: - self._session_start_callback() + self._notify_front(Notification(NotificationType.START)) # Also send through the session & total elements list lengths for status rendering element_totals = str(len(self.session_elements)), str(len(self.total_elements)) - if self._notify_front: - self._notify_front.put(Notification(NotificationType.ELEMENT_TOTALS, element_totals=element_totals)) - else: - self.len_session_elements, self.len_total_elements = element_totals + self._notify_front(Notification(NotificationType.ELEMENT_TOTALS, element_totals=element_totals)) status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process()) @@ -1749,7 +1742,7 @@ class Stream: return element_targets, artifact_refs - def _scheduler_notification_handler(self, notification): + def _notification_handler(self, notification): if notification.notification_type == NotificationType.TASK_GROUPS: self._state.task_groups = notification.task_groups elif notification.notification_type == NotificationType.MESSAGE: @@ -1784,23 +1777,27 @@ class Stream: else: raise StreamError("Unrecognised notification type received") - def _notify(self, notification): - # Set that the notifcation is for the scheduler - # notification.for_scheduler = True - if self._notify_back: - self._notify_back.put(notification) + def _notify_back(self, notification): + if self._notify_back_queue: + self._notify_back_queue.put(notification) + else: + self._scheduler._notification_handler(notification) + + def _notify_front(self, notification): + if self._notify_front_queue: + self._notify_front_queue.put(notification) else: - self._scheduler._stream_notification_handler(notification) + self._notification_handler(notification) # The code to be run by the Stream's event loop while delegating # work to a subprocess with the @subprocessed decorator def _loop(self): - assert self._notify_front + assert self._notify_front_queue # Check for and process new messages while True: try: - notification = self._notify_front.get_nowait() - self._scheduler_notification_handler(notification) + notification = self._notify_front_queue.get_nowait() + self._notification_handler(notification) except queue.Empty: notification = None break
