Add a new Archiving stage to the scheduler, which runs after Parsing.  This 
stage is responsible for copying results to the results server in a drone 
setup, a task currently performed directly by the scheduler, and allows for 
site-specific archiving functionality, replacing the site_parse functionality.  
It does this by running autoserv with a special control file 
(scheduler/archive_results.control.srv), which loads and runs code from the new 
scheduler.archive_results module.  The implementation was mostly 
straightfoward, as the archiving stage is fully analogous to the parser stage.  
I did make a couple of refactorings:
* factored out the parser throttling code into a common superclass that the 
ArchiveResultsTask could share
* added some generic flags to Autoserv to duplicate special-case functionality 
we'd added for the --collect-crashinfo option -- namely, specifying a different 
pidfile name and specifying that autoserv should allow (and even expect) an 
existing results directory.  in the future, i think it'd be more elegant to 
make crashinfo collection run using a special control file (as archiving 
works), rather than a hard-coded command-line option.
* moved call to server_job.init_parser() out of the constructor, since this was 
an easy source of exceptions that wouldn't get logged.

Note I believe some of the functional test changes slipped into my previous 
change there, which is why that looks smaller than you'd expect.

Signed-off-by: Steve Howard <[email protected]>

--- autotest/client/common_lib/host_queue_entry_states.py       2009-12-30 
15:31:05.000000000 -0800
+++ autotest/client/common_lib/host_queue_entry_states.py       2009-12-30 
15:31:05.000000000 -0800
@@ -7,9 +7,9 @@
 
 from autotest_lib.client.common_lib import enum
 
-Status = enum.Enum('Queued', 'Starting', 'Verifying', 'Pending', 'Running',
-                   'Gathering', 'Parsing', 'Aborted', 'Completed',
-                   'Failed', 'Stopped', 'Template', 'Waiting',
+Status = enum.Enum('Queued', 'Starting', 'Verifying', 'Pending', 'Waiting',
+                   'Running', 'Gathering', 'Parsing', 'Archiving', 'Aborted',
+                   'Completed', 'Failed', 'Stopped', 'Template',
                    string_values=True)
 ACTIVE_STATUSES = (Status.STARTING, Status.VERIFYING, Status.PENDING,
                    Status.RUNNING, Status.GATHERING)
--- autotest/frontend/afe/rpc_interface.py      2009-12-30 15:31:05.000000000 
-0800
+++ autotest/frontend/afe/rpc_interface.py      2009-12-30 15:31:05.000000000 
-0800
@@ -836,7 +836,8 @@
                                    "Parsing": "Awaiting parse of final 
results",
                                    "Gathering": "Gathering log files",
                                    "Template": "Template job for recurring 
run",
-                                   "Waiting": "Waiting for scheduler action"}
+                                   "Waiting": "Waiting for scheduler action",
+                                   "Archiving": "Archiving results"}
     return result
 
 
--- /dev/null   2009-12-17 12:29:38.000000000 -0800
+++ autotest/scheduler/archive_results.control.srv      2009-12-30 
15:31:05.000000000 -0800
@@ -0,0 +1,4 @@
+from autotest_lib.scheduler import archive_results
+
+archiver = archive_results.ResultsArchiver()
+archiver.archive_results(job.resultdir)
--- /dev/null   2009-12-17 12:29:38.000000000 -0800
+++ autotest/scheduler/archive_results.py       2009-12-30 15:31:05.000000000 
-0800
@@ -0,0 +1,21 @@
+#!/usr/bin/python
+
+import common, logging
+from autotest_lib.client.common_lib import global_config, utils
+from autotest_lib.scheduler import drone_utility
+
+class BaseResultsArchiver(object):
+    def archive_results(self, path):
+        results_host = global_config.global_config.get_config_value(
+                'SCHEDULER', 'results_host', default=None)
+        if not results_host:
+            return
+
+        logging.info('Archiving %s to %s', path, results_host)
+        utility = drone_utility.DroneUtility()
+        utility.sync_send_file_to(results_host, path, path, can_fail=True)
+
+
+ResultsArchiver = utils.import_site_class(
+        __file__, 'autotest_lib.scheduler.site_archive_results',
+        'SiteResultsArchiver', BaseResultsArchiver)
--- autotest/scheduler/drone_utility.py 2009-12-30 15:31:05.000000000 -0800
+++ autotest/scheduler/drone_utility.py 2009-12-30 15:31:05.000000000 -0800
@@ -322,7 +322,7 @@
                                (hostname, source_path, destination_path))
 
 
-    def _sync_send_file_to(self, hostname, source_path, destination_path,
+    def sync_send_file_to(self, hostname, source_path, destination_path,
                            can_fail):
         host = create_host(hostname)
         try:
@@ -350,7 +350,7 @@
 
     def send_file_to(self, hostname, source_path, destination_path,
                      can_fail=False):
-        self.run_async_command(self._sync_send_file_to,
+        self.run_async_command(self.sync_send_file_to,
                                (hostname, source_path, destination_path,
                                 can_fail))
 
--- autotest/scheduler/monitor_db.py    2009-12-30 15:31:05.000000000 -0800
+++ autotest/scheduler/monitor_db.py    2009-12-30 15:31:05.000000000 -0800
@@ -45,9 +45,10 @@
 _AUTOSERV_PID_FILE = '.autoserv_execute'
 _CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
 _PARSER_PID_FILE = '.parser_execute'
+_ARCHIVER_PID_FILE = '.archiver_execute'
 
 _ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
-                      _PARSER_PID_FILE)
+                      _PARSER_PID_FILE, _ARCHIVER_PID_FILE)
 
 # error message to leave in results dir when an autoserv process disappears
 # mysteriously
@@ -766,7 +767,8 @@
         statuses = (models.HostQueueEntry.Status.STARTING,
                     models.HostQueueEntry.Status.RUNNING,
                     models.HostQueueEntry.Status.GATHERING,
-                    models.HostQueueEntry.Status.PARSING)
+                    models.HostQueueEntry.Status.PARSING,
+                    models.HostQueueEntry.Status.ARCHIVING)
         status_list = ','.join("'%s'" % status for status in statuses)
         queue_entries = HostQueueEntry.fetch(
                 where='status IN (%s)' % status_list)
@@ -812,16 +814,19 @@
             return GatherLogsTask(queue_entries=task_entries)
         if queue_entry.status == models.HostQueueEntry.Status.PARSING:
             return FinalReparseTask(queue_entries=task_entries)
+        if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
+            return ArchiveResultsTask(queue_entries=task_entries)
 
         raise SchedulerError('_get_agent_task_for_queue_entry got entry with '
                              'invalid status %s: %s' % (entry.status, entry))
 
 
     def _check_for_duplicate_host_entries(self, task_entries):
-        parsing_status = models.HostQueueEntry.Status.PARSING
+        non_host_statuses = (models.HostQueueEntry.Status.PARSING,
+                             models.HostQueueEntry.Status.ARCHIVING)
         for task_entry in task_entries:
             using_host = (task_entry.host is not None
-                          and task_entry.status != parsing_status)
+                          and task_entry.status not in non_host_statuses)
             if using_host:
                 self._assert_host_has_no_agent(task_entry)
 
@@ -1607,9 +1612,9 @@
             queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
 
 
-    def _copy_and_parse_results(self, queue_entries, use_monitor=None):
-        self._copy_results(queue_entries, use_monitor)
-        self._parse_results(queue_entries)
+    def _archive_results(self, queue_entries):
+        for queue_entry in queue_entries:
+            queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
 
 
     def _command_line(self):
@@ -1721,6 +1726,22 @@
                         self._working_directory()))
 
 
+    def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
+                                    allowed_host_statuses=None):
+        for entry in queue_entries:
+            if entry.status not in allowed_hqe_statuses:
+                raise SchedulerError('Queue task attempting to start '
+                                     'entry with invalid status %s: %s'
+                                     % (entry.status, entry))
+            invalid_host_status = (
+                    allowed_host_statuses is not None
+                    and entry.host.status not in allowed_host_statuses)
+            if invalid_host_status:
+                raise SchedulerError('Queue task attempting to start on queue '
+                                     'entry with invalid host status %s: %s'
+                                     % (entry.host.status, entry))
+
+
 class TaskWithJobKeyvals(object):
     """AgentTask mixin providing functionality to help with job keyval 
files."""
     _KEYVAL_FILE = 'keyval'
@@ -1844,17 +1865,15 @@
                 source_path=self._working_directory() + '/',
                 destination_path=self.queue_entry.execution_path() + '/')
 
-        self._copy_results([self.queue_entry])
-
-        if not self.queue_entry.job.parse_failed_repair:
-            self.queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
-            return
-
         pidfile_id = _drone_manager.get_pidfile_id_from(
                 self.queue_entry.execution_path(),
                 pidfile_name=_AUTOSERV_PID_FILE)
         _drone_manager.register_pidfile(pidfile_id)
-        self._parse_results([self.queue_entry])
+
+        if self.queue_entry.job.parse_failed_repair:
+            self._parse_results([self.queue_entry])
+        else:
+            self._archive_results([self.queue_entry])
 
 
     def cleanup(self):
@@ -1995,6 +2014,55 @@
                 self.host.set_status(models.Host.Status.READY)
 
 
+class CleanupTask(PreJobTask):
+    # note this can also run post-job, but when it does, it's running 
standalone
+    # against the host (not related to the job), so it's not considered a
+    # PostJobTask
+
+    TASK_TYPE = models.SpecialTask.Task.CLEANUP
+
+
+    def __init__(self, task, recover_run_monitor=None):
+        super(CleanupTask, self).__init__(task, ['--cleanup'])
+        self._set_ids(host=self.host, queue_entries=[self.queue_entry])
+
+
+    def prolog(self):
+        super(CleanupTask, self).prolog()
+        logging.info("starting cleanup task for host: %s", self.host.hostname)
+        self.host.set_status(models.Host.Status.CLEANING)
+        if self.queue_entry:
+            self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
+
+
+    def _finish_epilog(self):
+        if not self.queue_entry or not self.success:
+            return
+
+        do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
+        should_run_verify = (
+                self.queue_entry.job.run_verify
+                and self.host.protection != do_not_verify_protection)
+        if should_run_verify:
+            entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
+            models.SpecialTask.objects.create(
+                    host=models.Host.objects.get(id=self.host.id),
+                    queue_entry=entry,
+                    task=models.SpecialTask.Task.VERIFY)
+        else:
+            self.queue_entry.on_pending()
+
+
+    def epilog(self):
+        super(CleanupTask, self).epilog()
+
+        if self.success:
+            self.host.update_field('dirty', 0)
+            self.host.set_status(models.Host.Status.READY)
+
+        self._finish_epilog()
+
+
 class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
     """
     Common functionality for QueueTask and HostlessQueueTask
@@ -2110,17 +2178,12 @@
 
 
     def prolog(self):
-        for entry in self.queue_entries:
-            if entry.status not in (models.HostQueueEntry.Status.STARTING,
-                                    models.HostQueueEntry.Status.RUNNING):
-                raise SchedulerError('Queue task attempting to start '
-                                     'entry with invalid status %s: %s'
-                                     % (entry.status, entry))
-            if entry.host.status not in (models.Host.Status.PENDING,
-                                         models.Host.Status.RUNNING):
-                raise SchedulerError('Queue task attempting to start on queue '
-                                     'entry with invalid host status %s: %s'
-                                     % (entry.host.status, entry))
+        self._check_queue_entry_statuses(
+                self.queue_entries,
+                allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
+                                      models.HostQueueEntry.Status.RUNNING),
+                allowed_host_statuses=(models.Host.Status.PENDING,
+                                       models.Host.Status.RUNNING))
 
         super(QueueTask, self).prolog()
 
@@ -2141,6 +2204,30 @@
             queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
 
 
+class HostlessQueueTask(AbstractQueueTask):
+    def __init__(self, queue_entry):
+        super(HostlessQueueTask, self).__init__([queue_entry])
+        self.queue_entry_ids = [queue_entry.id]
+
+
+    def prolog(self):
+        self.queue_entries[0].update_field('execution_subdir', 'hostless')
+        super(HostlessQueueTask, self).prolog()
+
+
+    def _final_status(self):
+        if self.queue_entries[0].aborted:
+            return models.HostQueueEntry.Status.ABORTED
+        if self.monitor.exit_code() == 0:
+            return models.HostQueueEntry.Status.COMPLETED
+        return models.HostQueueEntry.Status.FAILED
+
+
+    def _finish_task(self):
+        super(HostlessQueueTask, self)._finish_task()
+        self.queue_entries[0].set_status(self._final_status())
+
+
 class PostJobTask(AgentTask):
     def __init__(self, queue_entries, log_file_name):
         super(PostJobTask, self).__init__(log_file_name=log_file_name)
@@ -2213,6 +2300,11 @@
         pass
 
 
+    def _pidfile_label(self):
+        # '.autoserv_execute' -> 'autoserv'
+        return self._pidfile_name()[1:-len('_execute')]
+
+
 class GatherLogsTask(PostJobTask):
     """
     Task responsible for
@@ -2231,8 +2323,10 @@
     def _generate_command(self, results_dir):
         host_list = ','.join(queue_entry.host.hostname
                              for queue_entry in self.queue_entries)
-        return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
-                '-r', results_dir]
+        return [_autoserv_path , '-p',
+                '--pidfile-label=%s' % self._pidfile_label(),
+                '--use-existing-results', '--collect-crashinfo',
+                '-m', host_list, '-r', results_dir]
 
 
     @property
@@ -2245,23 +2339,17 @@
 
 
     def prolog(self):
-        for queue_entry in self.queue_entries:
-            if queue_entry.status != models.HostQueueEntry.Status.GATHERING:
-                raise SchedulerError('Gather task attempting to start on '
-                                     'non-gathering entry: %s' % queue_entry)
-            if queue_entry.host.status != models.Host.Status.RUNNING:
-                raise SchedulerError('Gather task attempting to start on queue 
'
-                                     'entry with non-running host status %s: 
%s'
-                                     % (queue_entry.host.status, queue_entry))
+        self._check_queue_entry_statuses(
+                self.queue_entries,
+                allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
+                allowed_host_statuses=(models.Host.Status.RUNNING,))
 
         super(GatherLogsTask, self).prolog()
 
 
     def epilog(self):
         super(GatherLogsTask, self).epilog()
-
-        self._copy_and_parse_results(self.queue_entries,
-                                     use_monitor=self._autoserv_monitor)
+        self._parse_results(self.queue_entries)
         self._reboot_hosts()
 
 
@@ -2304,54 +2392,67 @@
             self.finished(True)
 
 
-class CleanupTask(PreJobTask):
-    TASK_TYPE = models.SpecialTask.Task.CLEANUP
+class SelfThrottledPostJobTask(PostJobTask):
+    """
+    Special AgentTask subclass that maintains its own global process limit.
+    """
+    _num_running_processes = 0
 
 
-    def __init__(self, task, recover_run_monitor=None):
-        super(CleanupTask, self).__init__(task, ['--cleanup'])
-        self._set_ids(host=self.host, queue_entries=[self.queue_entry])
+    @classmethod
+    def _increment_running_processes(cls):
+        cls._num_running_processes += 1
 
 
-    def prolog(self):
-        super(CleanupTask, self).prolog()
-        logging.info("starting cleanup task for host: %s", self.host.hostname)
-        self.host.set_status(models.Host.Status.CLEANING)
-        if self.queue_entry:
-            self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
+    @classmethod
+    def _decrement_running_processes(cls):
+        cls._num_running_processes -= 1
 
 
-    def _finish_epilog(self):
-        if not self.queue_entry or not self.success:
-            return
+    @classmethod
+    def _max_processes(cls):
+        raise NotImplementedError
 
-        do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
-        should_run_verify = (
-                self.queue_entry.job.run_verify
-                and self.host.protection != do_not_verify_protection)
-        if should_run_verify:
-            entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
-            models.SpecialTask.objects.create(
-                    host=models.Host.objects.get(id=self.host.id),
-                    queue_entry=entry,
-                    task=models.SpecialTask.Task.VERIFY)
+
+    @classmethod
+    def _can_run_new_process(cls):
+        return cls._num_running_processes < cls._max_processes()
+
+
+    def _process_started(self):
+        return bool(self.monitor)
+
+
+    def tick(self):
+        # override tick to keep trying to start until the process count goes
+        # down and we can, at which point we revert to default behavior
+        if self._process_started():
+            super(SelfThrottledPostJobTask, self).tick()
         else:
-            self.queue_entry.on_pending()
+            self._try_starting_process()
 
 
-    def epilog(self):
-        super(CleanupTask, self).epilog()
+    def run(self):
+        # override run() to not actually run unless we can
+        self._try_starting_process()
 
-        if self.success:
-            self.host.update_field('dirty', 0)
-            self.host.set_status(models.Host.Status.READY)
 
-        self._finish_epilog()
+    def _try_starting_process(self):
+        if not self._can_run_new_process():
+            return
+
+        # actually run the command
+        super(SelfThrottledPostJobTask, self).run()
+        self._increment_running_processes()
+
 
+    def finished(self, success):
+        super(SelfThrottledPostJobTask, self).finished(success)
+        if self._process_started():
+            self._decrement_running_processes()
 
-class FinalReparseTask(PostJobTask):
-    _num_running_parses = 0
 
+class FinalReparseTask(SelfThrottledPostJobTask):
     def __init__(self, queue_entries):
         super(FinalReparseTask, self).__init__(queue_entries,
                                                log_file_name='.parse.log')
@@ -2360,7 +2461,7 @@
 
 
     def _generate_command(self, results_dir):
-        return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
+        return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
                 results_dir]
 
 
@@ -2373,91 +2474,59 @@
         return _PARSER_PID_FILE
 
 
-    def _parse_started(self):
-        return bool(self.monitor)
-
-
-    @classmethod
-    def _increment_running_parses(cls):
-        cls._num_running_parses += 1
-
-
-    @classmethod
-    def _decrement_running_parses(cls):
-        cls._num_running_parses -= 1
-
-
     @classmethod
-    def _can_run_new_parse(cls):
-        return (cls._num_running_parses <
-                scheduler_config.config.max_parse_processes)
+    def _max_processes(cls):
+        return scheduler_config.config.max_parse_processes
 
 
     def prolog(self):
-        for queue_entry in self.queue_entries:
-            if queue_entry.status != models.HostQueueEntry.Status.PARSING:
-                raise SchedulerError('Parse task attempting to start on '
-                                     'non-parsing entry: %s' % queue_entry)
+        self._check_queue_entry_statuses(
+                self.queue_entries,
+                allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
 
         super(FinalReparseTask, self).prolog()
 
 
     def epilog(self):
         super(FinalReparseTask, self).epilog()
-        self._set_all_statuses(self._final_status())
+        self._archive_results(self.queue_entries)
 
 
-    def tick(self):
-        # override tick to keep trying to start until the parse count goes down
-        # and we can, at which point we revert to default behavior
-        if self._parse_started():
-            super(FinalReparseTask, self).tick()
-        else:
-            self._try_starting_parse()
-
-
-    def run(self):
-        # override run() to not actually run unless we can
-        self._try_starting_parse()
-
+class ArchiveResultsTask(SelfThrottledPostJobTask):
+    def __init__(self, queue_entries):
+        super(ArchiveResultsTask, self).__init__(queue_entries,
+                                                 
log_file_name='.archiving.log')
+        # don't use _set_ids, since we don't want to set the host_ids
+        self.queue_entry_ids = [entry.id for entry in queue_entries]
 
-    def _try_starting_parse(self):
-        if not self._can_run_new_parse():
-            return
 
-        # actually run the parse command
-        super(FinalReparseTask, self).run()
-        self._increment_running_parses()
+    def _pidfile_name(self):
+        return _ARCHIVER_PID_FILE
 
 
-    def finished(self, success):
-        super(FinalReparseTask, self).finished(success)
-        if self._parse_started():
-            self._decrement_running_parses()
+    def _generate_command(self, results_dir):
+        return [_autoserv_path , '-p',
+                '--pidfile-label=%s' % self._pidfile_label(), '-r', 
results_dir,
+                '--use-existing-results',
+                os.path.join('..', 'scheduler', 'archive_results.control.srv')]
 
 
-class HostlessQueueTask(AbstractQueueTask):
-    def __init__(self, queue_entry):
-        super(HostlessQueueTask, self).__init__([queue_entry])
-        self.queue_entry_ids = [queue_entry.id]
+    @classmethod
+    def _max_processes(cls):
+        return scheduler_config.config.max_transfer_processes
 
 
     def prolog(self):
-        self.queue_entries[0].update_field('execution_subdir', 'hostless')
-        super(HostlessQueueTask, self).prolog()
+        self._check_queue_entry_statuses(
+                self.queue_entries,
+                allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
 
-
-    def _final_status(self):
-        if self.queue_entries[0].aborted:
-            return models.HostQueueEntry.Status.ABORTED
-        if self.monitor.exit_code() == 0:
-            return models.HostQueueEntry.Status.COMPLETED
-        return models.HostQueueEntry.Status.FAILED
+        super(ArchiveResultsTask, self).prolog()
 
 
-    def _finish_task(self):
-        super(HostlessQueueTask, self)._finish_task()
-        self.queue_entries[0].set_status(self._final_status())
+    def epilog(self):
+        super(ArchiveResultsTask, self).epilog()
+        self._set_all_statuses(self._final_status())
 
 
 class DBError(Exception):
@@ -2899,34 +2968,22 @@
 
         self.update_field('status', status)
 
-        if status in (models.HostQueueEntry.Status.QUEUED,
-                      models.HostQueueEntry.Status.PARSING):
-            self.update_field('complete', False)
-            self.update_field('active', False)
-
-        if status in (models.HostQueueEntry.Status.PENDING,
-                      models.HostQueueEntry.Status.RUNNING,
-                      models.HostQueueEntry.Status.VERIFYING,
-                      models.HostQueueEntry.Status.STARTING,
-                      models.HostQueueEntry.Status.GATHERING):
-            self.update_field('complete', False)
-            self.update_field('active', True)
+        active = (status in models.HostQueueEntry.ACTIVE_STATUSES)
+        complete = (status in models.HostQueueEntry.COMPLETE_STATUSES)
+        assert not (active and complete)
+
+        self.update_field('active', active)
+        self.update_field('complete', complete)
 
-        if status in (models.HostQueueEntry.Status.FAILED,
-                      models.HostQueueEntry.Status.COMPLETED,
-                      models.HostQueueEntry.Status.STOPPED,
-                      models.HostQueueEntry.Status.ABORTED):
-            self.update_field('complete', True)
-            self.update_field('active', False)
+        if complete:
             self._on_complete()
+            self._email_on_job_complete()
 
         should_email_status = (status.lower() in _notify_email_statuses or
                                'all' in _notify_email_statuses)
         if should_email_status:
             self._email_on_status(status)
 
-        self._email_on_job_complete()
-
 
     def _on_complete(self):
         self.job.stop_if_necessary()
@@ -3063,7 +3120,7 @@
         assert self.aborted and not self.complete
 
         Status = models.HostQueueEntry.Status
-        if self.status in (Status.GATHERING, Status.PARSING):
+        if self.status in (Status.GATHERING, Status.PARSING, Status.ARCHIVING):
             # do nothing; post-job tasks will finish and then mark this entry
             # with status "Aborted" and take care of the host
             return
--- autotest/scheduler/monitor_db_functional_test.py    2009-12-30 
15:31:05.000000000 -0800
+++ autotest/scheduler/monitor_db_functional_test.py    2009-12-30 
15:31:05.000000000 -0800
@@ -492,6 +492,8 @@
         self.mock_drone_manager.finish_process(_PidfileType.CLEANUP,
                                                exit_status=256)
         self._run_dispatcher() # repair, HQE unaffected
+        self.mock_drone_manager.finish_process(_PidfileType.ARCHIVE)
+        self._run_dispatcher()
         return queue_entry
 
 
@@ -537,7 +539,7 @@
         self.mock_drone_manager.finish_process(_PidfileType.PARSE)
         self._run_dispatcher()
 
-        self._check_statuses(queue_entry, HqeStatus.ARCHIVING, 
HostStatus.READY)
+        self._check_entry_status(queue_entry, HqeStatus.ARCHIVING)
         self.mock_drone_manager.finish_process(_PidfileType.ARCHIVE)
         self._run_dispatcher()
 
--- autotest/server/autoserv    2009-12-29 14:16:16.000000000 -0800
+++ autotest/server/autoserv    2009-12-30 15:31:05.000000000 -0800
@@ -127,6 +127,7 @@
                                 ssh_user, ssh_port, ssh_pass,
                                 group_name=group_name, tag=execution_tag)
     job.logging.start_logging()
+    job.init_parser()
 
     # perform checks
     job.precheck()
@@ -175,7 +176,7 @@
             results = 'results.' + time.strftime('%Y-%m-%d-%H.%M.%S')
         results  = os.path.abspath(results)
         resultdir_exists = os.path.exists(os.path.join(results, 'control.srv'))
-        if not parser.options.collect_crashinfo and resultdir_exists:
+        if not parser.options.use_existing_results and resultdir_exists:
             error = "Error: results directory already exists: %s\n" % results
             sys.stderr.write(error)
             sys.exit(1)
@@ -194,12 +195,15 @@
     if results:
         logging.info("Results placed in %s" % results)
 
+        # wait until now to perform this check, so it get properly logged
+        if parser.options.use_existing_results and not resultdir_exists:
+            logging.error("No existing results directory found: %s", results)
+            sys.exit(1)
+
+
     if parser.options.write_pidfile:
-        if parser.options.collect_crashinfo:
-            pidfile_label = 'collect_crashinfo'
-        else:
-            pidfile_label = 'autoserv'
-        pid_file_manager = pidfile.PidFileManager(pidfile_label, results)
+        pid_file_manager = pidfile.PidFileManager(parser.options.pidfile_label,
+                                                  results)
         pid_file_manager.open_file()
     else:
         pid_file_manager = None
--- autotest/server/autoserv_parser.py  2009-12-30 15:31:05.000000000 -0800
+++ autotest/server/autoserv_parser.py  2009-12-30 15:31:05.000000000 -0800
@@ -87,7 +87,16 @@
                                'output')
         self.parser.add_option("-p", "--write-pidfile", action="store_true",
                                dest="write_pidfile", default=False,
-                               help="write pidfile (.autoserv_execute)")
+                               help="write pidfile (pidfile name is determined 
"
+                                    "by --pidfile-label")
+        self.parser.add_option("--pidfile-label", action="store",
+                               default="autoserv",
+                               help="Determines filename to use as pidfile (if 
"
+                                    "-p is specified).  Pidfile will be "
+                                    ".<label>_execute.  Default to autoserv.")
+        self.parser.add_option("--use-existing-results", action="store_true",
+                               help="Indicates that autoserv is working with "
+                                    "an existing results directory")
         self.parser.add_option("-a", "--args", dest='args',
                                help="additional args to pass to control file")
         protection_levels = [host_protections.Protection.get_attr_name(s)
--- autotest/server/server_job.py       2009-12-30 15:31:05.000000000 -0800
+++ autotest/server/server_job.py       2009-12-30 15:31:05.000000000 -0800
@@ -135,11 +135,7 @@
             utils.write_keyval(self.resultdir, job_data)
 
         self._parse_job = parse_job
-        if self._parse_job and len(machines) == 1:
-            self._using_parser = True
-            self.init_parser(self.resultdir)
-        else:
-            self._using_parser = False
+        self._using_parser = (self._parse_job and len(machines) == 1)
         self.pkgmgr = packages.PackageManager(
             self.autodir, run_function_dargs={'timeout':600})
         self.num_tests_run = 0
@@ -201,20 +197,22 @@
         subcommand.subcommand.register_join_hook(on_join)
 
 
-    def init_parser(self, resultdir):
+    def init_parser(self):
         """
-        Start the continuous parsing of resultdir. This sets up
+        Start the continuous parsing of self.resultdir. This sets up
         the database connection and inserts the basic job object into
         the database if necessary.
         """
+        if not self._using_parser:
+            return
         # redirect parser debugging to .parse.log
-        parse_log = os.path.join(resultdir, '.parse.log')
+        parse_log = os.path.join(self.resultdir, '.parse.log')
         parse_log = open(parse_log, 'w', 0)
         tko_utils.redirect_parser_debugging(parse_log)
         # create a job model object and set up the db
         self.results_db = tko_db.db(autocommit=True)
         self.parser = status_lib.parser(self._STATUS_VERSION)
-        self.job_model = self.parser.make_job(resultdir)
+        self.job_model = self.parser.make_job(self.resultdir)
         self.parser.start(self.job_model)
         # check if a job already exists in the db and insert it if
         # it does not
@@ -310,7 +308,7 @@
                 self.push_execution_context(machine)
                 os.chdir(self.resultdir)
                 utils.write_keyval(self.resultdir, {"hostname": machine})
-                self.init_parser(self.resultdir)
+                self.init_parser()
                 result = function(machine)
                 self.cleanup_parser()
                 return result
--- autotest/tko/parse.py       2009-12-30 15:31:05.000000000 -0800
+++ autotest/tko/parse.py       2009-12-30 15:31:05.000000000 -0800
@@ -30,8 +30,6 @@
                       action="store")
     parser.add_option("-d", help="Database name", dest="db_name",
                       action="store")
-    parser.add_option("-P", help="Run site post-processing",
-                      dest="site_do_post", action="store_true", default=False)
     parser.add_option("--write-pidfile",
                       help="write pidfile (.parser_execute)",
                       dest="write_pidfile", action="store_true",
@@ -215,10 +213,6 @@
         parse_leaf_path(db, path, level, reparse, mail_on_failure)
 
 
-def _site_post_parse_job_dummy():
-    return {}
-
-
 def main():
     options, args = parse_args()
     results_dir = os.path.abspath(args[0])
@@ -229,10 +223,6 @@
     if options.write_pidfile:
         pid_file_manager.open_file()
 
-    site_post_parse_job = utils.import_site_function(__file__,
-        "autotest_lib.tko.site_parse", "site_post_parse_job",
-        _site_post_parse_job_dummy)
-
     try:
         # build up the list of job dirs to parse
         if options.singledir:
@@ -269,9 +259,6 @@
                 fcntl.flock(lockfile, fcntl.LOCK_UN)
                 lockfile.close()
 
-        if options.site_do_post is True:
-            site_post_parse_job(results_dir)
-
     except:
         pid_file_manager.close_file(1)
         raise
==== (deleted) //depot/google_vendor_src_branch/autotest/tko/site_parse.py ====
_______________________________________________
Autotest mailing list
[email protected]
http://test.kernel.org/cgi-bin/mailman/listinfo/autotest

Reply via email to