Add a new Archiving stage to the scheduler, which runs after Parsing. This stage is responsible for copying results to the results server in a drone setup, a task currently performed directly by the scheduler, and allows for site-specific archiving functionality, replacing the site_parse functionality. It does this by running autoserv with a special control file (scheduler/archive_results.control.srv), which loads and runs code from the new scheduler.archive_results module. The implementation was mostly straightfoward, as the archiving stage is fully analogous to the parser stage. I did make a couple of refactorings: * factored out the parser throttling code into a common superclass that the ArchiveResultsTask could share * added some generic flags to Autoserv to duplicate special-case functionality we'd added for the --collect-crashinfo option -- namely, specifying a different pidfile name and specifying that autoserv should allow (and even expect) an existing results directory. in the future, i think it'd be more elegant to make crashinfo collection run using a special control file (as archiving works), rather than a hard-coded command-line option. * moved call to server_job.init_parser() out of the constructor, since this was an easy source of exceptions that wouldn't get logged.
Note I believe some of the functional test changes slipped into my previous change there, which is why that looks smaller than you'd expect. Signed-off-by: Steve Howard <[email protected]> --- autotest/client/common_lib/host_queue_entry_states.py 2009-12-30 15:31:05.000000000 -0800 +++ autotest/client/common_lib/host_queue_entry_states.py 2009-12-30 15:31:05.000000000 -0800 @@ -7,9 +7,9 @@ from autotest_lib.client.common_lib import enum -Status = enum.Enum('Queued', 'Starting', 'Verifying', 'Pending', 'Running', - 'Gathering', 'Parsing', 'Aborted', 'Completed', - 'Failed', 'Stopped', 'Template', 'Waiting', +Status = enum.Enum('Queued', 'Starting', 'Verifying', 'Pending', 'Waiting', + 'Running', 'Gathering', 'Parsing', 'Archiving', 'Aborted', + 'Completed', 'Failed', 'Stopped', 'Template', string_values=True) ACTIVE_STATUSES = (Status.STARTING, Status.VERIFYING, Status.PENDING, Status.RUNNING, Status.GATHERING) --- autotest/frontend/afe/rpc_interface.py 2009-12-30 15:31:05.000000000 -0800 +++ autotest/frontend/afe/rpc_interface.py 2009-12-30 15:31:05.000000000 -0800 @@ -836,7 +836,8 @@ "Parsing": "Awaiting parse of final results", "Gathering": "Gathering log files", "Template": "Template job for recurring run", - "Waiting": "Waiting for scheduler action"} + "Waiting": "Waiting for scheduler action", + "Archiving": "Archiving results"} return result --- /dev/null 2009-12-17 12:29:38.000000000 -0800 +++ autotest/scheduler/archive_results.control.srv 2009-12-30 15:31:05.000000000 -0800 @@ -0,0 +1,4 @@ +from autotest_lib.scheduler import archive_results + +archiver = archive_results.ResultsArchiver() +archiver.archive_results(job.resultdir) --- /dev/null 2009-12-17 12:29:38.000000000 -0800 +++ autotest/scheduler/archive_results.py 2009-12-30 15:31:05.000000000 -0800 @@ -0,0 +1,21 @@ +#!/usr/bin/python + +import common, logging +from autotest_lib.client.common_lib import global_config, utils +from autotest_lib.scheduler import drone_utility + +class BaseResultsArchiver(object): + def archive_results(self, path): + results_host = global_config.global_config.get_config_value( + 'SCHEDULER', 'results_host', default=None) + if not results_host: + return + + logging.info('Archiving %s to %s', path, results_host) + utility = drone_utility.DroneUtility() + utility.sync_send_file_to(results_host, path, path, can_fail=True) + + +ResultsArchiver = utils.import_site_class( + __file__, 'autotest_lib.scheduler.site_archive_results', + 'SiteResultsArchiver', BaseResultsArchiver) --- autotest/scheduler/drone_utility.py 2009-12-30 15:31:05.000000000 -0800 +++ autotest/scheduler/drone_utility.py 2009-12-30 15:31:05.000000000 -0800 @@ -322,7 +322,7 @@ (hostname, source_path, destination_path)) - def _sync_send_file_to(self, hostname, source_path, destination_path, + def sync_send_file_to(self, hostname, source_path, destination_path, can_fail): host = create_host(hostname) try: @@ -350,7 +350,7 @@ def send_file_to(self, hostname, source_path, destination_path, can_fail=False): - self.run_async_command(self._sync_send_file_to, + self.run_async_command(self.sync_send_file_to, (hostname, source_path, destination_path, can_fail)) --- autotest/scheduler/monitor_db.py 2009-12-30 15:31:05.000000000 -0800 +++ autotest/scheduler/monitor_db.py 2009-12-30 15:31:05.000000000 -0800 @@ -45,9 +45,10 @@ _AUTOSERV_PID_FILE = '.autoserv_execute' _CRASHINFO_PID_FILE = '.collect_crashinfo_execute' _PARSER_PID_FILE = '.parser_execute' +_ARCHIVER_PID_FILE = '.archiver_execute' _ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE, - _PARSER_PID_FILE) + _PARSER_PID_FILE, _ARCHIVER_PID_FILE) # error message to leave in results dir when an autoserv process disappears # mysteriously @@ -766,7 +767,8 @@ statuses = (models.HostQueueEntry.Status.STARTING, models.HostQueueEntry.Status.RUNNING, models.HostQueueEntry.Status.GATHERING, - models.HostQueueEntry.Status.PARSING) + models.HostQueueEntry.Status.PARSING, + models.HostQueueEntry.Status.ARCHIVING) status_list = ','.join("'%s'" % status for status in statuses) queue_entries = HostQueueEntry.fetch( where='status IN (%s)' % status_list) @@ -812,16 +814,19 @@ return GatherLogsTask(queue_entries=task_entries) if queue_entry.status == models.HostQueueEntry.Status.PARSING: return FinalReparseTask(queue_entries=task_entries) + if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING: + return ArchiveResultsTask(queue_entries=task_entries) raise SchedulerError('_get_agent_task_for_queue_entry got entry with ' 'invalid status %s: %s' % (entry.status, entry)) def _check_for_duplicate_host_entries(self, task_entries): - parsing_status = models.HostQueueEntry.Status.PARSING + non_host_statuses = (models.HostQueueEntry.Status.PARSING, + models.HostQueueEntry.Status.ARCHIVING) for task_entry in task_entries: using_host = (task_entry.host is not None - and task_entry.status != parsing_status) + and task_entry.status not in non_host_statuses) if using_host: self._assert_host_has_no_agent(task_entry) @@ -1607,9 +1612,9 @@ queue_entry.set_status(models.HostQueueEntry.Status.PARSING) - def _copy_and_parse_results(self, queue_entries, use_monitor=None): - self._copy_results(queue_entries, use_monitor) - self._parse_results(queue_entries) + def _archive_results(self, queue_entries): + for queue_entry in queue_entries: + queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING) def _command_line(self): @@ -1721,6 +1726,22 @@ self._working_directory())) + def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses, + allowed_host_statuses=None): + for entry in queue_entries: + if entry.status not in allowed_hqe_statuses: + raise SchedulerError('Queue task attempting to start ' + 'entry with invalid status %s: %s' + % (entry.status, entry)) + invalid_host_status = ( + allowed_host_statuses is not None + and entry.host.status not in allowed_host_statuses) + if invalid_host_status: + raise SchedulerError('Queue task attempting to start on queue ' + 'entry with invalid host status %s: %s' + % (entry.host.status, entry)) + + class TaskWithJobKeyvals(object): """AgentTask mixin providing functionality to help with job keyval files.""" _KEYVAL_FILE = 'keyval' @@ -1844,17 +1865,15 @@ source_path=self._working_directory() + '/', destination_path=self.queue_entry.execution_path() + '/') - self._copy_results([self.queue_entry]) - - if not self.queue_entry.job.parse_failed_repair: - self.queue_entry.set_status(models.HostQueueEntry.Status.FAILED) - return - pidfile_id = _drone_manager.get_pidfile_id_from( self.queue_entry.execution_path(), pidfile_name=_AUTOSERV_PID_FILE) _drone_manager.register_pidfile(pidfile_id) - self._parse_results([self.queue_entry]) + + if self.queue_entry.job.parse_failed_repair: + self._parse_results([self.queue_entry]) + else: + self._archive_results([self.queue_entry]) def cleanup(self): @@ -1995,6 +2014,55 @@ self.host.set_status(models.Host.Status.READY) +class CleanupTask(PreJobTask): + # note this can also run post-job, but when it does, it's running standalone + # against the host (not related to the job), so it's not considered a + # PostJobTask + + TASK_TYPE = models.SpecialTask.Task.CLEANUP + + + def __init__(self, task, recover_run_monitor=None): + super(CleanupTask, self).__init__(task, ['--cleanup']) + self._set_ids(host=self.host, queue_entries=[self.queue_entry]) + + + def prolog(self): + super(CleanupTask, self).prolog() + logging.info("starting cleanup task for host: %s", self.host.hostname) + self.host.set_status(models.Host.Status.CLEANING) + if self.queue_entry: + self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING) + + + def _finish_epilog(self): + if not self.queue_entry or not self.success: + return + + do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY + should_run_verify = ( + self.queue_entry.job.run_verify + and self.host.protection != do_not_verify_protection) + if should_run_verify: + entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id) + models.SpecialTask.objects.create( + host=models.Host.objects.get(id=self.host.id), + queue_entry=entry, + task=models.SpecialTask.Task.VERIFY) + else: + self.queue_entry.on_pending() + + + def epilog(self): + super(CleanupTask, self).epilog() + + if self.success: + self.host.update_field('dirty', 0) + self.host.set_status(models.Host.Status.READY) + + self._finish_epilog() + + class AbstractQueueTask(AgentTask, TaskWithJobKeyvals): """ Common functionality for QueueTask and HostlessQueueTask @@ -2110,17 +2178,12 @@ def prolog(self): - for entry in self.queue_entries: - if entry.status not in (models.HostQueueEntry.Status.STARTING, - models.HostQueueEntry.Status.RUNNING): - raise SchedulerError('Queue task attempting to start ' - 'entry with invalid status %s: %s' - % (entry.status, entry)) - if entry.host.status not in (models.Host.Status.PENDING, - models.Host.Status.RUNNING): - raise SchedulerError('Queue task attempting to start on queue ' - 'entry with invalid host status %s: %s' - % (entry.host.status, entry)) + self._check_queue_entry_statuses( + self.queue_entries, + allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING, + models.HostQueueEntry.Status.RUNNING), + allowed_host_statuses=(models.Host.Status.PENDING, + models.Host.Status.RUNNING)) super(QueueTask, self).prolog() @@ -2141,6 +2204,30 @@ queue_entry.set_status(models.HostQueueEntry.Status.GATHERING) +class HostlessQueueTask(AbstractQueueTask): + def __init__(self, queue_entry): + super(HostlessQueueTask, self).__init__([queue_entry]) + self.queue_entry_ids = [queue_entry.id] + + + def prolog(self): + self.queue_entries[0].update_field('execution_subdir', 'hostless') + super(HostlessQueueTask, self).prolog() + + + def _final_status(self): + if self.queue_entries[0].aborted: + return models.HostQueueEntry.Status.ABORTED + if self.monitor.exit_code() == 0: + return models.HostQueueEntry.Status.COMPLETED + return models.HostQueueEntry.Status.FAILED + + + def _finish_task(self): + super(HostlessQueueTask, self)._finish_task() + self.queue_entries[0].set_status(self._final_status()) + + class PostJobTask(AgentTask): def __init__(self, queue_entries, log_file_name): super(PostJobTask, self).__init__(log_file_name=log_file_name) @@ -2213,6 +2300,11 @@ pass + def _pidfile_label(self): + # '.autoserv_execute' -> 'autoserv' + return self._pidfile_name()[1:-len('_execute')] + + class GatherLogsTask(PostJobTask): """ Task responsible for @@ -2231,8 +2323,10 @@ def _generate_command(self, results_dir): host_list = ','.join(queue_entry.host.hostname for queue_entry in self.queue_entries) - return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list, - '-r', results_dir] + return [_autoserv_path , '-p', + '--pidfile-label=%s' % self._pidfile_label(), + '--use-existing-results', '--collect-crashinfo', + '-m', host_list, '-r', results_dir] @property @@ -2245,23 +2339,17 @@ def prolog(self): - for queue_entry in self.queue_entries: - if queue_entry.status != models.HostQueueEntry.Status.GATHERING: - raise SchedulerError('Gather task attempting to start on ' - 'non-gathering entry: %s' % queue_entry) - if queue_entry.host.status != models.Host.Status.RUNNING: - raise SchedulerError('Gather task attempting to start on queue ' - 'entry with non-running host status %s: %s' - % (queue_entry.host.status, queue_entry)) + self._check_queue_entry_statuses( + self.queue_entries, + allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,), + allowed_host_statuses=(models.Host.Status.RUNNING,)) super(GatherLogsTask, self).prolog() def epilog(self): super(GatherLogsTask, self).epilog() - - self._copy_and_parse_results(self.queue_entries, - use_monitor=self._autoserv_monitor) + self._parse_results(self.queue_entries) self._reboot_hosts() @@ -2304,54 +2392,67 @@ self.finished(True) -class CleanupTask(PreJobTask): - TASK_TYPE = models.SpecialTask.Task.CLEANUP +class SelfThrottledPostJobTask(PostJobTask): + """ + Special AgentTask subclass that maintains its own global process limit. + """ + _num_running_processes = 0 - def __init__(self, task, recover_run_monitor=None): - super(CleanupTask, self).__init__(task, ['--cleanup']) - self._set_ids(host=self.host, queue_entries=[self.queue_entry]) + @classmethod + def _increment_running_processes(cls): + cls._num_running_processes += 1 - def prolog(self): - super(CleanupTask, self).prolog() - logging.info("starting cleanup task for host: %s", self.host.hostname) - self.host.set_status(models.Host.Status.CLEANING) - if self.queue_entry: - self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING) + @classmethod + def _decrement_running_processes(cls): + cls._num_running_processes -= 1 - def _finish_epilog(self): - if not self.queue_entry or not self.success: - return + @classmethod + def _max_processes(cls): + raise NotImplementedError - do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY - should_run_verify = ( - self.queue_entry.job.run_verify - and self.host.protection != do_not_verify_protection) - if should_run_verify: - entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id) - models.SpecialTask.objects.create( - host=models.Host.objects.get(id=self.host.id), - queue_entry=entry, - task=models.SpecialTask.Task.VERIFY) + + @classmethod + def _can_run_new_process(cls): + return cls._num_running_processes < cls._max_processes() + + + def _process_started(self): + return bool(self.monitor) + + + def tick(self): + # override tick to keep trying to start until the process count goes + # down and we can, at which point we revert to default behavior + if self._process_started(): + super(SelfThrottledPostJobTask, self).tick() else: - self.queue_entry.on_pending() + self._try_starting_process() - def epilog(self): - super(CleanupTask, self).epilog() + def run(self): + # override run() to not actually run unless we can + self._try_starting_process() - if self.success: - self.host.update_field('dirty', 0) - self.host.set_status(models.Host.Status.READY) - self._finish_epilog() + def _try_starting_process(self): + if not self._can_run_new_process(): + return + + # actually run the command + super(SelfThrottledPostJobTask, self).run() + self._increment_running_processes() + + def finished(self, success): + super(SelfThrottledPostJobTask, self).finished(success) + if self._process_started(): + self._decrement_running_processes() -class FinalReparseTask(PostJobTask): - _num_running_parses = 0 +class FinalReparseTask(SelfThrottledPostJobTask): def __init__(self, queue_entries): super(FinalReparseTask, self).__init__(queue_entries, log_file_name='.parse.log') @@ -2360,7 +2461,7 @@ def _generate_command(self, results_dir): - return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P', + return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', results_dir] @@ -2373,91 +2474,59 @@ return _PARSER_PID_FILE - def _parse_started(self): - return bool(self.monitor) - - - @classmethod - def _increment_running_parses(cls): - cls._num_running_parses += 1 - - - @classmethod - def _decrement_running_parses(cls): - cls._num_running_parses -= 1 - - @classmethod - def _can_run_new_parse(cls): - return (cls._num_running_parses < - scheduler_config.config.max_parse_processes) + def _max_processes(cls): + return scheduler_config.config.max_parse_processes def prolog(self): - for queue_entry in self.queue_entries: - if queue_entry.status != models.HostQueueEntry.Status.PARSING: - raise SchedulerError('Parse task attempting to start on ' - 'non-parsing entry: %s' % queue_entry) + self._check_queue_entry_statuses( + self.queue_entries, + allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,)) super(FinalReparseTask, self).prolog() def epilog(self): super(FinalReparseTask, self).epilog() - self._set_all_statuses(self._final_status()) + self._archive_results(self.queue_entries) - def tick(self): - # override tick to keep trying to start until the parse count goes down - # and we can, at which point we revert to default behavior - if self._parse_started(): - super(FinalReparseTask, self).tick() - else: - self._try_starting_parse() - - - def run(self): - # override run() to not actually run unless we can - self._try_starting_parse() - +class ArchiveResultsTask(SelfThrottledPostJobTask): + def __init__(self, queue_entries): + super(ArchiveResultsTask, self).__init__(queue_entries, + log_file_name='.archiving.log') + # don't use _set_ids, since we don't want to set the host_ids + self.queue_entry_ids = [entry.id for entry in queue_entries] - def _try_starting_parse(self): - if not self._can_run_new_parse(): - return - # actually run the parse command - super(FinalReparseTask, self).run() - self._increment_running_parses() + def _pidfile_name(self): + return _ARCHIVER_PID_FILE - def finished(self, success): - super(FinalReparseTask, self).finished(success) - if self._parse_started(): - self._decrement_running_parses() + def _generate_command(self, results_dir): + return [_autoserv_path , '-p', + '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir, + '--use-existing-results', + os.path.join('..', 'scheduler', 'archive_results.control.srv')] -class HostlessQueueTask(AbstractQueueTask): - def __init__(self, queue_entry): - super(HostlessQueueTask, self).__init__([queue_entry]) - self.queue_entry_ids = [queue_entry.id] + @classmethod + def _max_processes(cls): + return scheduler_config.config.max_transfer_processes def prolog(self): - self.queue_entries[0].update_field('execution_subdir', 'hostless') - super(HostlessQueueTask, self).prolog() + self._check_queue_entry_statuses( + self.queue_entries, + allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,)) - - def _final_status(self): - if self.queue_entries[0].aborted: - return models.HostQueueEntry.Status.ABORTED - if self.monitor.exit_code() == 0: - return models.HostQueueEntry.Status.COMPLETED - return models.HostQueueEntry.Status.FAILED + super(ArchiveResultsTask, self).prolog() - def _finish_task(self): - super(HostlessQueueTask, self)._finish_task() - self.queue_entries[0].set_status(self._final_status()) + def epilog(self): + super(ArchiveResultsTask, self).epilog() + self._set_all_statuses(self._final_status()) class DBError(Exception): @@ -2899,34 +2968,22 @@ self.update_field('status', status) - if status in (models.HostQueueEntry.Status.QUEUED, - models.HostQueueEntry.Status.PARSING): - self.update_field('complete', False) - self.update_field('active', False) - - if status in (models.HostQueueEntry.Status.PENDING, - models.HostQueueEntry.Status.RUNNING, - models.HostQueueEntry.Status.VERIFYING, - models.HostQueueEntry.Status.STARTING, - models.HostQueueEntry.Status.GATHERING): - self.update_field('complete', False) - self.update_field('active', True) + active = (status in models.HostQueueEntry.ACTIVE_STATUSES) + complete = (status in models.HostQueueEntry.COMPLETE_STATUSES) + assert not (active and complete) + + self.update_field('active', active) + self.update_field('complete', complete) - if status in (models.HostQueueEntry.Status.FAILED, - models.HostQueueEntry.Status.COMPLETED, - models.HostQueueEntry.Status.STOPPED, - models.HostQueueEntry.Status.ABORTED): - self.update_field('complete', True) - self.update_field('active', False) + if complete: self._on_complete() + self._email_on_job_complete() should_email_status = (status.lower() in _notify_email_statuses or 'all' in _notify_email_statuses) if should_email_status: self._email_on_status(status) - self._email_on_job_complete() - def _on_complete(self): self.job.stop_if_necessary() @@ -3063,7 +3120,7 @@ assert self.aborted and not self.complete Status = models.HostQueueEntry.Status - if self.status in (Status.GATHERING, Status.PARSING): + if self.status in (Status.GATHERING, Status.PARSING, Status.ARCHIVING): # do nothing; post-job tasks will finish and then mark this entry # with status "Aborted" and take care of the host return --- autotest/scheduler/monitor_db_functional_test.py 2009-12-30 15:31:05.000000000 -0800 +++ autotest/scheduler/monitor_db_functional_test.py 2009-12-30 15:31:05.000000000 -0800 @@ -492,6 +492,8 @@ self.mock_drone_manager.finish_process(_PidfileType.CLEANUP, exit_status=256) self._run_dispatcher() # repair, HQE unaffected + self.mock_drone_manager.finish_process(_PidfileType.ARCHIVE) + self._run_dispatcher() return queue_entry @@ -537,7 +539,7 @@ self.mock_drone_manager.finish_process(_PidfileType.PARSE) self._run_dispatcher() - self._check_statuses(queue_entry, HqeStatus.ARCHIVING, HostStatus.READY) + self._check_entry_status(queue_entry, HqeStatus.ARCHIVING) self.mock_drone_manager.finish_process(_PidfileType.ARCHIVE) self._run_dispatcher() --- autotest/server/autoserv 2009-12-29 14:16:16.000000000 -0800 +++ autotest/server/autoserv 2009-12-30 15:31:05.000000000 -0800 @@ -127,6 +127,7 @@ ssh_user, ssh_port, ssh_pass, group_name=group_name, tag=execution_tag) job.logging.start_logging() + job.init_parser() # perform checks job.precheck() @@ -175,7 +176,7 @@ results = 'results.' + time.strftime('%Y-%m-%d-%H.%M.%S') results = os.path.abspath(results) resultdir_exists = os.path.exists(os.path.join(results, 'control.srv')) - if not parser.options.collect_crashinfo and resultdir_exists: + if not parser.options.use_existing_results and resultdir_exists: error = "Error: results directory already exists: %s\n" % results sys.stderr.write(error) sys.exit(1) @@ -194,12 +195,15 @@ if results: logging.info("Results placed in %s" % results) + # wait until now to perform this check, so it get properly logged + if parser.options.use_existing_results and not resultdir_exists: + logging.error("No existing results directory found: %s", results) + sys.exit(1) + + if parser.options.write_pidfile: - if parser.options.collect_crashinfo: - pidfile_label = 'collect_crashinfo' - else: - pidfile_label = 'autoserv' - pid_file_manager = pidfile.PidFileManager(pidfile_label, results) + pid_file_manager = pidfile.PidFileManager(parser.options.pidfile_label, + results) pid_file_manager.open_file() else: pid_file_manager = None --- autotest/server/autoserv_parser.py 2009-12-30 15:31:05.000000000 -0800 +++ autotest/server/autoserv_parser.py 2009-12-30 15:31:05.000000000 -0800 @@ -87,7 +87,16 @@ 'output') self.parser.add_option("-p", "--write-pidfile", action="store_true", dest="write_pidfile", default=False, - help="write pidfile (.autoserv_execute)") + help="write pidfile (pidfile name is determined " + "by --pidfile-label") + self.parser.add_option("--pidfile-label", action="store", + default="autoserv", + help="Determines filename to use as pidfile (if " + "-p is specified). Pidfile will be " + ".<label>_execute. Default to autoserv.") + self.parser.add_option("--use-existing-results", action="store_true", + help="Indicates that autoserv is working with " + "an existing results directory") self.parser.add_option("-a", "--args", dest='args', help="additional args to pass to control file") protection_levels = [host_protections.Protection.get_attr_name(s) --- autotest/server/server_job.py 2009-12-30 15:31:05.000000000 -0800 +++ autotest/server/server_job.py 2009-12-30 15:31:05.000000000 -0800 @@ -135,11 +135,7 @@ utils.write_keyval(self.resultdir, job_data) self._parse_job = parse_job - if self._parse_job and len(machines) == 1: - self._using_parser = True - self.init_parser(self.resultdir) - else: - self._using_parser = False + self._using_parser = (self._parse_job and len(machines) == 1) self.pkgmgr = packages.PackageManager( self.autodir, run_function_dargs={'timeout':600}) self.num_tests_run = 0 @@ -201,20 +197,22 @@ subcommand.subcommand.register_join_hook(on_join) - def init_parser(self, resultdir): + def init_parser(self): """ - Start the continuous parsing of resultdir. This sets up + Start the continuous parsing of self.resultdir. This sets up the database connection and inserts the basic job object into the database if necessary. """ + if not self._using_parser: + return # redirect parser debugging to .parse.log - parse_log = os.path.join(resultdir, '.parse.log') + parse_log = os.path.join(self.resultdir, '.parse.log') parse_log = open(parse_log, 'w', 0) tko_utils.redirect_parser_debugging(parse_log) # create a job model object and set up the db self.results_db = tko_db.db(autocommit=True) self.parser = status_lib.parser(self._STATUS_VERSION) - self.job_model = self.parser.make_job(resultdir) + self.job_model = self.parser.make_job(self.resultdir) self.parser.start(self.job_model) # check if a job already exists in the db and insert it if # it does not @@ -310,7 +308,7 @@ self.push_execution_context(machine) os.chdir(self.resultdir) utils.write_keyval(self.resultdir, {"hostname": machine}) - self.init_parser(self.resultdir) + self.init_parser() result = function(machine) self.cleanup_parser() return result --- autotest/tko/parse.py 2009-12-30 15:31:05.000000000 -0800 +++ autotest/tko/parse.py 2009-12-30 15:31:05.000000000 -0800 @@ -30,8 +30,6 @@ action="store") parser.add_option("-d", help="Database name", dest="db_name", action="store") - parser.add_option("-P", help="Run site post-processing", - dest="site_do_post", action="store_true", default=False) parser.add_option("--write-pidfile", help="write pidfile (.parser_execute)", dest="write_pidfile", action="store_true", @@ -215,10 +213,6 @@ parse_leaf_path(db, path, level, reparse, mail_on_failure) -def _site_post_parse_job_dummy(): - return {} - - def main(): options, args = parse_args() results_dir = os.path.abspath(args[0]) @@ -229,10 +223,6 @@ if options.write_pidfile: pid_file_manager.open_file() - site_post_parse_job = utils.import_site_function(__file__, - "autotest_lib.tko.site_parse", "site_post_parse_job", - _site_post_parse_job_dummy) - try: # build up the list of job dirs to parse if options.singledir: @@ -269,9 +259,6 @@ fcntl.flock(lockfile, fcntl.LOCK_UN) lockfile.close() - if options.site_do_post is True: - site_post_parse_job(results_dir) - except: pid_file_manager.close_file(1) raise ==== (deleted) //depot/google_vendor_src_branch/autotest/tko/site_parse.py ==== _______________________________________________ Autotest mailing list [email protected] http://test.kernel.org/cgi-bin/mailman/listinfo/autotest
