Repository: aurora Updated Branches: refs/heads/master 94fe6c9df -> 297c0eb48
Extract an abstract base test for TaskStore implementations. Reviewed at https://reviews.apache.org/r/33600/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/297c0eb4 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/297c0eb4 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/297c0eb4 Branch: refs/heads/master Commit: 297c0eb485efe8ebe2ee82a0616049fdf85a7718 Parents: 94fe6c9 Author: Bill Farner <[email protected]> Authored: Mon Apr 27 16:00:34 2015 -0700 Committer: Bill Farner <[email protected]> Committed: Mon Apr 27 16:00:34 2015 -0700 ---------------------------------------------------------------------- .../aurora/scheduler/storage/db/DbStorage.java | 2 + .../storage/mem/InMemStoresModule.java | 2 +- .../storage/AbstractTaskStoreTest.java | 574 +++++++++++++++++++ .../storage/mem/InMemTaskStoreTest.java | 28 + .../scheduler/storage/mem/MemTaskStoreTest.java | 573 ------------------ 5 files changed, 605 insertions(+), 574 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/297c0eb4/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java index fbdbb05..1a6c3f2 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java @@ -71,6 +71,8 @@ class DbStorage extends AbstractIdleService implements Storage { this.sessionFactory = requireNonNull(sessionFactory); this.enumValueMapper = requireNonNull(enumValueMapper); + requireNonNull(cronJobStore); + requireNonNull(taskStore); requireNonNull(schedulerStore); requireNonNull(attributeStore); requireNonNull(lockStore); http://git-wip-us.apache.org/repos/asf/aurora/blob/297c0eb4/src/main/java/org/apache/aurora/scheduler/storage/mem/InMemStoresModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/InMemStoresModule.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/InMemStoresModule.java index 88fd006..21f7d4d 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/mem/InMemStoresModule.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/InMemStoresModule.java @@ -25,7 +25,7 @@ import org.apache.aurora.scheduler.storage.TaskStore; import static java.util.Objects.requireNonNull; /** - * Binding module for an in-memory storage system. + * Binding module for in-memory stores. * <p> * NOTE: These stores are being phased out in favor of database-backed stores. */ http://git-wip-us.apache.org/repos/asf/aurora/blob/297c0eb4/src/test/java/org/apache/aurora/scheduler/storage/AbstractTaskStoreTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/AbstractTaskStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/AbstractTaskStoreTest.java new file mode 100644 index 0000000..e3b1340 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/storage/AbstractTaskStoreTest.java @@ -0,0 +1,574 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.inject.Guice; +import com.google.inject.Module; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.util.concurrent.ExecutorServiceShutdown; + +import org.apache.aurora.gen.Attribute; +import org.apache.aurora.gen.Container; +import org.apache.aurora.gen.ExecutorConfig; +import org.apache.aurora.gen.HostAttributes; +import org.apache.aurora.gen.MaintenanceMode; +import org.apache.aurora.gen.MesosContainer; +import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.gen.ScheduledTask; +import org.apache.aurora.gen.TaskQuery; +import org.apache.aurora.scheduler.base.JobKeys; +import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.base.Tasks; +import org.apache.aurora.scheduler.storage.TaskStore.Mutable.TaskMutation; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; +import org.apache.aurora.scheduler.storage.testing.StorageEntityUtil; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import static org.apache.aurora.gen.ScheduleStatus.RUNNING; +import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public abstract class AbstractTaskStoreTest { + private static final IHostAttributes HOST_A = IHostAttributes.build( + new HostAttributes( + "hostA", + ImmutableSet.of(new Attribute("zone", ImmutableSet.of("1a")))) + .setSlaveId("slaveIdA") + .setMode(MaintenanceMode.NONE)); + private static final IHostAttributes HOST_B = IHostAttributes.build( + new HostAttributes( + "hostB", + ImmutableSet.of(new Attribute("zone", ImmutableSet.of("1a")))) + .setSlaveId("slaveIdB") + .setMode(MaintenanceMode.NONE)); + private static final IScheduledTask TASK_A = createTask("a"); + private static final IScheduledTask TASK_B = + setContainer(createTask("b"), Container.mesos(new MesosContainer())); + private static final IScheduledTask TASK_C = createTask("c"); + private static final IScheduledTask TASK_D = createTask("d"); + + private Storage storage; + + protected abstract Module getStorageModule(); + + @Before + public void baseSetUp() { + storage = Guice.createInjector(getStorageModule()).getInstance(Storage.class); + storage.prepare(); + + storage.write(new Storage.MutateWork.NoResult.Quiet() { + @Override + protected void execute(Storage.MutableStoreProvider storeProvider) { + AttributeStore.Mutable attributeStore = storeProvider.getAttributeStore(); + attributeStore.saveHostAttributes(HOST_A); + attributeStore.saveHostAttributes(HOST_B); + } + }); + } + + private Iterable<IScheduledTask> fetchTasks(final Query.Builder query) { + return storage.read(new Storage.Work.Quiet<Iterable<IScheduledTask>>() { + @Override + public Iterable<IScheduledTask> apply(Storage.StoreProvider storeProvider) { + return storeProvider.getTaskStore().fetchTasks(query); + } + }); + } + + private void saveTasks(final IScheduledTask... tasks) { + saveTasks(ImmutableSet.copyOf(tasks)); + } + + private void saveTasks(final Set<IScheduledTask> tasks) { + storage.write(new Storage.MutateWork.NoResult.Quiet() { + @Override + protected void execute(Storage.MutableStoreProvider storeProvider) { + storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.copyOf(tasks)); + } + }); + } + + private ImmutableSet<IScheduledTask> mutateTasks( + final Query.Builder query, + final TaskMutation mutation) { + + return storage.write(new Storage.MutateWork.Quiet<ImmutableSet<IScheduledTask>>() { + @Override + public ImmutableSet<IScheduledTask> apply(Storage.MutableStoreProvider storeProvider) { + return storeProvider.getUnsafeTaskStore().mutateTasks(query, mutation); + } + }); + } + + private boolean unsafeModifyInPlace(final String taskId, final ITaskConfig taskConfiguration) { + return storage.write(new Storage.MutateWork.Quiet<Boolean>() { + @Override + public Boolean apply(Storage.MutableStoreProvider storeProvider) { + return storeProvider.getUnsafeTaskStore().unsafeModifyInPlace(taskId, taskConfiguration); + } + }); + } + + private void deleteTasks(final String... taskIds) { + storage.write(new Storage.MutateWork.NoResult.Quiet() { + @Override + protected void execute(Storage.MutableStoreProvider storeProvider) { + storeProvider.getUnsafeTaskStore().deleteTasks(ImmutableSet.copyOf(taskIds)); + } + }); + } + + private void deleteAllTasks() { + storage.write(new Storage.MutateWork.NoResult.Quiet() { + @Override + protected void execute(Storage.MutableStoreProvider storeProvider) { + storeProvider.getUnsafeTaskStore().deleteAllTasks(); + } + }); + } + + @Test + public void testSave() { + IScheduledTask aWithHost = setHost(TASK_A, HOST_A); + StorageEntityUtil.assertFullyPopulated(aWithHost.newBuilder()); + + saveTasks(aWithHost, TASK_B); + assertStoreContents(aWithHost, TASK_B); + + saveTasks(TASK_C, TASK_D); + assertStoreContents(aWithHost, TASK_B, TASK_C, TASK_D); + + // Saving the same task should overwrite. + IScheduledTask taskAModified = IScheduledTask.build(aWithHost.newBuilder().setStatus(RUNNING)); + saveTasks(taskAModified); + assertStoreContents(taskAModified, TASK_B, TASK_C, TASK_D); + } + + @Test + public void testQuery() { + assertStoreContents(); + saveTasks(TASK_A, TASK_B, TASK_C, TASK_D); + + assertQueryResults(Query.taskScoped("b"), TASK_B); + assertQueryResults(Query.taskScoped("a", "d"), TASK_A, TASK_D); + assertQueryResults(Query.roleScoped("role-c"), TASK_C); + assertQueryResults(Query.envScoped("role-c", "env-c"), TASK_C); + assertQueryResults(Query.envScoped("role-c", "devel")); + assertQueryResults( + Query.unscoped().byStatus(ScheduleStatus.PENDING), + TASK_A, TASK_B, TASK_C, TASK_D); + assertQueryResults( + Query.instanceScoped(JobKeys.from("role-a", "env-a", "job-a"), 2).active(), TASK_A); + assertQueryResults(Query.jobScoped(JobKeys.from("role-b", "env-b", "job-b")).active(), TASK_B); + assertQueryResults(Query.jobScoped(JobKeys.from("role-b", "devel", "job-b")).active()); + + // Explicitly call out the current differing behaviors for types of empty query conditions. + // Specifically - null task IDs and empty task IDs are different than other 'IN' conditions.. + assertQueryResults(new TaskQuery().setTaskIds(null), TASK_A, TASK_B, TASK_C, TASK_D); + assertQueryResults(new TaskQuery().setTaskIds(ImmutableSet.<String>of())); + assertQueryResults( + new TaskQuery().setInstanceIds(ImmutableSet.<Integer>of()), + TASK_A, TASK_B, TASK_C, TASK_D); + assertQueryResults( + new TaskQuery().setStatuses(ImmutableSet.<ScheduleStatus>of()), + TASK_A, TASK_B, TASK_C, TASK_D); + } + + @Test + public void testQueryMultipleInstances() { + ImmutableSet.Builder<IScheduledTask> tasksBuilder = ImmutableSet.builder(); + for (int i = 0; i < 100; i++) { + ScheduledTask builder = TASK_A.newBuilder(); + builder.getAssignedTask() + .setTaskId("id" + i) + .setInstanceId(i); + tasksBuilder.add(IScheduledTask.build(builder)); + } + Set<IScheduledTask> tasks = tasksBuilder.build(); + saveTasks(tasks); + assertQueryResults(Query.unscoped(), tasks); + } + + @Test + public void testQueryBySlaveHost() { + IScheduledTask a = setHost(makeTask("a", JobKeys.from("role", "env", "job")), HOST_A); + IScheduledTask b = setHost(makeTask("b", JobKeys.from("role", "env", "job")), HOST_B); + saveTasks(a, b); + + assertQueryResults(Query.slaveScoped(HOST_A.getHost()), a); + assertQueryResults(Query.slaveScoped(HOST_A.getHost(), HOST_B.getHost()), a, b); + } + + @Test + public void testQueryByJobKeys() { + assertStoreContents(); + saveTasks(TASK_A, TASK_B, TASK_C, TASK_D); + + assertQueryResults( + Query.jobScoped(ImmutableSet.of( + JobKeys.from("role-a", "env-a", "job-a"), + JobKeys.from("role-b", "env-b", "job-b"), + JobKeys.from("role-c", "env-c", "job-c"))), + TASK_A, TASK_B, TASK_C); + + // Conflicting jobs will produce the result from the last added JobKey + assertQueryResults( + Query.jobScoped(JobKeys.from("role-a", "env-a", "job-a")) + .byJobKeys(ImmutableSet.of(JobKeys.from("role-b", "env-b", "job-b"))), + TASK_B); + + // The .byJobKeys will override the previous scoping and OR all of the keys. + assertQueryResults( + Query.jobScoped(JobKeys.from("role-a", "env-a", "job-a")) + .byJobKeys(ImmutableSet.of( + JobKeys.from("role-b", "env-b", "job-b"), + JobKeys.from("role-a", "env-a", "job-a"))), + TASK_A, TASK_B); + + // Combination of individual field and jobKeys is allowed. + assertQueryResults( + Query.roleScoped("role-b") + .byJobKeys(ImmutableSet.of( + JobKeys.from("role-b", "env-b", "job-b"), + JobKeys.from("role-a", "env-a", "job-a"))), + TASK_B); + } + + @Test + public void testMutate() { + saveTasks(TASK_A, TASK_B, TASK_C, TASK_D); + assertQueryResults(Query.statusScoped(RUNNING)); + + mutateTasks(Query.taskScoped("a"), new TaskMutation() { + @Override + public IScheduledTask apply(IScheduledTask task) { + return IScheduledTask.build(task.newBuilder().setStatus(RUNNING)); + } + }); + + assertQueryResults( + Query.statusScoped(RUNNING), + IScheduledTask.build(TASK_A.newBuilder().setStatus(RUNNING))); + + mutateTasks(Query.unscoped(), new TaskMutation() { + @Override + public IScheduledTask apply(IScheduledTask task) { + return IScheduledTask.build(task.newBuilder().setStatus(ScheduleStatus.ASSIGNED)); + } + }); + + assertStoreContents( + IScheduledTask.build(TASK_A.newBuilder().setStatus(ScheduleStatus.ASSIGNED)), + IScheduledTask.build(TASK_B.newBuilder().setStatus(ScheduleStatus.ASSIGNED)), + IScheduledTask.build(TASK_C.newBuilder().setStatus(ScheduleStatus.ASSIGNED)), + IScheduledTask.build(TASK_D.newBuilder().setStatus(ScheduleStatus.ASSIGNED))); + } + + @Test + public void testUnsafeModifyInPlace() { + ITaskConfig updated = ITaskConfig.build( + TASK_A.getAssignedTask() + .getTask() + .newBuilder() + .setExecutorConfig(new ExecutorConfig("aurora", "new_config"))); + + String taskId = Tasks.id(TASK_A); + assertFalse(unsafeModifyInPlace(taskId, updated)); + + saveTasks(TASK_A); + assertTrue(unsafeModifyInPlace(taskId, updated)); + Query.Builder query = Query.taskScoped(taskId); + ITaskConfig stored = + Iterables.getOnlyElement(fetchTasks(query)).getAssignedTask().getTask(); + assertEquals(updated, stored); + + deleteTasks(taskId); + assertFalse(unsafeModifyInPlace(taskId, updated)); + } + + @Test + public void testDelete() { + saveTasks(TASK_A, TASK_B, TASK_C, TASK_D); + deleteTasks("a"); + assertStoreContents(TASK_B, TASK_C, TASK_D); + deleteTasks("c"); + assertStoreContents(TASK_B, TASK_D); + deleteTasks("b", "d"); + assertStoreContents(); + } + + @Test + public void testDeleteAll() { + saveTasks(TASK_A, TASK_B, TASK_C, TASK_D); + deleteAllTasks(); + assertStoreContents(); + } + + @Test + public void testConsistentJobIndex() { + final IScheduledTask a = makeTask("a", JobKeys.from("jim", "test", "job")); + final IScheduledTask b = makeTask("b", JobKeys.from("jim", "test", "job")); + final IScheduledTask c = makeTask("c", JobKeys.from("jim", "test", "job2")); + final IScheduledTask d = makeTask("d", JobKeys.from("joe", "test", "job")); + final IScheduledTask e = makeTask("e", JobKeys.from("jim", "prod", "job")); + final Query.Builder jimsJob = Query.jobScoped(JobKeys.from("jim", "test", "job")); + final Query.Builder jimsJob2 = Query.jobScoped(JobKeys.from("jim", "test", "job2")); + final Query.Builder joesJob = Query.jobScoped(JobKeys.from("joe", "test", "job")); + + saveTasks(a, b, c, d, e); + assertQueryResults(jimsJob, a, b); + assertQueryResults(jimsJob2, c); + assertQueryResults(joesJob, d); + + deleteTasks(Tasks.id(b)); + assertQueryResults(jimsJob, a); + assertQueryResults(jimsJob2, c); + assertQueryResults(joesJob, d); + + mutateTasks(jimsJob, new TaskMutation() { + @Override + public IScheduledTask apply(IScheduledTask task) { + return IScheduledTask.build(task.newBuilder().setStatus(RUNNING)); + } + }); + IScheduledTask aRunning = IScheduledTask.build(a.newBuilder().setStatus(RUNNING)); + assertQueryResults(jimsJob, aRunning); + assertQueryResults(jimsJob2, c); + assertQueryResults(joesJob, d); + + deleteTasks(Tasks.id(d)); + assertQueryResults(joesJob); + + deleteTasks(Tasks.id(d)); + assertQueryResults(jimsJob, aRunning); + assertQueryResults(jimsJob2, c); + assertQueryResults(joesJob); + + saveTasks(b); + assertQueryResults(jimsJob, aRunning, b); + assertQueryResults(jimsJob2, c); + assertQueryResults(joesJob); + } + + @Test + public void testCanonicalTaskConfigs() { + IScheduledTask a = createTask("a"); + IScheduledTask b = createTask("a"); + IScheduledTask c = createTask("a"); + saveTasks(a, b, c); + Set<IScheduledTask> inserted = ImmutableSet.of(a, b, c); + + Set<ITaskConfig> storedConfigs = FluentIterable.from(fetchTasks(Query.unscoped())) + .transform(Tasks.SCHEDULED_TO_INFO) + .toSet(); + assertEquals( + FluentIterable.from(inserted).transform(Tasks.SCHEDULED_TO_INFO).toSet(), + storedConfigs); + Map<ITaskConfig, ITaskConfig> identityMap = Maps.newIdentityHashMap(); + for (ITaskConfig stored : storedConfigs) { + identityMap.put(stored, stored); + } + assertEquals( + ImmutableMap.of(Tasks.SCHEDULED_TO_INFO.apply(a), Tasks.SCHEDULED_TO_INFO.apply(a)), + identityMap); + } + + private static IScheduledTask setHost(IScheduledTask task, IHostAttributes host) { + ScheduledTask builder = task.newBuilder(); + builder.getAssignedTask() + .setSlaveHost(host.getHost()) + .setSlaveId(host.getSlaveId()); + return IScheduledTask.build(builder); + } + + private static IScheduledTask unsetHost(IScheduledTask task) { + ScheduledTask builder = task.newBuilder(); + builder.getAssignedTask() + .setSlaveHost(null) + .setSlaveId(null); + return IScheduledTask.build(builder); + } + + private static IScheduledTask setConfigData(IScheduledTask task, String configData) { + ScheduledTask builder = task.newBuilder(); + builder.getAssignedTask().getTask().getExecutorConfig().setData(configData); + return IScheduledTask.build(builder); + } + + @Test + public void testAddSlaveHost() { + final IScheduledTask a = createTask("a"); + saveTasks(a); + assertQueryResults(Query.slaveScoped(HOST_A.getHost())); + + final IScheduledTask b = setHost(a, HOST_A); + Set<IScheduledTask> result = mutateTasks(Query.taskScoped(Tasks.id(a)), + new TaskMutation() { + @Override + public IScheduledTask apply(IScheduledTask task) { + assertEquals(a, task); + return b; + } + }); + assertEquals(ImmutableSet.of(b), result); + assertQueryResults(Query.slaveScoped(HOST_A.getHost()), b); + + // Unrealistic behavior, but proving that the secondary index can handle key mutations. + final IScheduledTask c = setHost(b, HOST_B); + Set<IScheduledTask> result2 = mutateTasks(Query.taskScoped(Tasks.id(a)), + new TaskMutation() { + @Override + public IScheduledTask apply(IScheduledTask task) { + assertEquals(b, task); + return c; + } + }); + assertEquals(ImmutableSet.of(c), result2); + assertQueryResults(Query.slaveScoped(HOST_B.getHost()), c); + + deleteTasks(Tasks.id(a)); + assertQueryResults(Query.slaveScoped(HOST_B.getHost())); + } + + @Test + public void testUnsetSlaveHost() { + // Unrealistic behavior, but proving that the secondary index does not become stale. + + final IScheduledTask a = setHost(createTask("a"), HOST_A); + saveTasks(a); + assertQueryResults(Query.slaveScoped(HOST_A.getHost()), a); + + final IScheduledTask b = unsetHost(a); + Set<IScheduledTask> result = mutateTasks(Query.taskScoped(Tasks.id(a)), + new TaskMutation() { + @Override + public IScheduledTask apply(IScheduledTask task) { + assertEquals(a, task); + return b; + } + }); + assertEquals(ImmutableSet.of(b), result); + assertQueryResults(Query.slaveScoped(HOST_A.getHost())); + assertQueryResults(Query.taskScoped(Tasks.id(b)), b); + } + + @Test + public void testTasksOnSameHost() { + final IScheduledTask a = setHost(createTask("a"), HOST_A); + final IScheduledTask b = setHost(createTask("b"), HOST_A); + saveTasks(a, b); + assertQueryResults(Query.slaveScoped(HOST_A.getHost()), a, b); + } + + @Test + public void testSaveOverwrites() { + // Ensures that saving a task with an existing task ID is effectively the same as a mutate, + // and does not result in a duplicate object in the primary or secondary index. + + final IScheduledTask a = setHost(createTask("a"), HOST_A); + saveTasks(a); + + final IScheduledTask updated = setConfigData(a, "new config data"); + saveTasks(updated); + assertQueryResults(Query.taskScoped(Tasks.id(a)), updated); + assertQueryResults(Query.slaveScoped(HOST_A.getHost()), updated); + } + + @Ignore + @Test + public void testReadSecondaryIndexMultipleThreads() throws Exception { + ExecutorService executor = Executors.newFixedThreadPool(4, + new ThreadFactoryBuilder().setNameFormat("SlowRead-%d").setDaemon(true).build()); + + try { + ImmutableSet.Builder<IScheduledTask> builder = ImmutableSet.builder(); + final int numTasks = 100; + final int numJobs = 100; + for (int j = 0; j < numJobs; j++) { + for (int t = 0; t < numTasks; t++) { + builder.add(makeTask("" + j + "-" + t, JobKeys.from("role", "env", "name" + j))); + } + } + saveTasks(builder.build()); + + final CountDownLatch read = new CountDownLatch(numJobs); + for (int j = 0; j < numJobs; j++) { + final int id = j; + executor.submit(new Runnable() { + @Override + public void run() { + assertNotNull(fetchTasks(Query.jobScoped(JobKeys.from("role", "env", "name" + id)))); + read.countDown(); + } + }); + executor.submit(new Runnable() { + @Override + public void run() { + saveTasks(createTask("TaskNew1" + id)); + } + }); + } + + read.await(); + } finally { + new ExecutorServiceShutdown(executor, Amount.of(1L, Time.SECONDS)).execute(); + } + } + + private void assertStoreContents(IScheduledTask... tasks) { + assertQueryResults(Query.unscoped(), tasks); + } + + private void assertQueryResults(TaskQuery query, IScheduledTask... tasks) { + assertQueryResults(Query.arbitrary(query), tasks); + } + + private void assertQueryResults(Query.Builder query, IScheduledTask... tasks) { + assertQueryResults(query, ImmutableSet.copyOf(tasks)); + } + + private void assertQueryResults(Query.Builder query, Set<IScheduledTask> tasks) { + assertEquals(tasks, fetchTasks(query)); + } + + private static IScheduledTask createTask(String id) { + return makeTask(id, JobKeys.from("role-" + id, "env-" + id, "job-" + id)); + } + + private static IScheduledTask setContainer(IScheduledTask task, Container container) { + ScheduledTask builder = task.newBuilder(); + builder.getAssignedTask().getTask().setContainer(container); + return IScheduledTask.build(builder); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/297c0eb4/src/test/java/org/apache/aurora/scheduler/storage/mem/InMemTaskStoreTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/mem/InMemTaskStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/mem/InMemTaskStoreTest.java new file mode 100644 index 0000000..2014b73 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/storage/mem/InMemTaskStoreTest.java @@ -0,0 +1,28 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.mem; + +import com.google.inject.Module; +import com.google.inject.util.Modules; +import com.twitter.common.inject.Bindings.KeyFactory; + +import org.apache.aurora.scheduler.storage.AbstractTaskStoreTest; +import org.apache.aurora.scheduler.storage.db.DbModule; + +public class InMemTaskStoreTest extends AbstractTaskStoreTest { + @Override + protected Module getStorageModule() { + return Modules.combine(DbModule.testModule(KeyFactory.PLAIN)); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/297c0eb4/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java deleted file mode 100644 index 688a02f..0000000 --- a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java +++ /dev/null @@ -1,573 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.storage.mem; - -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.util.concurrent.ExecutorServiceShutdown; - -import org.apache.aurora.gen.Attribute; -import org.apache.aurora.gen.Container; -import org.apache.aurora.gen.ExecutorConfig; -import org.apache.aurora.gen.HostAttributes; -import org.apache.aurora.gen.MaintenanceMode; -import org.apache.aurora.gen.MesosContainer; -import org.apache.aurora.gen.ScheduleStatus; -import org.apache.aurora.gen.ScheduledTask; -import org.apache.aurora.gen.TaskQuery; -import org.apache.aurora.scheduler.base.JobKeys; -import org.apache.aurora.scheduler.base.Query; -import org.apache.aurora.scheduler.base.Tasks; -import org.apache.aurora.scheduler.storage.AttributeStore; -import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.TaskStore.Mutable.TaskMutation; -import org.apache.aurora.scheduler.storage.db.DbUtil; -import org.apache.aurora.scheduler.storage.entities.IHostAttributes; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import org.apache.aurora.scheduler.storage.entities.ITaskConfig; -import org.apache.aurora.scheduler.storage.testing.StorageEntityUtil; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; - -import static org.apache.aurora.gen.ScheduleStatus.RUNNING; -import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -public class MemTaskStoreTest { - - private static final IHostAttributes HOST_A = IHostAttributes.build( - new HostAttributes( - "hostA", - ImmutableSet.of(new Attribute("zone", ImmutableSet.of("1a")))) - .setSlaveId("slaveIdA") - .setMode(MaintenanceMode.NONE)); - private static final IHostAttributes HOST_B = IHostAttributes.build( - new HostAttributes( - "hostB", - ImmutableSet.of(new Attribute("zone", ImmutableSet.of("1a")))) - .setSlaveId("slaveIdB") - .setMode(MaintenanceMode.NONE)); - private static final IScheduledTask TASK_A = createTask("a"); - private static final IScheduledTask TASK_B = - setContainer(createTask("b"), Container.mesos(new MesosContainer())); - private static final IScheduledTask TASK_C = createTask("c"); - private static final IScheduledTask TASK_D = createTask("d"); - - private Storage storage; - - @Before - public void setUp() { - storage = DbUtil.createStorage(); - - storage.write(new Storage.MutateWork.NoResult.Quiet() { - @Override - protected void execute(Storage.MutableStoreProvider storeProvider) { - AttributeStore.Mutable attributeStore = storeProvider.getAttributeStore(); - attributeStore.saveHostAttributes(HOST_A); - attributeStore.saveHostAttributes(HOST_B); - } - }); - } - - private Iterable<IScheduledTask> fetchTasks(final Query.Builder query) { - return storage.read(new Storage.Work.Quiet<Iterable<IScheduledTask>>() { - @Override - public Iterable<IScheduledTask> apply(Storage.StoreProvider storeProvider) { - return storeProvider.getTaskStore().fetchTasks(query); - } - }); - } - - private void saveTasks(final IScheduledTask... tasks) { - saveTasks(ImmutableSet.copyOf(tasks)); - } - - private void saveTasks(final Set<IScheduledTask> tasks) { - storage.write(new Storage.MutateWork.NoResult.Quiet() { - @Override - protected void execute(Storage.MutableStoreProvider storeProvider) { - storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.copyOf(tasks)); - } - }); - } - - private ImmutableSet<IScheduledTask> mutateTasks( - final Query.Builder query, - final TaskMutation mutation) { - - return storage.write(new Storage.MutateWork.Quiet<ImmutableSet<IScheduledTask>>() { - @Override - public ImmutableSet<IScheduledTask> apply(Storage.MutableStoreProvider storeProvider) { - return storeProvider.getUnsafeTaskStore().mutateTasks(query, mutation); - } - }); - } - - private boolean unsafeModifyInPlace(final String taskId, final ITaskConfig taskConfiguration) { - return storage.write(new Storage.MutateWork.Quiet<Boolean>() { - @Override - public Boolean apply(Storage.MutableStoreProvider storeProvider) { - return storeProvider.getUnsafeTaskStore().unsafeModifyInPlace(taskId, taskConfiguration); - } - }); - } - - private void deleteTasks(final String... taskIds) { - storage.write(new Storage.MutateWork.NoResult.Quiet() { - @Override - protected void execute(Storage.MutableStoreProvider storeProvider) { - storeProvider.getUnsafeTaskStore().deleteTasks(ImmutableSet.copyOf(taskIds)); - } - }); - } - - private void deleteAllTasks() { - storage.write(new Storage.MutateWork.NoResult.Quiet() { - @Override - protected void execute(Storage.MutableStoreProvider storeProvider) { - storeProvider.getUnsafeTaskStore().deleteAllTasks(); - } - }); - } - - @Test - public void testSave() { - IScheduledTask aWithHost = setHost(TASK_A, HOST_A); - StorageEntityUtil.assertFullyPopulated(aWithHost.newBuilder()); - - saveTasks(aWithHost, TASK_B); - assertStoreContents(aWithHost, TASK_B); - - saveTasks(TASK_C, TASK_D); - assertStoreContents(aWithHost, TASK_B, TASK_C, TASK_D); - - // Saving the same task should overwrite. - IScheduledTask taskAModified = IScheduledTask.build(aWithHost.newBuilder().setStatus(RUNNING)); - saveTasks(taskAModified); - assertStoreContents(taskAModified, TASK_B, TASK_C, TASK_D); - } - - @Test - public void testQuery() { - assertStoreContents(); - saveTasks(TASK_A, TASK_B, TASK_C, TASK_D); - - assertQueryResults(Query.taskScoped("b"), TASK_B); - assertQueryResults(Query.taskScoped("a", "d"), TASK_A, TASK_D); - assertQueryResults(Query.roleScoped("role-c"), TASK_C); - assertQueryResults(Query.envScoped("role-c", "env-c"), TASK_C); - assertQueryResults(Query.envScoped("role-c", "devel")); - assertQueryResults( - Query.unscoped().byStatus(ScheduleStatus.PENDING), - TASK_A, TASK_B, TASK_C, TASK_D); - assertQueryResults( - Query.instanceScoped(JobKeys.from("role-a", "env-a", "job-a"), 2).active(), TASK_A); - assertQueryResults(Query.jobScoped(JobKeys.from("role-b", "env-b", "job-b")).active(), TASK_B); - assertQueryResults(Query.jobScoped(JobKeys.from("role-b", "devel", "job-b")).active()); - - // Explicitly call out the current differing behaviors for types of empty query conditions. - // Specifically - null task IDs and empty task IDs are different than other 'IN' conditions.. - assertQueryResults(new TaskQuery().setTaskIds(null), TASK_A, TASK_B, TASK_C, TASK_D); - assertQueryResults(new TaskQuery().setTaskIds(ImmutableSet.<String>of())); - assertQueryResults( - new TaskQuery().setInstanceIds(ImmutableSet.<Integer>of()), - TASK_A, TASK_B, TASK_C, TASK_D); - assertQueryResults( - new TaskQuery().setStatuses(ImmutableSet.<ScheduleStatus>of()), - TASK_A, TASK_B, TASK_C, TASK_D); - } - - @Test - public void testQueryMultipleInstances() { - ImmutableSet.Builder<IScheduledTask> tasksBuilder = ImmutableSet.builder(); - for (int i = 0; i < 100; i++) { - ScheduledTask builder = TASK_A.newBuilder(); - builder.getAssignedTask() - .setTaskId("id" + i) - .setInstanceId(i); - tasksBuilder.add(IScheduledTask.build(builder)); - } - Set<IScheduledTask> tasks = tasksBuilder.build(); - saveTasks(tasks); - assertQueryResults(Query.unscoped(), tasks); - } - - @Test - public void testQueryBySlaveHost() { - IScheduledTask a = setHost(makeTask("a", JobKeys.from("role", "env", "job")), HOST_A); - IScheduledTask b = setHost(makeTask("b", JobKeys.from("role", "env", "job")), HOST_B); - saveTasks(a, b); - - assertQueryResults(Query.slaveScoped(HOST_A.getHost()), a); - assertQueryResults(Query.slaveScoped(HOST_A.getHost(), HOST_B.getHost()), a, b); - } - - @Test - public void testQueryByJobKeys() { - assertStoreContents(); - saveTasks(TASK_A, TASK_B, TASK_C, TASK_D); - - assertQueryResults( - Query.jobScoped(ImmutableSet.of( - JobKeys.from("role-a", "env-a", "job-a"), - JobKeys.from("role-b", "env-b", "job-b"), - JobKeys.from("role-c", "env-c", "job-c"))), - TASK_A, TASK_B, TASK_C); - - // Conflicting jobs will produce the result from the last added JobKey - assertQueryResults( - Query.jobScoped(JobKeys.from("role-a", "env-a", "job-a")) - .byJobKeys(ImmutableSet.of(JobKeys.from("role-b", "env-b", "job-b"))), - TASK_B); - - // The .byJobKeys will override the previous scoping and OR all of the keys. - assertQueryResults( - Query.jobScoped(JobKeys.from("role-a", "env-a", "job-a")) - .byJobKeys(ImmutableSet.of( - JobKeys.from("role-b", "env-b", "job-b"), - JobKeys.from("role-a", "env-a", "job-a"))), - TASK_A, TASK_B); - - // Combination of individual field and jobKeys is allowed. - assertQueryResults( - Query.roleScoped("role-b") - .byJobKeys(ImmutableSet.of( - JobKeys.from("role-b", "env-b", "job-b"), - JobKeys.from("role-a", "env-a", "job-a"))), - TASK_B); - } - - @Test - public void testMutate() { - saveTasks(TASK_A, TASK_B, TASK_C, TASK_D); - assertQueryResults(Query.statusScoped(RUNNING)); - - mutateTasks(Query.taskScoped("a"), new TaskMutation() { - @Override - public IScheduledTask apply(IScheduledTask task) { - return IScheduledTask.build(task.newBuilder().setStatus(RUNNING)); - } - }); - - assertQueryResults( - Query.statusScoped(RUNNING), - IScheduledTask.build(TASK_A.newBuilder().setStatus(RUNNING))); - - mutateTasks(Query.unscoped(), new TaskMutation() { - @Override - public IScheduledTask apply(IScheduledTask task) { - return IScheduledTask.build(task.newBuilder().setStatus(ScheduleStatus.ASSIGNED)); - } - }); - - assertStoreContents( - IScheduledTask.build(TASK_A.newBuilder().setStatus(ScheduleStatus.ASSIGNED)), - IScheduledTask.build(TASK_B.newBuilder().setStatus(ScheduleStatus.ASSIGNED)), - IScheduledTask.build(TASK_C.newBuilder().setStatus(ScheduleStatus.ASSIGNED)), - IScheduledTask.build(TASK_D.newBuilder().setStatus(ScheduleStatus.ASSIGNED))); - } - - @Test - public void testUnsafeModifyInPlace() { - ITaskConfig updated = ITaskConfig.build( - TASK_A.getAssignedTask() - .getTask() - .newBuilder() - .setExecutorConfig(new ExecutorConfig("aurora", "new_config"))); - - String taskId = Tasks.id(TASK_A); - assertFalse(unsafeModifyInPlace(taskId, updated)); - - saveTasks(TASK_A); - assertTrue(unsafeModifyInPlace(taskId, updated)); - Query.Builder query = Query.taskScoped(taskId); - ITaskConfig stored = - Iterables.getOnlyElement(fetchTasks(query)).getAssignedTask().getTask(); - assertEquals(updated, stored); - - deleteTasks(taskId); - assertFalse(unsafeModifyInPlace(taskId, updated)); - } - - @Test - public void testDelete() { - saveTasks(TASK_A, TASK_B, TASK_C, TASK_D); - deleteTasks("a"); - assertStoreContents(TASK_B, TASK_C, TASK_D); - deleteTasks("c"); - assertStoreContents(TASK_B, TASK_D); - deleteTasks("b", "d"); - assertStoreContents(); - } - - @Test - public void testDeleteAll() { - saveTasks(TASK_A, TASK_B, TASK_C, TASK_D); - deleteAllTasks(); - assertStoreContents(); - } - - @Test - public void testConsistentJobIndex() { - final IScheduledTask a = makeTask("a", JobKeys.from("jim", "test", "job")); - final IScheduledTask b = makeTask("b", JobKeys.from("jim", "test", "job")); - final IScheduledTask c = makeTask("c", JobKeys.from("jim", "test", "job2")); - final IScheduledTask d = makeTask("d", JobKeys.from("joe", "test", "job")); - final IScheduledTask e = makeTask("e", JobKeys.from("jim", "prod", "job")); - final Query.Builder jimsJob = Query.jobScoped(JobKeys.from("jim", "test", "job")); - final Query.Builder jimsJob2 = Query.jobScoped(JobKeys.from("jim", "test", "job2")); - final Query.Builder joesJob = Query.jobScoped(JobKeys.from("joe", "test", "job")); - - saveTasks(a, b, c, d, e); - assertQueryResults(jimsJob, a, b); - assertQueryResults(jimsJob2, c); - assertQueryResults(joesJob, d); - - deleteTasks(Tasks.id(b)); - assertQueryResults(jimsJob, a); - assertQueryResults(jimsJob2, c); - assertQueryResults(joesJob, d); - - mutateTasks(jimsJob, new TaskMutation() { - @Override - public IScheduledTask apply(IScheduledTask task) { - return IScheduledTask.build(task.newBuilder().setStatus(RUNNING)); - } - }); - IScheduledTask aRunning = IScheduledTask.build(a.newBuilder().setStatus(RUNNING)); - assertQueryResults(jimsJob, aRunning); - assertQueryResults(jimsJob2, c); - assertQueryResults(joesJob, d); - - deleteTasks(Tasks.id(d)); - assertQueryResults(joesJob); - - deleteTasks(Tasks.id(d)); - assertQueryResults(jimsJob, aRunning); - assertQueryResults(jimsJob2, c); - assertQueryResults(joesJob); - - saveTasks(b); - assertQueryResults(jimsJob, aRunning, b); - assertQueryResults(jimsJob2, c); - assertQueryResults(joesJob); - } - - @Test - public void testCanonicalTaskConfigs() { - IScheduledTask a = createTask("a"); - IScheduledTask b = createTask("a"); - IScheduledTask c = createTask("a"); - saveTasks(a, b, c); - Set<IScheduledTask> inserted = ImmutableSet.of(a, b, c); - - Set<ITaskConfig> storedConfigs = FluentIterable.from(fetchTasks(Query.unscoped())) - .transform(Tasks.SCHEDULED_TO_INFO) - .toSet(); - assertEquals( - FluentIterable.from(inserted).transform(Tasks.SCHEDULED_TO_INFO).toSet(), - storedConfigs); - Map<ITaskConfig, ITaskConfig> identityMap = Maps.newIdentityHashMap(); - for (ITaskConfig stored : storedConfigs) { - identityMap.put(stored, stored); - } - assertEquals( - ImmutableMap.of(Tasks.SCHEDULED_TO_INFO.apply(a), Tasks.SCHEDULED_TO_INFO.apply(a)), - identityMap); - } - - private static IScheduledTask setHost(IScheduledTask task, IHostAttributes host) { - ScheduledTask builder = task.newBuilder(); - builder.getAssignedTask() - .setSlaveHost(host.getHost()) - .setSlaveId(host.getSlaveId()); - return IScheduledTask.build(builder); - } - - private static IScheduledTask unsetHost(IScheduledTask task) { - ScheduledTask builder = task.newBuilder(); - builder.getAssignedTask() - .setSlaveHost(null) - .setSlaveId(null); - return IScheduledTask.build(builder); - } - - private static IScheduledTask setConfigData(IScheduledTask task, String configData) { - ScheduledTask builder = task.newBuilder(); - builder.getAssignedTask().getTask().getExecutorConfig().setData(configData); - return IScheduledTask.build(builder); - } - - @Test - public void testAddSlaveHost() { - final IScheduledTask a = createTask("a"); - saveTasks(a); - assertQueryResults(Query.slaveScoped(HOST_A.getHost())); - - final IScheduledTask b = setHost(a, HOST_A); - Set<IScheduledTask> result = mutateTasks(Query.taskScoped(Tasks.id(a)), - new TaskMutation() { - @Override - public IScheduledTask apply(IScheduledTask task) { - assertEquals(a, task); - return b; - } - }); - assertEquals(ImmutableSet.of(b), result); - assertQueryResults(Query.slaveScoped(HOST_A.getHost()), b); - - // Unrealistic behavior, but proving that the secondary index can handle key mutations. - final IScheduledTask c = setHost(b, HOST_B); - Set<IScheduledTask> result2 = mutateTasks(Query.taskScoped(Tasks.id(a)), - new TaskMutation() { - @Override - public IScheduledTask apply(IScheduledTask task) { - assertEquals(b, task); - return c; - } - }); - assertEquals(ImmutableSet.of(c), result2); - assertQueryResults(Query.slaveScoped(HOST_B.getHost()), c); - - deleteTasks(Tasks.id(a)); - assertQueryResults(Query.slaveScoped(HOST_B.getHost())); - } - - @Test - public void testUnsetSlaveHost() { - // Unrealistic behavior, but proving that the secondary index does not become stale. - - final IScheduledTask a = setHost(createTask("a"), HOST_A); - saveTasks(a); - assertQueryResults(Query.slaveScoped(HOST_A.getHost()), a); - - final IScheduledTask b = unsetHost(a); - Set<IScheduledTask> result = mutateTasks(Query.taskScoped(Tasks.id(a)), - new TaskMutation() { - @Override - public IScheduledTask apply(IScheduledTask task) { - assertEquals(a, task); - return b; - } - }); - assertEquals(ImmutableSet.of(b), result); - assertQueryResults(Query.slaveScoped(HOST_A.getHost())); - assertQueryResults(Query.taskScoped(Tasks.id(b)), b); - } - - @Test - public void testTasksOnSameHost() { - final IScheduledTask a = setHost(createTask("a"), HOST_A); - final IScheduledTask b = setHost(createTask("b"), HOST_A); - saveTasks(a, b); - assertQueryResults(Query.slaveScoped(HOST_A.getHost()), a, b); - } - - @Test - public void testSaveOverwrites() { - // Ensures that saving a task with an existing task ID is effectively the same as a mutate, - // and does not result in a duplicate object in the primary or secondary index. - - final IScheduledTask a = setHost(createTask("a"), HOST_A); - saveTasks(a); - - final IScheduledTask updated = setConfigData(a, "new config data"); - saveTasks(updated); - assertQueryResults(Query.taskScoped(Tasks.id(a)), updated); - assertQueryResults(Query.slaveScoped(HOST_A.getHost()), updated); - } - - @Ignore - @Test - public void testReadSecondaryIndexMultipleThreads() throws Exception { - ExecutorService executor = Executors.newFixedThreadPool(4, - new ThreadFactoryBuilder().setNameFormat("SlowRead-%d").setDaemon(true).build()); - - try { - ImmutableSet.Builder<IScheduledTask> builder = ImmutableSet.builder(); - final int numTasks = 100; - final int numJobs = 100; - for (int j = 0; j < numJobs; j++) { - for (int t = 0; t < numTasks; t++) { - builder.add(makeTask("" + j + "-" + t, JobKeys.from("role", "env", "name" + j))); - } - } - saveTasks(builder.build()); - - final CountDownLatch read = new CountDownLatch(numJobs); - for (int j = 0; j < numJobs; j++) { - final int id = j; - executor.submit(new Runnable() { - @Override - public void run() { - assertNotNull(fetchTasks(Query.jobScoped(JobKeys.from("role", "env", "name" + id)))); - read.countDown(); - } - }); - executor.submit(new Runnable() { - @Override - public void run() { - saveTasks(createTask("TaskNew1" + id)); - } - }); - } - - read.await(); - } finally { - new ExecutorServiceShutdown(executor, Amount.of(1L, Time.SECONDS)).execute(); - } - } - - private void assertStoreContents(IScheduledTask... tasks) { - assertQueryResults(Query.unscoped(), tasks); - } - - private void assertQueryResults(TaskQuery query, IScheduledTask... tasks) { - assertQueryResults(Query.arbitrary(query), tasks); - } - - private void assertQueryResults(Query.Builder query, IScheduledTask... tasks) { - assertQueryResults(query, ImmutableSet.copyOf(tasks)); - } - - private void assertQueryResults(Query.Builder query, Set<IScheduledTask> tasks) { - assertEquals(tasks, fetchTasks(query)); - } - - private static IScheduledTask createTask(String id) { - return makeTask(id, JobKeys.from("role-" + id, "env-" + id, "job-" + id)); - } - - private static IScheduledTask setContainer(IScheduledTask task, Container container) { - ScheduledTask builder = task.newBuilder(); - builder.getAssignedTask().getTask().setContainer(container); - return IScheduledTask.build(builder); - } -}
