Repository: aurora Updated Branches: refs/heads/master 40d91feb7 -> 0e9c0864e
Expose task pruning endpoint in aurora_admin. Useful for scale testing in order to 'clean up' after a test run, but also useful in production if you have a bad actor inflating the size of your task index. Bugs closed: AURORA-1893 Reviewed at https://reviews.apache.org/r/56629/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/0e9c0864 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/0e9c0864 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/0e9c0864 Branch: refs/heads/master Commit: 0e9c0864eb1c8587616b6c10dfe104327b005e94 Parents: 40d91fe Author: David McLaughlin <[email protected]> Authored: Tue Feb 14 13:33:01 2017 -0800 Committer: David McLaughlin <[email protected]> Committed: Tue Feb 14 13:33:01 2017 -0800 ---------------------------------------------------------------------- RELEASE-NOTES.md | 1 + .../thrift/org/apache/aurora/gen/api.thrift | 7 +++ .../thrift/SchedulerThriftInterface.java | 28 +++++++++ src/main/python/apache/aurora/admin/admin.py | 32 ++++++++++ .../python/apache/aurora/client/api/__init__.py | 3 + .../thrift/SchedulerThriftInterfaceTest.java | 63 ++++++++++++++++++++ .../python/apache/aurora/admin/test_admin.py | 44 ++++++++++++++ .../aurora/client/api/test_scheduler_client.py | 6 ++ 8 files changed, 184 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/0e9c0864/RELEASE-NOTES.md ---------------------------------------------------------------------- diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md index 3e98802..ff382ff 100644 --- a/RELEASE-NOTES.md +++ b/RELEASE-NOTES.md @@ -4,6 +4,7 @@ ### New/updated: - Add message parameter to `killTasks` RPC. +- Add prune_tasks endpoint to aurora_admin. See aurora_admin prune_tasks -h for usage information. 0.17.0 ====== http://git-wip-us.apache.org/repos/asf/aurora/blob/0e9c0864/api/src/main/thrift/org/apache/aurora/gen/api.thrift ---------------------------------------------------------------------- diff --git a/api/src/main/thrift/org/apache/aurora/gen/api.thrift b/api/src/main/thrift/org/apache/aurora/gen/api.thrift index 6205c2e..efd4e53 100644 --- a/api/src/main/thrift/org/apache/aurora/gen/api.thrift +++ b/api/src/main/thrift/org/apache/aurora/gen/api.thrift @@ -1230,6 +1230,13 @@ service AuroraAdmin extends AuroraSchedulerManager { /** Tell scheduler to trigger an implicit task reconciliation. */ Response triggerImplicitTaskReconciliation() + + /** + * Force prune any (terminal) tasks that match the query. If no statuses are supplied with the + * query, it will default to all terminal task states. If statuses are supplied, they must be + * terminal states. + */ + Response pruneTasks(1: TaskQuery query) } // The name of the header that should be sent to bypass leader redirection in the Scheduler. http://git-wip-us.apache.org/repos/asf/aurora/blob/0e9c0864/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java index a211483..059fbb8 100644 --- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java +++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java @@ -36,6 +36,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; import com.google.common.collect.Range; +import com.google.common.collect.Sets; import org.apache.aurora.common.stats.StatsProvider; import org.apache.aurora.gen.ConfigRewrite; @@ -137,6 +138,7 @@ import static org.apache.aurora.gen.ResponseCode.WARNING; import static org.apache.aurora.scheduler.base.Numbers.convertRanges; import static org.apache.aurora.scheduler.base.Numbers.toRanges; import static org.apache.aurora.scheduler.base.Tasks.ACTIVE_STATES; +import static org.apache.aurora.scheduler.base.Tasks.TERMINAL_STATES; import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.INSUFFICIENT_QUOTA; import static org.apache.aurora.scheduler.thrift.Responses.addMessage; import static org.apache.aurora.scheduler.thrift.Responses.empty; @@ -1115,6 +1117,32 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin { } } + @Override + public Response pruneTasks(TaskQuery query) throws TException { + if (query.isSetStatuses() && query.getStatuses().stream().anyMatch(ACTIVE_STATES::contains)) { + return error("Tasks in non-terminal state cannot be pruned."); + } else if (!query.isSetStatuses()) { + query.setStatuses(TERMINAL_STATES); + } + + Iterable<IScheduledTask> tasks = storage.read(storeProvider -> + storeProvider.getTaskStore().fetchTasks(Query.arbitrary(query))); + // For some reason fetchTasks ignores the offset/limit options of a TaskQuery. So we have to + // manually apply the limit here. To be fixed in AURORA-1892. + if (query.isSetLimit()) { + tasks = Iterables.limit(tasks, query.getLimit()); + } + + Iterable<String> taskIds = Iterables.transform( + tasks, + task -> task.getAssignedTask().getTaskId()); + + return storage.write(storeProvider -> { + stateManager.deleteTasks(storeProvider, Sets.newHashSet(taskIds)); + return ok(); + }); + } + @ThriftWorkload @Override public Response getJobUpdateSummaries(JobUpdateQuery mutableQuery) throws TException { http://git-wip-us.apache.org/repos/asf/aurora/blob/0e9c0864/src/main/python/apache/aurora/admin/admin.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/admin/admin.py b/src/main/python/apache/aurora/admin/admin.py index 070c348..0bb995c 100644 --- a/src/main/python/apache/aurora/admin/admin.py +++ b/src/main/python/apache/aurora/admin/admin.py @@ -72,6 +72,38 @@ def make_admin_client_with_options(cluster): @app.command [email protected]_option('--states', dest='states', default=None, + help='Only match tasks with given state(s).') [email protected]_option('--role', dest='role', default=None, + help='Only match tasks with given role.') [email protected]_option('--env', dest='environment', default=None, + help='Only match tasks with given environment.') [email protected]_option('--limit', dest='limit', default=None, type=int, + help='Limit the number of total tasks to prune.') +def prune_tasks(args, options): + if len(args) == 0: + die('Must specify at least cluster.') + cluster = args[0] + + t = TaskQuery() + if options.states: + t.statuses = set(map(ScheduleStatus._NAMES_TO_VALUES.get, options.states.split(','))) + if options.role: + t.role = options.role + if options.environment: + t.environment = options.environment + if options.limit: + t.limit = options.limit + + api = make_admin_client_with_options(cluster) + rsp = api.prune_tasks(t) + if rsp.responseCode != ResponseCode.OK: + die('Failed to prune tasks: %s' % combine_messages(rsp)) + else: + print("Tasks pruned.") + + [email protected] @app.command_option('--force', dest='force', default=False, action='store_true', help='Force expensive queries to run.') @app.command_option('--shards', dest='shards', default=None, http://git-wip-us.apache.org/repos/asf/aurora/blob/0e9c0864/src/main/python/apache/aurora/client/api/__init__.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/client/api/__init__.py b/src/main/python/apache/aurora/client/api/__init__.py index 1250ccd..a4639db 100644 --- a/src/main/python/apache/aurora/client/api/__init__.py +++ b/src/main/python/apache/aurora/client/api/__init__.py @@ -343,6 +343,9 @@ class AuroraClientAPI(object): def snapshot(self): return self._scheduler_proxy.snapshot() + def prune_tasks(self, query): + return self._scheduler_proxy.pruneTasks(query) + def unsafe_rewrite_config(self, rewrite_request): return self._scheduler_proxy.rewriteConfigs(rewrite_request) http://git-wip-us.apache.org/repos/asf/aurora/blob/0e9c0864/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java index 0cdd982..c36abc8 100644 --- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java +++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java @@ -63,6 +63,7 @@ import org.apache.aurora.gen.Range; import org.apache.aurora.gen.ReadOnlyScheduler; import org.apache.aurora.gen.ResourceAggregate; import org.apache.aurora.gen.Response; +import org.apache.aurora.gen.ResponseCode; import org.apache.aurora.gen.ResponseDetail; import org.apache.aurora.gen.Result; import org.apache.aurora.gen.RewriteConfigsRequest; @@ -71,12 +72,14 @@ import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.gen.StartJobUpdateResult; import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.gen.TaskConstraint; +import org.apache.aurora.gen.TaskQuery; import org.apache.aurora.gen.ValueConstraint; import org.apache.aurora.gen.apiConstants; import org.apache.aurora.scheduler.TaskIdGenerator; import org.apache.aurora.scheduler.base.JobKeys; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.TaskTestUtil; +import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.configuration.ConfigurationManager; import org.apache.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException; import org.apache.aurora.scheduler.configuration.SanitizedConfiguration; @@ -782,6 +785,66 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest { } @Test + public void testPruneTasksRejectsActiveStates() throws Exception { + control.replay(); + Response rsp = thrift.pruneTasks(new TaskQuery().setStatuses(Tasks.ACTIVE_STATES)); + assertResponse(ResponseCode.ERROR, rsp); + } + + @Test + public void testPruneTasksRejectsMixedStates() throws Exception { + control.replay(); + Response rsp = thrift.pruneTasks(new TaskQuery().setStatuses( + ImmutableSet.of(ScheduleStatus.FINISHED, ScheduleStatus.KILLING))); + assertResponse(ResponseCode.ERROR, rsp); + } + + @Test + public void testPruneTasksAddsDefaultStatuses() throws Exception { + storageUtil.expectTaskFetch( + Query.arbitrary(new TaskQuery().setStatuses(Tasks.TERMINAL_STATES)), + buildScheduledTask()); + stateManager.deleteTasks( + storageUtil.mutableStoreProvider, + ImmutableSet.of(buildScheduledTask().getAssignedTask().getTaskId())); + control.replay(); + + assertOkResponse(thrift.pruneTasks(new TaskQuery())); + } + + @Test + public void testPruneTasksRespectsScopedTerminalState() throws Exception { + storageUtil.expectTaskFetch( + Query.arbitrary(new TaskQuery().setStatuses(ImmutableSet.of(ScheduleStatus.FAILED))), + buildScheduledTask()); + stateManager.deleteTasks( + storageUtil.mutableStoreProvider, + ImmutableSet.of(buildScheduledTask().getAssignedTask().getTaskId())); + control.replay(); + + assertOkResponse(thrift.pruneTasks( + new TaskQuery().setStatuses(ImmutableSet.of(ScheduleStatus.FAILED)))); + } + + @Test + public void testPruneTasksAppliesQueryLimit() throws Exception { + TaskQuery query = new TaskQuery().setLimit(3); + storageUtil.expectTaskFetch( + Query.arbitrary(query.setStatuses(Tasks.TERMINAL_STATES)), + buildScheduledTask("a/b/c", "task1"), + buildScheduledTask("a/b/c", "task2"), + buildScheduledTask("a/b/c", "task3"), + buildScheduledTask("a/b/c", "task4"), + buildScheduledTask("a/b/c", "task5")); + stateManager.deleteTasks( + storageUtil.mutableStoreProvider, + ImmutableSet.of("task1", "task2", "task3")); + control.replay(); + + assertOkResponse(thrift.pruneTasks(query)); + } + + @Test public void testSetQuota() throws Exception { ResourceAggregate resourceAggregate = new ResourceAggregate() .setNumCpus(10) http://git-wip-us.apache.org/repos/asf/aurora/blob/0e9c0864/src/test/python/apache/aurora/admin/test_admin.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/admin/test_admin.py b/src/test/python/apache/aurora/admin/test_admin.py index 66abade..ebe89b5 100644 --- a/src/test/python/apache/aurora/admin/test_admin.py +++ b/src/test/python/apache/aurora/admin/test_admin.py @@ -19,6 +19,7 @@ from mock import PropertyMock, call, create_autospec, patch from apache.aurora.admin.admin import ( get_scheduler, increase_quota, + prune_tasks, query, reconcile_tasks, set_quota @@ -45,6 +46,49 @@ from gen.apache.aurora.api.ttypes import ( ) +class TestPruneCommand(AuroraClientCommandTest): + @classmethod + def setup_mock_options(cls, states=None, role=None, env=None, limit=None): + mock_options = create_autospec( + spec=['states', 'role', 'environment', 'limit'], + spec_set=False, + instance=True) + + mock_options.role = role + mock_options.states = states + mock_options.environment = env + mock_options.limit = limit + mock_options.bypass_leader_redirect = False + + return mock_options + + @classmethod + def task_query(cls, options): + query = TaskQuery( + role=options.role, + environment=options.environment, + limit=options.limit) + if options.states: + query.statuses = set(map(ScheduleStatus._NAMES_TO_VALUES.get, options.states.split(','))) + return query + + def test_prune(self): + mock_options = self.setup_mock_options( + role='aurora', env='devel', limit=10, states="LOST,FINISHED") + with contextlib.nested( + patch('twitter.common.app.get_options', return_value=mock_options), + patch('apache.aurora.admin.admin.make_admin_client', + return_value=create_autospec(spec=AuroraClientAPI)), + patch('apache.aurora.admin.admin.CLUSTERS', new=self.TEST_CLUSTERS) + ) as (_, mock_make_admin_client, _): + api = mock_make_admin_client.return_value + api.prune_tasks.return_value = Response(responseCode=ResponseCode.OK) + + prune_tasks(['cluster'], mock_options) + + api.prune_tasks.assert_called_once_with(self.task_query(mock_options)) + + class TestQueryCommand(AuroraClientCommandTest): @classmethod http://git-wip-us.apache.org/repos/asf/aurora/blob/0e9c0864/src/test/python/apache/aurora/client/api/test_scheduler_client.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/client/api/test_scheduler_client.py b/src/test/python/apache/aurora/client/api/test_scheduler_client.py index fab9798..59c651c 100644 --- a/src/test/python/apache/aurora/client/api/test_scheduler_client.py +++ b/src/test/python/apache/aurora/client/api/test_scheduler_client.py @@ -282,6 +282,12 @@ class TestSchedulerProxyAdminInjection(TestSchedulerProxyInjection): self.mox.ReplayAll() self.make_scheduler_proxy().snapshot() + def test_pruneTasks(self): + t = TaskQuery() + self.mock_thrift_client.pruneTasks(IsA(TaskQuery)).AndReturn(DEFAULT_RESPONSE) + self.mox.ReplayAll() + self.make_scheduler_proxy().pruneTasks(t) + def test_rewriteConfigs(self): self.mock_thrift_client.rewriteConfigs( IsA(RewriteConfigsRequest)).AndReturn(DEFAULT_RESPONSE)
