This is an automated email from the ASF dual-hosted git repository. not-in-ldap pushed a commit to branch phil/ui-split-refactor in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit b7b445cdb0de0223015c71506d8689135151c966 Author: Phil Dawson <[email protected]> AuthorDate: Thu Jun 13 17:47:04 2019 +0100 WIP: Refactor scheduler-frontend communication --- src/buildstream/_frontend/app.py | 2 + src/buildstream/_scheduler/__init__.py | 2 +- src/buildstream/_scheduler/scheduler.py | 76 +++++++++++++++++++++++++-------- src/buildstream/_stream.py | 25 ++++++++++- 4 files changed, 85 insertions(+), 20 deletions(-) diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py index 76e3bc7..9f90938 100644 --- a/src/buildstream/_frontend/app.py +++ b/src/buildstream/_frontend/app.py @@ -607,6 +607,7 @@ class App(): pass return + assert False # Interactive mode for element failures with self._interrupted(): @@ -646,6 +647,7 @@ class App(): # Handle choices which you can come back from # + assert choice != 'shell' # This won't work for now if choice == 'shell': click.echo("\nDropping into an interactive shell in the failed build sandbox\n", err=True) try: diff --git a/src/buildstream/_scheduler/__init__.py b/src/buildstream/_scheduler/__init__.py index d2f458f..d689d6e 100644 --- a/src/buildstream/_scheduler/__init__.py +++ b/src/buildstream/_scheduler/__init__.py @@ -26,5 +26,5 @@ from .queues.buildqueue import BuildQueue from .queues.artifactpushqueue import ArtifactPushQueue from .queues.pullqueue import PullQueue -from .scheduler import Scheduler, SchedStatus +from .scheduler import Scheduler, SchedStatus, Notification, NotificationType from .jobs import ElementJob, JobStatus diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 4f668c6..a9286d4 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -21,6 +21,7 @@ # System imports import os import asyncio +import enum from itertools import chain import signal import datetime @@ -45,6 +46,34 @@ _ACTION_NAME_CLEANUP = 'clean' _ACTION_NAME_CACHE_SIZE = 'size' [email protected] +class NotificationType(enum.Enum): + INTERRUPT = "interrupt" + JOB_START = "job_start" + JOB_COMPLETE = "job_complete" + TICK = "tick" + + +class Notification: + + def __init__(self, + notification_type, + *, + full_name=None, + job_action=None, + job_status=None, + failed_element=False, + elapsed_time=None, + element=None): + self.notification_type = notification_type + self.full_name = full_name + self.job_action = job_action + self.job_status = job_status + self.failed_element = failed_element + self.elapsed_time = elapsed_time + self.element = element + + # Scheduler() # # The scheduler operates on a list queues, each of which is meant to accomplish @@ -69,7 +98,7 @@ _ACTION_NAME_CACHE_SIZE = 'size' class Scheduler(): def __init__(self, context, - start_time, state, + start_time, state, message_handler, interrupt_callback=None, ticker_callback=None, interactive_failure=False): @@ -102,9 +131,17 @@ class Scheduler(): self._cleanup_scheduled = False # Whether we have a cleanup job scheduled self._cleanup_running = None # A running CleanupJob, or None - # Callbacks to report back to the Scheduler owner - self._interrupt_callback = interrupt_callback - self._ticker_callback = ticker_callback + # Callback to send messages to report back to the Scheduler's owner + self.message = message_handler + + # Whether our exclusive jobs, like 'cleanup' are currently already + # waiting or active. + # + # This is just a bit quicker than scanning the wait queue and active + # queue and comparing job action names. + # + self._exclusive_waiting = set() + self._exclusive_active = set() self.resources = Resources(context.sched_builders, context.sched_fetchers, @@ -134,8 +171,7 @@ class Scheduler(): asyncio.set_event_loop(self.loop) # Add timeouts - if self._ticker_callback: - self.loop.call_later(1, self._tick) + self.loop.call_later(1, self._tick) # Handle unix signals while running self._connect_signals() @@ -254,13 +290,19 @@ class Scheduler(): # Remove from the active jobs list self._active_jobs.remove(job) - self._state.remove_task(job.action_name, job.name) if status == JobStatus.FAIL: # If it's an elementjob, we want to compare against the failure messages # and send the Element() instance if interactive failure handling. Note # this may change if the frontend is run in a separate process for pickling element = job._element if (job.element_job and self._interactive_failure) else None - self._state.fail_task(job.action_name, job.name, element_job=job.element_job, element=element) + + message = Notification(NotificationType.JOB_COMPLETE, + full_name=job.name, + job_action=job.action_name, + job_status=status, + failed_element=job.element_job, + element=element) + self.message(message) # Now check for more jobs self._sched() @@ -319,7 +361,11 @@ class Scheduler(): # def _start_job(self, job): self._active_jobs.append(job) - self._state.add_task(job.action_name, job.name, self.elapsed_time()) + message = Notification(NotificationType.JOB_START, + full_name=job.name, + job_action=job.action_name, + elapsed_time=self.elapsed_time()) + self.message(message) job.start() # Callback for the cache size job @@ -538,13 +584,8 @@ class Scheduler(): if self.terminated: return - # Leave this to the frontend to decide, if no - # interrrupt callback was specified, then just terminate. - if self._interrupt_callback: - self._interrupt_callback() - else: - # Default without a frontend is just terminate - self.terminate_jobs() + message = Notification(NotificationType.INTERRUPT) + self.message(message) # _terminate_event(): # @@ -603,7 +644,8 @@ class Scheduler(): # Regular timeout for driving status in the UI def _tick(self): - self._ticker_callback() + message = Notification(NotificationType.TICK) + self.message(message) self.loop.call_later(1, self._tick) def __getstate__(self): diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 5f12889..45bb41b 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -36,7 +36,7 @@ from ._artifactelement import verify_artifact_ref, ArtifactElement from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError from ._message import Message, MessageType from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \ - SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue + SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, JobStatus from ._pipeline import Pipeline, PipelineSelection from ._profile import Topics, PROFILER from ._state import State @@ -85,12 +85,14 @@ class Stream(): context.messenger.set_state(self._state) - self._scheduler = Scheduler(context, session_start, self._state, + self._scheduler = Scheduler(context, session_start, self._state, self._scheduler_notification_handler, interrupt_callback=interrupt_callback, ticker_callback=ticker_callback, interactive_failure=interactive_failure) self._first_non_track_queue = None self._session_start_callback = session_start_callback + self._ticker_callback = ticker_callback + self._interrupt_callback = interrupt_callback # init() # @@ -1572,6 +1574,25 @@ class Stream(): return element_targets, artifact_refs + def _scheduler_notification_handler(self, notification): + if notification.notification_type == NotificationType.INTERRUPT: + self._interrupt_callback() + elif notification.notification_type == NotificationType.TICK: + self._ticker_callback() + elif notification.notification_type == NotificationType.JOB_START: + self._state.add_task(notification.job_action, notification.full_name, notification.elapsed_time) + + elif notification.notification_type == NotificationType.JOB_COMPLETE: + self._state.remove_task(notification.job_action, notification.full_name) + if notification.job_status == JobStatus.FAIL: + if notification.failed_element: + unique_id = notification.full_name + else: + unique_id = None + self._state.fail_task(notification.job_action, notification.full_name, unique_id) + else: + raise StreamError("Unreccognised notification type recieved") + def __getstate__(self): # The only use-cases for pickling in BuildStream at the time of writing # are enabling the 'spawn' method of starting child processes, and
