This is an automated email from the ASF dual-hosted git repository. tvb pushed a commit to branch tpollard/temp in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 75e9d023ee424f4fb24e83908e199db891c54906 Author: Tom Pollard <[email protected]> AuthorDate: Thu Oct 24 14:23:55 2019 +0100 Add support for dynamic queue status reporting to frontend State() --- src/buildstream/_scheduler/queues/queue.py | 3 ++ src/buildstream/_scheduler/scheduler.py | 15 ++++----- src/buildstream/_state.py | 50 +++++++++++++++++++++++++----- src/buildstream/_stream.py | 12 ++++++- 4 files changed, 62 insertions(+), 18 deletions(-) diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py index 79cb162..77a9157 100644 --- a/src/buildstream/_scheduler/queues/queue.py +++ b/src/buildstream/_scheduler/queues/queue.py @@ -86,6 +86,9 @@ class Queue: self._max_retries = scheduler.context.sched_network_retries self._task_group = self._scheduler._state.add_task_group(self.action_name, self.complete_name) + self._scheduler._state.register_task_groups_changed_callback( + self._scheduler._update_task_groups, name=self.action_name + ) # destroy() # diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index fbe2599..ac52b0e 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -109,7 +109,7 @@ class Notification: self.message = message self.task_error = task_error # Tuple of domain & reason self.exception = exception - self.task_groups = task_groups + self.task_groups = task_groups # Tuple of queue name, complete name, task change, & optional element name self.element_totals = element_totals @@ -243,14 +243,6 @@ class Scheduler: else: status = SchedStatus.SUCCESS - # Send the state taskgroups if we're running under the subprocess - if subprocessed: - # Don't pickle state - for group in self._state.task_groups.values(): - group._state = None - notification = Notification(NotificationType.TASK_GROUPS, task_groups=self._state.task_groups) - self._notify_front_queue.put(notification) - return status # clear_queues() @@ -658,6 +650,11 @@ class Scheduler: if self._notify_back_queue: self.loop.remove_reader(self._notify_back_queue._reader.fileno()) + def _update_task_groups(self, name, complete_name, task, full_name=None): + if self._notify_front_queue: + changes = (name, complete_name, task, full_name) + self._notify_front(Notification(NotificationType.TASK_GROUPS, task_groups=changes)) + def __getstate__(self): # The only use-cases for pickling in BuildStream at the time of writing # are enabling the 'spawn' method of starting child processes, and diff --git a/src/buildstream/_state.py b/src/buildstream/_state.py index d85e348..c0c29e9 100644 --- a/src/buildstream/_state.py +++ b/src/buildstream/_state.py @@ -54,8 +54,10 @@ class TaskGroup: # def add_processed_task(self): self.processed_tasks += 1 - for cb in self._state._task_groups_changed_cbs: - cb() + for cb, name in self._state._task_groups_changed_cbs: + # If name matches group, or if name not given call the cb + if name == self.name or name is None: + cb(name, self.complete_name, "processed_tasks") # add_skipped_task() # @@ -65,9 +67,10 @@ class TaskGroup: # def add_skipped_task(self): self.skipped_tasks += 1 - - for cb in self._state._task_groups_changed_cbs: - cb() + for cb, name in self._state._task_groups_changed_cbs: + # If name matches group, or if name not given call the cb + if name == self.name or name is None: + cb(name, self.complete_name, "skipped_tasks") # add_failed_task() # @@ -82,9 +85,10 @@ class TaskGroup: # def add_failed_task(self, full_name): self.failed_tasks.append(full_name) - - for cb in self._state._task_groups_changed_cbs: - cb() + for cb, name in self._state._task_groups_changed_cbs: + # If name matches group, or if name not given call the cb + if name == self.name or name is None: + cb(name, self.complete_name, "failed_tasks", full_name) # State @@ -226,6 +230,36 @@ class State: def unregister_task_failed_callback(self, callback): self._task_failed_cbs.remove(callback) + # register_task_groups_changed_callback() + # + # Registers a callback to be notified when a task group has changed + # + # Args: + # callback (function): The callback to be notified + # name (str): Optional taskgroup related name, e.g. the action_name of a Queue. If None + # given then the callback will be triggered for any task group changing. + # + # Callback Args: + # name (str): The name of the task group, e.g. 'build' + # complete_name (str): The complete name of the task group, e.g. 'built' + # task(str): The full name of the task outcome, processed, skipped or failed. + # element_name (str): Optional if an element task failed, the element name + # + def register_task_groups_changed_callback(self, callback, name=None): + self._task_groups_changed_cbs.append((callback, name)) + + # unregister_task_groups_changed_callback() + # + # Unregisters a callback previously registered by + # register_task_groups_changed_callback() + # + # Args: + # callback (function): The callback to be removed + # name (str): Optional taskgroup related name, e.g. the action_name of a Queue + # + def unregister_task_groups_changed_callback(self, callback, name=None): + self._task_groups_changed_cbs.remove((callback, name)) + ############################################## # Core-facing APIs for driving notifications # ############################################## diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 4ac73e6..789e14a 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -1778,7 +1778,17 @@ class Stream: def _notification_handler(self, notification): if notification.notification_type == NotificationType.TASK_GROUPS: - self._state.task_groups = notification.task_groups + queue_name, complete_name, task_event, element_name = notification.task_groups + try: + group = self._state.task_groups[queue_name] + except KeyError: + # Queue not yet mirrored in front process, so create it & add it to status output + group = self._state.add_task_group(queue_name, complete_name) + if element_name is None: + count = getattr(group, task_event) + setattr(group, task_event, count + 1) + else: + getattr(group, task_event).append(element_name) elif notification.notification_type == NotificationType.MESSAGE: self._context.messenger.message(notification.message) elif notification.notification_type == NotificationType.INTERRUPT:
