This is an automated email from the ASF dual-hosted git repository. tvb pushed a commit to branch bschubert/no-multiprocessing-full in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 349f97d53817a992a5bc744b29a9012f9f2b7b77 Author: Benjamin Schubert <[email protected]> AuthorDate: Sat Jul 4 12:13:24 2020 +0000 _messenger.py: Make `timed_suspendable` public and use it in job.py This reduces the amount of code duplication --- src/buildstream/_messenger.py | 60 +++++++++++++++++----------------- src/buildstream/_scheduler/jobs/job.py | 23 +++---------- 2 files changed, 35 insertions(+), 48 deletions(-) diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py index 222b05d..84bea6a 100644 --- a/src/buildstream/_messenger.py +++ b/src/buildstream/_messenger.py @@ -161,7 +161,7 @@ class Messenger: # @contextmanager def timed_activity(self, activity_name, *, element_name=None, detail=None, silent_nested=False): - with self._timed_suspendable() as timedata: + with self.timed_suspendable() as timedata: try: # Push activity depth for status messages message = Message(MessageType.START, activity_name, detail=detail, element_name=element_name) @@ -205,7 +205,7 @@ class Messenger: if not full_name: full_name = activity_name - with self._timed_suspendable() as timedata: + with self.timed_suspendable() as timedata: try: message = Message(MessageType.START, activity_name, element_name=element_name) self.message(message) @@ -326,6 +326,34 @@ class Messenger: def get_log_filename(self): return self._locals.log_filename + # timed_suspendable() + # + # A contextmanager that allows an activity to be suspended and can + # adjust for clock drift caused by suspending + # + # Yields: + # TimeData: An object that contains the time the activity started + # + @contextmanager + def timed_suspendable(self): + # Note: timedata needs to be in a namedtuple so that values can be + # yielded that will change + timedata = _TimeData(start_time=datetime.datetime.now()) + stopped_time = None + + def stop_time(): + nonlocal stopped_time + stopped_time = datetime.datetime.now() + + def resume_time(): + nonlocal timedata + nonlocal stopped_time + sleep_time = datetime.datetime.now() - stopped_time + timedata.start_time += sleep_time + + with _signals.suspendable(stop_time, resume_time): + yield timedata + # _record_message() # # Records the message if recording is enabled @@ -388,31 +416,3 @@ class Messenger: if self._render_status_cb and now >= self._next_render: self._render_status_cb() self._next_render = now + _RENDER_INTERVAL - - # _timed_suspendable() - # - # A contextmanager that allows an activity to be suspended and can - # adjust for clock drift caused by suspending - # - # Yields: - # TimeData: An object that contains the time the activity started - # - @contextmanager - def _timed_suspendable(self): - # Note: timedata needs to be in a namedtuple so that values can be - # yielded that will change - timedata = _TimeData(start_time=datetime.datetime.now()) - stopped_time = None - - def stop_time(): - nonlocal stopped_time - stopped_time = datetime.datetime.now() - - def resume_time(): - nonlocal timedata - nonlocal stopped_time - sleep_time = datetime.datetime.now() - stopped_time - timedata.start_time += sleep_time - - with _signals.suspendable(stop_time, resume_time): - yield timedata diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index a4ace41..03a6b61 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -30,7 +30,6 @@ import traceback from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob from ..._message import Message, MessageType, unconditional_messages from ...types import FastEnum -from ... import _signals # Return code values shutdown of job handling child processes @@ -603,21 +602,9 @@ class ChildJob: self._pipe_w = pipe_w self._messenger.set_message_handler(self._child_message_handler) - starttime = datetime.datetime.now() - stopped_time = None - - def stop_time(): - nonlocal stopped_time - stopped_time = datetime.datetime.now() - - def resume_time(): - nonlocal stopped_time - nonlocal starttime - starttime += datetime.datetime.now() - stopped_time - # Time, log and and run the action function # - with _signals.suspendable(stop_time, resume_time), self._messenger.recorded_messages( + with self._messenger.timed_suspendable() as timeinfo, self._messenger.recorded_messages( self._logfile, self._logdir ) as filename: self.message(MessageType.START, self.action_name, logfile=filename) @@ -626,13 +613,13 @@ class ChildJob: # Try the task action result = self.child_process() # pylint: disable=assignment-from-no-return except SkipJob as e: - elapsed = datetime.datetime.now() - starttime + elapsed = datetime.datetime.now() - timeinfo.start_time self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename) # Alert parent of skip by return code return _ReturnCode.SKIPPED except BstError as e: - elapsed = datetime.datetime.now() - starttime + elapsed = datetime.datetime.now() - timeinfo.start_time retry_flag = e.temporary if retry_flag and (self._tries <= self._max_retries): @@ -662,7 +649,7 @@ class ChildJob: # send the traceback and formatted exception back to the frontend # and print it to the log file. # - elapsed = datetime.datetime.now() - starttime + elapsed = datetime.datetime.now() - timeinfo.start_time detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc()) self.message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail, logfile=filename) @@ -674,7 +661,7 @@ class ChildJob: self._send_message(_MessageType.CHILD_DATA, self.child_process_data()) self._child_send_result(result) - elapsed = datetime.datetime.now() - starttime + elapsed = datetime.datetime.now() - timeinfo.start_time self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed, logfile=filename) # Shutdown needs to stay outside of the above context manager,
