This is an automated email from the ASF dual-hosted git repository. root pushed a commit to branch testing/local-cache-expiry in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit ebdaf82be82954b1bcaf56f3478ad5929e7286a7 Author: Tristan Maat <[email protected]> AuthorDate: Thu Apr 19 17:11:54 2018 +0100 buildstream/_scheduler/*: Make Jobs abstract and element-independent --- buildstream/_scheduler/jobs/__init__.py | 1 + buildstream/_scheduler/jobs/elementjob.py | 211 +++++++++++++++++++ buildstream/_scheduler/{ => jobs}/job.py | 326 ++++++++++++++++-------------- buildstream/_scheduler/queues/queue.py | 37 +++- buildstream/_scheduler/scheduler.py | 8 +- 5 files changed, 419 insertions(+), 164 deletions(-) diff --git a/buildstream/_scheduler/jobs/__init__.py b/buildstream/_scheduler/jobs/__init__.py new file mode 100644 index 0000000..0030f5c --- /dev/null +++ b/buildstream/_scheduler/jobs/__init__.py @@ -0,0 +1 @@ +from .elementjob import ElementJob diff --git a/buildstream/_scheduler/jobs/elementjob.py b/buildstream/_scheduler/jobs/elementjob.py new file mode 100644 index 0000000..4d79de4 --- /dev/null +++ b/buildstream/_scheduler/jobs/elementjob.py @@ -0,0 +1,211 @@ +# Copyright (C) 2018 Codethink Limited +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Author: +# Tristan Daniël Maat <[email protected]> +# +import os +from contextlib import contextmanager + +from ruamel import yaml + +from ..._message import Message, MessageType +from ...plugin import _plugin_lookup +from ... import _signals + +from .job import Job + + +# ElementJob() +# +# A job to run an element's commands. When this job is spawned +# `action_cb` will be called, and when it completes `complete_cb` will +# be called. +# +# Args: +# scheduler (Scheduler): The scheduler +# action_name (str): The queue action name +# max_retries (int): The maximum number of retries +# action_cb (callable): The function to execute on the child +# complete_cb (callable): The function to execute when the job completes +# element (Element): The element to work on +# kwargs: Remaining Job() constructor arguments +# +# Here is the calling signature of the action_cb: +# +# action_cb(): +# +# This function will be called in the child task +# +# Args: +# element (Element): The element passed to the Job() constructor +# +# Returns: +# (object): Any abstract simple python object, including a string, int, +# bool, list or dict, this must be a simple serializable object. +# +# Here is the calling signature of the complete_cb: +# +# complete_cb(): +# +# This function will be called when the child task completes +# +# Args: +# job (Job): The job object which completed +# element (Element): The element passed to the Job() constructor +# success (bool): True if the action_cb did not raise an exception +# result (object): The deserialized object returned by the `action_cb`, or None +# if `success` is False +# +class ElementJob(Job): + def __init__(self, *args, element, action_cb, complete_cb, **kwargs): + super().__init__(*args, **kwargs) + self._element = element + self._action_cb = action_cb # The action callable function + self._complete_cb = complete_cb # The complete callable function + + # _child_process() + # + # This will be executed after fork(), and is intended to perform + # the job's task. + # + # Returns: + # (any): A (simple!) object to be returned to the main thread + # as the result. + # + def _child_process(self): + return self._action_cb(self._element) + + def _parent_complete(self, success, result): + self._complete_cb(self, self._element, success, self._result) + + # _child_logging_enabled() + # + # Start the log for this job. This function will be given a + # template string for the path to a log file - this will contain + # "{pid}", which should be replaced with the current process' + # PID. (i.e., call something like `logfile.format(pid=os.getpid())`). + # + # Args: + # logfile (str): A template string that points to the logfile + # that should be used - replace {pid} first. + # + # Yields: + # (str) The path to the logfile with {pid} replaced. + # + @contextmanager + def _child_logging_enabled(self, logfile): + self._logfile = logfile.format(pid=os.getpid()) + + with open(self._logfile, 'a') as log: + # Write one last line to the log and flush it to disk + def flush_log(): + + # If the process currently had something happening in the I/O stack + # then trying to reenter the I/O stack will fire a runtime error. + # + # So just try to flush as well as we can at SIGTERM time + try: + # FIXME: Better logging + + log.write('\n\nAction {} for element {} forcefully terminated\n' + .format(self.action_name, self._element.name)) + log.flush() + except RuntimeError: + os.fsync(log.fileno()) + + self._element._set_log_handle(log) + with _signals.terminator(flush_log): + self._print_start_message(self._element, self._logfile) + yield self._logfile + self._element._set_log_handle(None) + self._logfile = None + + # _message(): + # + # Sends a message to the frontend + # + # Args: + # message_type (MessageType): The type of message to send + # message (str): The message + # kwargs: Remaining Message() constructor arguments + # + def _message(self, message_type, message, **kwargs): + args = dict(kwargs) + args['scheduler'] = True + self._scheduler.context.message( + Message(self._element._get_unique_id(), + message_type, + message, + **args)) + + def _print_start_message(self, element, logfile): + self._message(MessageType.START, self.action_name, logfile=logfile) + + # Print the element's environment at the beginning of any element's log file. + # + # This should probably be omitted for non-build tasks but it's harmless here + elt_env = element.get_environment() + env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True) + self._message(MessageType.LOG, + "Build environment for element {}".format(element.name), + detail=env_dump, logfile=logfile) + + # _child_log() + # + # Log a message returned by the frontend's main message handler + # and return it to the main process. + # + # Arguments: + # message (str): The message to log + # + # Returns: + # message (Message): A message object + # + def _child_log(self, message): + # Tag them on the way out the door... + message.action_name = self.action_name + message.task_id = self._element._get_unique_id() + + # Use the plugin for the task for the output, not a plugin + # which might be acting on behalf of the task + plugin = _plugin_lookup(message.task_id) + + with plugin._output_file() as output: + message_text = self._format_frontend_message(message, '[{}]'.format(plugin.name)) + output.write('{}\n'.format(message_text)) + output.flush() + + return message + + # _child_process_data() + # + # Abstract method to retrieve additional data that should be + # returned to the parent process. Note that the job result is + # retrieved independently. + # + # Values can later be retrieved in Job.child_data. + # + # Returns: + # (dict) A dict containing values later to be read by _process_sync_data + # + def _child_process_data(self): + data = {} + + workspace = self._element._get_workspace() + if workspace is not None: + data['workspace'] = workspace.to_dict() + + return data diff --git a/buildstream/_scheduler/job.py b/buildstream/_scheduler/jobs/job.py similarity index 69% rename from buildstream/_scheduler/job.py rename to buildstream/_scheduler/jobs/job.py index 8b9af93..c567b6f 100644 --- a/buildstream/_scheduler/job.py +++ b/buildstream/_scheduler/jobs/job.py @@ -26,20 +26,21 @@ import datetime import traceback import asyncio import multiprocessing -from ruamel import yaml +from contextlib import contextmanager + +import psutil # BuildStream toplevel imports -from .._exceptions import BstError, set_last_task_error -from .._message import Message, MessageType, unconditional_messages -from ..plugin import _plugin_lookup -from .. import _signals, utils +from ..._exceptions import ImplError, BstError, set_last_task_error +from ..._message import MessageType, unconditional_messages +from ... import _signals, utils # Used to distinguish between status messages and return values class Envelope(): def __init__(self, message_type, message): - self.message_type = message_type - self.message = message + self._message_type = message_type + self._message = message # Process class that doesn't call waitpid on its own. @@ -54,54 +55,26 @@ class Process(multiprocessing.Process): # Job() # # The Job object represents a parallel task, when calling Job.spawn(), -# the given `action_cb` will be called in parallel to the calling process, -# and `complete_cb` will be called with the action result in the calling -# process when the job completes. +# the given `Job._child_process` will be called in parallel to the +# calling process, and `Job._parent_complete` will be called with the +# action result in the calling process when the job completes. # # Args: # scheduler (Scheduler): The scheduler -# element (Element): The element to operate on # action_name (str): The queue action name -# action_cb (callable): The action function -# complete_cb (callable): The function to call when complete +# logfile (str): A template string that points to the logfile +# that should be used - should contain {pid}. # max_retries (int): The maximum number of retries # -# Here is the calling signature of the action_cb: -# -# action_cb(): -# -# This function will be called in the child task -# -# Args: -# element (Element): The element passed to the Job() constructor -# -# Returns: -# (object): Any abstract simple python object, including a string, int, -# bool, list or dict, this must be a simple serializable object. -# -# Here is the calling signature of the complete_cb: -# -# complete_cb(): -# -# This function will be called when the child task completes -# -# Args: -# job (Job): The job object which completed -# element (Element): The element passed to the Job() constructor -# success (bool): True if the action_cb did not raise an exception -# result (object): The deserialized object returned by the `action_cb`, or None -# if `success` is False -# class Job(): - def __init__(self, scheduler, element, action_name, action_cb, complete_cb, *, max_retries=0): + def __init__(self, scheduler, action_name, logfile, *, max_retries=0): # # Public members # - self.element = element # The element we're processing self.action_name = action_name # The action name for the Queue - self.workspace_dict = None # A serialized Workspace object, after any modifications + self.child_data = None # # Private members @@ -110,13 +83,12 @@ class Job(): self._queue = multiprocessing.Queue() # A message passing queue self._process = None # The Process object self._watcher = None # Child process watcher - self._action_cb = action_cb # The action callable function - self._complete_cb = complete_cb # The complete callable function self._listening = False # Whether the parent is currently listening self._suspended = False # Whether this job is currently suspended self._max_retries = max_retries # Maximum number of automatic retries self._result = None # Return value of child action in the parent self._tries = 0 # Try count, for retryable jobs + self._logfile = logfile # spawn() # @@ -152,8 +124,7 @@ class Job(): # First resume the job if it's suspended self.resume(silent=True) - self._message(self.element, MessageType.STATUS, - "{} terminating".format(self.action_name)) + self._message(MessageType.STATUS, "{} terminating".format(self.action_name)) # Make sure there is no garbage on the queue self._parent_stop_listening() @@ -184,9 +155,15 @@ class Job(): def kill(self): # Force kill - self._message(self.element, MessageType.WARN, + self._message(MessageType.WARN, "{} did not terminate gracefully, killing".format(self.action_name)) - utils._kill_process_tree(self._process.pid) + + try: + utils._kill_process_tree(self._process.pid) + # This can happen if the process died of its own accord before + # we try to kill it + except psutil.NoSuchProcess: + return # suspend() # @@ -194,7 +171,7 @@ class Job(): # def suspend(self): if not self._suspended: - self._message(self.element, MessageType.STATUS, + self._message(MessageType.STATUS, "{} suspending".format(self.action_name)) try: @@ -219,42 +196,152 @@ class Job(): def resume(self, silent=False): if self._suspended: if not silent: - self._message(self.element, MessageType.STATUS, + self._message(MessageType.STATUS, "{} resuming".format(self.action_name)) os.kill(self._process.pid, signal.SIGCONT) self._suspended = False ####################################################### - # Local Private Methods # + # Abstract Methods # ####################################################### + # _parent_complete() # - # Methods prefixed with the word 'child' take place in the child process + # This will be executed after the job finishes, and is expected to + # pass the result to the main thread. # - # Methods prefixed with the word 'parent' take place in the parent process + # Args: + # success (bool): Whether the job was successful. + # result (any): The result returned by _child_process(). # - # Other methods can be called in both child or parent processes + def _parent_complete(self, success, result): + raise ImplError("Job '{kind}' does not implement _parent_complete()" + .format(kind=type(self).__name__)) + + # _child_process() # - ####################################################### + # This will be executed after fork(), and is intended to perform + # the job's task. + # + # Returns: + # (any): A (simple!) object to be returned to the main thread + # as the result. + # + def _child_process(self): + raise ImplError("Job '{kind}' does not implement _child_process()" + .format(kind=type(self).__name__)) + + # _child_logging_enabled() + # + # Start the log for this job. This function will be given a + # template string for the path to a log file - this will contain + # "{pid}", which should be replaced with the current process' + # PID. (i.e., call something like `logfile.format(pid=os.getpid())`). + # + # Args: + # logfile (str): A template string that points to the logfile + # that should be used - replace {pid} first. + # + # Yields: + # (str) The path to the logfile with {pid} replaced. + # + @contextmanager + def _child_logging_enabled(self, logfile): + raise ImplError("Job '{kind}' does not implement _child_logging_enabled()" + .format(kind=type(self).__name__)) # _message(): # # Sends a message to the frontend # # Args: - # plugin (Plugin): The plugin to send a message for # message_type (MessageType): The type of message to send # message (str): The message # kwargs: Remaining Message() constructor arguments # - def _message(self, plugin, message_type, message, **kwargs): - args = dict(kwargs) - args['scheduler'] = True - self._scheduler.context.message( - Message(plugin._get_unique_id(), - message_type, - message, - **args)) + def _message(self, message_type, message, **kwargs): + raise ImplError("Job '{kind}' does not implement _message()" + .format(kind=type(self).__name__)) + + # _child_process_data() + # + # Abstract method to retrieve additional data that should be + # returned to the parent process. Note that the job result is + # retrieved independently. + # + # Values can later be retrieved in Job.child_data. + # + # Returns: + # (dict) A dict containing values later to be read by _process_sync_data + # + def _child_process_data(self): + return {} + + # _child_log() + # + # Log a message returned by the frontend's main message handler + # and return it to the main process. + # + # This method is also expected to add process-specific information + # to the message (notably, action_name and task_id). + # + # Arguments: + # message (str): The message to log + # + # Returns: + # message (Message): A message object + # + def _child_log(self, message): + raise ImplError("Job '{kind}' does not implement _child_log()" + .format(kind=type(self).__name__)) + + ####################################################### + # Local Private Methods # + ####################################################### + # + # Methods prefixed with the word 'child' take place in the child process + # + # Methods prefixed with the word 'parent' take place in the parent process + # + # Other methods can be called in both child or parent processes + # + ####################################################### + + # _format_frontend_message() + # + # Format a message from the frontend for logging purposes. This + # will prepend a time code and add other information to help + # determine what happened. + # + # Args: + # message (Message) - The message to create a text from. + # name (str) - A name for the executing context. + # + # Returns: + # (str) The text to log. + # + def _format_frontend_message(self, message, name): + INDENT = " " + EMPTYTIME = "--:--:--" + template = "[{timecode: <8}] {type: <7} {name: <15}: {message}" + + detail = '' + if message.detail is not None: + template += "\n\n{detail}" + detail = message.detail.rstrip('\n') + detail = INDENT + INDENT.join(detail.splitlines(True)) + + timecode = EMPTYTIME + if message.message_type in (MessageType.SUCCESS, MessageType.FAIL): + hours, remainder = divmod(int(message.elapsed.total_seconds()), 60**2) + minutes, seconds = divmod(remainder, 60) + timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds) + + return template.format(timecode=timecode, + type=message.message_type.upper(), + name=name, + message=message.message, + detail=detail) # _child_action() # @@ -265,7 +352,7 @@ class Job(): # def _child_action(self, queue): - element = self.element + logfile = self._logfile # This avoids some SIGTSTP signals from grandchildren # getting propagated up to the master process @@ -301,35 +388,24 @@ class Job(): # Time, log and and run the action function # with _signals.suspendable(stop_time, resume_time), \ - element._logging_enabled(self.action_name) as filename: - - self._message(element, MessageType.START, self.action_name, logfile=filename) - - # Print the element's environment at the beginning of any element's log file. - # - # This should probably be omitted for non-build tasks but it's harmless here - elt_env = element.get_environment() - env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True) - self._message(element, MessageType.LOG, - "Build environment for element {}".format(element.name), - detail=env_dump, logfile=filename) + self._child_logging_enabled(logfile) as filename: try: # Try the task action - result = self._action_cb(element) + result = self._child_process() except BstError as e: elapsed = datetime.datetime.now() - starttime if self._tries <= self._max_retries: - self._message(element, MessageType.FAIL, "Try #{} failed, retrying".format(self._tries), + self._message(MessageType.FAIL, + "Try #{} failed, retrying".format(self._tries), elapsed=elapsed) else: - self._message(element, MessageType.FAIL, str(e), + self._message(MessageType.FAIL, str(e), elapsed=elapsed, detail=e.detail, logfile=filename, sandbox=e.sandbox) - # Report changes in the workspace, even if there was a handled failure - self._child_send_workspace() + self._queue.put(Envelope('child_data', self._child_process_data())) # Report the exception to the parent (for internal testing purposes) self._child_send_error(e) @@ -343,18 +419,19 @@ class Job(): # elapsed = datetime.datetime.now() - starttime detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc()) - self._message(element, MessageType.BUG, self.action_name, + + self._message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail, logfile=filename) self._child_shutdown(1) else: # No exception occurred in the action - self._child_send_workspace() + self._queue.put(Envelope('child_data', self._child_process_data())) self._child_send_result(result) elapsed = datetime.datetime.now() - starttime - self._message(element, MessageType.SUCCESS, self.action_name, elapsed=elapsed, + self._message(MessageType.SUCCESS, self.action_name, elapsed=elapsed, logfile=filename) # Shutdown needs to stay outside of the above context manager, @@ -398,16 +475,6 @@ class Job(): envelope = Envelope('result', result) self._queue.put(envelope) - # _child_send_workspace() - # - # Sends the serialized workspace through the message queue, if any - # - def _child_send_workspace(self): - workspace = self.element._get_workspace() - if workspace: - envelope = Envelope('workspace', workspace.to_dict()) - self._queue.put(envelope) - # _child_shutdown() # # Shuts down the child process by cleaning up and exiting the process @@ -419,44 +486,6 @@ class Job(): self._queue.close() sys.exit(exit_code) - # _child_log() - # - # Logs a Message to the process's dedicated log file - # - # Args: - # plugin (Plugin): The plugin to log for - # message (Message): The message to log - # - def _child_log(self, plugin, message): - - with plugin._output_file() as output: - INDENT = " " - EMPTYTIME = "--:--:--" - - name = '[' + plugin.name + ']' - - fmt = "[{timecode: <8}] {type: <7} {name: <15}: {message}" - detail = '' - if message.detail is not None: - fmt += "\n\n{detail}" - detail = message.detail.rstrip('\n') - detail = INDENT + INDENT.join(detail.splitlines(True)) - - timecode = EMPTYTIME - if message.message_type in (MessageType.SUCCESS, MessageType.FAIL): - hours, remainder = divmod(int(message.elapsed.total_seconds()), 60 * 60) - minutes, seconds = divmod(remainder, 60) - timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds) - - message_text = fmt.format(timecode=timecode, - type=message.message_type.upper(), - name=name, - message=message.message, - detail=detail) - - output.write('{}\n'.format(message_text)) - output.flush() - # _child_message_handler() # # A Context delegate for handling messages, this replaces the @@ -470,16 +499,8 @@ class Job(): # def _child_message_handler(self, message, context): - # Tag them on the way out the door... - message.action_name = self.action_name - message.task_id = self.element._get_unique_id() - - # Use the plugin for the task for the output, not a plugin - # which might be acting on behalf of the task - plugin = _plugin_lookup(message.task_id) - # Log first - self._child_log(plugin, message) + message = self._child_log(message) if message.message_type == MessageType.FAIL and self._tries <= self._max_retries: # Job will be retried, display failures as warnings in the frontend @@ -519,7 +540,7 @@ class Job(): self.spawn() return - self._complete_cb(self, self.element, returncode == 0, self._result) + self._parent_complete(returncode == 0, self._result) # _parent_process_envelope() # @@ -536,21 +557,22 @@ class Job(): if not self._listening: return - if envelope.message_type == 'message': + if envelope._message_type == 'message': # Propagate received messages from children # back through the context. - self._scheduler.context.message(envelope.message) - elif envelope.message_type == 'error': + self._scheduler.context.message(envelope._message) + elif envelope._message_type == 'error': # For regression tests only, save the last error domain / reason # reported from a child task in the main process, this global state # is currently managed in _exceptions.py - set_last_task_error(envelope.message['domain'], - envelope.message['reason']) - elif envelope.message_type == 'result': + set_last_task_error(envelope._message['domain'], + envelope._message['reason']) + elif envelope._message_type == 'result': assert self._result is None - self._result = envelope.message - elif envelope.message_type == 'workspace': - self.workspace_dict = envelope.message + self._result = envelope._message + elif envelope._message_type == 'child_data': + # If we retry a job, we assign a new value to this + self.child_data = envelope._message else: raise Exception() diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py index d0c4828..7f115b4 100644 --- a/buildstream/_scheduler/queues/queue.py +++ b/buildstream/_scheduler/queues/queue.py @@ -19,12 +19,13 @@ # Jürg Billeter <[email protected]> # System imports +import os from collections import deque from enum import Enum import traceback # Local imports -from ..job import Job +from ..jobs import ElementJob # BuildStream toplevel imports from ..._exceptions import BstError, set_last_task_error @@ -237,12 +238,15 @@ class Queue(): self.skipped_elements.append(element) continue + logfile = self._element_log_path(element) self.prepare(element) - job = Job(scheduler, element, self.action_name, - self.process, self._job_done, - max_retries=self._max_retries) - scheduler.job_starting(job) + job = ElementJob(scheduler, self.action_name, + logfile, element=element, + action_cb=self.process, + complete_cb=self._job_done, + max_retries=self._max_retries) + scheduler.job_starting(job, element) job.spawn() self.active_jobs.append(job) @@ -265,12 +269,16 @@ class Queue(): # job (Job): The job which completed # def _update_workspaces(self, element, job): + workspace_dict = None + if job.child_data: + workspace_dict = job.child_data['workspace'] + # Handle any workspace modifications now # - if job.workspace_dict: + if workspace_dict: context = element._get_context() workspaces = context.get_workspaces() - if workspaces.update_workspace(element._get_full_name(), job.workspace_dict): + if workspaces.update_workspace(element._get_full_name(), workspace_dict): try: workspaces.save_config() except BstError as e: @@ -343,7 +351,7 @@ class Queue(): self._scheduler.put_job_token(self.queue_type) # Notify frontend - self._scheduler.job_completed(self, job, success) + self._scheduler.job_completed(self, job, element, success) self._scheduler.sched() @@ -353,3 +361,16 @@ class Queue(): context = element._get_context() message = Message(element._get_unique_id(), message_type, brief, **kwargs) context.message(message) + + def _element_log_path(self, element): + project = element._get_project() + context = element._get_context() + + key = element._get_display_key()[1] + action = self.action_name.lower() + logfile = "{key}-{action}.{{pid}}.log".format(key=key, action=action) + + directory = os.path.join(context.logdir, project.name, element.normal_name) + + os.makedirs(directory, exist_ok=True) + return os.path.join(directory, logfile) diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py index 7bfbc95..a7a3f95 100644 --- a/buildstream/_scheduler/scheduler.py +++ b/buildstream/_scheduler/scheduler.py @@ -297,9 +297,9 @@ class Scheduler(): # Args: # job (Job): The starting Job # - def job_starting(self, job): + def job_starting(self, job, element): if self._job_start_callback: - self._job_start_callback(job.element, job.action_name) + self._job_start_callback(element, job.action_name) # job_completed(): # @@ -310,9 +310,9 @@ class Scheduler(): # job (Job): The completed Job # success (bool): Whether the Job completed with a success status # - def job_completed(self, queue, job, success): + def job_completed(self, queue, job, element, success): if self._job_complete_callback: - self._job_complete_callback(job.element, queue, job.action_name, success) + self._job_complete_callback(element, queue, job.action_name, success) ####################################################### # Local Private Methods #
