Repository: aurora Updated Branches: refs/heads/master c89fecbcd -> 66a4d5fdd
Add storage API methods for fetching amd mutating a task by ID. Reviewed at https://reviews.apache.org/r/42628/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/66a4d5fd Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/66a4d5fd Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/66a4d5fd Branch: refs/heads/master Commit: 66a4d5fdd3ff66facccd094e6c7523b0a2d19860 Parents: c89fecb Author: Bill Farner <[email protected]> Authored: Thu Jan 21 23:04:35 2016 -0800 Committer: Bill Farner <[email protected]> Committed: Thu Jan 21 23:04:35 2016 -0800 ---------------------------------------------------------------------- .../aurora/scheduler/http/StructDump.java | 14 ++-- .../scheduling/RescheduleCalculator.java | 6 +- .../scheduler/state/StateManagerImpl.java | 46 +++++--------- .../aurora/scheduler/storage/Storage.java | 8 ++- .../aurora/scheduler/storage/TaskStore.java | 26 ++++++-- .../scheduler/storage/db/DbTaskStore.java | 57 +++++++++++++---- .../storage/log/WriteAheadStorage.java | 19 ++++-- .../scheduler/storage/mem/MemTaskStore.java | 67 ++++++++++++++------ .../RescheduleCalculatorImplTest.java | 11 ++-- .../scheduler/state/StateManagerImplTest.java | 11 ++-- .../storage/AbstractCronJobStoreTest.java | 5 +- .../storage/AbstractTaskStoreTest.java | 35 +++++----- .../scheduler/storage/log/LogStorageTest.java | 41 ++++++------ .../storage/testing/StorageTestUtil.java | 9 +++ 14 files changed, 213 insertions(+), 142 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/main/java/org/apache/aurora/scheduler/http/StructDump.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/http/StructDump.java b/src/main/java/org/apache/aurora/scheduler/http/StructDump.java index 4fa5254..f84767a 100644 --- a/src/main/java/org/apache/aurora/scheduler/http/StructDump.java +++ b/src/main/java/org/apache/aurora/scheduler/http/StructDump.java @@ -23,15 +23,14 @@ import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import com.google.common.base.Optional; -import com.google.common.collect.Iterables; import org.apache.aurora.common.thrift.Util; import org.apache.aurora.scheduler.base.JobKeys; -import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.Work.Quiet; import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; import org.apache.aurora.scheduler.storage.entities.IJobKey; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.thrift.TBase; import static java.util.Objects.requireNonNull; @@ -73,13 +72,10 @@ public class StructDump extends JerseyTemplateServlet { public Response dumpJob( @PathParam("task") final String taskId) { - return dumpEntity("Task " + taskId, storeProvider -> { - // Deep copy the struct to sidestep any subclass trickery inside the storage system. - return Optional.fromNullable(Iterables.getOnlyElement( - storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskId)), - null) - .newBuilder()); - }); + return dumpEntity( + "Task " + taskId, + storeProvider -> + storeProvider.getTaskStore().fetchTask(taskId).transform(IScheduledTask::newBuilder)); } /** http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/main/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculator.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculator.java b/src/main/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculator.java index c136d1a..4b0ef81 100644 --- a/src/main/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculator.java +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculator.java @@ -32,7 +32,6 @@ import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.util.BackoffStrategy; import org.apache.aurora.common.util.Random; import org.apache.aurora.gen.ScheduleStatus; -import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; @@ -147,10 +146,7 @@ public interface RescheduleCalculator { return Optional.absent(); } - Iterable<IScheduledTask> res = - Storage.Util.fetchTasks(storage, Query.taskScoped(task.getAncestorId())); - - return Optional.fromNullable(Iterables.getOnlyElement(res, null)); + return Storage.Util.fetchTask(storage, task.getAncestorId()); } @Override http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java index 720b5e5..e5b2f41 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java @@ -31,7 +31,6 @@ import com.google.common.base.Throwables; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; @@ -47,6 +46,7 @@ import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.events.EventSink; import org.apache.aurora.scheduler.events.PubsubEvent; +import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; import org.apache.aurora.scheduler.mesos.Driver; import org.apache.aurora.scheduler.scheduling.RescheduleCalculator; import org.apache.aurora.scheduler.state.SideEffect.Action; @@ -163,18 +163,16 @@ public class StateManagerImpl implements StateManager { public IAssignedTask assignTask( MutableStoreProvider storeProvider, String taskId, - final String slaveHost, - final SlaveID slaveId, - final Map<String, Integer> assignedPorts) { + String slaveHost, + SlaveID slaveId, + Map<String, Integer> assignedPorts) { checkNotBlank(taskId); checkNotBlank(slaveHost); requireNonNull(slaveId); requireNonNull(assignedPorts); - Query.Builder query = Query.taskScoped(taskId); - - storeProvider.getUnsafeTaskStore().mutateTasks(query, + IScheduledTask mutated = storeProvider.getUnsafeTaskStore().mutateTask(taskId, task -> { ScheduledTask builder = task.newBuilder(); builder.getAssignedTask() @@ -182,7 +180,7 @@ public class StateManagerImpl implements StateManager { .setSlaveHost(slaveHost) .setSlaveId(slaveId.getValue()); return IScheduledTask.build(builder); - }); + }).get(); StateChangeResult changeResult = updateTaskAndExternalState( storeProvider.getUnsafeTaskStore(), @@ -195,10 +193,7 @@ public class StateManagerImpl implements StateManager { changeResult == SUCCESS, "Attempt to assign task " + taskId + " to " + slaveHost + " failed"); - return Iterables.getOnlyElement( - Iterables.transform( - storeProvider.getTaskStore().fetchTasks(query), - IScheduledTask::getAssignedTask)); + return mutated.getAssignedTask(); } @VisibleForTesting @@ -219,9 +214,7 @@ public class StateManagerImpl implements StateManager { ScheduleStatus targetState, Optional<String> transitionMessage) { - Optional<IScheduledTask> task = Optional.fromNullable(Iterables.getOnlyElement( - taskStore.fetchTasks(Query.taskScoped(taskId)), - null)); + Optional<IScheduledTask> task = taskStore.fetchTask(taskId); // CAS operation fails if the task does not exist, or the states don't match. if (casState.isPresent() @@ -264,34 +257,32 @@ public class StateManagerImpl implements StateManager { private StateChangeResult updateTaskAndExternalState( TaskStore.Mutable taskStore, String taskId, - // Note: This argument is deliberately non-final, and should not be made final. + // Note: This argument should be used with caution. // This is because using the captured value within the storage operation below is // highly-risky, since it doesn't necessarily represent the value in storage. // As a result, it would be easy to accidentally clobber mutations. Optional<IScheduledTask> task, - final Optional<ScheduleStatus> targetState, - final Optional<String> transitionMessage) { + Optional<ScheduleStatus> targetState, + Optional<String> transitionMessage) { if (task.isPresent()) { Preconditions.checkArgument(taskId.equals(task.get().getAssignedTask().getTaskId())); } - final List<PubsubEvent> events = Lists.newArrayList(); + List<PubsubEvent> events = Lists.newArrayList(); - final TaskStateMachine stateMachine = task.isPresent() + TaskStateMachine stateMachine = task.isPresent() ? new TaskStateMachine(task.get()) : new TaskStateMachine(taskId); TransitionResult result = stateMachine.updateState(targetState); - Query.Builder query = Query.taskScoped(taskId); for (SideEffect sideEffect : ACTION_ORDER.sortedCopy(result.getSideEffects())) { - Optional<IScheduledTask> upToDateTask = Optional.fromNullable( - Iterables.getOnlyElement(taskStore.fetchTasks(query), null)); + Optional<IScheduledTask> upToDateTask = taskStore.fetchTask(taskId); switch (sideEffect.getAction()) { case INCREMENT_FAILURES: - taskStore.mutateTasks(query, task1 -> IScheduledTask.build( + taskStore.mutateTask(taskId, task1 -> IScheduledTask.build( task1.newBuilder().setFailureCount(task1.getFailureCount() + 1))); break; @@ -300,7 +291,7 @@ public class StateManagerImpl implements StateManager { upToDateTask.isPresent(), "Operation expected task " + taskId + " to be present."); - taskStore.mutateTasks(query, task1 -> { + Optional<IScheduledTask> mutated = taskStore.mutateTask(taskId, task1 -> { ScheduledTask mutableTask = task1.newBuilder(); mutableTask.setStatus(targetState.get()); mutableTask.addToTaskEvents(new TaskEvent() @@ -310,10 +301,7 @@ public class StateManagerImpl implements StateManager { .setScheduler(LOCAL_HOST_SUPPLIER.get())); return IScheduledTask.build(mutableTask); }); - events.add( - PubsubEvent.TaskStateChange.transition( - Iterables.getOnlyElement(taskStore.fetchTasks(query)), - stateMachine.getPreviousState())); + events.add(TaskStateChange.transition(mutated.get(), stateMachine.getPreviousState())); break; case RESCHEDULE: http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/main/java/org/apache/aurora/scheduler/storage/Storage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java index 6109158..578bb37 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java @@ -20,6 +20,8 @@ import java.lang.annotation.Target; import javax.inject.Qualifier; +import com.google.common.base.Optional; + import org.apache.aurora.scheduler.base.Query.Builder; import org.apache.aurora.scheduler.base.SchedulerException; import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; @@ -291,10 +293,14 @@ public interface Storage { * @param query Builder of the query to perform. * @return Tasks returned from the query. */ - public static Iterable<IScheduledTask> fetchTasks(Storage storage, final Builder query) { + public static Iterable<IScheduledTask> fetchTasks(Storage storage, Builder query) { return storage.read(storeProvider -> storeProvider.getTaskStore().fetchTasks(query)); } + public static Optional<IScheduledTask> fetchTask(Storage storage, String taskId) { + return storage.read(storeProvider -> storeProvider.getTaskStore().fetchTask(taskId)); + } + public static Iterable<IJobConfiguration> fetchCronJobs(Storage storage) { return storage.read(storeProvider -> storeProvider.getCronJobStore().fetchJobs()); } http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java index 62639c4..4e4f8d2 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java @@ -16,6 +16,7 @@ package org.apache.aurora.scheduler.storage; import java.util.Set; import com.google.common.base.Function; +import com.google.common.base.Optional; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableSet; @@ -34,8 +35,16 @@ import static com.google.common.base.CharMatcher.WHITESPACE; public interface TaskStore { /** + * Fetches a task. + * + * @param taskId ID of the task to fetch. + * @return The task, if it exists. + */ + Optional<IScheduledTask> fetchTask(String taskId); + + /** * Fetches a read-only view of tasks matching a query and filters. Intended for use with a - * {@link org.apache.aurora.scheduler.base.Query.Builder}. + * {@link Query.Builder}. * * @param query Builder of the query to identify tasks with. * @return A read-only view of matching tasks. @@ -82,14 +91,23 @@ public interface TaskStore { void deleteTasks(Set<String> taskIds); /** + * Mutates a single task, if present. + * + * @param taskId Unique ID of the task to mutate. + * @param mutator The mutate operation. + * @return The result of the mutate operation, if performed. + */ + Optional<IScheduledTask> mutateTask( + String taskId, + Function<IScheduledTask, IScheduledTask> mutator); + + /** * Offers temporary mutable access to tasks. If a task ID is not found, it will be silently * skipped, and no corresponding task will be returned. - * TODO(wfarner): Consider a non-batch variant of this, since that's a more common use case, - * and it prevents the caller from worrying about a bad query having broad impact. * * @param query Query to match tasks against. * @param mutator The mutate operation. - * @return Immutable copies of only the tasks that were mutated. + * @return Immutable copies <em>of only the tasks that were mutated</em>. */ ImmutableSet<IScheduledTask> mutateTasks( Query.Builder query, http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java index d406134..43fda1d 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java @@ -79,6 +79,14 @@ class DbTaskStore implements TaskStore.Mutable { this.slowQueryThresholdNanos = slowQueryThreshold.as(Time.NANOSECONDS); } + @Timed("db_storage_fetch_task") + @Override + public Optional<IScheduledTask> fetchTask(String taskId) { + requireNonNull(taskId); + return Optional.fromNullable(taskMapper.selectById(taskId)) + .transform(DbScheduledTask::toImmutable); + } + @Timed("db_storage_fetch_tasks") @Override public ImmutableSet<IScheduledTask> fetchTasks(Builder query) { @@ -160,28 +168,50 @@ class DbTaskStore implements TaskStore.Mutable { } } - @Timed("db_storage_mutate_tasks") - @Override - public ImmutableSet<IScheduledTask> mutateTasks( - Builder query, + private Function<IScheduledTask, IScheduledTask> mutateAndSave( Function<IScheduledTask, IScheduledTask> mutator) { - requireNonNull(query); - requireNonNull(mutator); - - ImmutableSet.Builder<IScheduledTask> mutated = ImmutableSet.builder(); - for (IScheduledTask original : fetchTasks(query)) { + return original -> { IScheduledTask maybeMutated = mutator.apply(original); + requireNonNull(maybeMutated); if (!original.equals(maybeMutated)) { Preconditions.checkState( Tasks.id(original).equals(Tasks.id(maybeMutated)), "A task's ID may not be mutated."); saveTasks(ImmutableSet.of(maybeMutated)); - mutated.add(maybeMutated); } - } + return maybeMutated; + }; + } + + @Timed("db_storage_mutate_task") + @Override + public Optional<IScheduledTask> mutateTask( + String taskId, + Function<IScheduledTask, IScheduledTask> mutator) { - return mutated.build(); + requireNonNull(taskId); + requireNonNull(mutator); + + return fetchTask(taskId).transform(mutateAndSave(mutator)); + } + + @Timed("db_storage_mutate_tasks") + @Override + public ImmutableSet<IScheduledTask> mutateTasks( + Builder query, + Function<IScheduledTask, IScheduledTask> mutator) { + + requireNonNull(query); + requireNonNull(mutator); + + Function<IScheduledTask, IScheduledTask> mutateFunction = mutateAndSave(mutator); + Iterable<Optional<IScheduledTask>> mutations = matches(query) + .transform(original -> { + IScheduledTask mutateResult = mutateFunction.apply(original); + return original.equals(mutateResult) ? Optional.absent() : Optional.of(mutateResult); + }); + return ImmutableSet.copyOf(Optional.presentInstances(mutations)); } @Timed("db_storage_unsafe_modify_in_place") @@ -189,8 +219,7 @@ class DbTaskStore implements TaskStore.Mutable { public boolean unsafeModifyInPlace(String taskId, ITaskConfig taskConfiguration) { checkNotNull(taskId); checkNotNull(taskConfiguration); - Optional<IScheduledTask> task = - Optional.fromNullable(Iterables.getOnlyElement(fetchTasks(Query.taskScoped(taskId)), null)); + Optional<IScheduledTask> task = fetchTask(taskId); if (task.isPresent()) { deleteTasks(ImmutableSet.of(taskId)); ScheduledTask builder = task.get().newBuilder(); http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java index c44ff47..7283531 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java @@ -193,12 +193,21 @@ class WriteAheadStorage extends WriteAheadStorageForwarder implements } @Override - public ImmutableSet<IScheduledTask> mutateTasks( - final Query.Builder query, - final Function<IScheduledTask, IScheduledTask> mutator) { + public Optional<IScheduledTask> mutateTask( + String taskId, + Function<IScheduledTask, IScheduledTask> mutator) { + + Optional<IScheduledTask> mutated = taskStore.mutateTask(taskId, mutator); + log.debug("Storing updated task to log: {}={}", taskId, mutated.get().getStatus()); + write(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder())))); - requireNonNull(query); - requireNonNull(mutator); + return mutated; + } + + @Override + public ImmutableSet<IScheduledTask> mutateTasks( + Query.Builder query, + Function<IScheduledTask, IScheduledTask> mutator) { ImmutableSet<IScheduledTask> mutated = taskStore.mutateTasks(query, mutator); http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java index c55dcc9..8fd024a 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java @@ -124,13 +124,20 @@ class MemTaskStore implements TaskStore.Mutable { taskQueriesAll = statsProvider.makeCounter("task_queries_all"); } + @Timed("mem_storage_fetch_task") + @Override + public Optional<IScheduledTask> fetchTask(String taskId) { + requireNonNull(taskId); + return Optional.fromNullable(tasks.get(taskId)).transform(t -> t.storedTask); + } + @Timed("mem_storage_fetch_tasks") @Override public ImmutableSet<IScheduledTask> fetchTasks(Query.Builder query) { requireNonNull(query); long start = System.nanoTime(); - ImmutableSet<IScheduledTask> result = matches(query).transform(TO_SCHEDULED).toSet(); + ImmutableSet<IScheduledTask> result = matches(query).toSet(); long durationNanos = System.nanoTime() - start; boolean infoLevel = durationNanos >= slowQueryThresholdNanos; long time = Amount.of(durationNanos, Time.NANOSECONDS).as(Time.MILLISECONDS); @@ -196,32 +203,50 @@ class MemTaskStore implements TaskStore.Mutable { } } - @Timed("mem_storage_mutate_tasks") - @Override - public ImmutableSet<IScheduledTask> mutateTasks( - Query.Builder query, + private Function<IScheduledTask, IScheduledTask> mutateAndSave( Function<IScheduledTask, IScheduledTask> mutator) { - requireNonNull(query); - requireNonNull(mutator); - - ImmutableSet.Builder<IScheduledTask> mutated = ImmutableSet.builder(); - for (Task original : matches(query).toList()) { - IScheduledTask maybeMutated = mutator.apply(original.storedTask); - if (!original.storedTask.equals(maybeMutated)) { + return original -> { + IScheduledTask maybeMutated = mutator.apply(original); + requireNonNull(maybeMutated); + if (!original.equals(maybeMutated)) { Preconditions.checkState( - Tasks.id(original.storedTask).equals(Tasks.id(maybeMutated)), + Tasks.id(original).equals(Tasks.id(maybeMutated)), "A task's ID may not be mutated."); tasks.put(Tasks.id(maybeMutated), toTask.apply(maybeMutated)); for (SecondaryIndex<?> index : secondaryIndices) { - index.replace(original.storedTask, maybeMutated); + index.replace(original, maybeMutated); } - - mutated.add(maybeMutated); } - } + return maybeMutated; + }; + } - return mutated.build(); + @Timed("mem_storage_mutate_task") + @Override + public Optional<IScheduledTask> mutateTask( + String taskId, + Function<IScheduledTask, IScheduledTask> mutator) { + + return fetchTask(taskId).transform(mutateAndSave(mutator)); + } + + @Timed("mem_storage_mutate_tasks") + @Override + public ImmutableSet<IScheduledTask> mutateTasks( + Query.Builder query, + Function<IScheduledTask, IScheduledTask> mutator) { + + requireNonNull(query); + requireNonNull(mutator); + + Function<IScheduledTask, IScheduledTask> mutateFunction = mutateAndSave(mutator); + Iterable<Optional<IScheduledTask>> mutations = matches(query) + .transform(original -> { + IScheduledTask mutateResult = mutateFunction.apply(original); + return original.equals(mutateResult) ? Optional.absent() : Optional.of(mutateResult); + }); + return ImmutableSet.copyOf(Optional.presentInstances(mutations)); } @Timed("mem_storage_unsafe_modify_in_place") @@ -259,7 +284,7 @@ class MemTaskStore implements TaskStore.Mutable { .toList(); } - private FluentIterable<Task> matches(Query.Builder query) { + private FluentIterable<IScheduledTask> matches(Query.Builder query) { // Apply the query against the working set. Optional<? extends Iterable<Task>> from = Optional.absent(); if (query.get().isSetTaskIds()) { @@ -284,7 +309,9 @@ class MemTaskStore implements TaskStore.Mutable { } } - return FluentIterable.from(from.get()).filter(queryFilter(query)); + return FluentIterable.from(from.get()) + .filter(queryFilter(query)) + .transform(TO_SCHEDULED); } private static final Function<Task, IScheduledTask> TO_SCHEDULED = task -> task.storedTask; http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/test/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculatorImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculatorImplTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculatorImplTest.java index b380f21..9d21dcd 100644 --- a/src/test/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculatorImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculatorImplTest.java @@ -29,7 +29,6 @@ import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.gen.TaskEvent; -import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.scheduling.RescheduleCalculator.RescheduleCalculatorImpl; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; @@ -78,7 +77,7 @@ public class RescheduleCalculatorImplTest extends EasyMockTest { @Test public void testNoPenaltyDeletedAncestor() { String ancestorId = "a"; - storageUtil.expectTaskFetch(Query.taskScoped(ancestorId)); + storageUtil.expectTaskFetch(ancestorId); control.replay(); @@ -90,7 +89,7 @@ public class RescheduleCalculatorImplTest extends EasyMockTest { @Test public void testFlappingTask() { IScheduledTask ancestor = makeFlappyTask("a"); - storageUtil.expectTaskFetch(Query.taskScoped(Tasks.id(ancestor)), ancestor); + storageUtil.expectTaskFetch(Tasks.id(ancestor), ancestor); long penaltyMs = 1000L; expect(backoff.calculateBackoffMs(0L)).andReturn(penaltyMs); @@ -119,9 +118,7 @@ public class RescheduleCalculatorImplTest extends EasyMockTest { long lastPenalty = 0L; for (Map.Entry<IScheduledTask, Long> taskAndPenalty : ancestorsAndPenalties.entrySet()) { - storageUtil.expectTaskFetch( - Query.taskScoped(Tasks.id(taskAndPenalty.getKey())), - taskAndPenalty.getKey()); + storageUtil.expectTaskFetch(Tasks.id(taskAndPenalty.getKey()), taskAndPenalty.getKey()); expect(backoff.calculateBackoffMs(lastPenalty)).andReturn(taskAndPenalty.getValue()); lastPenalty = taskAndPenalty.getValue(); } @@ -137,7 +134,7 @@ public class RescheduleCalculatorImplTest extends EasyMockTest { IScheduledTask ancestor = setEvents( makeTask("a", KILLED), ImmutableMap.of(INIT, 0L, PENDING, 100L, RUNNING, 200L, KILLING, 300L, KILLED, 400L)); - storageUtil.expectTaskFetch(Query.taskScoped(Tasks.id(ancestor)), ancestor); + storageUtil.expectTaskFetch(Tasks.id(ancestor), ancestor); control.replay(); http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java index 6d42689..498da78 100644 --- a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java @@ -202,7 +202,7 @@ public class StateManagerImplTest extends EasyMockTest { .setTask(NON_SERVICE_CONFIG.newBuilder())); assertEquals( ImmutableSet.of(IScheduledTask.build(expected)), - Storage.Util.fetchTasks(storage, Query.taskScoped(taskId))); + Storage.Util.fetchTask(storage, taskId).asSet()); } @Test @@ -343,8 +343,7 @@ public class StateManagerImplTest extends EasyMockTest { assignTask(taskId, HOST_A); changeState(taskId, RUNNING); changeState(taskId, FAILED); - IScheduledTask rescheduledTask = Iterables.getOnlyElement( - Storage.Util.fetchTasks(storage, Query.taskScoped(taskId2))); + IScheduledTask rescheduledTask = Storage.Util.fetchTask(storage, taskId2).get(); assertEquals(taskId, rescheduledTask.getAncestorId()); assertEquals(1, rescheduledTask.getFailureCount()); } @@ -422,8 +421,7 @@ public class StateManagerImplTest extends EasyMockTest { insertTask(task, 0); assignTask(taskId, HOST_A, ImmutableMap.of("one", 80, "two", 81, "three", 82)); - IScheduledTask actual = Iterables.getOnlyElement( - Storage.Util.fetchTasks(storage, Query.taskScoped(taskId))); + IScheduledTask actual = Storage.Util.fetchTask(storage, taskId).get(); assertEquals( requestedPorts, @@ -453,8 +451,7 @@ public class StateManagerImplTest extends EasyMockTest { assignTask(newTaskId, HOST_A, ImmutableMap.of("one", 86)); - IScheduledTask actual = Iterables.getOnlyElement( - Storage.Util.fetchTasks(storage, Query.taskScoped(newTaskId))); + IScheduledTask actual = Storage.Util.fetchTask(storage, newTaskId).get(); assertEquals(ImmutableMap.of("one", 86), actual.getAssignedTask().getAssignedPorts()); } http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/test/java/org/apache/aurora/scheduler/storage/AbstractCronJobStoreTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/AbstractCronJobStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/AbstractCronJobStoreTest.java index a6bfc7a..22a6b43 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/AbstractCronJobStoreTest.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/AbstractCronJobStoreTest.java @@ -26,7 +26,6 @@ import org.apache.aurora.gen.Identity; import org.apache.aurora.gen.JobConfiguration; import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.scheduler.base.JobKeys; -import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.TaskTestUtil; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; @@ -114,8 +113,8 @@ public abstract class AbstractCronJobStoreTest { saveAcceptedJob(JOB_A); storage.write(storeProvider -> - storeProvider.getUnsafeTaskStore().mutateTasks( - Query.taskScoped(Tasks.id(instance)), + storeProvider.getUnsafeTaskStore().mutateTask( + Tasks.id(instance), task -> IScheduledTask.build(task.newBuilder().setStatus(ScheduleStatus.RUNNING)))); } http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/test/java/org/apache/aurora/scheduler/storage/AbstractTaskStoreTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/AbstractTaskStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/AbstractTaskStoreTest.java index 5a9b6c1..1ac41d1 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/AbstractTaskStoreTest.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/AbstractTaskStoreTest.java @@ -20,10 +20,10 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import com.google.common.base.Optional; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -101,6 +101,10 @@ public abstract class AbstractTaskStoreTest extends TearDownTestCase { }); } + private Optional<IScheduledTask> fetchTask(String taskId) { + return storage.read(storeProvider -> storeProvider.getTaskStore().fetchTask(taskId)); + } + private Iterable<IScheduledTask> fetchTasks(Query.Builder query) { return storage.read(storeProvider -> storeProvider.getTaskStore().fetchTasks(query)); } @@ -114,10 +118,12 @@ public abstract class AbstractTaskStoreTest extends TearDownTestCase { storeProvider -> storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.copyOf(tasks))); } - private ImmutableSet<IScheduledTask> mutateTasks( - final Query.Builder query, - final TaskMutation mutation) { + private Optional<IScheduledTask> mutateTask(String taskId, TaskMutation mutation) { + return storage.write( + storeProvider -> storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation)); + } + private ImmutableSet<IScheduledTask> mutateTasks(Query.Builder query, TaskMutation mutation) { return storage.write( storeProvider -> storeProvider.getUnsafeTaskStore().mutateTasks(query, mutation)); } @@ -261,9 +267,7 @@ public abstract class AbstractTaskStoreTest extends TearDownTestCase { saveTasks(TASK_A, TASK_B, TASK_C, TASK_D); assertQueryResults(Query.statusScoped(RUNNING)); - mutateTasks( - Query.taskScoped("a"), - task -> IScheduledTask.build(task.newBuilder().setStatus(RUNNING))); + mutateTask("a", task -> IScheduledTask.build(task.newBuilder().setStatus(RUNNING))); assertQueryResults( Query.statusScoped(RUNNING), @@ -293,10 +297,7 @@ public abstract class AbstractTaskStoreTest extends TearDownTestCase { saveTasks(TASK_A); assertTrue(unsafeModifyInPlace(taskId, updated)); - Query.Builder query = Query.taskScoped(taskId); - ITaskConfig stored = - Iterables.getOnlyElement(fetchTasks(query)).getAssignedTask().getTask(); - assertEquals(updated, stored); + assertEquals(updated, fetchTask(taskId).get().getAssignedTask().getTask()); deleteTasks(taskId); assertFalse(unsafeModifyInPlace(taskId, updated)); @@ -413,22 +414,22 @@ public abstract class AbstractTaskStoreTest extends TearDownTestCase { assertQueryResults(Query.slaveScoped(HOST_A.getHost())); final IScheduledTask b = setHost(a, HOST_A); - Set<IScheduledTask> result = mutateTasks(Query.taskScoped(Tasks.id(a)), + Optional<IScheduledTask> result = mutateTask(Tasks.id(a), task -> { assertEquals(a, task); return b; }); - assertEquals(ImmutableSet.of(b), result); + assertEquals(Optional.of(b), result); assertQueryResults(Query.slaveScoped(HOST_A.getHost()), b); // Unrealistic behavior, but proving that the secondary index can handle key mutations. final IScheduledTask c = setHost(b, HOST_B); - Set<IScheduledTask> result2 = mutateTasks(Query.taskScoped(Tasks.id(a)), + Optional<IScheduledTask> result2 = mutateTask(Tasks.id(a), task -> { assertEquals(b, task); return c; }); - assertEquals(ImmutableSet.of(c), result2); + assertEquals(Optional.of(c), result2); assertQueryResults(Query.slaveScoped(HOST_B.getHost()), c); deleteTasks(Tasks.id(a)); @@ -444,12 +445,12 @@ public abstract class AbstractTaskStoreTest extends TearDownTestCase { assertQueryResults(Query.slaveScoped(HOST_A.getHost()), a); final IScheduledTask b = unsetHost(a); - Set<IScheduledTask> result = mutateTasks(Query.taskScoped(Tasks.id(a)), + Optional<IScheduledTask> result = mutateTask(Tasks.id(a), task -> { assertEquals(a, task); return b; }); - assertEquals(ImmutableSet.of(b), result); + assertEquals(Optional.of(b), result); assertQueryResults(Query.slaveScoped(HOST_A.getHost())); assertQueryResults(Query.taskScoped(Tasks.id(b)), b); } http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java index 4305270..7382eca 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java @@ -78,7 +78,6 @@ import org.apache.aurora.gen.storage.Snapshot; import org.apache.aurora.gen.storage.Transaction; import org.apache.aurora.gen.storage.storageConstants; import org.apache.aurora.scheduler.base.JobKeys; -import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.TaskTestUtil; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.events.EventSink; @@ -542,22 +541,22 @@ public class LogStorageTest extends EasyMockTest { @Test public void testMutateTasks() throws Exception { - Query.Builder query = Query.taskScoped("fred"); + String taskId = "fred"; Function<IScheduledTask, IScheduledTask> mutation = Functions.identity(); - ImmutableSet<IScheduledTask> mutated = ImmutableSet.of(task("a", ScheduleStatus.STARTING)); + Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.STARTING)); new AbstractMutationFixture() { @Override protected void setupExpectations() throws Exception { storageUtil.expectWrite(); - expect(storageUtil.taskStore.mutateTasks(query, mutation)).andReturn(mutated); + expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated); streamMatcher.expectTransaction( - Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(mutated)))) + Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder())))) .andReturn(null); } @Override protected void performMutations(MutableStoreProvider storeProvider) { - assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTasks(query, mutation)); + assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation)); } }.run(); } @@ -588,28 +587,28 @@ public class LogStorageTest extends EasyMockTest { @Test public void testNestedTransactions() throws Exception { - Query.Builder query = Query.taskScoped("fred"); + String taskId = "fred"; Function<IScheduledTask, IScheduledTask> mutation = Functions.identity(); - ImmutableSet<IScheduledTask> mutated = ImmutableSet.of(task("a", ScheduleStatus.STARTING)); + Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.STARTING)); ImmutableSet<String> tasksToRemove = ImmutableSet.of("b"); new AbstractMutationFixture() { @Override protected void setupExpectations() throws Exception { storageUtil.expectWrite(); - expect(storageUtil.taskStore.mutateTasks(query, mutation)).andReturn(mutated); + expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated); storageUtil.taskStore.deleteTasks(tasksToRemove); streamMatcher.expectTransaction( - Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(mutated))), + Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))), Op.removeTasks(new RemoveTasks(tasksToRemove))) .andReturn(position); } @Override protected void performMutations(MutableStoreProvider storeProvider) { - assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTasks(query, mutation)); + assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation)); logStorage.write((NoResult.Quiet) innerProvider -> innerProvider.getUnsafeTaskStore().deleteTasks(tasksToRemove)); @@ -619,10 +618,10 @@ public class LogStorageTest extends EasyMockTest { @Test public void testSaveAndMutateTasks() throws Exception { - Query.Builder query = Query.taskScoped("fred"); + String taskId = "fred"; Function<IScheduledTask, IScheduledTask> mutation = Functions.identity(); Set<IScheduledTask> saved = ImmutableSet.of(task("a", ScheduleStatus.INIT)); - ImmutableSet<IScheduledTask> mutated = ImmutableSet.of(task("a", ScheduleStatus.PENDING)); + Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.PENDING)); new AbstractMutationFixture() { @Override @@ -631,28 +630,28 @@ public class LogStorageTest extends EasyMockTest { storageUtil.taskStore.saveTasks(saved); // Nested transaction with result. - expect(storageUtil.taskStore.mutateTasks(query, mutation)).andReturn(mutated); + expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated); // Resulting stream operation. streamMatcher.expectTransaction(Op.saveTasks( - new SaveTasks(IScheduledTask.toBuildersSet(mutated)))) + new SaveTasks(ImmutableSet.of(mutated.get().newBuilder())))) .andReturn(null); } @Override protected void performMutations(MutableStoreProvider storeProvider) { storeProvider.getUnsafeTaskStore().saveTasks(saved); - assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTasks(query, mutation)); + assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation)); } }.run(); } @Test public void testSaveAndMutateTasksNoCoalesceUniqueIds() throws Exception { - Query.Builder query = Query.taskScoped("fred"); + String taskId = "fred"; Function<IScheduledTask, IScheduledTask> mutation = Functions.identity(); Set<IScheduledTask> saved = ImmutableSet.of(task("b", ScheduleStatus.INIT)); - ImmutableSet<IScheduledTask> mutated = ImmutableSet.of(task("a", ScheduleStatus.PENDING)); + Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.PENDING)); new AbstractMutationFixture() { @Override @@ -661,14 +660,14 @@ public class LogStorageTest extends EasyMockTest { storageUtil.taskStore.saveTasks(saved); // Nested transaction with result. - expect(storageUtil.taskStore.mutateTasks(query, mutation)).andReturn(mutated); + expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated); // Resulting stream operation. streamMatcher.expectTransaction( Op.saveTasks(new SaveTasks( ImmutableSet.<ScheduledTask>builder() .addAll(IScheduledTask.toBuildersList(saved)) - .addAll(IScheduledTask.toBuildersList(mutated)) + .add(mutated.get().newBuilder()) .build()))) .andReturn(position); } @@ -676,7 +675,7 @@ public class LogStorageTest extends EasyMockTest { @Override protected void performMutations(MutableStoreProvider storeProvider) { storeProvider.getUnsafeTaskStore().saveTasks(saved); - assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTasks(query, mutation)); + assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation)); } }.run(); } http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java b/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java index bf344a4..21d26b3 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java @@ -13,6 +13,7 @@ */ package org.apache.aurora.scheduler.storage.testing; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableSet; import org.apache.aurora.common.testing.easymock.EasyMockTest; @@ -114,6 +115,14 @@ public class StorageTestUtil { return expect(taskStore.fetchTasks(query)).andReturn(result); } + public IExpectationSetters<?> expectTaskFetch(String taskId, IScheduledTask result) { + return expect(taskStore.fetchTask(taskId)).andReturn(Optional.of(result)); + } + + public IExpectationSetters<?> expectTaskFetch(String taskId) { + return expect(taskStore.fetchTask(taskId)).andReturn(Optional.absent()); + } + public IExpectationSetters<?> expectTaskFetch(Query.Builder query, IScheduledTask... result) { return expectTaskFetch(query, ImmutableSet.<IScheduledTask>builder().add(result).build()); }
