Refactor the server_job code to use the base_job implementation of record. The two main features that the server job code added on top of the basic job record were the ability to inject client job records into the server job record, and the ability to inject console/vlm warnings into the record.
The Autotest injection was implemented by adding a parse classmethod to the status_log_entry class that does the inverse of the render method, so that the Autotest class can construct status_log_entries from the raw text status logs it gets from the client. The server then re-records the messages, rather than using a low-level method to bypass the old job.record rendering. The warning injection was implemented by using the record_hook; to make it work better it was moved to be called before the entry is logged, not after. This required a slightly more involved hook, as it was useful to be able to call job.record directly from the hook and so the hook was wrapped in a callable class that prevents recursion. Signed-off-by: John Admanski <[email protected]> --- autotest/client/common_lib/base_job.py 2010-06-09 14:46:54.000000000 -0700 +++ autotest/client/common_lib/base_job.py 2010-06-09 14:46:54.000000000 -0700 @@ -418,6 +418,10 @@ class status_log_entry(object): """Represents a single status log entry.""" + RENDERED_NONE_VALUE = '----' + TIMESTAMP_FIELD = 'timestamp' + LOCALTIME_FIELD = 'localtime' + def __init__(self, status_code, subdir, operation, message, fields, timestamp=None): """Construct a status.log entry. @@ -471,9 +475,9 @@ # build up the timestamp if timestamp is None: timestamp = int(time.time()) - self.fields['timestamp'] = str(timestamp) - self.fields['localtime'] = time.strftime('%b %d %H:%M:%S', - time.localtime(timestamp)) + self.fields[self.TIMESTAMP_FIELD] = str(timestamp) + self.fields[self.LOCALTIME_FIELD] = time.strftime( + '%b %d %H:%M:%S', time.localtime(timestamp)) def is_start(self): @@ -498,8 +502,8 @@ @return: A text string suitable for writing into a status log file. """ # combine all the log line data into a tab-delimited string - subdir = self.subdir or '----' - operation = self.operation or '----' + subdir = self.subdir or self.RENDERED_NONE_VALUE + operation = self.operation or self.RENDERED_NONE_VALUE extra_fields = ['%s=%s' % field for field in self.fields.iteritems()] line_items = [self.status_code, subdir, operation] line_items += extra_fields + [self.message] @@ -511,6 +515,40 @@ return '\n'.join(all_lines) + @classmethod + def parse(cls, line): + """Parse a status log entry from a text string. + + This method is the inverse of render; it should always be true that + parse(entry.render()) produces a new status_log_entry equivalent to + entry. + + @return: A new status_log_entry instance with fields extracted from the + given status line. If the line is an extra message line then None + is returned. + """ + # extra message lines are always prepended with two spaces + if line.startswith(' '): + return None + + line = line.lstrip('\t') # ignore indentation + entry_parts = line.split('\t') + if len(entry_parts) < 4: + raise ValueError('%r is not a valid status line' % line) + status_code, subdir, operation = entry_parts[:3] + if subdir == cls.RENDERED_NONE_VALUE: + subdir = None + if operation == cls.RENDERED_NONE_VALUE: + operation = None + message = entry_parts[-1] + fields = dict(part.split('=', 1) for part in entry_parts[3:-1]) + if cls.TIMESTAMP_FIELD in fields: + timestamp = int(fields[cls.TIMESTAMP_FIELD]) + else: + timestamp = None + return cls(status_code, subdir, operation, message, fields, timestamp) + + class status_indenter(object): """Abstract interface that a status log indenter should use.""" @@ -548,7 +586,7 @@ self.global_filename attribute. @param subdir_filename: An optional filename to initialize the self.subdir_filename attribute. - @param record_hook: An optional function to be called after an entry + @param record_hook: An optional function to be called before an entry is logged. The function should expect a single parameter, a copy of the status_log_entry object. """ @@ -590,11 +628,13 @@ return self._indent_multiline_text(log_entry.render(), indent) - def record_entry(self, log_entry): + def record_entry(self, log_entry, log_in_subdir=True): """Record a status_log_entry into the appropriate status log files. @param log_entry: A status_log_entry instance to be recorded into the status logs. + @param log_in_subdir: A boolean that indicates (when true) that subdir + logs should be written into the subdirectory status log file. """ # acquire a strong reference for the duration of the method job = self._jobref() @@ -604,9 +644,13 @@ logging.warning(traceback.format_stack()) return + # call the record hook if one was given + if self._record_hook: + self._record_hook(log_entry) + # figure out where we need to log to log_files = [os.path.join(job.resultdir, self.global_filename)] - if log_entry.subdir: + if log_in_subdir and log_entry.subdir: log_files.append(os.path.join(job.resultdir, log_entry.subdir, self.subdir_filename)) @@ -619,10 +663,6 @@ finally: fileobj.close() - # call the record hook if one was given - if self._record_hook: - self._record_hook(log_entry) - # adjust the indentation if this was a START or END entry if log_entry.is_start(): self._indenter.increment() @@ -959,5 +999,17 @@ """ entry = status_log_entry(status_code, subdir, operation, status, optional_fields) - logger = self._get_status_logger() - logger.record_entry(entry) + self.record_entry(entry) + + + def record_entry(self, entry, log_in_subdir=True): + """Record a job-level status event, using a status_log_entry. + + This is the same as self.record but using an existing status log + entry object rather than constructing one for you. + + @param entry: A status_log_entry object + @param log_in_subdir: A boolean that indicates (when true) that subdir + logs should be written into the subdirectory status log file. + """ + self._get_status_logger().record_entry(entry, log_in_subdir) --- autotest/client/common_lib/base_job_unittest.py 2010-06-09 14:46:54.000000000 -0700 +++ autotest/client/common_lib/base_job_unittest.py 2010-06-09 14:46:54.000000000 -0700 @@ -1024,6 +1024,44 @@ custom_fields, 8) + def assertEntryEqual(self, lhs, rhs): + self.assertEqual( + (lhs.status_code, lhs.subdir, lhs.operation, lhs.fields, lhs.message), + (rhs.status_code, rhs.subdir, rhs.operation, rhs.fields, rhs.message)) + + + def test_base_parse(self): + entry = base_job.status_log_entry( + 'GOOD', None, None, 'message', {'field1': 'x', 'field2': 'y'}, + timestamp=16) + parsed_entry = base_job.status_log_entry.parse( + 'GOOD\t----\t----\tfield1=x\tfield2=y\ttimestamp=16\tmessage\n') + self.assertEntryEqual(entry, parsed_entry) + + + def test_subdir_parse(self): + entry = base_job.status_log_entry( + 'FAIL', 'sub', None, 'message', {'field1': 'x', 'field2': 'y'}, + timestamp=32) + parsed_entry = base_job.status_log_entry.parse( + 'FAIL\tsub\t----\tfield1=x\tfield2=y\ttimestamp=32\tmessage\n') + self.assertEntryEqual(entry, parsed_entry) + + + def test_operation_parse(self): + entry = base_job.status_log_entry( + 'ABORT', None, 'myop', 'message', {'field1': 'x', 'field2': 'y'}, + timestamp=64) + parsed_entry = base_job.status_log_entry.parse( + 'ABORT\t----\tmyop\tfield1=x\tfield2=y\ttimestamp=64\tmessage\n') + self.assertEntryEqual(entry, parsed_entry) + + + def test_extra_lines_parse(self): + parsed_entry = base_job.status_log_entry.parse( + ' This is a non-status line, line in a traceback\n') + self.assertEqual(None, parsed_entry) + class test_status_logger(unittest.TestCase): def setUp(self): @@ -1154,6 +1192,19 @@ self.assertEqual('LINE4\n', open('123/status').read()) + def test_writes_no_subdir_when_disabled(self): + os.mkdir('sub') + self.logger.record_entry(self.make_dummy_entry('LINE1')) + self.logger.record_entry(self.make_dummy_entry('LINE2', subdir='sub')) + self.logger.record_entry(self.make_dummy_entry( + 'LINE3', subdir='sub_nowrite'), log_in_subdir=False) + self.logger.record_entry(self.make_dummy_entry('LINE4', subdir='sub')) + + self.assertEqual('LINE1\nLINE2\nLINE3\nLINE4\n', open('status').read()) + self.assertEqual('LINE2\nLINE4\n', open('sub/status').read()) + self.assert_(not os.path.exists('sub_nowrite/status')) + + def test_indentation(self): self.logger.record_entry(self.make_dummy_entry('LINE1', start=True)) self.logger.record_entry(self.make_dummy_entry('LINE2')) --- autotest/server/autotest.py 2010-06-07 12:43:22.000000000 -0700 +++ autotest/server/autotest.py 2010-06-09 14:46:54.000000000 -0700 @@ -3,7 +3,7 @@ import re, os, sys, traceback, subprocess, time, pickle, glob, tempfile import logging, getpass from autotest_lib.server import installable_object, prebuild, utils -from autotest_lib.client.common_lib import log, error, autotemp +from autotest_lib.client.common_lib import base_job, log, error, autotemp from autotest_lib.client.common_lib import global_config, packages from autotest_lib.client.common_lib import utils as client_utils @@ -20,7 +20,7 @@ autoserv_prebuild = get_value('AUTOSERV', 'enable_server_prebuild', type=bool, default=False) - + class AutodirNotFoundError(Exception): """No Autotest installation could be found.""" @@ -859,16 +859,7 @@ self.log_collector = log_collector(host, tag, server_results_dir) self.leftover = "" self.last_line = "" - self.newest_timestamp = float("-inf") self.logs = {} - self.server_warnings = [] - - - def _update_timestamp(self, line): - match = self.extract_timestamp.search(line) - if match: - self.newest_timestamp = max(self.newest_timestamp, - int(match.group(1))) def _process_log_dict(self, log_dict): @@ -892,10 +883,10 @@ ordering is desired that one can be reconstructed from the status log by looking at timestamp lines.""" log_list = self._process_log_dict(self.logs) - for line in log_list: - self.job._record_prerendered(line + '\n') + for entry in log_list: + self.job.record_entry(entry, log_in_subdir=False) if log_list: - self.last_line = log_list[-1] + self.last_line = log_list[-1].render() def _process_quoted_line(self, tag, line): @@ -904,10 +895,12 @@ building up in self.logs, and then the newest line. If the tag is not blank, then push the line into the logs for handling later.""" - logging.info(line) + entry = base_job.status_log_entry.parse(line) + if entry is None: + return # the line contains no status lines if tag == "": self._process_logs() - self.job._record_prerendered(line + '\n') + self.job.record_entry(entry, log_in_subdir=False) self.last_line = line else: tag_parts = [int(x) for x in tag.split(".")] @@ -915,7 +908,7 @@ for part in tag_parts: log_dict = log_dict.setdefault(part, {}) log_list = log_dict.setdefault("logs", []) - log_list.append(line) + log_list.append(entry) def _process_info_line(self, line): @@ -1012,50 +1005,15 @@ return - def _format_warnings(self, last_line, warnings): - # use the indentation of whatever the last log line was - indent = self.extract_indent.match(last_line).group(1) - # if the last line starts a new group, add an extra indent - if last_line.lstrip('\t').startswith("START\t"): - indent += '\t' - return [self.job._render_record("WARN", None, None, msg, - timestamp, indent).rstrip('\n') - for timestamp, msg in warnings] - - - def _process_warnings(self, last_line, log_dict, warnings): - if log_dict.keys() in ([], ["logs"]): - # there are no sub-jobs, just append the warnings here - warnings = self._format_warnings(last_line, warnings) - log_list = log_dict.setdefault("logs", []) - log_list += warnings - for warning in warnings: - sys.stdout.write(warning + '\n') - else: - # there are sub-jobs, so put the warnings in there - log_list = log_dict.get("logs", []) - if log_list: - last_line = log_list[-1] - for key in sorted(log_dict.iterkeys()): - if key != "logs": - self._process_warnings(last_line, - log_dict[key], - warnings) - - def log_warning(self, msg, warning_type): """Injects a WARN message into the current status logging stream.""" timestamp = int(time.time()) if self.job.warning_manager.is_valid(timestamp, warning_type): - self.server_warnings.append((timestamp, msg)) + self.job.record('WARN', None, None, {}, msg) def write(self, data): - # first check for any new console warnings - self.server_warnings = self.job._read_warnings() + self.server_warnings - warnings = self.server_warnings - warnings.sort() # sort into timestamp order - # now start processing the existng buffer and the new data + # now start processing the existing buffer and the new data data = self.leftover + data lines = data.split('\n') processed_lines = 0 @@ -1063,13 +1021,6 @@ # process all the buffered data except the last line # ignore the last line since we may not have all of it yet for line in lines[:-1]: - self._update_timestamp(line) - # output any warnings between now and the next status line - old_warnings = [(timestamp, msg) for timestamp, msg in warnings - if timestamp < self.newest_timestamp] - self._process_warnings(self.last_line, self.logs, old_warnings) - del warnings[:len(old_warnings)] - # now process the line itself self._process_line(line) processed_lines += 1 finally: @@ -1085,7 +1036,6 @@ if self.leftover: self._process_line(self.leftover) self.leftover = "" - self._process_warnings(self.last_line, self.logs, self.server_warnings) self._process_logs() self.flush() --- autotest/server/autotest_unittest.py 2010-06-09 14:46:54.000000000 -0700 +++ autotest/server/autotest_unittest.py 2010-06-09 14:46:54.000000000 -0700 @@ -312,38 +312,5 @@ '/autotest/dest/:/autotest/fifo3') - def test_client_logger_write_handles_process_line_failures(self): - collector = autotest.log_collector.expect_new(self.host, '', '') - logger = autotest.client_logger(self.host, '', '') - logger.server_warnings = [(x, 'warn%d' % x) for x in xrange(5)] - self.god.stub_function(logger, '_process_warnings') - self.god.stub_function(logger, '_process_line') - def _update_timestamp(line): - logger.newest_timestamp += 2 - class ProcessingException(Exception): - pass - def _read_warnings(): - return [(5, 'warn5')] - logger._update_timestamp = _update_timestamp - logger.newest_timestamp = 0 - self.host.job._read_warnings = _read_warnings - # process line1, newest_timestamp -> 2 - logger._process_warnings.expect_call( - '', {}, [(0, 'warn0'), (1, 'warn1')]) - logger._process_line.expect_call('line1') - # process line2, newest_timestamp -> 4, failure occurs during process - logger._process_warnings.expect_call( - '', {}, [(2, 'warn2'), (3, 'warn3')]) - logger._process_line.expect_call('line2').and_raises( - ProcessingException('line processing failure')) - # when we call write with data we should get an exception - self.assertRaises(ProcessingException, logger.write, - 'line1\nline2\nline3\nline4') - # but, after the exception, the server_warnings and leftover buffers - # should contain the unprocessed data, and ONLY the unprocessed data - self.assertEqual(logger.server_warnings, [(4, 'warn4'), (5, 'warn5')]) - self.assertEqual(logger.leftover, 'line2\nline3\nline4') - - if __name__ == "__main__": unittest.main() --- autotest/server/server_job.py 2010-06-09 14:46:54.000000000 -0700 +++ autotest/server/server_job.py 2010-06-09 14:46:54.000000000 -0700 @@ -48,6 +48,70 @@ _get_site_job_data_dummy) +class status_indenter(base_job.status_indenter): + """Provide a simple integer-backed status indenter.""" + def __init__(self): + self._indent = 0 + + + @property + def indent(self): + return self._indent + + + def increment(self): + self._indent += 1 + + + def decrement(self): + self._indent -= 1 + + +class server_job_record_hook(object): + """The job.record hook for server job. Used to inject WARN messages from + the console or vlm whenever new logs are written, and to echo any logs + to INFO level logging. Implemented as a class so that it can use state to + block recursive calls, so that the hook can call job.record itself to + log WARN messages. + + Depends on job._read_warnings and job._logger. + """ + def __init__(self, job): + self._job = job + self._being_called = False + + + def __call__(self, entry): + """A wrapper around the 'real' record hook, the _hook method, which + prevents recursion. This isn't making any effort to be threadsafe, + the intent is to outright block infinite recursion via a + job.record->_hook->job.record->_hook->job.record... chain.""" + if self._being_called: + return + self._being_called = True + try: + self._hook(self._job, entry) + finally: + self._being_called = False + + + @staticmethod + def _hook(job, entry): + """The core hook, which can safely call job.record.""" + entries = [] + # poll all our warning loggers for new warnings + for timestamp, msg in job._read_warnings(): + warning_entry = base_job.status_log_entry( + 'WARN', None, None, msg, {}, timestamp=timestamp) + entries.append(warning_entry) + job.record_entry(warning_entry) + # echo rendered versions of all the status logs to info + entries.append(entry) + for entry in entries: + rendered_entry = job._logger.render_entry(entry) + logging.info(rendered_entry) + + class base_server_job(base_job.base_job): """The server-side concrete implementation of base_job. @@ -152,6 +216,11 @@ self.bootloader = None self.harness = None + # set up the status logger + self._logger = base_job.status_logger( + self, status_indenter(), 'status.log', 'status.log', + record_hook=server_job_record_hook(self)) + @classmethod def _find_base_directories(cls): @@ -177,6 +246,11 @@ return None + def _get_status_logger(self): + """Return a reference to the status logger.""" + return self._logger + + @staticmethod def _load_control_file(path): f = open(path) @@ -622,55 +696,6 @@ self.sysinfo.boot_loggables.add(loggable) - def record(self, status_code, subdir, operation, status='', - optional_fields=None): - """ - Record job-level status - - The intent is to make this file both machine parseable and - human readable. That involves a little more complexity, but - really isn't all that bad ;-) - - Format is <status code>\t<subdir>\t<operation>\t<status> - - status code: see common_lib.log.is_valid_status() - for valid status definition - - subdir: MUST be a relevant subdirectory in the results, - or None, which will be represented as '----' - - operation: description of what you ran (e.g. "dbench", or - "mkfs -t foobar /dev/sda9") - - status: error message or "completed sucessfully" - - ------------------------------------------------------------ - - Initial tabs indicate indent levels for grouping, and is - governed by self._record_prefix - - multiline messages have secondary lines prefaced by a double - space (' ') - - Executing this method will trigger the logging of all new - warnings to date from the various console loggers. - """ - # poll all our warning loggers for new warnings - warnings = self._read_warnings() - old_record_prefix = self._record_prefix - try: - if status_code.startswith("END "): - self._record_prefix += "\t" - for timestamp, msg in warnings: - self._record("WARN", None, None, msg, timestamp) - finally: - self._record_prefix = old_record_prefix - - # write out the actual status log line - self._record(status_code, subdir, operation, status, - optional_fields=optional_fields) - - def _read_warnings(self): """Poll all the warning loggers and extract any new warnings that have been logged. If the warnings belong to a category that is currently @@ -860,74 +885,6 @@ return [] - def _render_record(self, status_code, subdir, operation, status='', - epoch_time=None, record_prefix=None, - optional_fields=None): - """ - Internal Function to generate a record to be written into a - status log. For use by server_job.* classes only. - """ - if subdir: - if re.match(r'[\n\t]', subdir): - raise ValueError('Invalid character in subdir string') - substr = subdir - else: - substr = '----' - - if not log.is_valid_status(status_code): - raise ValueError('Invalid status code supplied: %s' % status_code) - if not operation: - operation = '----' - if re.match(r'[\n\t]', operation): - raise ValueError('Invalid character in operation string') - operation = operation.rstrip() - status = status.rstrip() - status = re.sub(r"\t", " ", status) - # Ensure any continuation lines are marked so we can - # detect them in the status file to ensure it is parsable. - status = re.sub(r"\n", "\n" + self._record_prefix + " ", status) - - if not optional_fields: - optional_fields = {} - - # Generate timestamps for inclusion in the logs - if epoch_time is None: - epoch_time = int(time.time()) - local_time = time.localtime(epoch_time) - optional_fields["timestamp"] = str(epoch_time) - optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S", - local_time) - - fields = [status_code, substr, operation] - fields += ["%s=%s" % x for x in optional_fields.iteritems()] - fields.append(status) - - if record_prefix is None: - record_prefix = self._record_prefix - - msg = '\t'.join(str(x) for x in fields) - return record_prefix + msg + '\n' - - - def _record_prerendered(self, msg): - """ - Record a pre-rendered msg into the status logs. The only - change this makes to the message is to add on the local - indentation. Should not be called outside of server_job.* - classes. Unlike _record, this does not write the message - to standard output. - """ - lines = [] - status_file = self.get_status_log_path() - status_log = open(status_file, 'a') - for line in msg.splitlines(): - line = self._record_prefix + line + '\n' - lines.append(line) - status_log.write(line) - status_log.close() - self.__parse_status(lines) - - def _fill_server_control_namespace(self, namespace, protect=True): """ Prepare a namespace to be used when executing server control files. @@ -1045,27 +1002,6 @@ execfile(code_file, namespace, namespace) - def _record(self, status_code, subdir, operation, status='', - epoch_time=None, optional_fields=None): - """ - Actual function for recording a single line into the status - logs. Should never be called directly, only by job.record as - this would bypass the console monitor logging. - """ - - msg = self._render_record(status_code, subdir, operation, status, - epoch_time, optional_fields=optional_fields) - - status_file = self.get_status_log_path() - sys.stdout.write(msg) - if status_file: - open(status_file, "a").write(msg) - if subdir: - sub_status_file = self.get_status_log_path(subdir) - open(sub_status_file, "a").write(msg) - self.__parse_status(msg.splitlines()) - - def __parse_status(self, new_lines): if not self._using_parser: return _______________________________________________ Autotest mailing list [email protected] http://test.kernel.org/cgi-bin/mailman/listinfo/autotest
