Repository: aurora Updated Branches: refs/heads/master 751d65f15 -> a31acbb6c
Fix inconsistency in MemTaskStore secondary indices. Bugs closed: AURORA-1305 Reviewed at https://reviews.apache.org/r/33869/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/a31acbb6 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/a31acbb6 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/a31acbb6 Branch: refs/heads/master Commit: a31acbb6c59db6cf592be03a5e51a77c8bc50549 Parents: 751d65f Author: Bill Farner <[email protected]> Authored: Wed May 6 10:28:51 2015 -0700 Committer: Bill Farner <[email protected]> Committed: Wed May 6 10:28:51 2015 -0700 ---------------------------------------------------------------------- .../scheduler/storage/mem/MemTaskStore.java | 60 ++++++++++++++------ .../storage/AbstractTaskStoreTest.java | 4 +- .../storage/db/DbJobUpdateStoreTest.java | 7 +-- .../storage/mem/InMemTaskStoreTest.java | 27 ++++++++- 4 files changed, 75 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/a31acbb6/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java index 9c76fa5..3a9de60 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java @@ -20,12 +20,16 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; +import javax.inject.Inject; + +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Functions; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.base.Predicates; +import com.google.common.base.Supplier; import com.google.common.collect.FluentIterable; import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableList; @@ -40,7 +44,7 @@ import com.twitter.common.base.MorePreconditions; import com.twitter.common.inject.TimedInterceptor.Timed; import com.twitter.common.quantity.Amount; import com.twitter.common.quantity.Time; -import com.twitter.common.stats.Stats; +import com.twitter.common.stats.StatsProvider; import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.gen.TaskConfig; @@ -96,23 +100,32 @@ class MemTaskStore implements TaskStore.Mutable { // slave host. This is deemed acceptable due to the fact that secondary key values are rarely // mutated in practice, and mutated in ways that are not impacted by this behavior. private final Map<String, Task> tasks = Maps.newConcurrentMap(); - private final List<SecondaryIndex<?>> secondaryIndices = ImmutableList.of( - new SecondaryIndex<>( - Tasks.SCHEDULED_TO_JOB_KEY, - QUERY_TO_JOB_KEY, - Stats.exportLong("task_queries_by_job")), - new SecondaryIndex<>( - Tasks.SCHEDULED_TO_SLAVE_HOST, - QUERY_TO_SLAVE_HOST, - Stats.exportLong("task_queries_by_host"))); + private final List<SecondaryIndex<?>> secondaryIndices; // An interner is used here to collapse equivalent TaskConfig instances into canonical instances. // Ideally this would fall out of the object hierarchy (TaskConfig being associated with the job // rather than the task), but we intuit this detail here for performance reasons. private final Interner<TaskConfig, String> configInterner = new Interner<>(); - private final AtomicLong taskQueriesById = Stats.exportLong("task_queries_by_id"); - private final AtomicLong taskQueriesAll = Stats.exportLong("task_queries_all"); + private final AtomicLong taskQueriesById; + private final AtomicLong taskQueriesAll; + + @Inject + MemTaskStore(StatsProvider statsProvider) { + secondaryIndices = ImmutableList.of( + new SecondaryIndex<>( + Tasks.SCHEDULED_TO_JOB_KEY, + QUERY_TO_JOB_KEY, + statsProvider, + "job"), + new SecondaryIndex<>( + Tasks.SCHEDULED_TO_SLAVE_HOST, + QUERY_TO_SLAVE_HOST, + statsProvider, + "host")); + taskQueriesById = statsProvider.makeCounter("task_queries_by_id"); + taskQueriesAll = statsProvider.makeCounter("task_queries_all"); + } @Timed("mem_storage_fetch_tasks") @Override @@ -347,6 +360,11 @@ class MemTaskStore implements TaskStore.Mutable { } } + @VisibleForTesting + static String getIndexSizeStatName(String name) { + return "task_store_index_" + name + "_items"; + } + /** * A non-unique secondary index on the task store. Maps a custom key type to a set of task IDs. * @@ -364,16 +382,26 @@ class MemTaskStore implements TaskStore.Mutable { * * @param indexer Indexing function. * @param queryExtractor Function to extract the keys relevant to a query. - * @param hitCount Counter for number of times the secondary index applies to a query. + * @param statsProvider Stats system to export metrics to. + * @param name Name to use in stats keys. */ SecondaryIndex( Function<IScheduledTask, K> indexer, Function<Query.Builder, Optional<Set<K>>> queryExtractor, - AtomicLong hitCount) { + StatsProvider statsProvider, + String name) { this.indexer = indexer; this.queryExtractor = queryExtractor; - this.hitCount = hitCount; + this.hitCount = statsProvider.makeCounter("task_queries_by_" + name); + statsProvider.makeGauge( + getIndexSizeStatName(name), + new Supplier<Number>() { + @Override + public Number get() { + return index.size(); + } + }); } void insert(Iterable<IScheduledTask> tasks) { @@ -396,7 +424,7 @@ class MemTaskStore implements TaskStore.Mutable { void remove(IScheduledTask task) { K key = indexer.apply(task); if (key != null) { - index.remove(key, task); + index.remove(key, Tasks.id(task)); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/a31acbb6/src/test/java/org/apache/aurora/scheduler/storage/AbstractTaskStoreTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/AbstractTaskStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/AbstractTaskStoreTest.java index e3b1340..6a6ff27 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/AbstractTaskStoreTest.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/AbstractTaskStoreTest.java @@ -72,13 +72,13 @@ public abstract class AbstractTaskStoreTest { ImmutableSet.of(new Attribute("zone", ImmutableSet.of("1a")))) .setSlaveId("slaveIdB") .setMode(MaintenanceMode.NONE)); - private static final IScheduledTask TASK_A = createTask("a"); + protected static final IScheduledTask TASK_A = createTask("a"); private static final IScheduledTask TASK_B = setContainer(createTask("b"), Container.mesos(new MesosContainer())); private static final IScheduledTask TASK_C = createTask("c"); private static final IScheduledTask TASK_D = createTask("d"); - private Storage storage; + protected Storage storage; protected abstract Module getStorageModule(); http://git-wip-us.apache.org/repos/asf/aurora/blob/a31acbb6/src/test/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStoreTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStoreTest.java index be57c5e..7d856d0 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStoreTest.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStoreTest.java @@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; import com.google.inject.Injector; import org.apache.aurora.gen.InstanceTaskConfig; @@ -276,11 +275,11 @@ public class DbJobUpdateStoreTest { } private <T extends Number> void assertStats(Map<JobUpdateStatus, T> expected) { - Map<String, Long> statValues = Maps.newHashMap(); for (Map.Entry<JobUpdateStatus, T> entry : expected.entrySet()) { - statValues.put(DBJobUpdateStore.statName(entry.getKey()), entry.getValue().longValue()); + assertEquals( + entry.getValue().longValue(), + stats.getLongValue(DBJobUpdateStore.statName(entry.getKey()))); } - assertEquals(statValues , stats.getAllValues()); } @Test http://git-wip-us.apache.org/repos/asf/aurora/blob/a31acbb6/src/test/java/org/apache/aurora/scheduler/storage/mem/InMemTaskStoreTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/mem/InMemTaskStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/mem/InMemTaskStoreTest.java index d1f4026..8f139fc 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/mem/InMemTaskStoreTest.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/mem/InMemTaskStoreTest.java @@ -13,25 +13,50 @@ */ package org.apache.aurora.scheduler.storage.mem; +import com.google.common.collect.ImmutableSet; import com.google.inject.AbstractModule; import com.google.inject.Module; import com.google.inject.util.Modules; import com.twitter.common.stats.StatsProvider; +import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.storage.AbstractTaskStoreTest; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.TaskStore; import org.apache.aurora.scheduler.storage.db.DbModule; import org.apache.aurora.scheduler.testing.FakeStatsProvider; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; public class InMemTaskStoreTest extends AbstractTaskStoreTest { + + private FakeStatsProvider statsProvider; + @Override protected Module getStorageModule() { + statsProvider = new FakeStatsProvider(); return Modules.combine( DbModule.testModule(), new AbstractModule() { @Override protected void configure() { - bind(StatsProvider.class).toInstance(new FakeStatsProvider()); + bind(StatsProvider.class).toInstance(statsProvider); } }); } + + @Test + public void testSecondaryIndexConsistency() { + // Test for regression of AURORA-1305. + storage.write(new Storage.MutateWork.NoResult.Quiet() { + @Override + protected void execute(Storage.MutableStoreProvider storeProvider) { + TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore(); + taskStore.saveTasks(ImmutableSet.of(TASK_A)); + taskStore.deleteTasks(Tasks.ids(TASK_A)); + assertEquals(0L, statsProvider.getLongValue(MemTaskStore.getIndexSizeStatName("job"))); + } + }); + } }
