Refactor the server_job code to use the base_job implementation of
record.

The two main features that the server job code added on top of the
basic job record were the ability to inject client job records into
the server job record, and the ability to inject console/vlm warnings
into the record.

The Autotest injection was implemented by adding a parse classmethod
to the status_log_entry class that does the inverse of the render
method, so that the Autotest class can construct status_log_entries
from the raw text status logs it gets from the client. The server then
re-records the messages, rather than using a low-level method to
bypass the old job.record rendering.

The warning injection was implemented by using the record_hook; to
make it work better it was moved to be called before the entry is
logged, not after.  This required a slightly more involved hook, as it
was useful to be able to call job.record directly from the hook and so
the hook was wrapped in a callable class that prevents recursion.

Signed-off-by: John Admanski <[email protected]>

--- autotest/client/common_lib/base_job.py      2010-06-09 14:46:54.000000000 
-0700
+++ autotest/client/common_lib/base_job.py      2010-06-09 14:46:54.000000000 
-0700
@@ -418,6 +418,10 @@
 class status_log_entry(object):
     """Represents a single status log entry."""
 
+    RENDERED_NONE_VALUE = '----'
+    TIMESTAMP_FIELD = 'timestamp'
+    LOCALTIME_FIELD = 'localtime'
+
     def __init__(self, status_code, subdir, operation, message, fields,
                  timestamp=None):
         """Construct a status.log entry.
@@ -471,9 +475,9 @@
         # build up the timestamp
         if timestamp is None:
             timestamp = int(time.time())
-        self.fields['timestamp'] = str(timestamp)
-        self.fields['localtime'] = time.strftime('%b %d %H:%M:%S',
-                                                 time.localtime(timestamp))
+        self.fields[self.TIMESTAMP_FIELD] = str(timestamp)
+        self.fields[self.LOCALTIME_FIELD] = time.strftime(
+            '%b %d %H:%M:%S', time.localtime(timestamp))
 
 
     def is_start(self):
@@ -498,8 +502,8 @@
         @return: A text string suitable for writing into a status log file.
         """
         # combine all the log line data into a tab-delimited string
-        subdir = self.subdir or '----'
-        operation = self.operation or '----'
+        subdir = self.subdir or self.RENDERED_NONE_VALUE
+        operation = self.operation or self.RENDERED_NONE_VALUE
         extra_fields = ['%s=%s' % field for field in self.fields.iteritems()]
         line_items = [self.status_code, subdir, operation]
         line_items += extra_fields + [self.message]
@@ -511,6 +515,40 @@
         return '\n'.join(all_lines)
 
 
+    @classmethod
+    def parse(cls, line):
+        """Parse a status log entry from a text string.
+
+        This method is the inverse of render; it should always be true that
+        parse(entry.render()) produces a new status_log_entry equivalent to
+        entry.
+
+        @return: A new status_log_entry instance with fields extracted from the
+            given status line. If the line is an extra message line then None
+            is returned.
+        """
+        # extra message lines are always prepended with two spaces
+        if line.startswith('  '):
+            return None
+
+        line = line.lstrip('\t')  # ignore indentation
+        entry_parts = line.split('\t')
+        if len(entry_parts) < 4:
+            raise ValueError('%r is not a valid status line' % line)
+        status_code, subdir, operation = entry_parts[:3]
+        if subdir == cls.RENDERED_NONE_VALUE:
+            subdir = None
+        if operation == cls.RENDERED_NONE_VALUE:
+            operation = None
+        message = entry_parts[-1]
+        fields = dict(part.split('=', 1) for part in entry_parts[3:-1])
+        if cls.TIMESTAMP_FIELD in fields:
+            timestamp = int(fields[cls.TIMESTAMP_FIELD])
+        else:
+            timestamp = None
+        return cls(status_code, subdir, operation, message, fields, timestamp)
+
+
 class status_indenter(object):
     """Abstract interface that a status log indenter should use."""
 
@@ -548,7 +586,7 @@
             self.global_filename attribute.
         @param subdir_filename: An optional filename to initialize the
             self.subdir_filename attribute.
-        @param record_hook: An optional function to be called after an entry
+        @param record_hook: An optional function to be called before an entry
             is logged. The function should expect a single parameter, a
             copy of the status_log_entry object.
         """
@@ -590,11 +628,13 @@
         return self._indent_multiline_text(log_entry.render(), indent)
 
 
-    def record_entry(self, log_entry):
+    def record_entry(self, log_entry, log_in_subdir=True):
         """Record a status_log_entry into the appropriate status log files.
 
         @param log_entry: A status_log_entry instance to be recorded into the
                 status logs.
+        @param log_in_subdir: A boolean that indicates (when true) that subdir
+                logs should be written into the subdirectory status log file.
         """
         # acquire a strong reference for the duration of the method
         job = self._jobref()
@@ -604,9 +644,13 @@
             logging.warning(traceback.format_stack())
             return
 
+        # call the record hook if one was given
+        if self._record_hook:
+            self._record_hook(log_entry)
+
         # figure out where we need to log to
         log_files = [os.path.join(job.resultdir, self.global_filename)]
-        if log_entry.subdir:
+        if log_in_subdir and log_entry.subdir:
             log_files.append(os.path.join(job.resultdir, log_entry.subdir,
                                           self.subdir_filename))
 
@@ -619,10 +663,6 @@
             finally:
                 fileobj.close()
 
-        # call the record hook if one was given
-        if self._record_hook:
-            self._record_hook(log_entry)
-
         # adjust the indentation if this was a START or END entry
         if log_entry.is_start():
             self._indenter.increment()
@@ -959,5 +999,17 @@
         """
         entry = status_log_entry(status_code, subdir, operation, status,
                                  optional_fields)
-        logger = self._get_status_logger()
-        logger.record_entry(entry)
+        self.record_entry(entry)
+
+
+    def record_entry(self, entry, log_in_subdir=True):
+        """Record a job-level status event, using a status_log_entry.
+
+        This is the same as self.record but using an existing status log
+        entry object rather than constructing one for you.
+
+        @param entry: A status_log_entry object
+        @param log_in_subdir: A boolean that indicates (when true) that subdir
+                logs should be written into the subdirectory status log file.
+        """
+        self._get_status_logger().record_entry(entry, log_in_subdir)
--- autotest/client/common_lib/base_job_unittest.py     2010-06-09 
14:46:54.000000000 -0700
+++ autotest/client/common_lib/base_job_unittest.py     2010-06-09 
14:46:54.000000000 -0700
@@ -1024,6 +1024,44 @@
                             custom_fields, 8)
 
 
+    def assertEntryEqual(self, lhs, rhs):
+        self.assertEqual(
+          (lhs.status_code, lhs.subdir, lhs.operation, lhs.fields, 
lhs.message),
+          (rhs.status_code, rhs.subdir, rhs.operation, rhs.fields, 
rhs.message))
+
+
+    def test_base_parse(self):
+        entry = base_job.status_log_entry(
+            'GOOD', None, None, 'message', {'field1': 'x', 'field2': 'y'},
+            timestamp=16)
+        parsed_entry = base_job.status_log_entry.parse(
+            'GOOD\t----\t----\tfield1=x\tfield2=y\ttimestamp=16\tmessage\n')
+        self.assertEntryEqual(entry, parsed_entry)
+
+
+    def test_subdir_parse(self):
+        entry = base_job.status_log_entry(
+            'FAIL', 'sub', None, 'message', {'field1': 'x', 'field2': 'y'},
+            timestamp=32)
+        parsed_entry = base_job.status_log_entry.parse(
+            'FAIL\tsub\t----\tfield1=x\tfield2=y\ttimestamp=32\tmessage\n')
+        self.assertEntryEqual(entry, parsed_entry)
+
+
+    def test_operation_parse(self):
+        entry = base_job.status_log_entry(
+            'ABORT', None, 'myop', 'message', {'field1': 'x', 'field2': 'y'},
+            timestamp=64)
+        parsed_entry = base_job.status_log_entry.parse(
+            'ABORT\t----\tmyop\tfield1=x\tfield2=y\ttimestamp=64\tmessage\n')
+        self.assertEntryEqual(entry, parsed_entry)
+
+
+    def test_extra_lines_parse(self):
+        parsed_entry = base_job.status_log_entry.parse(
+            '  This is a non-status line, line in a traceback\n')
+        self.assertEqual(None, parsed_entry)
+
 
 class test_status_logger(unittest.TestCase):
     def setUp(self):
@@ -1154,6 +1192,19 @@
         self.assertEqual('LINE4\n', open('123/status').read())
 
 
+    def test_writes_no_subdir_when_disabled(self):
+        os.mkdir('sub')
+        self.logger.record_entry(self.make_dummy_entry('LINE1'))
+        self.logger.record_entry(self.make_dummy_entry('LINE2', subdir='sub'))
+        self.logger.record_entry(self.make_dummy_entry(
+            'LINE3', subdir='sub_nowrite'), log_in_subdir=False)
+        self.logger.record_entry(self.make_dummy_entry('LINE4', subdir='sub'))
+
+        self.assertEqual('LINE1\nLINE2\nLINE3\nLINE4\n', open('status').read())
+        self.assertEqual('LINE2\nLINE4\n', open('sub/status').read())
+        self.assert_(not os.path.exists('sub_nowrite/status'))
+
+
     def test_indentation(self):
         self.logger.record_entry(self.make_dummy_entry('LINE1', start=True))
         self.logger.record_entry(self.make_dummy_entry('LINE2'))
--- autotest/server/autotest.py 2010-06-07 12:43:22.000000000 -0700
+++ autotest/server/autotest.py 2010-06-09 14:46:54.000000000 -0700
@@ -3,7 +3,7 @@
 import re, os, sys, traceback, subprocess, time, pickle, glob, tempfile
 import logging, getpass
 from autotest_lib.server import installable_object, prebuild, utils
-from autotest_lib.client.common_lib import log, error, autotemp
+from autotest_lib.client.common_lib import base_job, log, error, autotemp
 from autotest_lib.client.common_lib import global_config, packages
 from autotest_lib.client.common_lib import utils as client_utils
 
@@ -20,7 +20,7 @@
 autoserv_prebuild = get_value('AUTOSERV', 'enable_server_prebuild',
                               type=bool, default=False)
 
-    
+
 class AutodirNotFoundError(Exception):
     """No Autotest installation could be found."""
 
@@ -859,16 +859,7 @@
         self.log_collector = log_collector(host, tag, server_results_dir)
         self.leftover = ""
         self.last_line = ""
-        self.newest_timestamp = float("-inf")
         self.logs = {}
-        self.server_warnings = []
-
-
-    def _update_timestamp(self, line):
-        match = self.extract_timestamp.search(line)
-        if match:
-            self.newest_timestamp = max(self.newest_timestamp,
-                                        int(match.group(1)))
 
 
     def _process_log_dict(self, log_dict):
@@ -892,10 +883,10 @@
         ordering is desired that one can be reconstructed from the
         status log by looking at timestamp lines."""
         log_list = self._process_log_dict(self.logs)
-        for line in log_list:
-            self.job._record_prerendered(line + '\n')
+        for entry in log_list:
+            self.job.record_entry(entry, log_in_subdir=False)
         if log_list:
-            self.last_line = log_list[-1]
+            self.last_line = log_list[-1].render()
 
 
     def _process_quoted_line(self, tag, line):
@@ -904,10 +895,12 @@
         building up in self.logs, and then the newest line. If the
         tag is not blank, then push the line into the logs for handling
         later."""
-        logging.info(line)
+        entry = base_job.status_log_entry.parse(line)
+        if entry is None:
+            return  # the line contains no status lines
         if tag == "":
             self._process_logs()
-            self.job._record_prerendered(line + '\n')
+            self.job.record_entry(entry, log_in_subdir=False)
             self.last_line = line
         else:
             tag_parts = [int(x) for x in tag.split(".")]
@@ -915,7 +908,7 @@
             for part in tag_parts:
                 log_dict = log_dict.setdefault(part, {})
             log_list = log_dict.setdefault("logs", [])
-            log_list.append(line)
+            log_list.append(entry)
 
 
     def _process_info_line(self, line):
@@ -1012,50 +1005,15 @@
                 return
 
 
-    def _format_warnings(self, last_line, warnings):
-        # use the indentation of whatever the last log line was
-        indent = self.extract_indent.match(last_line).group(1)
-        # if the last line starts a new group, add an extra indent
-        if last_line.lstrip('\t').startswith("START\t"):
-            indent += '\t'
-        return [self.job._render_record("WARN", None, None, msg,
-                                        timestamp, indent).rstrip('\n')
-                for timestamp, msg in warnings]
-
-
-    def _process_warnings(self, last_line, log_dict, warnings):
-        if log_dict.keys() in ([], ["logs"]):
-            # there are no sub-jobs, just append the warnings here
-            warnings = self._format_warnings(last_line, warnings)
-            log_list = log_dict.setdefault("logs", [])
-            log_list += warnings
-            for warning in warnings:
-                sys.stdout.write(warning + '\n')
-        else:
-            # there are sub-jobs, so put the warnings in there
-            log_list = log_dict.get("logs", [])
-            if log_list:
-                last_line = log_list[-1]
-            for key in sorted(log_dict.iterkeys()):
-                if key != "logs":
-                    self._process_warnings(last_line,
-                                           log_dict[key],
-                                           warnings)
-
-
     def log_warning(self, msg, warning_type):
         """Injects a WARN message into the current status logging stream."""
         timestamp = int(time.time())
         if self.job.warning_manager.is_valid(timestamp, warning_type):
-            self.server_warnings.append((timestamp, msg))
+            self.job.record('WARN', None, None, {}, msg)
 
 
     def write(self, data):
-        # first check for any new console warnings
-        self.server_warnings = self.job._read_warnings() + self.server_warnings
-        warnings = self.server_warnings
-        warnings.sort()  # sort into timestamp order
-        # now start processing the existng buffer and the new data
+        # now start processing the existing buffer and the new data
         data = self.leftover + data
         lines = data.split('\n')
         processed_lines = 0
@@ -1063,13 +1021,6 @@
             # process all the buffered data except the last line
             # ignore the last line since we may not have all of it yet
             for line in lines[:-1]:
-                self._update_timestamp(line)
-                # output any warnings between now and the next status line
-                old_warnings = [(timestamp, msg) for timestamp, msg in warnings
-                                if timestamp < self.newest_timestamp]
-                self._process_warnings(self.last_line, self.logs, old_warnings)
-                del warnings[:len(old_warnings)]
-                # now process the line itself
                 self._process_line(line)
                 processed_lines += 1
         finally:
@@ -1085,7 +1036,6 @@
         if self.leftover:
             self._process_line(self.leftover)
             self.leftover = ""
-        self._process_warnings(self.last_line, self.logs, self.server_warnings)
         self._process_logs()
         self.flush()
 
--- autotest/server/autotest_unittest.py        2010-06-09 14:46:54.000000000 
-0700
+++ autotest/server/autotest_unittest.py        2010-06-09 14:46:54.000000000 
-0700
@@ -312,38 +312,5 @@
                              '/autotest/dest/:/autotest/fifo3')
 
 
-    def test_client_logger_write_handles_process_line_failures(self):
-        collector = autotest.log_collector.expect_new(self.host, '', '')
-        logger = autotest.client_logger(self.host, '', '')
-        logger.server_warnings = [(x, 'warn%d' % x) for x in xrange(5)]
-        self.god.stub_function(logger, '_process_warnings')
-        self.god.stub_function(logger, '_process_line')
-        def _update_timestamp(line):
-            logger.newest_timestamp += 2
-        class ProcessingException(Exception):
-            pass
-        def _read_warnings():
-            return [(5, 'warn5')]
-        logger._update_timestamp = _update_timestamp
-        logger.newest_timestamp = 0
-        self.host.job._read_warnings = _read_warnings
-        # process line1, newest_timestamp -> 2
-        logger._process_warnings.expect_call(
-                '', {}, [(0, 'warn0'), (1, 'warn1')])
-        logger._process_line.expect_call('line1')
-        # process line2, newest_timestamp -> 4, failure occurs during process
-        logger._process_warnings.expect_call(
-                '', {}, [(2, 'warn2'), (3, 'warn3')])
-        logger._process_line.expect_call('line2').and_raises(
-                ProcessingException('line processing failure'))
-        # when we call write with data we should get an exception
-        self.assertRaises(ProcessingException, logger.write,
-                          'line1\nline2\nline3\nline4')
-        # but, after the exception, the server_warnings and leftover buffers
-        # should contain the unprocessed data, and ONLY the unprocessed data
-        self.assertEqual(logger.server_warnings, [(4, 'warn4'), (5, 'warn5')])
-        self.assertEqual(logger.leftover, 'line2\nline3\nline4')
-
-
 if __name__ == "__main__":
     unittest.main()
--- autotest/server/server_job.py       2010-06-09 14:46:54.000000000 -0700
+++ autotest/server/server_job.py       2010-06-09 14:46:54.000000000 -0700
@@ -48,6 +48,70 @@
     _get_site_job_data_dummy)
 
 
+class status_indenter(base_job.status_indenter):
+    """Provide a simple integer-backed status indenter."""
+    def __init__(self):
+        self._indent = 0
+
+
+    @property
+    def indent(self):
+        return self._indent
+
+
+    def increment(self):
+        self._indent += 1
+
+
+    def decrement(self):
+        self._indent -= 1
+
+
+class server_job_record_hook(object):
+    """The job.record hook for server job. Used to inject WARN messages from
+    the console or vlm whenever new logs are written, and to echo any logs
+    to INFO level logging. Implemented as a class so that it can use state to
+    block recursive calls, so that the hook can call job.record itself to
+    log WARN messages.
+
+    Depends on job._read_warnings and job._logger.
+    """
+    def __init__(self, job):
+        self._job = job
+        self._being_called = False
+
+
+    def __call__(self, entry):
+        """A wrapper around the 'real' record hook, the _hook method, which
+        prevents recursion. This isn't making any effort to be threadsafe,
+        the intent is to outright block infinite recursion via a
+        job.record->_hook->job.record->_hook->job.record... chain."""
+        if self._being_called:
+            return
+        self._being_called = True
+        try:
+            self._hook(self._job, entry)
+        finally:
+            self._being_called = False
+
+
+    @staticmethod
+    def _hook(job, entry):
+        """The core hook, which can safely call job.record."""
+        entries = []
+        # poll all our warning loggers for new warnings
+        for timestamp, msg in job._read_warnings():
+            warning_entry = base_job.status_log_entry(
+                'WARN', None, None, msg, {}, timestamp=timestamp)
+            entries.append(warning_entry)
+            job.record_entry(warning_entry)
+        # echo rendered versions of all the status logs to info
+        entries.append(entry)
+        for entry in entries:
+            rendered_entry = job._logger.render_entry(entry)
+            logging.info(rendered_entry)
+
+
 class base_server_job(base_job.base_job):
     """The server-side concrete implementation of base_job.
 
@@ -152,6 +216,11 @@
         self.bootloader = None
         self.harness = None
 
+        # set up the status logger
+        self._logger = base_job.status_logger(
+            self, status_indenter(), 'status.log', 'status.log',
+            record_hook=server_job_record_hook(self))
+
 
     @classmethod
     def _find_base_directories(cls):
@@ -177,6 +246,11 @@
             return None
 
 
+    def _get_status_logger(self):
+        """Return a reference to the status logger."""
+        return self._logger
+
+
     @staticmethod
     def _load_control_file(path):
         f = open(path)
@@ -622,55 +696,6 @@
             self.sysinfo.boot_loggables.add(loggable)
 
 
-    def record(self, status_code, subdir, operation, status='',
-               optional_fields=None):
-        """
-        Record job-level status
-
-        The intent is to make this file both machine parseable and
-        human readable. That involves a little more complexity, but
-        really isn't all that bad ;-)
-
-        Format is <status code>\t<subdir>\t<operation>\t<status>
-
-        status code: see common_lib.log.is_valid_status()
-                     for valid status definition
-
-        subdir: MUST be a relevant subdirectory in the results,
-        or None, which will be represented as '----'
-
-        operation: description of what you ran (e.g. "dbench", or
-                                        "mkfs -t foobar /dev/sda9")
-
-        status: error message or "completed sucessfully"
-
-        ------------------------------------------------------------
-
-        Initial tabs indicate indent levels for grouping, and is
-        governed by self._record_prefix
-
-        multiline messages have secondary lines prefaced by a double
-        space ('  ')
-
-        Executing this method will trigger the logging of all new
-        warnings to date from the various console loggers.
-        """
-        # poll all our warning loggers for new warnings
-        warnings = self._read_warnings()
-        old_record_prefix = self._record_prefix
-        try:
-            if status_code.startswith("END "):
-                self._record_prefix += "\t"
-            for timestamp, msg in warnings:
-                self._record("WARN", None, None, msg, timestamp)
-        finally:
-            self._record_prefix = old_record_prefix
-
-        # write out the actual status log line
-        self._record(status_code, subdir, operation, status,
-                      optional_fields=optional_fields)
-
-
     def _read_warnings(self):
         """Poll all the warning loggers and extract any new warnings that have
         been logged. If the warnings belong to a category that is currently
@@ -860,74 +885,6 @@
             return []
 
 
-    def _render_record(self, status_code, subdir, operation, status='',
-                       epoch_time=None, record_prefix=None,
-                       optional_fields=None):
-        """
-        Internal Function to generate a record to be written into a
-        status log. For use by server_job.* classes only.
-        """
-        if subdir:
-            if re.match(r'[\n\t]', subdir):
-                raise ValueError('Invalid character in subdir string')
-            substr = subdir
-        else:
-            substr = '----'
-
-        if not log.is_valid_status(status_code):
-            raise ValueError('Invalid status code supplied: %s' % status_code)
-        if not operation:
-            operation = '----'
-        if re.match(r'[\n\t]', operation):
-            raise ValueError('Invalid character in operation string')
-        operation = operation.rstrip()
-        status = status.rstrip()
-        status = re.sub(r"\t", "  ", status)
-        # Ensure any continuation lines are marked so we can
-        # detect them in the status file to ensure it is parsable.
-        status = re.sub(r"\n", "\n" + self._record_prefix + "  ", status)
-
-        if not optional_fields:
-            optional_fields = {}
-
-        # Generate timestamps for inclusion in the logs
-        if epoch_time is None:
-            epoch_time = int(time.time())
-        local_time = time.localtime(epoch_time)
-        optional_fields["timestamp"] = str(epoch_time)
-        optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
-                                                     local_time)
-
-        fields = [status_code, substr, operation]
-        fields += ["%s=%s" % x for x in optional_fields.iteritems()]
-        fields.append(status)
-
-        if record_prefix is None:
-            record_prefix = self._record_prefix
-
-        msg = '\t'.join(str(x) for x in fields)
-        return record_prefix + msg + '\n'
-
-
-    def _record_prerendered(self, msg):
-        """
-        Record a pre-rendered msg into the status logs. The only
-        change this makes to the message is to add on the local
-        indentation. Should not be called outside of server_job.*
-        classes. Unlike _record, this does not write the message
-        to standard output.
-        """
-        lines = []
-        status_file = self.get_status_log_path()
-        status_log = open(status_file, 'a')
-        for line in msg.splitlines():
-            line = self._record_prefix + line + '\n'
-            lines.append(line)
-            status_log.write(line)
-        status_log.close()
-        self.__parse_status(lines)
-
-
     def _fill_server_control_namespace(self, namespace, protect=True):
         """
         Prepare a namespace to be used when executing server control files.
@@ -1045,27 +1002,6 @@
         execfile(code_file, namespace, namespace)
 
 
-    def _record(self, status_code, subdir, operation, status='',
-                 epoch_time=None, optional_fields=None):
-        """
-        Actual function for recording a single line into the status
-        logs. Should never be called directly, only by job.record as
-        this would bypass the console monitor logging.
-        """
-
-        msg = self._render_record(status_code, subdir, operation, status,
-                                  epoch_time, optional_fields=optional_fields)
-
-        status_file = self.get_status_log_path()
-        sys.stdout.write(msg)
-        if status_file:
-            open(status_file, "a").write(msg)
-        if subdir:
-            sub_status_file = self.get_status_log_path(subdir)
-            open(sub_status_file, "a").write(msg)
-        self.__parse_status(msg.splitlines())
-
-
     def __parse_status(self, new_lines):
         if not self._using_parser:
             return
_______________________________________________
Autotest mailing list
[email protected]
http://test.kernel.org/cgi-bin/mailman/listinfo/autotest

Reply via email to