Eliminate sequential scan in MemTaskStore.getJobKeys() If scheduler is configured to run with the `MemTaskStore` every hit on scheduler landing page (`/scheduler`) causes a call to `MemTaskStore.getJobKeys()` through `ReadOnlyScheduler.getRoleSummary()`.
The implementation of `MemTaskStore.getJobKeys()` is currently very inefficient as it requires a sequential scan of the task store and mapping to their respective job keys. In Twitter clusters this method is currently taking half a second per call (`mem_storage_get_job_keys`). This patch eliminates the sequential scan and mapping to job key by simply returning an immutable copy of the key set of the existing secondary index `job`. Bugs closed: AURORA-1847 Reviewed at https://reviews.apache.org/r/55217/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/5d673167 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/5d673167 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/5d673167 Branch: refs/heads/master Commit: 5d673167f34e9e50120511868a8f97547ae782db Parents: 48ea310 Author: Mehrdad Nurolahzade <[email protected]> Authored: Wed Jan 11 23:17:34 2017 +0100 Committer: Stephan Erb <[email protected]> Committed: Wed Jan 11 23:21:10 2017 +0100 ---------------------------------------------------------------------- .../aurora/benchmark/TaskStoreBenchmarks.java | 3 +++ .../aurora/scheduler/storage/mem/MemTaskStore.java | 17 ++++++++--------- 2 files changed, 11 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/5d673167/src/jmh/java/org/apache/aurora/benchmark/TaskStoreBenchmarks.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/TaskStoreBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/TaskStoreBenchmarks.java index f2f00b9..5b4d2a2 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/TaskStoreBenchmarks.java +++ b/src/jmh/java/org/apache/aurora/benchmark/TaskStoreBenchmarks.java @@ -22,6 +22,8 @@ import com.google.inject.Guice; import com.google.inject.util.Modules; import org.apache.aurora.common.stats.StatsProvider; +import org.apache.aurora.common.util.Clock; +import org.apache.aurora.common.util.testing.FakeClock; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.TaskStore; @@ -88,6 +90,7 @@ public class TaskStoreBenchmarks { @Override protected void configure() { bind(StatsProvider.class).toInstance(new FakeStatsProvider()); + bind(Clock.class).toInstance(new FakeClock()); } })) .getInstance(Storage.class); http://git-wip-us.apache.org/repos/asf/aurora/blob/5d673167/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java index fc272dd..d89e715 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java @@ -95,8 +95,8 @@ class MemTaskStore implements TaskStore.Mutable { // slave host. This is deemed acceptable due to the fact that secondary key values are rarely // mutated in practice, and mutated in ways that are not impacted by this behavior. private final Map<String, Task> tasks = Maps.newConcurrentMap(); + private final SecondaryIndex<IJobKey> jobIndex; private final List<SecondaryIndex<?>> secondaryIndices; - // An interner is used here to collapse equivalent TaskConfig instances into canonical instances. // Ideally this would fall out of the object hierarchy (TaskConfig being associated with the job // rather than the task), but we intuit this detail here for performance reasons. @@ -110,12 +110,9 @@ class MemTaskStore implements TaskStore.Mutable { StatsProvider statsProvider, @SlowQueryThreshold Amount<Long, Time> slowQueryThreshold) { + jobIndex = new SecondaryIndex<>(Tasks::getJob, QUERY_TO_JOB_KEY, statsProvider, "job"); secondaryIndices = ImmutableList.of( - new SecondaryIndex<>( - Tasks::getJob, - QUERY_TO_JOB_KEY, - statsProvider, - "job"), + jobIndex, new SecondaryIndex<>( Tasks::scheduledToSlaveHost, QUERY_TO_SLAVE_HOST, @@ -156,9 +153,7 @@ class MemTaskStore implements TaskStore.Mutable { @Timed("mem_storage_get_job_keys") @Override public Set<IJobKey> getJobKeys() { - return FluentIterable.from(fetchTasks(Query.unscoped())) - .transform(Tasks::getJob) - .toSet(); + return jobIndex.keySet(); } private final Function<IScheduledTask, Task> toTask = task -> new Task(task, configInterner); @@ -372,6 +367,10 @@ class MemTaskStore implements TaskStore.Mutable { }); } + Set<K> keySet() { + return ImmutableSet.copyOf(index.keySet()); + } + void insert(Iterable<IScheduledTask> tasks) { for (IScheduledTask task : tasks) { insert(task);
