Repository: aurora Updated Branches: refs/heads/master 38476abdf -> f2755e1cd
http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/storage/mem/MemQuotaStoreTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemQuotaStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemQuotaStoreTest.java new file mode 100644 index 0000000..e8324ee --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemQuotaStoreTest.java @@ -0,0 +1,24 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.mem; + +import org.apache.aurora.scheduler.storage.AbstractQuotaStoreTest; +import org.apache.aurora.scheduler.storage.Storage; + +public class MemQuotaStoreTest extends AbstractQuotaStoreTest { + @Override + protected Storage createStorage() { + return MemStorageModule.newEmptyStorage(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/storage/mem/MemStorageTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemStorageTest.java new file mode 100644 index 0000000..7a12dfa --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemStorageTest.java @@ -0,0 +1,173 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.mem; + +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import org.apache.aurora.common.testing.TearDownTestCase; +import org.apache.aurora.gen.AssignedTask; +import org.apache.aurora.gen.Identity; +import org.apache.aurora.gen.JobKey; +import org.apache.aurora.gen.ScheduledTask; +import org.apache.aurora.gen.TaskConfig; +import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.Storage.MutateWork; +import org.apache.aurora.scheduler.storage.Storage.Work; +import org.apache.aurora.scheduler.storage.Storage.Work.Quiet; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * TODO(William Farner): Wire a mechanism to allow verification of synchronized writers. + */ +public class MemStorageTest extends TearDownTestCase { + + private ExecutorService executor; + private Storage storage; + + @Before + public void setUp() { + executor = Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setNameFormat("SlowRead-%d").setDaemon(true).build()); + addTearDown(() -> executor.shutdown()); + storage = MemStorageModule.newEmptyStorage(); + } + + @Test + public void testConcurrentReaders() throws Exception { + // Validate that a slow read does not block another read. + + final CountDownLatch slowReadStarted = new CountDownLatch(1); + final CountDownLatch slowReadFinished = new CountDownLatch(1); + + Future<String> future = executor.submit(() -> storage.read((Quiet<String>) storeProvider -> { + slowReadStarted.countDown(); + try { + slowReadFinished.await(); + } catch (InterruptedException e) { + fail(e.getMessage()); + } + return "slowResult"; + })); + + slowReadStarted.await(); + + String fastResult = storage.read((Quiet<String>) storeProvider -> "fastResult"); + assertEquals("fastResult", fastResult); + slowReadFinished.countDown(); + assertEquals("slowResult", future.get()); + } + + private IScheduledTask makeTask(String taskId) { + return IScheduledTask.build(new ScheduledTask().setAssignedTask( + new AssignedTask() + .setTaskId(taskId) + .setTask(new TaskConfig() + .setOwner(new Identity().setUser("owner-" + taskId)) + .setJob(new JobKey() + .setRole("role-" + taskId) + .setEnvironment("env-" + taskId) + .setName("job-" + taskId))))); + } + + private static class CustomException extends RuntimeException { + } + + private <T, E extends RuntimeException> void expectWriteFail(MutateWork<T, E> work) { + try { + storage.write(work); + fail("Expected a CustomException."); + } catch (CustomException e) { + // Expected. + } + } + + private void expectTasks(final String... taskIds) { + storage.read((Work.Quiet<Void>) storeProvider -> { + Query.Builder query = Query.unscoped(); + Set<String> ids = FluentIterable.from(storeProvider.getTaskStore().fetchTasks(query)) + .transform(t -> t.getAssignedTask().getTaskId()) + .toSet(); + assertEquals(ImmutableSet.<String>builder().add(taskIds).build(), ids); + return null; + }); + } + + @Test + public void testOperations() { + expectWriteFail((MutateWork.NoResult.Quiet) storeProvider -> { + storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("a"), makeTask("b"))); + throw new CustomException(); + }); + expectTasks("a", "b"); + + storage.write((MutateWork.NoResult.Quiet) storeProvider -> { + storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("a"), makeTask("b"))); + }); + expectTasks("a", "b"); + + expectWriteFail((MutateWork.NoResult.Quiet) storeProvider -> { + storeProvider.getUnsafeTaskStore().deleteAllTasks(); + throw new CustomException(); + }); + expectTasks(); + + expectWriteFail((MutateWork.NoResult.Quiet) storeProvider -> { + storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("a"))); + throw new CustomException(); + }); + expectTasks("a"); + storage.read((Work.Quiet<Void>) storeProvider -> { + assertEquals( + makeTask("a"), + Iterables.getOnlyElement(storeProvider.getTaskStore().fetchTasks( + Query.taskScoped("a")))); + return null; + }); + + // Nested transaction where inner transaction fails. + expectWriteFail((MutateWork.NoResult.Quiet) storeProvider -> { + storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("c"))); + storage.write((MutateWork.NoResult.Quiet) storeProvider1 -> { + storeProvider1.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("d"))); + throw new CustomException(); + }); + }); + expectTasks("a", "c", "d"); + + // Nested transaction where outer transaction fails. + expectWriteFail((MutateWork.NoResult.Quiet) storeProvider -> { + storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("c"))); + storage.write((MutateWork.NoResult.Quiet) storeProvider12 -> { + storeProvider12.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("d"))); + }); + throw new CustomException(); + }); + expectTasks("a", "c", "d"); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java new file mode 100644 index 0000000..9e75c98 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java @@ -0,0 +1,68 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.mem; + +import com.google.common.collect.ImmutableSet; +import com.google.inject.AbstractModule; +import com.google.inject.Module; +import com.google.inject.util.Modules; + +import org.apache.aurora.common.stats.SlidingStats; +import org.apache.aurora.common.stats.StatsProvider; +import org.apache.aurora.common.util.Clock; +import org.apache.aurora.scheduler.base.Tasks; +import org.apache.aurora.scheduler.storage.AbstractTaskStoreTest; +import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; +import org.apache.aurora.scheduler.storage.TaskStore; +import org.apache.aurora.scheduler.storage.db.InstrumentingInterceptor; +import org.apache.aurora.scheduler.testing.FakeStatsProvider; +import org.junit.Test; + +import static org.easymock.EasyMock.createMock; +import static org.junit.Assert.assertEquals; + +public class MemTaskStoreTest extends AbstractTaskStoreTest { + + private FakeStatsProvider statsProvider; + + @Override + protected Module getStorageModule() { + statsProvider = new FakeStatsProvider(); + return Modules.combine( + new MemStorageModule(), + new AbstractModule() { + @Override + protected void configure() { + bind(StatsProvider.class).toInstance(statsProvider); + + // bindings for mybatis interceptor + SlidingStats slidingStats = createMock(SlidingStats.class); + bind(InstrumentingInterceptor.class).toInstance(new InstrumentingInterceptor( + Clock.SYSTEM_CLOCK, s -> slidingStats + )); + } + }); + } + + @Test + public void testSecondaryIndexConsistency() { + storage.write((NoResult.Quiet) storeProvider -> { + // Test for regression of AURORA-1305. + TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore(); + taskStore.saveTasks(ImmutableSet.of(TASK_A)); + taskStore.deleteTasks(Tasks.ids(TASK_A)); + assertEquals(0L, statsProvider.getLongValue(MemTaskStore.getIndexSizeStatName("job"))); + }); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/storage/mem/StorageTransactionTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/mem/StorageTransactionTest.java b/src/test/java/org/apache/aurora/scheduler/storage/mem/StorageTransactionTest.java index 25f34e2..b7df14a 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/mem/StorageTransactionTest.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/mem/StorageTransactionTest.java @@ -21,7 +21,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -34,7 +33,6 @@ import org.apache.aurora.scheduler.resources.ResourceTestUtil; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutateWork; import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; -import org.apache.aurora.scheduler.storage.db.DbUtil; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.junit.Before; import org.junit.Test; @@ -56,7 +54,7 @@ public class StorageTransactionTest extends TearDownTestCase { executor = Executors.newCachedThreadPool( new ThreadFactoryBuilder().setNameFormat("SlowRead-%d").setDaemon(true).build()); addTearDown(() -> MoreExecutors.shutdownAndAwaitTermination(executor, 1, TimeUnit.SECONDS)); - storage = DbUtil.createStorage(); + storage = MemStorageModule.newEmptyStorage(); } @Test @@ -124,9 +122,9 @@ public class StorageTransactionTest extends TearDownTestCase { } storage.read(storeProvider -> { - // If the previous write was under a transaction then there would be no quota records. - assertEquals(ImmutableMap.of(), - storeProvider.getQuotaStore().fetchQuotas()); + // Since the previous write was not transactional, the quota record remains. + assertEquals(ImmutableSet.of("a"), + storeProvider.getQuotaStore().fetchQuotas().keySet()); return null; }); } @@ -137,7 +135,9 @@ public class StorageTransactionTest extends TearDownTestCase { storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("a"), makeTask("b"))); throw new CustomException(); }); - expectTasks(); + // The in-memory storage is not transactional, so the writes are retained despite the write + // operation failing. + expectTasks("a", "b"); storage.write((NoResult.Quiet) storeProvider -> storeProvider.getUnsafeTaskStore().saveTasks( @@ -148,7 +148,7 @@ public class StorageTransactionTest extends TearDownTestCase { storeProvider.getUnsafeTaskStore().deleteAllTasks(); throw new CustomException(); }); - expectTasks("a", "b"); + expectTasks(); storage.write( (NoResult.Quiet) storeProvider -> storeProvider.getUnsafeTaskStore().deleteAllTasks()); @@ -157,7 +157,7 @@ public class StorageTransactionTest extends TearDownTestCase { storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("a"))); throw new CustomException(); }); - expectTasks(); + expectTasks("a"); storage.write((NoResult.Quiet) storeProvider -> storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("a")))); @@ -170,7 +170,7 @@ public class StorageTransactionTest extends TearDownTestCase { throw new CustomException(); }); }); - expectTasks("a"); + expectTasks("a", "c", "d"); // Nested transaction where outer transaction fails. expectWriteFail(storeProvider -> { @@ -179,6 +179,6 @@ public class StorageTransactionTest extends TearDownTestCase { storeProvider1.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("d")))); throw new CustomException(); }); - expectTasks("a"); + expectTasks("a", "c", "d"); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java index 1f30bfa..b2c371c 100644 --- a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java +++ b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java @@ -66,9 +66,9 @@ import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage; import org.apache.aurora.scheduler.storage.backup.Recovery; import org.apache.aurora.scheduler.storage.backup.StorageBackup; -import org.apache.aurora.scheduler.storage.db.DbModule; import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; import org.apache.aurora.scheduler.storage.entities.IServerInfo; +import org.apache.aurora.scheduler.storage.mem.MemStorageModule; import org.apache.aurora.scheduler.thrift.aop.AnnotatedAuroraAdmin; import org.apache.aurora.scheduler.thrift.aop.AopModule; import org.apache.mesos.v1.Protos.FrameworkInfo; @@ -105,7 +105,7 @@ public class ThriftIT extends EasyMockTest { bind(CliOptions.class).toInstance(options); install(new LifecycleModule()); install(new StatsModule(options.stats)); - install(DbModule.testModule()); + install(new MemStorageModule()); install(new QuotaModule()); install(new CronModule(options.cron)); install(new TierModule(TaskTestUtil.TIER_CONFIG)); http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java index 0a76f53..4d62bba 100644 --- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java +++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java @@ -82,7 +82,6 @@ import org.apache.aurora.scheduler.state.UUIDGenerator; import org.apache.aurora.scheduler.state.UUIDGenerator.UUIDGeneratorImpl; import org.apache.aurora.scheduler.storage.JobUpdateStore; import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.db.DbModule; import org.apache.aurora.scheduler.storage.entities.IInstanceTaskConfig; import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent; import org.apache.aurora.scheduler.storage.entities.IJobKey; @@ -94,6 +93,7 @@ import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary; import org.apache.aurora.scheduler.storage.entities.ILock; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; +import org.apache.aurora.scheduler.storage.mem.MemStorageModule; import org.apache.aurora.scheduler.testing.FakeScheduledExecutor; import org.apache.aurora.scheduler.testing.FakeStatsProvider; import org.apache.aurora.scheduler.updater.JobUpdateController.AuditData; @@ -183,7 +183,7 @@ public class JobUpdaterIT extends EasyMockTest { Injector injector = Guice.createInjector( new UpdaterModule(executor, options), - DbModule.testModuleWithWorkQueue(), + new MemStorageModule(), new AbstractModule() { @Override protected void configure() {
