This is an automated email from the ASF dual-hosted git repository. root pushed a commit to branch testing/local-cache-expiry in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 3b1b98e3a6f057ed0408688075508a118e403ea6 Author: Tristan Maat <[email protected]> AuthorDate: Tue May 8 17:34:38 2018 +0100 Allow aborting jobs without elements --- buildstream/_frontend/app.py | 35 ++++++++++++++++++------------- buildstream/_frontend/status.py | 27 +++++++++++++++++------- buildstream/_scheduler/jobs/elementjob.py | 7 ++++++- buildstream/_scheduler/jobs/job.py | 2 +- buildstream/_scheduler/scheduler.py | 8 ++++--- 5 files changed, 52 insertions(+), 27 deletions(-) diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py index 4675b0e..de910af 100644 --- a/buildstream/_frontend/app.py +++ b/buildstream/_frontend/app.py @@ -492,30 +492,37 @@ class App(): def _tick(self, elapsed): self._maybe_render_status() - def _job_started(self, element, action_name): - self._status.add_job(element, action_name) + def _job_started(self, job): + self._status.add_job(job) self._maybe_render_status() - def _job_completed(self, element, queue, action_name, success): - self._status.remove_job(element, action_name) + def _job_completed(self, job, success): + self._status.remove_job(job) self._maybe_render_status() # Dont attempt to handle a failure if the user has already opted to # terminate if not success and not self.stream.terminated: - # Get the last failure message for additional context - failure = self._fail_messages.get(element._get_unique_id()) + if hasattr(job, 'element'): + element = job.element + queue = job.queue - # XXX This is dangerous, sometimes we get the job completed *before* - # the failure message reaches us ?? - if not failure: - self._status.clear() - click.echo("\n\n\nBUG: Message handling out of sync, " + - "unable to retrieve failure message for element {}\n\n\n\n\n" - .format(element), err=True) + # Get the last failure message for additional context + failure = self._fail_messages.get(element._get_unique_id()) + + # XXX This is dangerous, sometimes we get the job completed *before* + # the failure message reaches us ?? + if not failure: + self._status.clear() + click.echo("\n\n\nBUG: Message handling out of sync, " + + "unable to retrieve failure message for element {}\n\n\n\n\n" + .format(element), err=True) + else: + self._handle_failure(element, queue, failure) else: - self._handle_failure(element, queue, failure) + click.echo("\nTerminating all jobs\n", err=True) + self.stream.terminate() def _handle_failure(self, element, queue, failure): diff --git a/buildstream/_frontend/status.py b/buildstream/_frontend/status.py old mode 100644 new mode 100755 index 3f66e00..7a2e719 --- a/buildstream/_frontend/status.py +++ b/buildstream/_frontend/status.py @@ -77,9 +77,9 @@ class Status(): # element (Element): The element of the job to track # action_name (str): The action name for this job # - def add_job(self, element, action_name): + def add_job(self, job): elapsed = self._stream.elapsed_time - job = _StatusJob(self._context, element, action_name, self._content_profile, self._format_profile, elapsed) + job = _StatusJob(self._context, job, self._content_profile, self._format_profile, elapsed) self._jobs.append(job) self._need_alloc = True @@ -91,7 +91,13 @@ class Status(): # element (Element): The element of the job to track # action_name (str): The action name for this job # - def remove_job(self, element, action_name): + def remove_job(self, job): + action_name = job.action_name + if not hasattr(job, 'element'): + element = None + else: + element = job.element + self._jobs = [ job for job in self._jobs if not (job.element is element and @@ -358,15 +364,19 @@ class _StatusHeader(): # # Args: # context (Context): The Context -# element (Element): The element being processed -# action_name (str): The name of the action +# job (Job): The job being processed # content_profile (Profile): Formatting profile for content text # format_profile (Profile): Formatting profile for formatting text # elapsed (datetime): The offset into the session when this job is created # class _StatusJob(): - def __init__(self, context, element, action_name, content_profile, format_profile, elapsed): + def __init__(self, context, job, content_profile, format_profile, elapsed): + action_name = job.action_name + if not hasattr(job, 'element'): + element = None + else: + element = job.element # # Public members @@ -374,6 +384,7 @@ class _StatusJob(): self.element = element # The Element self.action_name = action_name # The action name self.size = None # The number of characters required to render + self.full_name = element._get_full_name() if element else action_name # # Private members @@ -386,7 +397,7 @@ class _StatusJob(): # Calculate the size needed to display self.size = 10 # Size of time code with brackets self.size += len(action_name) - self.size += len(element._get_full_name()) + self.size += len(self.full_name) self.size += 3 # '[' + ':' + ']' # render() @@ -403,7 +414,7 @@ class _StatusJob(): self._format_profile.fmt(']') # Add padding after the display name, before terminating ']' - name = self.element._get_full_name() + (' ' * padding) + name = self.full_name + (' ' * padding) text += self._format_profile.fmt('[') + \ self._content_profile.fmt(self.action_name) + \ self._format_profile.fmt(':') + \ diff --git a/buildstream/_scheduler/jobs/elementjob.py b/buildstream/_scheduler/jobs/elementjob.py index 4d79de4..36e7c1d 100644 --- a/buildstream/_scheduler/jobs/elementjob.py +++ b/buildstream/_scheduler/jobs/elementjob.py @@ -70,12 +70,17 @@ from .job import Job # if `success` is False # class ElementJob(Job): - def __init__(self, *args, element, action_cb, complete_cb, **kwargs): + def __init__(self, *args, element, queue, action_cb, complete_cb, **kwargs): super().__init__(*args, **kwargs) + self.queue = queue self._element = element self._action_cb = action_cb # The action callable function self._complete_cb = complete_cb # The complete callable function + @property + def element(self): + return self._element + # _child_process() # # This will be executed after fork(), and is intended to perform diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py index 84be452..cf5bf07 100644 --- a/buildstream/_scheduler/jobs/job.py +++ b/buildstream/_scheduler/jobs/job.py @@ -551,7 +551,7 @@ class Job(): return self._parent_complete(returncode == 0, self._result) - self._scheduler.job_completed(self) + self._scheduler.job_completed(self, returncode == 0) # _parent_process_envelope() # diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py index ffbd656..b715d51 100644 --- a/buildstream/_scheduler/scheduler.py +++ b/buildstream/_scheduler/scheduler.py @@ -350,10 +350,14 @@ class Scheduler(): # job (Job): The completed Job # success (bool): Whether the Job completed with a success status # - def job_completed(self, job): + def job_completed(self, job, success): self.active_jobs.remove(job) self.schedule_queue_jobs() + # Notify frontend + if self._job_complete_callback: + self._job_complete_callback(job, success) + # get_job_token(): # # Used by the Queue object to obtain a token for @@ -486,8 +490,6 @@ class Scheduler(): if not job.terminate_wait(timeout): job.kill() - self.loop.stop() - # Regular timeout for driving status in the UI def _tick(self): elapsed = self.elapsed_time()
