Repository: aurora Updated Branches: refs/heads/master 2ab406599 -> 363816f07
DbTaskStore perf: add a task store API to list task job keys. Bugs closed: AURORA-1298 Reviewed at https://reviews.apache.org/r/35630/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/363816f0 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/363816f0 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/363816f0 Branch: refs/heads/master Commit: 363816f07336094e74282bc76433f95e8c34a62b Parents: 2ab4065 Author: Bill Farner <[email protected]> Authored: Thu Jun 18 16:37:55 2015 -0700 Committer: Bill Farner <[email protected]> Committed: Thu Jun 18 16:37:55 2015 -0700 ---------------------------------------------------------------------- .../scheduler/storage/ForwardingStore.java | 5 +++ .../aurora/scheduler/storage/TaskStore.java | 8 ++++ .../scheduler/storage/db/DbTaskStore.java | 6 +++ .../aurora/scheduler/storage/db/TaskMapper.java | 8 ++++ .../scheduler/storage/mem/MemTaskStore.java | 8 ++++ .../scheduler/thrift/ReadOnlySchedulerImpl.java | 47 ++++++++++---------- .../aurora/scheduler/storage/db/TaskMapper.xml | 10 +++++ .../storage/AbstractTaskStoreTest.java | 35 +++++++++++++++ .../thrift/ReadOnlySchedulerImplTest.java | 8 +++- 9 files changed, 109 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/363816f0/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java b/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java index 5242eba..2be3eb0 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java @@ -109,6 +109,11 @@ public class ForwardingStore implements } @Override + public Set<IJobKey> getJobKeys() { + return taskStore.getJobKeys(); + } + + @Override public Set<ILock> fetchLocks() { return lockStore.fetchLocks(); } http://git-wip-us.apache.org/repos/asf/aurora/blob/363816f0/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java index 2768e6e..cceac8a 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java @@ -19,6 +19,7 @@ import com.google.common.base.Function; import com.google.common.collect.ImmutableSet; import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; @@ -36,6 +37,13 @@ public interface TaskStore { */ Iterable<IScheduledTask> fetchTasks(Query.Builder query); + /** + * Fetches all job keys represented in the task store. + * + * @return Job keys of stored tasks. + */ + Set<IJobKey> getJobKeys(); + interface Mutable extends TaskStore { /** http://git-wip-us.apache.org/repos/asf/aurora/blob/363816f0/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java index 76f65da..9b30b01 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java @@ -110,6 +110,12 @@ class DbTaskStore implements TaskStore.Mutable { return result; } + @Timed("db_storage_get_job_keys") + @Override + public ImmutableSet<IJobKey> getJobKeys() { + return IJobKey.setFromBuilders(taskMapper.selectJobKeys()); + } + private static final Function<TaskConfigRow, Long> CONFIG_ID = new Function<TaskConfigRow, Long>() { @Override http://git-wip-us.apache.org/repos/asf/aurora/blob/363816f0/src/main/java/org/apache/aurora/scheduler/storage/db/TaskMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/TaskMapper.java b/src/main/java/org/apache/aurora/scheduler/storage/db/TaskMapper.java index 9903675..8270407 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/TaskMapper.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/TaskMapper.java @@ -16,6 +16,7 @@ package org.apache.aurora.scheduler.storage.db; import java.util.List; import java.util.Set; +import org.apache.aurora.gen.JobKey; import org.apache.aurora.gen.TaskQuery; import org.apache.aurora.scheduler.storage.db.views.AssignedPort; import org.apache.aurora.scheduler.storage.db.views.ScheduledTaskWrapper; @@ -43,6 +44,13 @@ interface TaskMapper { List<ScheduledTaskWrapper> select(TaskQuery query); /** + * Gets job keys of all stored tasks. + * + * @return Job keys. + */ + Set<JobKey> selectJobKeys(); + + /** * Inserts the task events association within an * {@link org.apache.aurora.scheduler.storage.entities.IScheduledTask}. * http://git-wip-us.apache.org/repos/asf/aurora/blob/363816f0/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java index 3a9de60..4b67f6b 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java @@ -144,6 +144,14 @@ class MemTaskStore implements TaskStore.Mutable { return result; } + @Timed("mem_storage_get_job_keys") + @Override + public Set<IJobKey> getJobKeys() { + return FluentIterable.from(fetchTasks(Query.unscoped())) + .transform(Tasks.SCHEDULED_TO_JOB_KEY) + .toSet(); + } + private final Function<IScheduledTask, Task> toTask = new Function<IScheduledTask, Task>() { @Override http://git-wip-us.apache.org/repos/asf/aurora/blob/363816f0/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java index 30e579c..41e144b 100644 --- a/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java @@ -227,22 +227,29 @@ class ReadOnlySchedulerImpl implements ReadOnlyScheduler.Iface { @Override public Response getRoleSummary() { - Multimap<String, IJobKey> jobsByRole = mapByRole( - Storage.Util.fetchTasks(storage, Query.unscoped()), - Tasks.SCHEDULED_TO_JOB_KEY); - - Multimap<String, IJobKey> cronJobsByRole = mapByRole( - Storage.Util.fetchCronJobs(storage), - JobKeys.FROM_CONFIG); - - Set<RoleSummary> summaries = Sets.newHashSet(); - for (String role : Sets.union(jobsByRole.keySet(), cronJobsByRole.keySet())) { - RoleSummary summary = new RoleSummary(); - summary.setRole(role); - summary.setJobCount(jobsByRole.get(role).size()); - summary.setCronJobCount(cronJobsByRole.get(role).size()); - summaries.add(summary); - } + Multimap<String, IJobKey> jobsByRole = storage.read(new Quiet<Multimap<String, IJobKey>>() { + @Override + public Multimap<String, IJobKey> apply(StoreProvider storeProvider) { + return Multimaps.index(storeProvider.getTaskStore().getJobKeys(), JobKeys.TO_ROLE); + } + }); + + Multimap<String, IJobKey> cronJobsByRole = Multimaps.index( + Iterables.transform(Storage.Util.fetchCronJobs(storage), JobKeys.FROM_CONFIG), + JobKeys.TO_ROLE); + + Set<RoleSummary> summaries = FluentIterable.from( + Sets.union(jobsByRole.keySet(), cronJobsByRole.keySet())) + .transform(new Function<String, RoleSummary>() { + @Override + public RoleSummary apply(String role) { + return new RoleSummary( + role, + jobsByRole.get(role).size(), + cronJobsByRole.get(role).size()); + } + }) + .toSet(); return ok(Result.roleSummaryResult(new RoleSummaryResult(summaries))); } @@ -404,12 +411,4 @@ class ReadOnlySchedulerImpl implements ReadOnlyScheduler.Iface { private Multimap<IJobKey, IScheduledTask> getTasks(Query.Builder query) { return Tasks.byJobKey(Storage.Util.fetchTasks(storage, query)); } - - private static <T> Multimap<String, IJobKey> mapByRole( - Iterable<T> tasks, - Function<T, IJobKey> keyExtractor) { - - return HashMultimap.create( - Multimaps.index(Iterables.transform(tasks, keyExtractor), JobKeys.TO_ROLE)); - } } http://git-wip-us.apache.org/repos/asf/aurora/blob/363816f0/src/main/resources/org/apache/aurora/scheduler/storage/db/TaskMapper.xml ---------------------------------------------------------------------- diff --git a/src/main/resources/org/apache/aurora/scheduler/storage/db/TaskMapper.xml b/src/main/resources/org/apache/aurora/scheduler/storage/db/TaskMapper.xml index c3ab3eb..7c27f37 100644 --- a/src/main/resources/org/apache/aurora/scheduler/storage/db/TaskMapper.xml +++ b/src/main/resources/org/apache/aurora/scheduler/storage/db/TaskMapper.xml @@ -163,6 +163,16 @@ </where> </select> + <select id="selectJobKeys" resultMap="org.apache.aurora.scheduler.storage.db.JobKeyMapper.jobKeyMap"> + SELECT DISTINCT + j.role AS role, + j.environment AS environment, + j.name AS name + FROM tasks AS t + INNER JOIN task_configs as c ON c.id = t.task_config_row_id + INNER JOIN job_keys AS j ON j.id = c.job_key_id + </select> + <insert id="insertTaskEvents"> INSERT INTO task_events( task_row_id, http://git-wip-us.apache.org/repos/asf/aurora/blob/363816f0/src/test/java/org/apache/aurora/scheduler/storage/AbstractTaskStoreTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/AbstractTaskStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/AbstractTaskStoreTest.java index 0f3a1a0..63a784f 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/AbstractTaskStoreTest.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/AbstractTaskStoreTest.java @@ -46,6 +46,7 @@ import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.storage.TaskStore.Mutable.TaskMutation; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import org.apache.aurora.scheduler.storage.testing.StorageEntityUtil; @@ -508,6 +509,40 @@ public abstract class AbstractTaskStoreTest { assertQueryResults(Query.slaveScoped(HOST_A.getHost()), updated); } + private Set<IJobKey> getJobKeys() { + return storage.read(new Storage.Work.Quiet<Set<IJobKey>>() { + @Override + public Set<IJobKey> apply(Storage.StoreProvider storeProvider) { + return storeProvider.getTaskStore().getJobKeys(); + } + }); + } + + private Set<IJobKey> toJobKeys(IScheduledTask... tasks) { + return FluentIterable.from(ImmutableSet.copyOf(tasks)) + .transform(Tasks.SCHEDULED_TO_JOB_KEY) + .toSet(); + } + + @Test + public void testGetsJobKeys() { + assertEquals(ImmutableSet.<IJobKey>of(), getJobKeys()); + saveTasks(TASK_A); + assertEquals(toJobKeys(TASK_A), getJobKeys()); + saveTasks(TASK_B, TASK_C); + assertEquals(toJobKeys(TASK_A, TASK_B, TASK_C), getJobKeys()); + deleteTasks(Tasks.id(TASK_B)); + assertEquals(toJobKeys(TASK_A, TASK_C), getJobKeys()); + IJobKey multiInstanceJob = JobKeys.from("role", "env", "instances"); + saveTasks( + makeTask("instance1", multiInstanceJob), + makeTask("instance2", multiInstanceJob), + makeTask("instance3", multiInstanceJob)); + assertEquals( + ImmutableSet.builder().addAll(toJobKeys(TASK_A, TASK_C)).add(multiInstanceJob).build(), + getJobKeys()); + } + @Ignore @Test public void testReadSecondaryIndexMultipleThreads() throws Exception { http://git-wip-us.apache.org/repos/asf/aurora/blob/363816f0/src/test/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImplTest.java index a854ed3..4786bea 100644 --- a/src/test/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImplTest.java @@ -21,6 +21,7 @@ import javax.annotation.Nullable; import com.google.common.base.Function; import com.google.common.base.Optional; +import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; @@ -58,6 +59,7 @@ import org.apache.aurora.scheduler.base.JobKeys; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.Query.Builder; import org.apache.aurora.scheduler.base.TaskGroupKey; +import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.configuration.SanitizedConfiguration; import org.apache.aurora.scheduler.cron.CronPredictor; import org.apache.aurora.scheduler.cron.CrontabEntry; @@ -634,8 +636,10 @@ public class ReadOnlySchedulerImplTest extends EasyMockTest { IScheduledTask task4 = IScheduledTask.build(new ScheduledTask() .setAssignedTask(new AssignedTask().setTask(immediateTaskConfigThree))); - storageUtil.expectTaskFetch(Query.unscoped(), task1, task2, task3, task4); - + expect(storageUtil.taskStore.getJobKeys()).andReturn( + FluentIterable.from(ImmutableSet.of(task1, task2, task3, task4)) + .transform(Tasks.SCHEDULED_TO_JOB_KEY) + .toSet()); expect(storageUtil.jobStore.fetchJobs()).andReturn(IJobConfiguration.setFromBuilders(crons)); RoleSummaryResult expectedResult = new RoleSummaryResult();
