Repository: aurora Updated Branches: refs/heads/master 3380572a9 -> 449a835e9
http://git-wip-us.apache.org/repos/asf/aurora/blob/449a835e/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java index 83b238f..20c9204 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java @@ -19,43 +19,46 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import com.google.common.base.Function; -import com.google.common.base.Optional; import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.twitter.common.inject.Bindings; +import com.twitter.common.inject.Bindings.KeyFactory; import com.twitter.common.quantity.Amount; import com.twitter.common.quantity.Time; import com.twitter.common.util.concurrent.ExecutorServiceShutdown; -import org.apache.aurora.gen.AssignedTask; -import org.apache.aurora.gen.Constraint; +import org.apache.aurora.gen.Attribute; +import org.apache.aurora.gen.Container; import org.apache.aurora.gen.ExecutorConfig; -import org.apache.aurora.gen.Identity; -import org.apache.aurora.gen.JobKey; -import org.apache.aurora.gen.Metadata; +import org.apache.aurora.gen.HostAttributes; +import org.apache.aurora.gen.MaintenanceMode; +import org.apache.aurora.gen.MesosContainer; import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.gen.ScheduledTask; -import org.apache.aurora.gen.TaskConfig; -import org.apache.aurora.gen.TaskConstraint; -import org.apache.aurora.gen.TaskEvent; import org.apache.aurora.gen.TaskQuery; -import org.apache.aurora.gen.ValueConstraint; import org.apache.aurora.scheduler.base.JobKeys; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.Tasks; +import org.apache.aurora.scheduler.storage.AttributeStore; +import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.TaskStore.Mutable.TaskMutation; +import org.apache.aurora.scheduler.storage.db.DbModule; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import org.apache.aurora.scheduler.storage.testing.StorageEntityUtil; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import static org.apache.aurora.gen.ScheduleStatus.RUNNING; +import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -63,38 +66,126 @@ import static org.junit.Assert.assertTrue; public class MemTaskStoreTest { - private static final IScheduledTask TASK_A = makeTask("a"); - private static final IScheduledTask TASK_B = makeTask("b"); - private static final IScheduledTask TASK_C = makeTask("c"); - private static final IScheduledTask TASK_D = makeTask("d"); - - private MemTaskStore store; + private static final IHostAttributes HOST_A = IHostAttributes.build( + new HostAttributes( + "hostA", + ImmutableSet.of(new Attribute("zone", ImmutableSet.of("1a")))) + .setSlaveId("slaveIdA") + .setMode(MaintenanceMode.NONE)); + private static final IHostAttributes HOST_B = IHostAttributes.build( + new HostAttributes( + "hostB", + ImmutableSet.of(new Attribute("zone", ImmutableSet.of("1a")))) + .setSlaveId("slaveIdB") + .setMode(MaintenanceMode.NONE)); + private static final IScheduledTask TASK_A = createTask("a"); + private static final IScheduledTask TASK_B = + setContainer(createTask("b"), Container.mesos(new MesosContainer())); + private static final IScheduledTask TASK_C = createTask("c"); + private static final IScheduledTask TASK_D = createTask("d"); + + private Storage storage; @Before public void setUp() { - store = new MemTaskStore(); + Injector injector = Guice.createInjector( + new MemStorageModule(KeyFactory.PLAIN), + DbModule.testModule(Bindings.annotatedKeyFactory(MemStorage.Delegated.class))); + storage = injector.getInstance(Storage.class); + storage.prepare(); + + storage.write(new Storage.MutateWork.NoResult.Quiet() { + @Override + protected void execute(Storage.MutableStoreProvider storeProvider) { + AttributeStore.Mutable attributeStore = storeProvider.getAttributeStore(); + attributeStore.saveHostAttributes(HOST_A); + attributeStore.saveHostAttributes(HOST_B); + } + }); + } + + private Iterable<IScheduledTask> fetchTasks(final Query.Builder query) { + return storage.read(new Storage.Work.Quiet<Iterable<IScheduledTask>>() { + @Override + public Iterable<IScheduledTask> apply(Storage.StoreProvider storeProvider) { + return storeProvider.getTaskStore().fetchTasks(query); + } + }); + } + + private void saveTasks(final IScheduledTask... tasks) { + saveTasks(ImmutableSet.copyOf(tasks)); + } + + private void saveTasks(final Set<IScheduledTask> tasks) { + storage.write(new Storage.MutateWork.NoResult.Quiet() { + @Override + protected void execute(Storage.MutableStoreProvider storeProvider) { + storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.copyOf(tasks)); + } + }); + } + + private ImmutableSet<IScheduledTask> mutateTasks( + final Query.Builder query, + final TaskMutation mutation) { + + return storage.write(new Storage.MutateWork.Quiet<ImmutableSet<IScheduledTask>>() { + @Override + public ImmutableSet<IScheduledTask> apply(Storage.MutableStoreProvider storeProvider) { + return storeProvider.getUnsafeTaskStore().mutateTasks(query, mutation); + } + }); + } + + private boolean unsafeModifyInPlace(final String taskId, final ITaskConfig taskConfiguration) { + return storage.write(new Storage.MutateWork.Quiet<Boolean>() { + @Override + public Boolean apply(Storage.MutableStoreProvider storeProvider) { + return storeProvider.getUnsafeTaskStore().unsafeModifyInPlace(taskId, taskConfiguration); + } + }); + } + + private void deleteTasks(final String... taskIds) { + storage.write(new Storage.MutateWork.NoResult.Quiet() { + @Override + protected void execute(Storage.MutableStoreProvider storeProvider) { + storeProvider.getUnsafeTaskStore().deleteTasks(ImmutableSet.copyOf(taskIds)); + } + }); + } + + private void deleteAllTasks() { + storage.write(new Storage.MutateWork.NoResult.Quiet() { + @Override + protected void execute(Storage.MutableStoreProvider storeProvider) { + storeProvider.getUnsafeTaskStore().deleteAllTasks(); + } + }); } @Test public void testSave() { - StorageEntityUtil.assertFullyPopulated(TASK_A.newBuilder()); - store.saveTasks(ImmutableSet.of(TASK_A, TASK_B)); - assertStoreContents(TASK_A, TASK_B); + IScheduledTask aWithHost = setHost(TASK_A, HOST_A); + StorageEntityUtil.assertFullyPopulated(aWithHost.newBuilder()); - store.saveTasks(ImmutableSet.of(TASK_C, TASK_D)); - assertStoreContents(TASK_A, TASK_B, TASK_C, TASK_D); + saveTasks(aWithHost, TASK_B); + assertStoreContents(aWithHost, TASK_B); + + saveTasks(TASK_C, TASK_D); + assertStoreContents(aWithHost, TASK_B, TASK_C, TASK_D); // Saving the same task should overwrite. - IScheduledTask taskAModified = - IScheduledTask.build(TASK_A.newBuilder().setStatus(RUNNING)); - store.saveTasks(ImmutableSet.of(taskAModified)); + IScheduledTask taskAModified = IScheduledTask.build(aWithHost.newBuilder().setStatus(RUNNING)); + saveTasks(taskAModified); assertStoreContents(taskAModified, TASK_B, TASK_C, TASK_D); } @Test public void testQuery() { assertStoreContents(); - store.saveTasks(ImmutableSet.of(TASK_A, TASK_B, TASK_C, TASK_D)); + saveTasks(TASK_A, TASK_B, TASK_C, TASK_D); assertQueryResults(Query.taskScoped("b"), TASK_B); assertQueryResults(Query.taskScoped("a", "d"), TASK_A, TASK_D); @@ -105,7 +196,7 @@ public class MemTaskStoreTest { Query.unscoped().byStatus(ScheduleStatus.PENDING), TASK_A, TASK_B, TASK_C, TASK_D); assertQueryResults( - Query.instanceScoped(JobKeys.from("role-a", "env-a", "job-a"), 0).active(), TASK_A); + Query.instanceScoped(JobKeys.from("role-a", "env-a", "job-a"), 2).active(), TASK_A); assertQueryResults(Query.jobScoped(JobKeys.from("role-b", "env-b", "job-b")).active(), TASK_B); assertQueryResults(Query.jobScoped(JobKeys.from("role-b", "devel", "job-b")).active()); @@ -122,21 +213,34 @@ public class MemTaskStoreTest { } @Test + public void testQueryMultipleInstances() { + ImmutableSet.Builder<IScheduledTask> tasksBuilder = ImmutableSet.builder(); + for (int i = 0; i < 100; i++) { + ScheduledTask builder = TASK_A.newBuilder(); + builder.getAssignedTask() + .setTaskId("id" + i) + .setInstanceId(i); + tasksBuilder.add(IScheduledTask.build(builder)); + } + Set<IScheduledTask> tasks = tasksBuilder.build(); + saveTasks(tasks); + assertQueryResults(Query.unscoped(), tasks); + } + + @Test public void testQueryBySlaveHost() { - String hostA = "slaveA"; - String hostB = "slaveB"; - final IScheduledTask a = setHost(makeTask("a", "role", "env", "job"), Optional.of(hostA)); - final IScheduledTask b = setHost(makeTask("b", "role", "env", "job"), Optional.of(hostB)); - store.saveTasks(ImmutableSet.of(a, b)); + IScheduledTask a = setHost(makeTask("a", JobKeys.from("role", "env", "job")), HOST_A); + IScheduledTask b = setHost(makeTask("b", JobKeys.from("role", "env", "job")), HOST_B); + saveTasks(a, b); - assertQueryResults(Query.slaveScoped(hostA), a); - assertQueryResults(Query.slaveScoped(hostA, hostB), a, b); + assertQueryResults(Query.slaveScoped(HOST_A.getHost()), a); + assertQueryResults(Query.slaveScoped(HOST_A.getHost(), HOST_B.getHost()), a, b); } @Test public void testQueryByJobKeys() { assertStoreContents(); - store.saveTasks(ImmutableSet.of(TASK_A, TASK_B, TASK_C, TASK_D)); + saveTasks(TASK_A, TASK_B, TASK_C, TASK_D); assertQueryResults( Query.jobScoped(ImmutableSet.of( @@ -147,8 +251,8 @@ public class MemTaskStoreTest { // Conflicting jobs will produce the result from the last added JobKey assertQueryResults( - Query.jobScoped(JobKeys.from("role-a", "env-a", "job-a")) - .byJobKeys(ImmutableSet.of(JobKeys.from("role-b", "env-b", "job-b"))), + Query.jobScoped(JobKeys.from("role-a", "env-a", "job-a")) + .byJobKeys(ImmutableSet.of(JobKeys.from("role-b", "env-b", "job-b"))), TASK_B); // The .byJobKeys will override the previous scoping and OR all of the keys. @@ -170,10 +274,10 @@ public class MemTaskStoreTest { @Test public void testMutate() { - store.saveTasks(ImmutableSet.of(TASK_A, TASK_B, TASK_C, TASK_D)); + saveTasks(TASK_A, TASK_B, TASK_C, TASK_D); assertQueryResults(Query.statusScoped(RUNNING)); - store.mutateTasks(Query.taskScoped("a"), new TaskMutation() { + mutateTasks(Query.taskScoped("a"), new TaskMutation() { @Override public IScheduledTask apply(IScheduledTask task) { return IScheduledTask.build(task.newBuilder().setStatus(RUNNING)); @@ -184,7 +288,7 @@ public class MemTaskStoreTest { Query.statusScoped(RUNNING), IScheduledTask.build(TASK_A.newBuilder().setStatus(RUNNING))); - store.mutateTasks(Query.unscoped(), new TaskMutation() { + mutateTasks(Query.unscoped(), new TaskMutation() { @Override public IScheduledTask apply(IScheduledTask task) { return IScheduledTask.build(task.newBuilder().setStatus(ScheduleStatus.ASSIGNED)); @@ -207,59 +311,59 @@ public class MemTaskStoreTest { .setExecutorConfig(new ExecutorConfig("aurora", "new_config"))); String taskId = Tasks.id(TASK_A); - assertFalse(store.unsafeModifyInPlace(taskId, updated)); + assertFalse(unsafeModifyInPlace(taskId, updated)); - store.saveTasks(ImmutableSet.of(TASK_A)); - assertTrue(store.unsafeModifyInPlace(taskId, updated)); + saveTasks(TASK_A); + assertTrue(unsafeModifyInPlace(taskId, updated)); Query.Builder query = Query.taskScoped(taskId); ITaskConfig stored = - Iterables.getOnlyElement(store.fetchTasks(query)).getAssignedTask().getTask(); + Iterables.getOnlyElement(fetchTasks(query)).getAssignedTask().getTask(); assertEquals(updated, stored); - store.deleteTasks(ImmutableSet.of(taskId)); - assertFalse(store.unsafeModifyInPlace(taskId, updated)); + deleteTasks(taskId); + assertFalse(unsafeModifyInPlace(taskId, updated)); } @Test public void testDelete() { - store.saveTasks(ImmutableSet.of(TASK_A, TASK_B, TASK_C, TASK_D)); - store.deleteTasks(ImmutableSet.of("a")); + saveTasks(TASK_A, TASK_B, TASK_C, TASK_D); + deleteTasks("a"); assertStoreContents(TASK_B, TASK_C, TASK_D); - store.deleteTasks(ImmutableSet.of("c")); + deleteTasks("c"); assertStoreContents(TASK_B, TASK_D); - store.deleteTasks(ImmutableSet.of("b", "d")); + deleteTasks("b", "d"); assertStoreContents(); } @Test public void testDeleteAll() { - store.saveTasks(ImmutableSet.of(TASK_A, TASK_B, TASK_C, TASK_D)); - store.deleteAllTasks(); + saveTasks(TASK_A, TASK_B, TASK_C, TASK_D); + deleteAllTasks(); assertStoreContents(); } @Test public void testConsistentJobIndex() { - final IScheduledTask a = makeTask("a", "jim", "test", "job"); - final IScheduledTask b = makeTask("b", "jim", "test", "job"); - final IScheduledTask c = makeTask("c", "jim", "test", "job2"); - final IScheduledTask d = makeTask("d", "joe", "test", "job"); - final IScheduledTask e = makeTask("e", "jim", "prod", "job"); + final IScheduledTask a = makeTask("a", JobKeys.from("jim", "test", "job")); + final IScheduledTask b = makeTask("b", JobKeys.from("jim", "test", "job")); + final IScheduledTask c = makeTask("c", JobKeys.from("jim", "test", "job2")); + final IScheduledTask d = makeTask("d", JobKeys.from("joe", "test", "job")); + final IScheduledTask e = makeTask("e", JobKeys.from("jim", "prod", "job")); final Query.Builder jimsJob = Query.jobScoped(JobKeys.from("jim", "test", "job")); final Query.Builder jimsJob2 = Query.jobScoped(JobKeys.from("jim", "test", "job2")); final Query.Builder joesJob = Query.jobScoped(JobKeys.from("joe", "test", "job")); - store.saveTasks(ImmutableSet.of(a, b, c, d, e)); + saveTasks(a, b, c, d, e); assertQueryResults(jimsJob, a, b); assertQueryResults(jimsJob2, c); assertQueryResults(joesJob, d); - store.deleteTasks(ImmutableSet.of(Tasks.id(b))); + deleteTasks(Tasks.id(b)); assertQueryResults(jimsJob, a); assertQueryResults(jimsJob2, c); assertQueryResults(joesJob, d); - store.mutateTasks(jimsJob, new TaskMutation() { + mutateTasks(jimsJob, new TaskMutation() { @Override public IScheduledTask apply(IScheduledTask task) { return IScheduledTask.build(task.newBuilder().setStatus(RUNNING)); @@ -270,15 +374,15 @@ public class MemTaskStoreTest { assertQueryResults(jimsJob2, c); assertQueryResults(joesJob, d); - store.deleteTasks(ImmutableSet.of(Tasks.id(d))); + deleteTasks(Tasks.id(d)); assertQueryResults(joesJob); - store.deleteTasks(ImmutableSet.of(Tasks.id(d))); + deleteTasks(Tasks.id(d)); assertQueryResults(jimsJob, aRunning); assertQueryResults(jimsJob2, c); assertQueryResults(joesJob); - store.saveTasks(ImmutableSet.of(b)); + saveTasks(b); assertQueryResults(jimsJob, aRunning, b); assertQueryResults(jimsJob2, c); assertQueryResults(joesJob); @@ -286,13 +390,13 @@ public class MemTaskStoreTest { @Test public void testCanonicalTaskConfigs() { - IScheduledTask a = makeTask("a", "role", "env", "job"); - IScheduledTask b = makeTask("a", "role", "env", "job"); - IScheduledTask c = makeTask("a", "role", "env", "job"); + IScheduledTask a = createTask("a"); + IScheduledTask b = createTask("a"); + IScheduledTask c = createTask("a"); + saveTasks(a, b, c); Set<IScheduledTask> inserted = ImmutableSet.of(a, b, c); - store.saveTasks(inserted); - Set<ITaskConfig> storedConfigs = FluentIterable.from(store.fetchTasks(Query.unscoped())) + Set<ITaskConfig> storedConfigs = FluentIterable.from(fetchTasks(Query.unscoped())) .transform(Tasks.SCHEDULED_TO_INFO) .toSet(); assertEquals( @@ -307,9 +411,19 @@ public class MemTaskStoreTest { identityMap); } - private static IScheduledTask setHost(IScheduledTask task, Optional<String> host) { + private static IScheduledTask setHost(IScheduledTask task, IHostAttributes host) { ScheduledTask builder = task.newBuilder(); - builder.getAssignedTask().setSlaveHost(host.orNull()); + builder.getAssignedTask() + .setSlaveHost(host.getHost()) + .setSlaveId(host.getSlaveId()); + return IScheduledTask.build(builder); + } + + private static IScheduledTask unsetHost(IScheduledTask task) { + ScheduledTask builder = task.newBuilder(); + builder.getAssignedTask() + .setSlaveHost(null) + .setSlaveId(null); return IScheduledTask.build(builder); } @@ -321,14 +435,13 @@ public class MemTaskStoreTest { @Test public void testAddSlaveHost() { - final IScheduledTask a = makeTask("a", "role", "env", "job"); - store.saveTasks(ImmutableSet.of(a)); - String host = "slaveA"; - assertQueryResults(Query.slaveScoped(host)); - - final IScheduledTask b = setHost(a, Optional.of(host)); - Set<IScheduledTask> result = store.mutateTasks(Query.taskScoped(Tasks.id(a)), - new Function<IScheduledTask, IScheduledTask>() { + final IScheduledTask a = createTask("a"); + saveTasks(a); + assertQueryResults(Query.slaveScoped(HOST_A.getHost())); + + final IScheduledTask b = setHost(a, HOST_A); + Set<IScheduledTask> result = mutateTasks(Query.taskScoped(Tasks.id(a)), + new TaskMutation() { @Override public IScheduledTask apply(IScheduledTask task) { assertEquals(a, task); @@ -336,13 +449,12 @@ public class MemTaskStoreTest { } }); assertEquals(ImmutableSet.of(b), result); - assertQueryResults(Query.slaveScoped(host), b); + assertQueryResults(Query.slaveScoped(HOST_A.getHost()), b); // Unrealistic behavior, but proving that the secondary index can handle key mutations. - String host2 = "slaveA2"; - final IScheduledTask c = setHost(b, Optional.of(host2)); - Set<IScheduledTask> result2 = store.mutateTasks(Query.taskScoped(Tasks.id(a)), - new Function<IScheduledTask, IScheduledTask>() { + final IScheduledTask c = setHost(b, HOST_B); + Set<IScheduledTask> result2 = mutateTasks(Query.taskScoped(Tasks.id(a)), + new TaskMutation() { @Override public IScheduledTask apply(IScheduledTask task) { assertEquals(b, task); @@ -350,24 +462,23 @@ public class MemTaskStoreTest { } }); assertEquals(ImmutableSet.of(c), result2); - assertQueryResults(Query.slaveScoped(host2), c); + assertQueryResults(Query.slaveScoped(HOST_B.getHost()), c); - store.deleteTasks(ImmutableSet.of(Tasks.id(a))); - assertQueryResults(Query.slaveScoped(host)); + deleteTasks(Tasks.id(a)); + assertQueryResults(Query.slaveScoped(HOST_B.getHost())); } @Test public void testUnsetSlaveHost() { // Unrealistic behavior, but proving that the secondary index does not become stale. - String host = "slaveA"; - final IScheduledTask a = setHost(makeTask("a", "role", "env", "job"), Optional.of(host)); - store.saveTasks(ImmutableSet.of(a)); - assertQueryResults(Query.slaveScoped(host), a); + final IScheduledTask a = setHost(createTask("a"), HOST_A); + saveTasks(a); + assertQueryResults(Query.slaveScoped(HOST_A.getHost()), a); - final IScheduledTask b = setHost(a, Optional.<String>absent()); - Set<IScheduledTask> result = store.mutateTasks(Query.taskScoped(Tasks.id(a)), - new Function<IScheduledTask, IScheduledTask>() { + final IScheduledTask b = unsetHost(a); + Set<IScheduledTask> result = mutateTasks(Query.taskScoped(Tasks.id(a)), + new TaskMutation() { @Override public IScheduledTask apply(IScheduledTask task) { assertEquals(a, task); @@ -375,17 +486,16 @@ public class MemTaskStoreTest { } }); assertEquals(ImmutableSet.of(b), result); - assertQueryResults(Query.slaveScoped(host)); + assertQueryResults(Query.slaveScoped(HOST_A.getHost())); assertQueryResults(Query.taskScoped(Tasks.id(b)), b); } @Test public void testTasksOnSameHost() { - String host = "slaveA"; - final IScheduledTask a = setHost(makeTask("a", "role", "env", "job"), Optional.of(host)); - final IScheduledTask b = setHost(makeTask("b", "role", "env", "job"), Optional.of(host)); - store.saveTasks(ImmutableSet.of(a, b)); - assertQueryResults(Query.slaveScoped(host), a, b); + final IScheduledTask a = setHost(createTask("a"), HOST_A); + final IScheduledTask b = setHost(createTask("b"), HOST_A); + saveTasks(a, b); + assertQueryResults(Query.slaveScoped(HOST_A.getHost()), a, b); } @Test @@ -393,16 +503,16 @@ public class MemTaskStoreTest { // Ensures that saving a task with an existing task ID is effectively the same as a mutate, // and does not result in a duplicate object in the primary or secondary index. - String host = "slaveA"; - final IScheduledTask a = setHost(makeTask("a", "role", "env", "job"), Optional.of(host)); - store.saveTasks(ImmutableSet.of(a)); + final IScheduledTask a = setHost(createTask("a"), HOST_A); + saveTasks(a); final IScheduledTask updated = setConfigData(a, "new config data"); - store.saveTasks(ImmutableSet.of(updated)); + saveTasks(updated); assertQueryResults(Query.taskScoped(Tasks.id(a)), updated); - assertQueryResults(Query.slaveScoped(host), updated); + assertQueryResults(Query.slaveScoped(HOST_A.getHost()), updated); } + @Ignore @Test public void testReadSecondaryIndexMultipleThreads() throws Exception { ExecutorService executor = Executors.newFixedThreadPool(4, @@ -414,10 +524,10 @@ public class MemTaskStoreTest { final int numJobs = 100; for (int j = 0; j < numJobs; j++) { for (int t = 0; t < numTasks; t++) { - builder.add(makeTask("" + j + "-" + t, "role", "env", "name" + j)); + builder.add(makeTask("" + j + "-" + t, JobKeys.from("role", "env", "name" + j))); } } - store.saveTasks(builder.build()); + saveTasks(builder.build()); final CountDownLatch read = new CountDownLatch(numJobs); for (int j = 0; j < numJobs; j++) { @@ -425,15 +535,14 @@ public class MemTaskStoreTest { executor.submit(new Runnable() { @Override public void run() { - assertNotNull(store.fetchTasks(Query.jobScoped( - JobKeys.from("role", "env", "name" + id)))); + assertNotNull(fetchTasks(Query.jobScoped(JobKeys.from("role", "env", "name" + id)))); read.countDown(); } }); executor.submit(new Runnable() { @Override public void run() { - store.saveTasks(ImmutableSet.of(makeTask("TaskNew1" + id))); + saveTasks(createTask("TaskNew1" + id)); } }); } @@ -453,50 +562,20 @@ public class MemTaskStoreTest { } private void assertQueryResults(Query.Builder query, IScheduledTask... tasks) { - assertEquals( - ImmutableSet.<IScheduledTask>builder().add(tasks).build(), - store.fetchTasks(query)); - } - - private static IScheduledTask makeTask(String id, String role, String env, String jobName) { - return IScheduledTask.build(new ScheduledTask() - .setStatus(ScheduleStatus.PENDING) - .setTaskEvents(ImmutableList.of( - new TaskEvent(100L, ScheduleStatus.ASSIGNED) - .setMessage("message") - .setScheduler("scheduler"))) - .setAncestorId("ancestor") - .setAssignedTask(new AssignedTask() - .setInstanceId(0) - .setTaskId(id) - .setSlaveId("slave") - .setSlaveHost("slavehost") - .setAssignedPorts(ImmutableMap.of("http", 1000)) - .setTask(new TaskConfig() - .setJob(new JobKey(role, env, jobName)) - .setJobName(jobName) - .setEnvironment(env) - .setOwner(new Identity(role, role)) - .setIsService(true) - .setNumCpus(1.0) - .setRamMb(1024) - .setDiskMb(1024) - .setPriority(1) - .setMaxTaskFailures(-1) - .setProduction(true) - .setConstraints(ImmutableSet.of( - new Constraint( - "name", - TaskConstraint.value( - new ValueConstraint(false, ImmutableSet.of("value")))))) - .setRequestedPorts(ImmutableSet.of("http")) - .setTaskLinks(ImmutableMap.of("http", "link")) - .setContactEmail("[email protected]") - .setMetadata(ImmutableSet.of(new Metadata("key", "value"))) - .setExecutorConfig(new ExecutorConfig("name", "config"))))); - } - - private static IScheduledTask makeTask(String id) { - return makeTask(id, "role-" + id, "env-" + id, "job-" + id); + assertQueryResults(query, ImmutableSet.copyOf(tasks)); + } + + private void assertQueryResults(Query.Builder query, Set<IScheduledTask> tasks) { + assertEquals(tasks, fetchTasks(query)); + } + + private static IScheduledTask createTask(String id) { + return makeTask(id, JobKeys.from("role-" + id, "env-" + id, "job-" + id)); + } + + private static IScheduledTask setContainer(IScheduledTask task, Container container) { + ScheduledTask builder = task.newBuilder(); + builder.getAssignedTask().getTask().setContainer(container); + return IScheduledTask.build(builder); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/449a835e/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageEntityUtil.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageEntityUtil.java b/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageEntityUtil.java index b26ddd3..0bde4d8 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageEntityUtil.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageEntityUtil.java @@ -19,11 +19,13 @@ import java.util.Collection; import java.util.Map; import java.util.Set; +import com.google.common.base.Defaults; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableSet; import com.google.gson.internal.Primitives; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; /** @@ -49,20 +51,29 @@ public final class StorageEntityUtil { assertFullyPopulated(name + " key", entry.getKey(), ignoredFields); assertFullyPopulated(name + "[" + entry.getKey() + "]", entry.getValue(), ignoredFields); } - } else if (!Primitives.isPrimitive(object.getClass()) - && !Primitives.isWrapperType(object.getClass())) { - + } else if (!(object instanceof String) && !(object instanceof Enum)) { for (Field field : object.getClass().getDeclaredFields()) { if (!Modifier.isStatic(field.getModifiers())) { try { field.setAccessible(true); String fullName = name + "." + field.getName(); - Object value = field.get(object); - if (!ignoredFields.contains(field)) { - assertNotNull(fullName + " is null", value); + Object fieldValue = field.get(object); + boolean mustBeSet = !ignoredFields.contains(field); + if (mustBeSet) { + assertNotNull(fullName + " is null", fieldValue); } - if (value != null) { - assertFullyPopulated(fullName, value, ignoredFields); + if (fieldValue != null) { + if (Primitives.isWrapperType(fieldValue.getClass())) { + // Special-case the mutable hash code field. + if (mustBeSet && !fullName.endsWith("cachedHashCode")) { + assertNotEquals( + "Primitive value must not be default: " + fullName, + Defaults.defaultValue(Primitives.unwrap(fieldValue.getClass())), + fieldValue); + } + } else { + assertFullyPopulated(fullName, fieldValue, ignoredFields); + } } } catch (IllegalAccessException e) { throw Throwables.propagate(e); http://git-wip-us.apache.org/repos/asf/aurora/blob/449a835e/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java b/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java index 3f1f72b..1128303 100644 --- a/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java +++ b/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java @@ -13,16 +13,22 @@ */ package org.apache.aurora.scheduler.updater; +import java.util.List; +import java.util.Objects; + import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.twitter.common.quantity.Amount; import com.twitter.common.quantity.Time; import com.twitter.common.util.testing.FakeClock; +import org.apache.aurora.gen.AssignedTask; import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.gen.TaskEvent; +import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import org.junit.Test; @@ -263,4 +269,33 @@ public class InstanceUpdaterTest { f.advanceTime(MIN_RUNNING_TIME); f.evaluateCurrentState(SUCCEEDED); } + + static final class TaskUtil { + private final FakeClock clock; + + TaskUtil(FakeClock clock) { + this.clock = Objects.requireNonNull(clock); + } + + IScheduledTask makeTask(ITaskConfig config, ScheduleStatus status) { + List<TaskEvent> events = Lists.newArrayList(); + if (status != PENDING) { + events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(PENDING)); + } + if (Tasks.isTerminated(status) || status == KILLING) { + events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(ASSIGNED)); + events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(RUNNING)); + } + + events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(status)); + + return IScheduledTask.build( + new ScheduledTask() + .setStatus(status) + .setTaskEvents(ImmutableList.copyOf(events)) + .setAssignedTask( + new AssignedTask() + .setTask(config.newBuilder()))); + } + } } http://git-wip-us.apache.org/repos/asf/aurora/blob/449a835e/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java index dd4c940..6be0efa 100644 --- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java +++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java @@ -43,10 +43,7 @@ import com.twitter.common.testing.easymock.EasyMockTest; import com.twitter.common.util.Clock; import com.twitter.common.util.TruncatedBinaryBackoff; -import org.apache.aurora.gen.ExecutorConfig; -import org.apache.aurora.gen.Identity; import org.apache.aurora.gen.InstanceTaskConfig; -import org.apache.aurora.gen.JobKey; import org.apache.aurora.gen.JobUpdate; import org.apache.aurora.gen.JobUpdateAction; import org.apache.aurora.gen.JobUpdateEvent; @@ -67,6 +64,7 @@ import org.apache.aurora.scheduler.async.RescheduleCalculator; import org.apache.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculatorImpl; import org.apache.aurora.scheduler.base.JobKeys; import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.base.TaskTestUtil; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.events.EventSink; import org.apache.aurora.scheduler.events.PubsubEvent; @@ -146,8 +144,8 @@ public class JobUpdaterIT extends EasyMockTest { private static final Amount<Long, Time> FLAPPING_THRESHOLD = Amount.of(1L, Time.MILLISECONDS); private static final Amount<Long, Time> ONE_DAY = Amount.of(1L, Time.DAYS); private static final ITaskConfig OLD_CONFIG = - ITaskConfig.build(makeTaskConfig().setExecutorConfig(new ExecutorConfig().setName("new"))); - private static final ITaskConfig NEW_CONFIG = ITaskConfig.build(makeTaskConfig()); + setExecutorData(TaskTestUtil.makeConfig(JOB), "olddata"); + private static final ITaskConfig NEW_CONFIG = setExecutorData(OLD_CONFIG, "newdata"); private static final long PULSE_TIMEOUT_MS = 10000; private FakeScheduledExecutor clock; @@ -159,6 +157,12 @@ public class JobUpdaterIT extends EasyMockTest { private StateManager stateManager; private JobUpdateEventSubscriber subscriber; + private static ITaskConfig setExecutorData(ITaskConfig task, String executorData) { + TaskConfig builder = task.newBuilder(); + builder.getExecutorConfig().setData(executorData); + return ITaskConfig.build(builder); + } + @Before public void setUp() { // Avoid console spam due to stats registered multiple times. @@ -1327,15 +1331,4 @@ public class JobUpdaterIT extends EasyMockTest { .setInstances(ImmutableSet.of(new Range(start, end))) .setTask(config.newBuilder())); } - - private static TaskConfig makeTaskConfig() { - return new TaskConfig() - .setJob(new JobKey(JOB.newBuilder())) - .setJobName(JOB.getName()) - .setEnvironment(JOB.getEnvironment()) - .setOwner(new Identity(JOB.getRole(), "user")) - .setIsService(true) - .setExecutorConfig(new ExecutorConfig().setName("old")) - .setNumCpus(1); - } } http://git-wip-us.apache.org/repos/asf/aurora/blob/449a835e/src/test/java/org/apache/aurora/scheduler/updater/TaskUtil.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/updater/TaskUtil.java b/src/test/java/org/apache/aurora/scheduler/updater/TaskUtil.java deleted file mode 100644 index 0e67f91..0000000 --- a/src/test/java/org/apache/aurora/scheduler/updater/TaskUtil.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.updater; - -import java.util.List; -import java.util.Objects; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.twitter.common.util.testing.FakeClock; - -import org.apache.aurora.gen.AssignedTask; -import org.apache.aurora.gen.ScheduleStatus; -import org.apache.aurora.gen.ScheduledTask; -import org.apache.aurora.gen.TaskEvent; -import org.apache.aurora.scheduler.base.Tasks; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import org.apache.aurora.scheduler.storage.entities.ITaskConfig; - -import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED; -import static org.apache.aurora.gen.ScheduleStatus.KILLING; -import static org.apache.aurora.gen.ScheduleStatus.PENDING; -import static org.apache.aurora.gen.ScheduleStatus.RUNNING; - -final class TaskUtil { - - private final FakeClock clock; - - TaskUtil(FakeClock clock) { - this.clock = Objects.requireNonNull(clock); - } - - IScheduledTask makeTask(ITaskConfig config, ScheduleStatus status) { - List<TaskEvent> events = Lists.newArrayList(); - if (status != PENDING) { - events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(PENDING)); - } - if (Tasks.isTerminated(status) || status == KILLING) { - events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(ASSIGNED)); - events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(RUNNING)); - } - - events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(status)); - - return IScheduledTask.build( - new ScheduledTask() - .setStatus(status) - .setTaskEvents(ImmutableList.copyOf(events)) - .setAssignedTask( - new AssignedTask() - .setTask(config.newBuilder()))); - } -}
