Repository: aurora Updated Branches: refs/heads/master 1dc11fb1a -> ef0975655
Return Iterable from TaskStore.fetchTasks to allow for streaming. Reviewed at https://reviews.apache.org/r/33105/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/ef097565 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/ef097565 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/ef097565 Branch: refs/heads/master Commit: ef0975655c04f0c2f3ecb6599d4e4beb9547f091 Parents: 1dc11fb Author: Bill Farner <[email protected]> Authored: Tue Apr 14 14:44:18 2015 -0700 Committer: Bill Farner <[email protected]> Committed: Tue Apr 14 14:44:18 2015 -0700 ---------------------------------------------------------------------- .../aurora/scheduler/async/GcExecutorLauncher.java | 4 ++-- .../org/apache/aurora/scheduler/async/KillRetry.java | 3 ++- .../aurora/scheduler/async/RescheduleCalculator.java | 2 +- .../apache/aurora/scheduler/async/TaskHistoryPruner.java | 7 ++++--- .../aurora/scheduler/cron/quartz/AuroraCronJob.java | 3 ++- .../aurora/scheduler/state/MaintenanceController.java | 5 +++-- .../apache/aurora/scheduler/state/StateManagerImpl.java | 2 +- .../apache/aurora/scheduler/storage/ForwardingStore.java | 3 +-- .../org/apache/aurora/scheduler/storage/Storage.java | 8 +++----- .../org/apache/aurora/scheduler/storage/TaskStore.java | 2 +- .../apache/aurora/scheduler/storage/backup/Recovery.java | 6 +++--- .../scheduler/storage/backup/TemporaryStorage.java | 8 ++++---- .../aurora/scheduler/thrift/ReadOnlySchedulerImpl.java | 8 +++----- .../scheduler/thrift/SchedulerThriftInterface.java | 11 ++++++----- .../apache/aurora/scheduler/async/TaskSchedulerTest.java | 3 ++- 15 files changed, 38 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/ef097565/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java b/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java index 1da35c0..4d589a3 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java +++ b/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java @@ -15,7 +15,6 @@ package org.apache.aurora.scheduler.async; import java.util.Collections; import java.util.Map; -import java.util.Set; import java.util.UUID; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; @@ -180,7 +179,8 @@ public class GcExecutorLauncher implements TaskLauncher { } private TaskInfo makeGcTask(String hostName, SlaveID slaveId) { - Set<IScheduledTask> tasksOnHost = Storage.Util.fetchTasks(storage, Query.slaveScoped(hostName)); + Iterable<IScheduledTask> tasksOnHost = + Storage.Util.fetchTasks(storage, Query.slaveScoped(hostName)); tasksCreated.incrementAndGet(); return makeGcTask( hostName, http://git-wip-us.apache.org/repos/asf/aurora/blob/ef097565/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java b/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java index 3bb80ec..b125c1c 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java +++ b/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java @@ -21,6 +21,7 @@ import java.util.logging.Logger; import javax.inject.Inject; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterables; import com.google.common.eventbus.Subscribe; import com.twitter.common.stats.StatsProvider; import com.twitter.common.util.BackoffStrategy; @@ -88,7 +89,7 @@ public class KillRetry implements EventSubscriber { @Override public void run() { Query.Builder query = Query.taskScoped(taskId).byStatus(ScheduleStatus.KILLING); - if (!Storage.Util.fetchTasks(storage, query).isEmpty()) { + if (!Iterables.isEmpty(Storage.Util.fetchTasks(storage, query))) { LOG.info("Task " + taskId + " not yet killed, retrying."); // Kill did not yet take effect, try again. http://git-wip-us.apache.org/repos/asf/aurora/blob/ef097565/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java b/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java index 0cf7fb4..6a0c0a9 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java +++ b/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java @@ -146,7 +146,7 @@ public interface RescheduleCalculator { return Optional.absent(); } - Set<IScheduledTask> res = + Iterable<IScheduledTask> res = Storage.Util.fetchTasks(storage, Query.taskScoped(task.getAncestorId())); return Optional.fromNullable(Iterables.getOnlyElement(res, null)); http://git-wip-us.apache.org/repos/asf/aurora/blob/ef097565/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java index 985a319..7b6c063 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java +++ b/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java @@ -155,10 +155,11 @@ public class TaskHistoryPruner implements EventSubscriber { executor.submit(new Runnable() { @Override public void run() { - Set<IScheduledTask> inactiveTasks = + Iterable<IScheduledTask> inactiveTasks = Storage.Util.fetchTasks(storage, jobHistoryQuery(jobKey)); - int tasksToPrune = inactiveTasks.size() - settings.perJobHistoryGoal; - if (tasksToPrune > 0 && inactiveTasks.size() > settings.perJobHistoryGoal) { + int numInactiveTasks = Iterables.size(inactiveTasks); + int tasksToPrune = numInactiveTasks - settings.perJobHistoryGoal; + if (tasksToPrune > 0 && numInactiveTasks > settings.perJobHistoryGoal) { Set<String> toPrune = FluentIterable .from(Tasks.LATEST_ACTIVITY.sortedCopy(inactiveTasks)) .filter(safeToDelete) http://git-wip-us.apache.org/repos/asf/aurora/blob/ef097565/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java index 3b5dcf8..df180a4 100644 --- a/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java +++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java @@ -23,6 +23,7 @@ import javax.inject.Inject; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; +import com.google.common.collect.Iterables; import com.twitter.common.base.Supplier; import com.twitter.common.stats.Stats; import com.twitter.common.util.BackoffHelper; @@ -200,7 +201,7 @@ class AuroraCronJob implements Job { delayedStartBackoff.doUntilSuccess(new Supplier<Boolean>() { @Override public Boolean get() { - if (Storage.Util.fetchTasks(storage, query).isEmpty()) { + if (Iterables.isEmpty(Storage.Util.fetchTasks(storage, query))) { LOG.info("Initiating delayed launch of cron " + path); storage.write(new Storage.MutateWork.NoResult.Quiet() { @Override http://git-wip-us.apache.org/repos/asf/aurora/blob/ef097565/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java index 09be4dc..a6d7ab7 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java +++ b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java @@ -24,6 +24,7 @@ import com.google.common.base.Optional; import com.google.common.base.Predicates; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.common.eventbus.Subscribe; @@ -161,8 +162,8 @@ public interface MaintenanceController { store.getAttributeStore().getHostAttributes(host); if (attributes.isPresent() && attributes.get().getMode() == DRAINING) { Query.Builder builder = Query.slaveScoped(host).active(); - Set<IScheduledTask> activeTasks = store.getTaskStore().fetchTasks(builder); - if (activeTasks.isEmpty()) { + Iterable<IScheduledTask> activeTasks = store.getTaskStore().fetchTasks(builder); + if (Iterables.isEmpty(activeTasks)) { LOG.info(String.format("Moving host %s into DRAINED", host)); setMaintenanceMode(store, ImmutableSet.of(host), DRAINED); } else { http://git-wip-us.apache.org/repos/asf/aurora/blob/ef097565/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java index b6a7b4a..2a943cf 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java @@ -126,7 +126,7 @@ public class StateManagerImpl implements StateManager { } }).toSet(); - ImmutableSet<IScheduledTask> existingTasks = storeProvider.getTaskStore().fetchTasks( + Iterable<IScheduledTask> existingTasks = storeProvider.getTaskStore().fetchTasks( Query.jobScoped(task.getJob()).active()); Set<Integer> existingInstanceIds = http://git-wip-us.apache.org/repos/asf/aurora/blob/ef097565/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java b/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java index a8e3b14..1a63169 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java @@ -18,7 +18,6 @@ import java.util.Map; import java.util.Set; import com.google.common.base.Optional; -import com.google.common.collect.ImmutableSet; import org.apache.aurora.gen.storage.StoredJobUpdateDetails; import org.apache.aurora.scheduler.base.Query; @@ -105,7 +104,7 @@ public class ForwardingStore implements } @Override - public ImmutableSet<IScheduledTask> fetchTasks(Query.Builder querySupplier) { + public Iterable<IScheduledTask> fetchTasks(Query.Builder querySupplier) { return taskStore.fetchTasks(querySupplier); } http://git-wip-us.apache.org/repos/asf/aurora/blob/ef097565/src/main/java/org/apache/aurora/scheduler/storage/Storage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java index 6180a36..972a3c1 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java @@ -20,8 +20,6 @@ import java.lang.annotation.Target; import javax.inject.Qualifier; -import com.google.common.collect.ImmutableSet; - import org.apache.aurora.scheduler.base.Query.Builder; import org.apache.aurora.scheduler.base.SchedulerException; import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; @@ -275,10 +273,10 @@ public interface Storage { * @param query Builder of the query to perform. * @return Tasks returned from the query. */ - public static ImmutableSet<IScheduledTask> fetchTasks(Storage storage, final Builder query) { - return storage.read(new Work.Quiet<ImmutableSet<IScheduledTask>>() { + public static Iterable<IScheduledTask> fetchTasks(Storage storage, final Builder query) { + return storage.read(new Work.Quiet<Iterable<IScheduledTask>>() { @Override - public ImmutableSet<IScheduledTask> apply(StoreProvider storeProvider) { + public Iterable<IScheduledTask> apply(StoreProvider storeProvider) { return storeProvider.getTaskStore().fetchTasks(query); } }); http://git-wip-us.apache.org/repos/asf/aurora/blob/ef097565/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java index b76c937..2768e6e 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java @@ -34,7 +34,7 @@ public interface TaskStore { * @param query Builder of the query to identify tasks with. * @return A read-only view of matching tasks. */ - ImmutableSet<IScheduledTask> fetchTasks(Query.Builder query); + Iterable<IScheduledTask> fetchTasks(Query.Builder query); interface Mutable extends TaskStore { http://git-wip-us.apache.org/repos/asf/aurora/blob/ef097565/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java index 38764e5..fb0dbae 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java @@ -67,7 +67,7 @@ public interface Recovery { * @return Tasks matching the query. * @throws RecoveryException If a backup is not staged, or could not be queried. */ - Set<IScheduledTask> query(Query.Builder query) throws RecoveryException; + Iterable<IScheduledTask> query(Query.Builder query) throws RecoveryException; /** * Deletes tasks from a staged backup. @@ -163,7 +163,7 @@ public interface Recovery { } @Override - public Set<IScheduledTask> query(Query.Builder query) throws RecoveryException { + public Iterable<IScheduledTask> query(Query.Builder query) throws RecoveryException { return getLoadedRecovery().query(query); } @@ -203,7 +203,7 @@ public interface Recovery { }); } - Set<IScheduledTask> query(final Query.Builder query) { + Iterable<IScheduledTask> query(final Query.Builder query) { return tempStorage.fetchTasks(query); } http://git-wip-us.apache.org/repos/asf/aurora/blob/ef097565/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java index 2102adb..586b53b 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java @@ -51,7 +51,7 @@ interface TemporaryStorage { * @param query Query builder for tasks to fetch. * @return Matching tasks. */ - Set<IScheduledTask> fetchTasks(Query.Builder query); + Iterable<IScheduledTask> fetchTasks(Query.Builder query); /** * Creates a snapshot of the contents of the temporary storage. @@ -87,10 +87,10 @@ interface TemporaryStorage { } @Override - public Set<IScheduledTask> fetchTasks(final Query.Builder query) { - return storage.read(new Work.Quiet<Set<IScheduledTask>>() { + public Iterable<IScheduledTask> fetchTasks(final Query.Builder query) { + return storage.read(new Work.Quiet<Iterable<IScheduledTask>>() { @Override - public Set<IScheduledTask> apply(StoreProvider storeProvider) { + public Iterable<IScheduledTask> apply(StoreProvider storeProvider) { return storeProvider.getTaskStore().fetchTasks(query); } }); http://git-wip-us.apache.org/repos/asf/aurora/blob/ef097565/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java index 7aef1ca..30e579c 100644 --- a/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java @@ -209,11 +209,9 @@ class ReadOnlySchedulerImpl implements ReadOnlyScheduler.Iface { public Response getConfigSummary(JobKey job) throws TException { IJobKey jobKey = JobKeys.assertValid(IJobKey.build(job)); - Set<IScheduledTask> activeTasks = - Storage.Util.fetchTasks(storage, Query.jobScoped(jobKey).active()); - - Iterable<IAssignedTask> assignedTasks = - Iterables.transform(activeTasks, Tasks.SCHEDULED_TO_ASSIGNED); + Iterable<IAssignedTask> assignedTasks = Iterables.transform( + Storage.Util.fetchTasks(storage, Query.jobScoped(jobKey).active()), + Tasks.SCHEDULED_TO_ASSIGNED); Map<Integer, ITaskConfig> tasksByInstance = Maps.transformValues( Maps.uniqueIndex(assignedTasks, Tasks.ASSIGNED_TO_INSTANCE_ID), Tasks.ASSIGNED_TO_INFO); http://git-wip-us.apache.org/repos/asf/aurora/blob/ef097565/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java index b7d3874..160db12 100644 --- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java +++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java @@ -293,7 +293,7 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin { } private void checkJobExists(StoreProvider store, IJobKey jobKey) throws JobExistsException { - if (!store.getTaskStore().fetchTasks(Query.jobScoped(jobKey).active()).isEmpty() + if (!Iterables.isEmpty(store.getTaskStore().fetchTasks(Query.jobScoped(jobKey).active())) || getCronJob(store, jobKey).isPresent()) { throw new JobExistsException(jobAlreadyExistsMessage(jobKey)); @@ -615,8 +615,9 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin { } Query.Builder query = Query.instanceScoped(jobKey, shardIds).active(); - final Set<IScheduledTask> matchingTasks = storeProvider.getTaskStore().fetchTasks(query); - if (matchingTasks.size() != shardIds.size()) { + final Iterable<IScheduledTask> matchingTasks = + storeProvider.getTaskStore().fetchTasks(query); + if (Iterables.size(matchingTasks) != shardIds.size()) { return invalidRequest("Not all requested shards are active."); } @@ -935,12 +936,12 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin { ILockKey.build(LockKey.job(jobKey.newBuilder())), Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER)); - ImmutableSet<IScheduledTask> currentTasks = storeProvider.getTaskStore().fetchTasks( + Iterable<IScheduledTask> currentTasks = storeProvider.getTaskStore().fetchTasks( Query.jobScoped(task.getJob()).active()); validateTaskLimits( task, - currentTasks.size() + config.getInstanceIdsSize(), + Iterables.size(currentTasks) + config.getInstanceIdsSize(), quotaManager.checkInstanceAddition(task, config.getInstanceIdsSize(), storeProvider)); storage.write(new NoResult.Quiet() { http://git-wip-us.apache.org/repos/asf/aurora/blob/ef097565/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java index 34cbd19..858069e 100644 --- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java +++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java @@ -21,6 +21,7 @@ import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; import com.google.common.util.concurrent.RateLimiter; import com.twitter.common.quantity.Amount; import com.twitter.common.quantity.Time; @@ -181,7 +182,7 @@ public class TaskSchedulerTest extends EasyMockTest { @Override protected void execute(MutableStoreProvider storeProvider) { TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore(); - if (taskStore.fetchTasks(Query.taskScoped(Tasks.id(copy))).isEmpty()) { + if (Iterables.isEmpty(taskStore.fetchTasks(Query.taskScoped(Tasks.id(copy))))) { taskStore.saveTasks(ImmutableSet.of(copy)); } }
