Repository: aurora Updated Branches: refs/heads/master e0e90e172 -> b3fa9febf
Switch Thermos to lazy log formatting This is the first part of a small series of Thermos observer performance improvements. As a first iteration, this switches all logging to use the logger-embedded formatting rather than doing it eager up front. This has the advantage that we produce less garbage if debug logging is disabled. Reviewed at https://reviews.apache.org/r/66136/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/b3fa9feb Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/b3fa9feb Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/b3fa9feb Branch: refs/heads/master Commit: b3fa9febf84df09f8a464a0b0aae14ddc72bba6e Parents: e0e90e1 Author: Stephan Erb <[email protected]> Authored: Mon Mar 19 09:10:03 2018 +0000 Committer: Stephan Erb <[email protected]> Committed: Mon Mar 19 09:10:03 2018 +0000 ---------------------------------------------------------------------- src/main/python/apache/thermos/common/ckpt.py | 14 +-- src/main/python/apache/thermos/core/helper.py | 56 +++++----- src/main/python/apache/thermos/core/muxer.py | 36 +++--- src/main/python/apache/thermos/core/process.py | 6 +- src/main/python/apache/thermos/core/runner.py | 112 +++++++++---------- .../python/apache/thermos/monitoring/disk.py | 4 +- .../python/apache/thermos/monitoring/monitor.py | 4 +- .../monitoring/process_collector_psutil.py | 6 +- .../apache/thermos/monitoring/resource.py | 16 +-- .../thermos/observer/http/file_browser.py | 4 +- .../thermos/observer/http/http_observer.py | 2 +- .../thermos/observer/http/static_assets.py | 2 +- .../apache/thermos/observer/observed_task.py | 6 +- .../apache/thermos/observer/task_observer.py | 18 +-- .../apache/thermos/runner/thermos_runner.py | 18 +-- 15 files changed, 152 insertions(+), 152 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/b3fa9feb/src/main/python/apache/thermos/common/ckpt.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/common/ckpt.py b/src/main/python/apache/thermos/common/ckpt.py index e79ec6a..cb254a4 100644 --- a/src/main/python/apache/thermos/common/ckpt.py +++ b/src/main/python/apache/thermos/common/ckpt.py @@ -186,7 +186,7 @@ class CheckpointDispatcher(object): builder.dispatch(state, update, truncate=truncate) return state except cls.Error as e: - log.error('Failed to recover from %s: %s' % (filename, e)) + log.error('Failed to recover from %s: %s', filename, e) def __init__(self): self._task_handlers = [] @@ -325,7 +325,7 @@ class CheckpointDispatcher(object): raise self.ErrorRecoveringState( "Attempting to rebind task with different parameters!") else: - log.debug('Initializing TaskRunner header to %s' % runner_ckpt.runner_header) + log.debug('Initializing TaskRunner header to %s', runner_ckpt.runner_header) state.header = runner_ckpt.runner_header self._run_header_dispatch(runner_ckpt.runner_header) return @@ -343,9 +343,9 @@ class CheckpointDispatcher(object): else: state.statuses = [runner_ckpt.task_status] new_state = runner_ckpt.task_status.state - log.debug('Flipping task state from %s to %s' % ( + log.debug('Flipping task state from %s to %s', TaskState._VALUES_TO_NAMES.get(old_state, '(undefined)'), - TaskState._VALUES_TO_NAMES.get(new_state, '(undefined)'))) + TaskState._VALUES_TO_NAMES.get(new_state, '(undefined)')) self._run_task_dispatch(new_state, runner_ckpt.task_status) return @@ -358,11 +358,11 @@ class CheckpointDispatcher(object): current_run = state.processes[name][-1] if name in state.processes else None if current_run and process_update.seq != current_run.seq + 1: if recovery: - log.debug('Skipping replayed out-of-order update: %s' % process_update) + log.debug('Skipping replayed out-of-order update: %s', process_update) return else: raise self.InvalidSequenceNumber( - "Out of order sequence number! %s => %s" % (current_run, process_update)) + "Out of order sequence number! %s => %s", current_run, process_update) # One special case for WAITING: Initialize a new target ProcessState. if process_update.state == ProcessState.WAITING: @@ -376,7 +376,7 @@ class CheckpointDispatcher(object): state.processes[name] = [ProcessStatus(seq=current_run.seq)] # Run the process state machine. - log.debug('Running state machine for process=%s/seq=%s' % (name, process_update.seq)) + log.debug('Running state machine for process=%s/seq=%s', name, process_update.seq) if not state.processes or name not in state.processes: raise self.ErrorRecoveringState("Encountered potentially out of order " "process update. Are you sure this is a full checkpoint stream?") http://git-wip-us.apache.org/repos/asf/aurora/blob/b3fa9feb/src/main/python/apache/thermos/core/helper.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/core/helper.py b/src/main/python/apache/thermos/core/helper.py index 0811e84..a402498 100644 --- a/src/main/python/apache/thermos/core/helper.py +++ b/src/main/python/apache/thermos/core/helper.py @@ -85,8 +85,8 @@ class TaskRunnerHelper(object): process_create_time = process.create_time() if abs(start_time - process_create_time) >= cls.MAX_START_TIME_DRIFT.as_(Time.SECONDS): - log.info("Expected pid %s start time to be %s but it's %s" % ( - process.pid, start_time, process_create_time)) + log.info("Expected pid %s start time to be %s but it's %s", + process.pid, start_time, process_create_time) return False if uid is not None: @@ -104,12 +104,12 @@ class TaskRunnerHelper(object): elif uid == 0: # If the process was launched as root but is now not root, we should # kill this because it could have called `setuid` on itself. - log.info("pid %s appears to be have launched by root but it's uid is now %s" % ( - process.pid, process_uid)) + log.info("pid %s appears to be have launched by root but it's uid is now %s", + process.pid, process_uid) return True else: - log.info("Expected pid %s to be ours but the pid uid is %s and we're %s" % ( - process.pid, process_uid, uid)) + log.info("Expected pid %s to be ours but the pid uid is %s and we're %s", + process.pid, process_uid, uid) return False try: @@ -121,8 +121,8 @@ class TaskRunnerHelper(object): # If the uid was not provided, we must use user -- which is possibly flaky if the # user gets deleted from the system, so process_user will be None and we must # return False. - log.info("Expected pid %s to be ours but the pid user is %s and we're %s" % ( - process.pid, process_user, user)) + log.info("Expected pid %s to be ours but the pid user is %s and we're %s", + process.pid, process_user, user) return True return False @@ -141,7 +141,7 @@ class TaskRunnerHelper(object): coordinator_pid, pid, tree = None, None, set() if uid is None: - log.debug('Legacy thermos checkpoint stream detected, user = %s' % user) + log.debug('Legacy thermos checkpoint stream detected, user = %s', user) if process_run.coordinator_pid: try: @@ -149,11 +149,11 @@ class TaskRunnerHelper(object): if cls.this_is_really_our_pid(coordinator_process, uid, user, process_run.fork_time): coordinator_pid = process_run.coordinator_pid except psutil.NoSuchProcess: - log.info(' Coordinator %s [pid: %s] completed.' % (process_run.process, - process_run.coordinator_pid)) + log.info(' Coordinator %s [pid: %s] completed.', process_run.process, + process_run.coordinator_pid) except psutil.Error as err: - log.warning(' Error gathering information on pid %s: %s' % (process_run.coordinator_pid, - err)) + log.warning(' Error gathering information on pid %s: %s', process_run.coordinator_pid, + err) if process_run.pid: try: @@ -161,15 +161,15 @@ class TaskRunnerHelper(object): if cls.this_is_really_our_pid(process, uid, user, process_run.start_time): pid = process.pid except psutil.NoSuchProcess: - log.info(' Process %s [pid: %s] completed.' % (process_run.process, process_run.pid)) + log.info(' Process %s [pid: %s] completed.', process_run.process, process_run.pid) except psutil.Error as err: - log.warning(' Error gathering information on pid %s: %s' % (process_run.pid, err)) + log.warning(' Error gathering information on pid %s: %s', process_run.pid, err) else: if pid: try: tree = set(child.pid for child in process.children(recursive=True)) except psutil.Error: - log.warning(' Error gathering information on children of pid %s' % pid) + log.warning(' Error gathering information on children of pid %s', pid) return (coordinator_pid, pid, tree) @@ -192,9 +192,9 @@ class TaskRunnerHelper(object): os.kill(pid, sig) except OSError as e: if e.errno not in (errno.ESRCH, errno.EPERM): - log.error('Unexpected error in os.kill: %s' % e) + log.error('Unexpected error in os.kill: %s', e) except Exception as e: - log.error('Unexpected error in os.kill: %s' % e) + log.error('Unexpected error in os.kill: %s', e) @classmethod def terminate_pid(cls, pid): @@ -240,16 +240,16 @@ class TaskRunnerHelper(object): @classmethod def terminate_process(cls, state, process_name): - log.debug('TaskRunnerHelper.terminate_process(%s)' % process_name) + log.debug('TaskRunnerHelper.terminate_process(%s)', process_name) _, pid, _ = cls._get_process_tuple(state, process_name) if pid: - log.debug(' => SIGTERM pid %s' % pid) + log.debug(' => SIGTERM pid %s', pid) cls.terminate_pid(pid) return bool(pid) @classmethod def kill_process(cls, state, process_name): - log.debug('TaskRunnerHelper.kill_process(%s)' % process_name) + log.debug('TaskRunnerHelper.kill_process(%s)', process_name) coordinator_pgid = cls._get_coordinator_group(state, process_name) coordinator_pid, pid, tree = cls._get_process_tuple(state, process_name) # This is super dangerous. TODO(wickman) Add a heuristic that determines @@ -257,16 +257,16 @@ class TaskRunnerHelper(object): # and 2) those processes have inherited the coordinator checkpoint filehandle # This way we validate that it is in fact the process group we expect. if coordinator_pgid: - log.debug(' => SIGKILL coordinator group %s' % coordinator_pgid) + log.debug(' => SIGKILL coordinator group %s', coordinator_pgid) cls.kill_group(coordinator_pgid) if coordinator_pid: - log.debug(' => SIGKILL coordinator %s' % coordinator_pid) + log.debug(' => SIGKILL coordinator %s', coordinator_pid) cls.kill_pid(coordinator_pid) if pid: - log.debug(' => SIGKILL pid %s' % pid) + log.debug(' => SIGKILL pid %s', pid) cls.kill_pid(pid) for child in tree: - log.debug(' => SIGKILL child %s' % child) + log.debug(' => SIGKILL child %s', child) cls.kill_pid(child) return bool(coordinator_pid or pid or tree) @@ -380,11 +380,11 @@ class TaskRunnerHelper(object): if pid == 0: break pids.add(pid) - log.debug('Detected terminated process: pid=%s, status=%s, rusage=%s' % ( - pid, status, rusage)) + log.debug('Detected terminated process: pid=%s, status=%s, rusage=%s', + pid, status, rusage) except OSError as e: if e.errno != errno.ECHILD: - log.warning('Unexpected error when calling waitpid: %s' % e) + log.warning('Unexpected error when calling waitpid: %s', e) break return pids http://git-wip-us.apache.org/repos/asf/aurora/blob/b3fa9feb/src/main/python/apache/thermos/core/muxer.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/core/muxer.py b/src/main/python/apache/thermos/core/muxer.py index 47e77f7..b095d75 100644 --- a/src/main/python/apache/thermos/core/muxer.py +++ b/src/main/python/apache/thermos/core/muxer.py @@ -37,7 +37,7 @@ class ProcessMuxer(object): fp.close() def register(self, process_name, watermark=0): - log.debug('registering %s' % process_name) + log.debug('registering %s', process_name) if process_name in self._processes: raise self.ProcessExists("Process %s is already registered" % process_name) self._processes[process_name] = None @@ -47,7 +47,7 @@ class ProcessMuxer(object): for process_name, fp in self._processes.items(): if fp is None: process_ckpt = self._pathspec.given(process=process_name).getpath('process_checkpoint') - log.debug('ProcessMuxer binding %s => %s' % (process_name, process_ckpt)) + log.debug('ProcessMuxer binding %s => %s', process_name, process_ckpt) try: self._processes[process_name] = open(process_ckpt, 'r') # noqa except IOError as e: @@ -55,14 +55,14 @@ class ProcessMuxer(object): log.debug(' => bind failed, checkpoint not available yet.') continue else: - log.error("Unexpected inability to open %s! %s" % (process_ckpt, e)) + log.error("Unexpected inability to open %s! %s", process_ckpt, e) except Exception as e: - log.error("Unexpected inability to open %s! %s" % (process_ckpt, e)) + log.error("Unexpected inability to open %s! %s", process_ckpt, e) self._fast_forward_stream(process_name) def _fast_forward_stream(self, process_name): - log.debug('Fast forwarding %s stream to seq=%s' % (process_name, - self._watermarks[process_name])) + log.debug('Fast forwarding %s stream to seq=%s', process_name, + self._watermarks[process_name]) assert self._processes.get(process_name) is not None fp = self._processes[process_name] rr = ThriftRecordReader(fp, RunnerCkpt) @@ -75,25 +75,25 @@ class ProcessMuxer(object): break new_watermark = record.process_status.seq if new_watermark > self._watermarks[process_name]: - log.debug('Over-seeked %s [watermark = %s, high watermark = %s], rewinding.' % ( - process_name, new_watermark, self._watermarks[process_name])) + log.debug('Over-seeked %s [watermark = %s, high watermark = %s], rewinding.', + process_name, new_watermark, self._watermarks[process_name]) fp.seek(last_pos) break current_watermark = new_watermark records += 1 if current_watermark < self._watermarks[process_name]: - log.warning('Only able to fast forward to %s@sequence=%s, high watermark is %s' % ( - process_name, current_watermark, self._watermarks[process_name])) + log.warning('Only able to fast forward to %s@sequence=%s, high watermark is %s', + process_name, current_watermark, self._watermarks[process_name]) if records: - log.debug('Fast forwarded %s %s record(s) to seq=%s.' % (process_name, records, - current_watermark)) + log.debug('Fast forwarded %s %s record(s) to seq=%s.', process_name, records, + current_watermark) def unregister(self, process_name): log.debug('unregistering %s' % process_name) if process_name not in self._processes: - raise self.ProcessNotFound("No trace of process: %s" % process_name) + raise self.ProcessNotFound("No trace of process: %s", process_name) else: self._watermarks.pop(process_name) fp = self._processes.pop(process_name) @@ -114,7 +114,7 @@ class ProcessMuxer(object): try: os.fstat(fp.fileno()).st_size except OSError: - log.debug('ProcessMuxer could not fstat for process %s' % process) + log.debug('ProcessMuxer could not fstat for process %s', process) return False update = rr.try_read() if update: @@ -139,10 +139,10 @@ class ProcessMuxer(object): try: fstat = os.fstat(handle.fileno()) except OSError: - log.error('Unable to fstat %s!' % handle.name) + log.error('Unable to fstat %s!', handle.name) continue if handle.tell() > fstat.st_size: - log.error('Truncated checkpoint record detected on %s!' % handle.name) + log.error('Truncated checkpoint record detected on %s!', handle.name) elif handle.tell() < fstat.st_size: rr = ThriftRecordReader(handle, RunnerCkpt) while True: @@ -152,7 +152,7 @@ class ProcessMuxer(object): else: break if len(updates) > 0: - log.debug('select() returning %s updates:' % len(updates)) + log.debug('select() returning %s updates:', len(updates)) for update in updates: - log.debug(' = %s' % update) + log.debug(' = %s', update) return updates http://git-wip-us.apache.org/repos/asf/aurora/blob/b3fa9feb/src/main/python/apache/thermos/core/process.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/core/process.py b/src/main/python/apache/thermos/core/process.py index 4a4678f..32631e6 100644 --- a/src/main/python/apache/thermos/core/process.py +++ b/src/main/python/apache/thermos/core/process.py @@ -151,7 +151,7 @@ class ProcessBase(object): raise ValueError('Log backups cannot be less than one.') def _log(self, msg, exc_info=None): - log.debug('[process:%5s=%s]: %s' % (self._pid, self.name(), msg), + log.debug('[process:%5s=%s]: %s', self._pid, self.name(), msg, exc_info=exc_info) def _getpwuid(self): @@ -442,7 +442,7 @@ class Process(ProcessBase): }) wrapped_cmdline = self.wrapped_cmdline(cwd) - log.debug('Wrapped cmdline: %s' % wrapped_cmdline) + log.debug('Wrapped cmdline: %s', wrapped_cmdline) real_thermos_profile_path = os.path.join( os.environ['MESOS_DIRECTORY'], @@ -452,7 +452,7 @@ class Process(ProcessBase): if os.path.exists(real_thermos_profile_path): env.update(BASH_ENV=thermos_profile) - log.debug('ENV is: %s' % env) + log.debug('ENV is: %s', env) subprocess_args = { 'args': wrapped_cmdline, 'close_fds': self.FD_CLOEXEC, http://git-wip-us.apache.org/repos/asf/aurora/blob/b3fa9feb/src/main/python/apache/thermos/core/runner.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/core/runner.py b/src/main/python/apache/thermos/core/runner.py index 1b63c08..79aa68a 100644 --- a/src/main/python/apache/thermos/core/runner.py +++ b/src/main/python/apache/thermos/core/runner.py @@ -100,20 +100,20 @@ class TaskRunnerProcessHandler(ProcessStateHandler): self._runner = runner def on_waiting(self, process_update): - log.debug('Process on_waiting %s' % process_update) + log.debug('Process on_waiting %s', process_update) self._runner._task_processes[process_update.process] = ( self._runner._task_process_from_process_name( process_update.process, process_update.seq + 1)) self._runner._watcher.register(process_update.process, process_update.seq - 1) def on_forked(self, process_update): - log.debug('Process on_forked %s' % process_update) + log.debug('Process on_forked %s', process_update) task_process = self._runner._task_processes[process_update.process] task_process.rebind(process_update.coordinator_pid, process_update.fork_time) self._runner._plan.set_running(process_update.process) def on_running(self, process_update): - log.debug('Process on_running %s' % process_update) + log.debug('Process on_running %s', process_update) self._runner._plan.set_running(process_update.process) def _cleanup(self, process_update): @@ -121,39 +121,39 @@ class TaskRunnerProcessHandler(ProcessStateHandler): TaskRunnerHelper.kill_process(self._runner.state, process_update.process) def on_success(self, process_update): - log.debug('Process on_success %s' % process_update) - log.info('Process(%s) finished successfully [rc=%s]' % ( - process_update.process, process_update.return_code)) + log.debug('Process on_success %s', process_update) + log.info('Process(%s) finished successfully [rc=%s]', + process_update.process, process_update.return_code) self._cleanup(process_update) self._runner._task_processes.pop(process_update.process) self._runner._watcher.unregister(process_update.process) self._runner._plan.add_success(process_update.process) def _on_abnormal(self, process_update): - log.info('Process %s had an abnormal termination' % process_update.process) + log.info('Process %s had an abnormal termination', process_update.process) self._runner._task_processes.pop(process_update.process) self._runner._watcher.unregister(process_update.process) def on_failed(self, process_update): - log.debug('Process on_failed %s' % process_update) - log.info('Process(%s) failed [rc=%s]' % (process_update.process, process_update.return_code)) + log.debug('Process on_failed %s', process_update) + log.info('Process(%s) failed [rc=%s]', process_update.process, process_update.return_code) self._cleanup(process_update) self._on_abnormal(process_update) self._runner._plan.add_failure(process_update.process) if process_update.process in self._runner._plan.failed: - log.info('Process %s reached maximum failures, marking process run failed.' % + log.info('Process %s reached maximum failures, marking process run failed.', process_update.process) else: - log.info('Process %s under maximum failure limit, restarting.' % process_update.process) + log.info('Process %s under maximum failure limit, restarting.', process_update.process) def on_lost(self, process_update): - log.debug('Process on_lost %s' % process_update) + log.debug('Process on_lost %s', process_update) self._cleanup(process_update) self._on_abnormal(process_update) self._runner._plan.lost(process_update.process) def on_killed(self, process_update): - log.debug('Process on_killed %s' % process_update) + log.debug('Process on_killed %s', process_update) self._cleanup(process_update) self._runner._task_processes.pop(process_update.process) self._runner._watcher.unregister(process_update.process) @@ -177,7 +177,7 @@ class TaskRunnerTaskHandler(TaskStateHandler): self._pathspec = self._runner._pathspec def on_active(self, task_update): - log.debug('Task on_active(%s)' % task_update) + log.debug('Task on_active(%s)', task_update) self._runner._plan = self._runner._regular_plan if self._runner._recovery: return @@ -185,12 +185,12 @@ class TaskRunnerTaskHandler(TaskStateHandler): ThermosTaskWrapper(self._runner._task).to_json()) def on_cleaning(self, task_update): - log.debug('Task on_cleaning(%s)' % task_update) + log.debug('Task on_cleaning(%s)', task_update) self._runner._finalization_start = task_update.timestamp_ms / 1000.0 self._runner._terminate_plan(self._runner._regular_plan) def on_finalizing(self, task_update): - log.debug('Task on_finalizing(%s)' % task_update) + log.debug('Task on_finalizing(%s)', task_update) if not self._runner._recovery: self._runner._kill() self._runner._plan = self._runner._finalizing_plan @@ -198,20 +198,20 @@ class TaskRunnerTaskHandler(TaskStateHandler): self._runner._finalization_start = task_update.timestamp_ms / 1000.0 def on_killed(self, task_update): - log.debug('Task on_killed(%s)' % task_update) + log.debug('Task on_killed(%s)', task_update) self._cleanup() def on_success(self, task_update): - log.debug('Task on_success(%s)' % task_update) + log.debug('Task on_success(%s)', task_update) self._cleanup() log.info('Task succeeded.') def on_failed(self, task_update): - log.debug('Task on_failed(%s)' % task_update) + log.debug('Task on_failed(%s)', task_update) self._cleanup() def on_lost(self, task_update): - log.debug('Task on_lost(%s)' % task_update) + log.debug('Task on_lost(%s)', task_update) self._cleanup() def _cleanup(self): @@ -235,15 +235,15 @@ class TaskRunnerUniversalHandler(UniversalStateHandler): self._runner._ckpt_write(record) def on_process_transition(self, state, process_update): - log.debug('_on_process_transition: %s' % process_update) + log.debug('_on_process_transition: %s', process_update) self._checkpoint(RunnerCkpt(process_status=process_update)) def on_task_transition(self, state, task_update): - log.debug('_on_task_transition: %s' % task_update) + log.debug('_on_task_transition: %s', task_update) self._checkpoint(RunnerCkpt(task_status=task_update)) def on_initialization(self, header): - log.debug('_on_initialization: %s' % header) + log.debug('_on_initialization: %s', header) ThermosTaskValidator.assert_valid_task(self._runner.task) ThermosTaskValidator.assert_valid_ports(self._runner.task, header.ports) self._checkpoint(RunnerCkpt(runner_header=header)) @@ -413,7 +413,7 @@ class TaskRunner(object): log_dir=checkpoint.header.log_dir, task_id=task_id, portmap=checkpoint.header.ports, hostname=checkpoint.header.hostname) except Exception as e: - log.error('Failed to reconstitute checkpoint in TaskRunner.get: %s' % e, exc_info=True) + log.error('Failed to reconstitute checkpoint in TaskRunner.get: %s', e, exc_info=True) return None def __init__(self, task, checkpoint_root, sandbox, log_dir=None, @@ -577,8 +577,8 @@ class TaskRunner(object): try: yield except Exception as e: - log.error('Caught exception in self.control(): %s' % e) - log.error(' %s' % traceback.format_exc()) + log.error('Caught exception in self.control(): %s', e) + log.error(' %s', traceback.format_exc()) self._ckpt.close() def _resume_task(self): @@ -612,7 +612,7 @@ class TaskRunner(object): with open(ckpt_file, 'r') as fp: ckpt_recover = ThriftRecordReader(fp, RunnerCkpt) for record in ckpt_recover: - log.debug('Replaying runner checkpoint record: %s' % record) + log.debug('Replaying runner checkpoint record: %s', record) self._dispatcher.dispatch(self._state, record, recovery=True) def _replay_process_ckpts(self): @@ -640,7 +640,7 @@ class TaskRunner(object): except KeyError: # This will cause failures downstream, but they will at least be correctly # reflected in the process state. - log.error('Unknown user %s.' % self._user) + log.error('Unknown user %s.', self._user) uid = None header = RunnerHeader( @@ -677,8 +677,8 @@ class TaskRunner(object): def _set_process_status(self, process_name, process_state, **kw): if 'sequence_number' in kw: sequence_number = kw.pop('sequence_number') - log.debug('_set_process_status(%s <= %s, seq=%s[force])' % (process_name, - ProcessState._VALUES_TO_NAMES.get(process_state), sequence_number)) + log.debug('_set_process_status(%s <= %s, seq=%s[force])', process_name, + ProcessState._VALUES_TO_NAMES.get(process_state), sequence_number) else: current_run = self._current_process_run(process_name) if not current_run: @@ -686,8 +686,8 @@ class TaskRunner(object): sequence_number = 0 else: sequence_number = current_run.seq + 1 - log.debug('_set_process_status(%s <= %s, seq=%s[auto])' % (process_name, - ProcessState._VALUES_TO_NAMES.get(process_state), sequence_number)) + log.debug('_set_process_status(%s <= %s, seq=%s[auto])', process_name, + ProcessState._VALUES_TO_NAMES.get(process_state), sequence_number) runner_ckpt = RunnerCkpt(process_status=ProcessStatus( process=process_name, state=process_state, seq=sequence_number, **kw)) self._dispatcher.dispatch(self._state, runner_ckpt, self._recovery) @@ -781,8 +781,8 @@ class TaskRunner(object): running = list(plan.running) runnable = list(plan.runnable_at(now)) waiting = list(plan.waiting_at(now)) - log.debug('running:%d runnable:%d waiting:%d complete:%s' % ( - len(running), len(runnable), len(waiting), plan.is_complete())) + log.debug('running:%d runnable:%d waiting:%d complete:%s', + len(running), len(runnable), len(waiting), plan.is_complete()) return len(running + runnable + waiting) == 0 and not plan.is_complete() def is_healthy(self): @@ -791,9 +791,9 @@ class TaskRunner(object): max_failures = self._task.max_failures().get() deadlocked = self.deadlocked() under_failure_limit = max_failures == 0 or len(self._regular_plan.failed) < max_failures - log.debug('max_failures:%d failed:%d under_failure_limit:%s deadlocked:%s ==> health:%s' % ( + log.debug('max_failures:%d failed:%d under_failure_limit:%s deadlocked:%s ==> health:%s', max_failures, len(self._regular_plan.failed), under_failure_limit, deadlocked, - not deadlocked and under_failure_limit)) + not deadlocked and under_failure_limit) return not deadlocked and under_failure_limit def _current_process_run(self, process_name): @@ -822,9 +822,9 @@ class TaskRunner(object): return True if forked_but_never_came_up() or running_but_coordinator_died(): - log.info('Detected a LOST task: %s' % current_run) - log.debug(' forked_but_never_came_up: %s' % forked_but_never_came_up()) - log.debug(' running_but_coordinator_died: %s' % running_but_coordinator_died()) + log.info('Detected a LOST task: %s', current_run) + log.debug(' forked_but_never_came_up: %s', forked_but_never_came_up()) + log.debug(' running_but_coordinator_died: %s', running_but_coordinator_died()) return True return False @@ -833,8 +833,8 @@ class TaskRunner(object): log.debug('Schedule pass:') running = list(plan.running) - log.debug('running: %s' % ' '.join(plan.running)) - log.debug('finished: %s' % ' '.join(plan.finished)) + log.debug('running: %s', ' '.join(plan.running)) + log.debug('finished: %s', ' '.join(plan.finished)) launched = [] for process_name in plan.running: @@ -844,8 +844,8 @@ class TaskRunner(object): now = self._clock.time() runnable = list(plan.runnable_at(now)) waiting = list(plan.waiting_at(now)) - log.debug('runnable: %s' % ' '.join(runnable)) - log.debug('waiting: %s' % ' '.join( + log.debug('runnable: %s', ' '.join(runnable)) + log.debug('waiting: %s', ' '.join( '%s[T-%.1fs]' % (process, plan.get_wait(process)) for process in waiting)) def pick_processes(process_list): @@ -862,12 +862,12 @@ class TaskRunner(object): else: self._set_process_status(process_name, ProcessState.WAITING) tp = self._task_processes[process_name] - log.info('Forking Process(%s)' % process_name) + log.info('Forking Process(%s)', process_name) try: tp.start() launched.append(tp) except Process.Error as e: - log.error('Failed to launch process: %s' % e) + log.error('Failed to launch process: %s', e) self._set_process_status(process_name, ProcessState.FAILED) return len(launched) > 0 @@ -948,15 +948,15 @@ class TaskRunner(object): TaskState._VALUES_TO_NAMES.get(self.task_state(), 'UNKNOWN')) self._set_task_status(runner.transition_to()) continue - log.debug('Run loop: Work to be done within %.1fs' % iteration_wait) + log.debug('Run loop: Work to be done within %.1fs', iteration_wait) # step 2: check child process checkpoint streams for updates if not self.collect_updates(iteration_wait): # If we don't collect any updates, at least 'touch' the checkpoint stream # so as to prevent garbage collection. elapsed = self._clock.time() - start if elapsed < iteration_wait: - log.debug('Update collection only took %.1fs, idling %.1fs' % ( - elapsed, iteration_wait - elapsed)) + log.debug('Update collection only took %.1fs, idling %.1fs', + elapsed, iteration_wait - elapsed) self._clock.sleep(iteration_wait - elapsed) log.debug('Run loop: No updates collected, touching checkpoint.') os.utime(self._pathspec.getpath('runner_checkpoint'), None) @@ -969,8 +969,8 @@ class TaskRunner(object): Kill all processes associated with this task and set task/process states as terminal_status (defaults to KILLED) """ - log.debug('Runner issued kill: force:%s, preemption_wait:%s' % ( - force, preemption_wait)) + log.debug('Runner issued kill: force:%s, preemption_wait:%s', + force, preemption_wait) assert terminal_status in (TaskState.KILLED, TaskState.LOST) self._preemption_deadline = self._clock.time() + preemption_wait.as_(Time.SECONDS) with self.control(force): @@ -995,17 +995,17 @@ class TaskRunner(object): coordinator_pid, pid, tree = pid_tuple if TaskRunnerHelper.is_process_terminal(current_run.state): if coordinator_pid or pid or tree: - log.warning('Terminal process (%s) still has running pids:' % process) - log.warning(' coordinator_pid: %s' % coordinator_pid) - log.warning(' pid: %s' % pid) - log.warning(' tree: %s' % tree) + log.warning('Terminal process (%s) still has running pids:', process) + log.warning(' coordinator_pid: %s', coordinator_pid) + log.warning(' pid: %s', pid) + log.warning(' tree: %s', tree) TaskRunnerHelper.kill_process(self.state, process) else: if coordinator_pid or pid or tree: - log.info('Transitioning %s to KILLED' % process) + log.info('Transitioning %s to KILLED', process) self._set_process_status(process, ProcessState.KILLED, stop_time=self._clock.time(), return_code=-1) else: - log.info('Transitioning %s to LOST' % process) + log.info('Transitioning %s to LOST', process) if current_run.state != ProcessState.WAITING: self._set_process_status(process, ProcessState.LOST) http://git-wip-us.apache.org/repos/asf/aurora/blob/b3fa9feb/src/main/python/apache/thermos/monitoring/disk.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/monitoring/disk.py b/src/main/python/apache/thermos/monitoring/disk.py index 52c5d74..986d33a 100644 --- a/src/main/python/apache/thermos/monitoring/disk.py +++ b/src/main/python/apache/thermos/monitoring/disk.py @@ -40,8 +40,8 @@ class DiskCollectorThread(ExceptionalThread): def run(self): start = time.time() self.value = du(self.path) - log.debug("DiskCollectorThread: finished collection of %s in %.1fms" % ( - self.path, 1000.0 * (time.time() - start))) + log.debug("DiskCollectorThread: finished collection of %s in %.1fms", + self.path, 1000.0 * (time.time() - start)) self.event.set() def finished(self): http://git-wip-us.apache.org/repos/asf/aurora/blob/b3fa9feb/src/main/python/apache/thermos/monitoring/monitor.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/monitoring/monitor.py b/src/main/python/apache/thermos/monitoring/monitor.py index d77703e..3ab1e48 100644 --- a/src/main/python/apache/thermos/monitoring/monitor.py +++ b/src/main/python/apache/thermos/monitoring/monitor.py @@ -80,7 +80,7 @@ class TaskMonitor(object): try: self._dispatcher.dispatch(self._runnerstate, runner_update) except CheckpointDispatcher.InvalidSequenceNumber as e: - log.error('Checkpoint stream is corrupt: %s' % e) + log.error('Checkpoint stream is corrupt: %s', e) break new_ckpt_head = fp.tell() updated = self._ckpt_head != new_ckpt_head @@ -89,7 +89,7 @@ class TaskMonitor(object): except OSError as e: if e.errno == errno.ENOENT: # The log doesn't yet exist, will retry later. - log.warning('Could not read from checkpoint %s' % self._runner_ckpt) + log.warning('Could not read from checkpoint %s', self._runner_ckpt) return False else: raise http://git-wip-us.apache.org/repos/asf/aurora/blob/b3fa9feb/src/main/python/apache/thermos/monitoring/process_collector_psutil.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/monitoring/process_collector_psutil.py b/src/main/python/apache/thermos/monitoring/process_collector_psutil.py index 3000e95..e13d6dd 100644 --- a/src/main/python/apache/thermos/monitoring/process_collector_psutil.py +++ b/src/main/python/apache/thermos/monitoring/process_collector_psutil.py @@ -39,7 +39,7 @@ def process_to_sample(process): threads = process.num_threads() return ProcessSample(rate, user, system, rss, vms, nice, status, threads) except (AccessDenied, NoSuchProcess) as e: - log.debug('Error during process sampling [pid=%s]: %s' % (process.pid, e)) + log.debug('Error during process sampling [pid=%s]: %s', process.pid, e) return ProcessSample.empty() @@ -72,7 +72,7 @@ class ProcessTreeCollector(object): new_samples[self._pid] = parent_sample except (IOError, PsutilError) as e: - log.debug('Error during process sampling: %s' % e) + log.debug('Error during process sampling: %s', e) self._sample = ProcessSample.empty() self._rate = 0.0 @@ -90,7 +90,7 @@ class ProcessTreeCollector(object): new_user_sys = sum(map(attrgetter('user'), new)) + sum(map(attrgetter('system'), new)) old_user_sys = sum(map(attrgetter('user'), old)) + sum(map(attrgetter('system'), old)) self._rate = (new_user_sys - old_user_sys) / (self._stamp - last_stamp) - log.debug("Calculated rate for pid=%s and children: %s" % (self._process.pid, self._rate)) + log.debug("Calculated rate for pid=%s and children: %s", self._process.pid, self._rate) self._sampled_tree = new_samples @property http://git-wip-us.apache.org/repos/asf/aurora/blob/b3fa9feb/src/main/python/apache/thermos/monitoring/resource.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/monitoring/resource.py b/src/main/python/apache/thermos/monitoring/resource.py index f5e3849..adcdc75 100644 --- a/src/main/python/apache/thermos/monitoring/resource.py +++ b/src/main/python/apache/thermos/monitoring/resource.py @@ -137,7 +137,7 @@ class HistoryProvider(object): history_length = int(history_time.as_(Time.SECONDS) / min_collection_interval) if history_length > self.MAX_HISTORY: raise ValueError("Requested history length too large") - log.debug("Initialising ResourceHistory of length %s" % history_length) + log.debug("Initialising ResourceHistory of length %s", history_length) return ResourceHistory(history_length) @@ -166,7 +166,7 @@ class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread): """ self._task_monitor = task_monitor # exposes PIDs, sandbox self._task_id = task_id - log.debug('Initialising resource collection for task %s' % self._task_id) + log.debug('Initialising resource collection for task %s', self._task_id) self._process_collectors = dict() # ProcessStatus => ProcessTreeCollector self._disk_collector_class = disk_collector self._disk_collector = None @@ -225,7 +225,7 @@ class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread): """Thread entrypoint. Loop indefinitely, polling collectors at self._collection_interval and collating samples.""" - log.debug('Commencing resource monitoring for task "%s"' % self._task_id) + log.debug('Commencing resource monitoring for task "%s"', self._task_id) next_process_collection = 0 next_disk_collection = 0 @@ -252,7 +252,7 @@ class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread): if self._disk_collector: self._disk_collector.sample() else: - log.debug('No sandbox detected yet for %s' % self._task_id) + log.debug('No sandbox detected yet for %s', self._task_id) try: disk_usage = self._disk_collector.value if self._disk_collector else 0 @@ -264,10 +264,10 @@ class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread): self._history.add(now, self.FullResourceResult(proc_usage_dict, disk_usage)) except ValueError as err: - log.warning("Error recording resource sample: %s" % err) + log.warning("Error recording resource sample: %s", err) - log.debug("TaskResourceMonitor: finished collection of %s in %.2fs" % ( - self._task_id, (time.time() - now))) + log.debug("TaskResourceMonitor: finished collection of %s in %.2fs", + self._task_id, (time.time() - now)) # Sleep until any of the following conditions are met: # - it's time for the next disk collection @@ -288,4 +288,4 @@ class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread): log.warning('Task resource collection is backlogged. Consider increasing ' 'process_collection_interval and disk_collection_interval.') - log.debug('Stopping resource monitoring for task "%s"' % self._task_id) + log.debug('Stopping resource monitoring for task "%s"', self._task_id) http://git-wip-us.apache.org/repos/asf/aurora/blob/b3fa9feb/src/main/python/apache/thermos/observer/http/file_browser.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/observer/http/file_browser.py b/src/main/python/apache/thermos/observer/http/file_browser.py index d099665..dc740fb 100644 --- a/src/main/python/apache/thermos/observer/http/file_browser.py +++ b/src/main/python/apache/thermos/observer/http/file_browser.py @@ -42,7 +42,7 @@ def _read_chunk(filename, offset=None, length=None): try: fstat = os.stat(filename) except Exception as e: - log.error('Could not read from %s: %s' % (filename, e)) + log.error('Could not read from %s: %s', filename, e) return {} if offset == -1: @@ -56,7 +56,7 @@ def _read_chunk(filename, offset=None, length=None): try: data = fp.read(length) except IOError as e: - log.error('Failed to read %s: %s' % (filename, e), exc_info=True) + log.error('Failed to read %s: %s', filename, e, exc_info=True) return {} if data: http://git-wip-us.apache.org/repos/asf/aurora/blob/b3fa9feb/src/main/python/apache/thermos/observer/http/http_observer.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/observer/http/http_observer.py b/src/main/python/apache/thermos/observer/http/http_observer.py index 5bfc4f2..c81383c 100644 --- a/src/main/python/apache/thermos/observer/http/http_observer.py +++ b/src/main/python/apache/thermos/observer/http/http_observer.py @@ -134,5 +134,5 @@ class BottleObserver(HttpServer, StaticAssets, TaskObserverFileBrowser, TaskObse } template['process'].update(**all_processes[current_run_number].get('used', {})) template['runs'] = all_processes - log.debug('Rendering template is: %s' % template) + log.debug('Rendering template is: %s', template) return template http://git-wip-us.apache.org/repos/asf/aurora/blob/b3fa9feb/src/main/python/apache/thermos/observer/http/static_assets.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/observer/http/static_assets.py b/src/main/python/apache/thermos/observer/http/static_assets.py index 83adeb3..334c937 100644 --- a/src/main/python/apache/thermos/observer/http/static_assets.py +++ b/src/main/python/apache/thermos/observer/http/static_assets.py @@ -35,7 +35,7 @@ class StaticAssets(object): assets = pkg_resources.resource_listdir(__name__, 'assets') cached_assets = {} for asset in assets: - log.info(' detected asset: %s' % asset) + log.info(' detected asset: %s', asset) cached_assets[asset] = pkg_resources.resource_string( __name__, os.path.join('assets', asset)) self._assets = cached_assets http://git-wip-us.apache.org/repos/asf/aurora/blob/b3fa9feb/src/main/python/apache/thermos/observer/observed_task.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/observer/observed_task.py b/src/main/python/apache/thermos/observer/observed_task.py index 08540e1..cb90c8b 100644 --- a/src/main/python/apache/thermos/observer/observed_task.py +++ b/src/main/python/apache/thermos/observer/observed_task.py @@ -60,11 +60,11 @@ class ObservedTask(AbstractClass): if os.path.exists(path): task = ThermosTaskWrapper.from_file(path) if task is None: - log.error('Error reading ThermosTask from %s in observer.' % path) + log.error('Error reading ThermosTask from %s in observer.', path) else: context = self.context(self._task_id) if not context: - log.warning('Task not yet available: %s' % self._task_id) + log.warning('Task not yet available: %s', self._task_id) task = task.task() % Environment(thermos=context) memoized[self._task_id] = task @@ -77,7 +77,7 @@ class ObservedTask(AbstractClass): if mtime is None: mtime = self.safe_mtime(get_path('finished')) if mtime is None: - log.error("Couldn't get mtime for task %s!" % self._task_id) + log.error("Couldn't get mtime for task %s!", self._task_id) return mtime def context(self, task_id): http://git-wip-us.apache.org/repos/asf/aurora/blob/b3fa9feb/src/main/python/apache/thermos/observer/task_observer.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/observer/task_observer.py b/src/main/python/apache/thermos/observer/task_observer.py index 4bb5d23..a6870d4 100644 --- a/src/main/python/apache/thermos/observer/task_observer.py +++ b/src/main/python/apache/thermos/observer/task_observer.py @@ -96,9 +96,9 @@ class TaskObserver(ExceptionalThread, Lockable): ExceptionalThread.start(self) def __on_active(self, root, task_id): - log.debug('on_active(%r, %r)' % (root, task_id)) + log.debug('on_active(%r, %r)', root, task_id) if task_id in self.finished_tasks: - log.error('Found an active task (%s) in finished tasks?' % task_id) + log.error('Found an active task (%s) in finished tasks?', task_id) return task_monitor = TaskMonitor(root, task_id) resource_monitor = TaskResourceMonitor( @@ -115,14 +115,14 @@ class TaskObserver(ExceptionalThread, Lockable): ) def __on_finished(self, root, task_id): - log.debug('on_finished(%r, %r)' % (root, task_id)) + log.debug('on_finished(%r, %r)', root, task_id) active_task = self._active_tasks.pop(task_id, None) if active_task: active_task.resource_monitor.kill() self._finished_tasks[task_id] = FinishedObservedTask(root, task_id) def __on_removed(self, root, task_id): - log.debug('on_removed(%r, %r)' % (root, task_id)) + log.debug('on_removed(%r, %r)', root, task_id) active_task = self._active_tasks.pop(task_id, None) if active_task: active_task.resource_monitor.kill() @@ -139,7 +139,7 @@ class TaskObserver(ExceptionalThread, Lockable): with self.lock: start = time.time() self._detector.refresh() - log.debug("TaskObserver: finished checkpoint refresh in %.2fs" % (time.time() - start)) + log.debug("TaskObserver: finished checkpoint refresh in %.2fs", time.time() - start) @Lockable.sync def process_from_name(self, task_id, process_id): @@ -178,7 +178,7 @@ class TaskObserver(ExceptionalThread, Lockable): }.get(type, None) if tasks is None: - log.error('Unknown task type %s' % type) + log.error('Unknown task type %s', type) return {} return tasks @@ -313,7 +313,7 @@ class TaskObserver(ExceptionalThread, Lockable): resource_sample = self.active_tasks[task_id].resource_monitor.sample()[1] sample = resource_sample.process_sample.to_dict() sample['disk'] = resource_sample.disk_usage - log.debug("Got sample for task %s: %s" % (task_id, sample)) + log.debug("Got sample for task %s: %s", task_id, sample) return sample @Lockable.sync @@ -390,7 +390,7 @@ class TaskObserver(ExceptionalThread, Lockable): task = self.all_tasks[task_id].task if task is None: # TODO(wickman) Can this happen? - log.error('Could not find task: %s' % task_id) + log.error('Could not find task: %s', task_id) return {} state = self.raw_state(task_id) @@ -425,7 +425,7 @@ class TaskObserver(ExceptionalThread, Lockable): if task_id not in self.active_tasks: return ProcessSample.empty().to_dict() sample = self.active_tasks[task_id].resource_monitor.sample_by_process(process_name).to_dict() - log.debug('Resource consumption (%s, %s) => %s' % (task_id, process_name, sample)) + log.debug('Resource consumption (%s, %s) => %s', task_id, process_name, sample) return sample @Lockable.sync http://git-wip-us.apache.org/repos/asf/aurora/blob/b3fa9feb/src/main/python/apache/thermos/runner/thermos_runner.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/runner/thermos_runner.py b/src/main/python/apache/thermos/runner/thermos_runner.py index fa4f0fb..c434000 100644 --- a/src/main/python/apache/thermos/runner/thermos_runner.py +++ b/src/main/python/apache/thermos/runner/thermos_runner.py @@ -176,13 +176,13 @@ def get_task_from_options(opts): def runner_teardown(runner, sig=signal.SIGUSR1, frame=None): """Destroy runner on SIGUSR1 (kill) or SIGUSR2 (lose)""" op = 'kill' if sig == signal.SIGUSR1 else 'lose' - log.info('Thermos runner got signal %s, shutting down.' % sig) + log.info('Thermos runner got signal %s, shutting down.', sig) log.info('Interrupted frame:') if frame: for line in ''.join(traceback.format_stack(frame)).splitlines(): log.info(line) runner.close_ckpt() - log.info('Calling runner.%s()' % op) + log.info('Calling runner.%s()', op) getattr(runner, op)() sys.exit(0) @@ -201,7 +201,7 @@ def proxy_main(args, opts): missing_ports = set(thermos_task.ports()) - set(prebound_ports) if missing_ports: - log.error('ERROR! Unbound ports: %s' % ' '.join(port for port in missing_ports)) + log.error('ERROR! Unbound ports: %s', ' '.join(port for port in missing_ports)) sys.exit(INTERNAL_ERROR) if opts.setuid: @@ -213,7 +213,7 @@ def proxy_main(args, opts): try: pwd.getpwnam(user).pw_uid except KeyError: - log.error('Unknown user: %s' % user) + log.error('Unknown user: %s', user) sys.exit(UNKNOWN_USER) task_runner = TaskRunner( @@ -240,22 +240,22 @@ def proxy_main(args, opts): try: task_runner.run() except TaskRunner.InternalError as err: - log.error('Internal error: %s' % err) + log.error('Internal error: %s', err) sys.exit(INTERNAL_ERROR) except TaskRunner.InvalidTask as err: - log.error('Invalid task: %s' % err) + log.error('Invalid task: %s', err) sys.exit(INVALID_TASK) except TaskRunner.StateError as err: - log.error('Checkpoint error: %s' % err) + log.error('Checkpoint error: %s', err) sys.exit(TERMINAL_TASK) except Process.UnknownUserError as err: - log.error('User ceased to exist: %s' % err) + log.error('User ceased to exist: %s', err) sys.exit(UNKNOWN_USER) except KeyboardInterrupt: log.info('Caught ^C, tearing down runner.') runner_teardown(task_runner) except Exception as e: - log.error('Unknown exception: %s' % e) + log.error('Unknown exception: %s', e) for line in traceback.format_exc().splitlines(): log.error(line) sys.exit(UNKNOWN_ERROR)
