This is an automated email from the ASF dual-hosted git repository. dill0wn pushed a commit to branch dw/taskcmd_filter_age in repository https://gitbox.apache.org/repos/asf/allura.git
commit bbee79b790478de90cc4d262ca2a67429a2a7a03 Author: Dillon Walls <[email protected]> AuthorDate: Fri May 21 20:14:49 2021 +0000 various TaskCommand improvements triggered by expansions to purge command --- Allura/allura/command/taskd.py | 75 +++++++++++++++++++++++++++++++++--------- 1 file changed, 59 insertions(+), 16 deletions(-) diff --git a/Allura/allura/command/taskd.py b/Allura/allura/command/taskd.py index 4d7d624..5f9d747 100644 --- a/Allura/allura/command/taskd.py +++ b/Allura/allura/command/taskd.py @@ -167,12 +167,20 @@ class TaskdCommand(base.Command): class TaskCommand(base.Command): + cmd_default_states = { + 'list': 'ready', + 'count': 'ready', + 'purge': '*' + } + summary = 'Task command' parser = base.Command.standard_parser(verbose=True) - parser.add_option('-s', '--state', dest='state', default='ready', - help='state of processes for "list" subcommand. * means all') + parser.add_option('-s', '--state', dest='state', default=None, + help='state of processes for "list", "count", or "purge" subcommands. * means all. ' + '(Defaults per command: %s)' % + ", ".join(['%s="%s"' % (k, v) for k, v in cmd_default_states.items()])) parser.add_option('-t', '--timeout', dest='timeout', type=int, default=60, - help='timeout (in seconds) for busy tasks') + help='timeout (in seconds) for busy tasks (only applies to "timeout" command)') parser.add_option('--filter-name-prefix', dest='filter_name_prefix', default=None, help='limit to task names starting with this. Example allura.tasks.index_tasks.') parser.add_option('--filter-result-regex', dest='filter_result_regex', default=None, @@ -181,12 +189,13 @@ class TaskCommand(base.Command): help='limit to tasks queued NUM days ago. Example "180"') min_args = 2 max_args = None - usage = '''<ini file> [list|retry|purge|timeout|commit] + usage = '''<ini file> [list|count|retry|purge|timeout|commit] - list: list tasks with given --state value - retry: re-run tasks with error state - purge: remove all "complete" tasks with result_type "forget" (which is the default) - timeout: retry all busy tasks older than --timeout seconds (does not stop existing task) + list: prints tasks matching --state (default: 'ready') and filters + count: counts tasks matching --state (default: 'ready') and filters + retry: re-run tasks with 'error' state. --state has no effect + purge: remove all tasks that match --state ( default: '*') with result_type "forget". + timeout: retry all tasks with state 'busy' and older than --timeout seconds (does not stop existing task). --state has no effect commit: run a solr 'commit' as a background task All subcommands except 'commit' can use the --filter-... options. @@ -197,12 +206,27 @@ class TaskCommand(base.Command): cmd = self.args[1] tab = dict( list=self._list, + count=self._count, retry=self._retry, purge=self._purge, timeout=self._timeout, commit=self._commit) tab[cmd]() + def _get_state_query(self): + state = self.options.state + if not state: + cmd = self.args[1] + state = self.cmd_default_states.get(cmd, 'ready') + + if state == '*': + from allura import model as M + # Providing all possible state values allows us to leverage the mongo index. + # omitting a state field might result in an entire COLLSCAN + state = {'$in': M.MonQTask.states} + + return state + def _add_filters(self, q): if self.options.filter_name_prefix: q['task_name'] = {'$regex': r'^{}.*'.format(re.escape(self.options.filter_name_prefix))} @@ -214,35 +238,52 @@ class TaskCommand(base.Command): print(q) return q + def _print_query(self, cmd, *args): + print('running mongod cmd: %s, %s' % (cmd, args)) + def _list(self): '''List tasks''' from allura import model as M - base.log.info('Listing tasks of state %s', self.options.state) - if self.options.state == '*': - q = dict() - else: - q = dict(state=self.options.state) + state = self._get_state_query() + base.log.info('Listing tasks of state %s', state) + q = dict(state=state) q = self._add_filters(q) + self._print_query('find', q) for t in M.MonQTask.query.find(q): print(t) + def _count(self): + '''Count tasks''' + from allura import model as M + state = self._get_state_query() + base.log.info('Counting tasks of state %s', state) + q = dict(state=state) + q = self._add_filters(q) + self._print_query('find', q) + count = M.MonQTask.query.find(q).count() + print('Task Count %s' % count) + def _retry(self): '''Retry tasks in an error state''' from allura import model as M base.log.info('Retry tasks in error state') q = dict(state='error') q = self._add_filters(q) + update = {'$set': dict(state='ready')} + self._print_query('update', q, update) M.MonQTask.query.update( q, - {'$set': dict(state='ready')}, + update, multi=True) def _purge(self): '''Purge completed tasks''' from allura import model as M base.log.info('Purge complete/forget tasks') - q = dict(state='complete', result_type='forget') + state = self._get_state_query() + q = dict(state=state, result_type='forget') q = self._add_filters(q) + self._print_query('remove', q) M.MonQTask.query.remove(q) def _timeout(self): @@ -256,9 +297,11 @@ class TaskCommand(base.Command): time_start={'$lt': cutoff}, ) q = self._add_filters(q) + update = {'$set': dict(state='ready')} + self._print_query('update', q, update) M.MonQTask.query.update( q, - {'$set': dict(state='ready')}, + update, multi=True) def _commit(self):
