This is an automated email from the ASF dual-hosted git repository. not-in-ldap pushed a commit to branch tpollard/streamasync in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 676b700301204947e57a0a54a18f580c6c2a5b00 Author: Tom Pollard <[email protected]> AuthorDate: Fri Sep 27 17:46:46 2019 +0100 Make it more verbose with front & back notifications --- src/buildstream/_scheduler/scheduler.py | 48 +++++++++++------------ src/buildstream/_stream.py | 68 ++++++++++++++++----------------- 2 files changed, 58 insertions(+), 58 deletions(-) diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index bb3fac5..e90efdb 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -153,8 +153,8 @@ class Scheduler(): self._state = state # Bidirectional pipe to send notifications back to the Scheduler's owner - self._notify_front = None - self._notify_back = None + self._notify_front_queue = None + self._notify_back_queue = None # Notifier callback to use if not running in a subprocess self._notifier = notifier @@ -188,7 +188,7 @@ class Scheduler(): asyncio.set_event_loop(self.loop) # Notify that the loop has been created - self._notify(Notification(NotificationType.RUNNING)) + self._notify_front(Notification(NotificationType.RUNNING)) # Add timeouts self.loop.call_later(1, self._tick) @@ -197,7 +197,7 @@ class Scheduler(): self._connect_signals() # Add notification handler - if self._notify_back: + if self._notify_back_queue: self.loop.call_later(0.01, self._loop) # Start the profiler @@ -214,7 +214,7 @@ class Scheduler(): self.loop = None # Notify that the loop has been reset - self._notify(Notification(NotificationType.RUNNING)) + self._notify_front(Notification(NotificationType.RUNNING)) if failed: status = SchedStatus.ERROR @@ -224,12 +224,12 @@ class Scheduler(): status = SchedStatus.SUCCESS # Send the state taskgroups if we're running under the subprocess - if self._notify_front: + if self._notify_front_queue: # Don't pickle state for group in self._state.task_groups.values(): group._state = None notification = Notification(NotificationType.TASK_GROUPS, task_groups=self._state.task_groups) - self._notify_front.put(notification) + self._notify_front_queue.put(notification) return status @@ -268,7 +268,7 @@ class Scheduler(): # Notify the frontend that we're terminated as it might be # from an interactive prompt callback or SIGTERM - self._notify(Notification(NotificationType.TERMINATED)) + self._notify_front(Notification(NotificationType.TERMINATED)) self.loop.call_soon(self._terminate_jobs_real) # Block this until we're finished terminating jobs, @@ -329,7 +329,7 @@ class Scheduler(): job_action=job.action_name, job_status=status, element=element_info) - self._notify(notification) + self._notify_front(notification) self._sched() # notify_messenger() @@ -341,7 +341,7 @@ class Scheduler(): # handler, as assigned by context's messenger. # def notify_messenger(self, message): - self._notify(Notification(NotificationType.MESSAGE, message=message)) + self._notify_front(Notification(NotificationType.MESSAGE, message=message)) # set_last_task_error() # @@ -356,7 +356,7 @@ class Scheduler(): task_error = domain, reason notification = Notification(NotificationType.TASK_ERROR, task_error=task_error) - self._notify(notification) + self._notify_front(notification) ####################################################### # Local Private Methods # @@ -375,7 +375,7 @@ class Scheduler(): full_name=job.name, job_action=job.action_name, time=self._state.elapsed_time(start_time=self._starttime)) - self._notify(notification) + self._notify_front(notification) job.start() # _sched_queue_jobs() @@ -460,7 +460,7 @@ class Scheduler(): self._suspendtime = datetime.datetime.now() self.suspended = True # Notify that we're suspended - self._notify(Notification(NotificationType.SUSPENDED)) + self._notify_front(Notification(NotificationType.SUSPENDED)) for job in self._active_jobs: job.suspend() @@ -474,9 +474,9 @@ class Scheduler(): job.resume() self.suspended = False # Notify that we're unsuspended - self._notify(Notification(NotificationType.SUSPENDED)) + self._notify_front(Notification(NotificationType.SUSPENDED)) self._starttime += (datetime.datetime.now() - self._suspendtime) - self._notify(Notification(NotificationType.SCHED_START_TIME, time=self._starttime)) + self._notify_front(Notification(NotificationType.SCHED_START_TIME, time=self._starttime)) self._suspendtime = None # _interrupt_event(): @@ -494,7 +494,7 @@ class Scheduler(): return notification = Notification(NotificationType.INTERRUPT) - self._notify(notification) + self._notify_front(notification) # _terminate_event(): # @@ -553,7 +553,7 @@ class Scheduler(): # Regular timeout for driving status in the UI def _tick(self): - self._notify(Notification(NotificationType.TICK)) + self._notify_front(Notification(NotificationType.TICK)) self.loop.call_later(1, self._tick) def _failure_retry(self, action_name, unique_id): @@ -568,14 +568,14 @@ class Scheduler(): queue._task_group.failed_tasks.remove(element._get_full_name()) queue.enqueue([element]) - def _notify(self, notification): + def _notify_front(self, notification): # Check if we need to call the notifier callback - if self._notify_front: - self._notify_front.put(notification) + if self._notify_front_queue: + self._notify_front_queue.put(notification) else: self._notifier(notification) - def _stream_notification_handler(self, notification): + def _notification_handler(self, notification): if notification.notification_type == NotificationType.TERMINATE: self.terminate_jobs() elif notification.notification_type == NotificationType.QUIT: @@ -592,12 +592,12 @@ class Scheduler(): raise ValueError("Unrecognised notification type received") def _loop(self): - assert self._notify_back + assert self._notify_back_queue # Check for and process new messages while True: try: - notification = self._notify_back.get_nowait() - self._stream_notification_handler(notification) + notification = self._notify_back_queue.get_nowait() + self._notification_handler(notification) except queue.Empty: notification = None break diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index d01605e..839636c 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -90,18 +90,17 @@ class Stream(): context.messenger.set_state(self._state) # Scheduler may use callback for notification depending on whether it's subprocessed - self._scheduler = Scheduler(context, session_start, self._state, self._scheduler_notification_handler) + self._scheduler = Scheduler(context, session_start, self._state, self._notification_handler) self._first_non_track_queue = None self._session_start_callback = session_start_callback self._ticker_callback = ticker_callback self._interrupt_callback = interrupt_callback - self._notifier = self._scheduler._stream_notification_handler # Assign the schedulers notification handler self._scheduler_running = False self._scheduler_terminated = False self._scheduler_suspended = False - self._notify_front = None - self._notify_back = None + self._notify_front_queue = None + self._notify_back_queue = None # init() # @@ -129,14 +128,14 @@ class Stream(): mp_context = mp.get_context(method='fork') process_name = "stream-{}".format(func.__name__) - self._notify_front = mp.Queue() - self._notify_back = mp.Queue() + self._notify_front_queue = mp.Queue() + self._notify_back_queue = mp.Queue() # Tell the scheduler to not use the notifier callback - self._scheduler._notify_front = self._notify_front - self._scheduler._notify_back = self._notify_back + self._scheduler._notify_front_queue = self._notify_front_queue + self._scheduler._notify_back_queue = self._notify_back_queue args = list(args) - args.insert(0, self._notify_front) + args.insert(0, self._notify_front_queue) args.insert(0, func) self._subprocess = mp_context.Process(target=Stream._subprocess_main, args=args, @@ -157,8 +156,8 @@ class Stream(): # Ensure no more notifcations to process try: while True: - notification = self._notify_front.get_nowait() - self._scheduler_notification_handler(notification) + notification = self._notify_front_queue.get_nowait() + self._notification_handler(notification) except queue.Empty: pass @@ -169,7 +168,7 @@ class Stream(): # def cleanup(self): # Close the notification queue - for q in [self._notify_back, self._notify_front]: + for q in [self._notify_back_queue, self._notify_front_queue]: if q is not None: q.close #self._notification_queue.cancel_join_thread() @@ -1196,7 +1195,7 @@ class Stream(): # def terminate(self): notification = Notification(NotificationType.TERMINATE) - self._notify(notification) + self._notify_back(notification) # quit() # @@ -1206,7 +1205,7 @@ class Stream(): # def quit(self): notification = Notification(NotificationType.QUIT) - self._notify(notification) + self._notify_back(notification) # suspend() # @@ -1216,11 +1215,11 @@ class Stream(): def suspend(self): # Send the notification to suspend jobs notification = Notification(NotificationType.SUSPEND) - self._notify(notification) + self._notify_back(notification) yield # Unsuspend jobs on context exit notification = Notification(NotificationType.UNSUSPEND) - self._notify(notification) + self._notify_back(notification) ############################################################# # Private Methods # @@ -1426,7 +1425,7 @@ class Stream(): notification = Notification(NotificationType.RETRY, job_action=action_name, element=unique_id) - self._notify(notification) + self._notify_back(notification) # _run() # @@ -1440,18 +1439,13 @@ class Stream(): self.total_elements = list(self._pipeline.dependencies(self.targets, Scope.ALL)) if self._session_start_callback is not None: - if self._notify_front: - self._notify_front.put(Notification(NotificationType.START)) - else: - self._session_start_callback() + self._notify_front(Notification(NotificationType.START)) # Also send through the session & total elements list lengths for status rendering element_totals = str(len(self.session_elements)), str(len(self.total_elements)) - if self._notify_front: - self._notify_front.put(Notification(NotificationType.ELEMENT_TOTALS, - element_totals=element_totals)) - else: - self.len_session_elements, self.len_total_elements = element_totals + self._notify_front(Notification(NotificationType.ELEMENT_TOTALS, + element_totals=element_totals)) + status = self._scheduler.run(self.queues) @@ -1745,7 +1739,7 @@ class Stream(): return element_targets, artifact_refs - def _scheduler_notification_handler(self, notification): + def _notification_handler(self, notification): if notification.notification_type == NotificationType.TASK_GROUPS: self._state.task_groups = notification.task_groups elif notification.notification_type == NotificationType.MESSAGE: @@ -1782,21 +1776,27 @@ class Stream(): else: raise StreamError("Unrecognised notification type received") - def _notify(self, notification): - if self._notify_back: - self._notify_back.put(notification) + def _notify_back(self, notification): + if self._notify_back_queue: + self._notify_back_queue.put(notification) + else: + self._scheduler._notification_handler(notification) + + def _notify_front(self, notification): + if self._notify_front_queue: + self._notify_front_queue.put(notification) else: - self._scheduler._stream_notification_handler(notification) + self._notification_handler(notification) # The code to be run by the Stream's event loop while delegating # work to a subprocess with the @subprocessed decorator def _loop(self): - assert self._notify_front + assert self._notify_front_queue # Check for and process new messages while True: try: - notification = self._notify_front.get_nowait() - self._scheduler_notification_handler(notification) + notification = self._notify_front_queue.get_nowait() + self._notification_handler(notification) except queue.Empty: notification = None break
