Add RemoveJobUpdates log Op, slim JobUpdateStore API
JobUpdateStore historically had granular APIs in the storage layer to
minimize unnecessary use of 'expensive' database objects. The
in-memory store makes these 'free', so moving business logic out of
the storage layer is now feasible for performance and pragmatic.
This patch also introduces the `RemoveJobUpdates` log `Op`, and
`PruneJobUpdateHistory` is now ignored. In a future release (and possibly
before, with a feature flag), the scheduler will write `RemoveJobUpdates`
to the log.
LogStorage has always had the fundamental expectation that `Op`s are
idempotent. The job update event `Op`s arguably violate this requirement, but
at minimum, explicit removal of updates is necessary for idempotency.
>From LogStorage.java:
This design implies that all mutations must be idempotent and free from
constraint and thus replayable over newer operations when recovering
from an old checkpoint.
Reviewed at https://reviews.apache.org/r/63884/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/284f40f5
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/284f40f5
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/284f40f5
Branch: refs/heads/master
Commit: 284f40f5e36c70114e6229fcb93e3b203d2f1120
Parents: 80139da
Author: Bill Farner <[email protected]>
Authored: Tue Nov 28 16:58:47 2017 -0800
Committer: Bill Farner <[email protected]>
Committed: Tue Nov 28 16:58:47 2017 -0800
----------------------------------------------------------------------
.../thrift/org/apache/aurora/gen/storage.thrift | 5 +
.../org/apache/aurora/benchmark/JobUpdates.java | 2 +-
.../aurora/benchmark/UpdateStoreBenchmarks.java | 12 +-
.../pruning/JobUpdateHistoryPruner.java | 94 ++-
.../aurora/scheduler/pruning/PruningModule.java | 11 +-
.../scheduler/pruning/TaskHistoryPruner.java | 8 +-
.../aurora/scheduler/quota/QuotaManager.java | 30 +-
.../scheduler/storage/JobUpdateStore.java | 83 +--
.../scheduler/storage/log/LogStorage.java | 10 +-
.../storage/log/SnapshotStoreImpl.java | 4 +-
.../storage/log/WriteAheadStorage.java | 55 +-
.../storage/mem/MemJobUpdateStore.java | 158 +----
.../scheduler/thrift/ReadOnlySchedulerImpl.java | 21 +-
.../updater/JobUpdateControllerImpl.java | 84 ++-
.../pruning/JobUpdateHistoryPrunerTest.java | 170 +++--
.../pruning/TaskHistoryPrunerTest.java | 4 +-
.../scheduler/quota/QuotaManagerImplTest.java | 131 ++--
.../storage/AbstractJobUpdateStoreTest.java | 711 +++++--------------
.../scheduler/storage/log/LogStorageTest.java | 30 +-
.../storage/log/WriteAheadStorageTest.java | 28 +-
.../thrift/ReadOnlySchedulerImplTest.java | 23 +-
.../aurora/scheduler/updater/JobUpdaterIT.java | 8 +-
22 files changed, 623 insertions(+), 1059 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/api/src/main/thrift/org/apache/aurora/gen/storage.thrift
----------------------------------------------------------------------
diff --git a/api/src/main/thrift/org/apache/aurora/gen/storage.thrift
b/api/src/main/thrift/org/apache/aurora/gen/storage.thrift
index c692a5f..2210497 100644
--- a/api/src/main/thrift/org/apache/aurora/gen/storage.thrift
+++ b/api/src/main/thrift/org/apache/aurora/gen/storage.thrift
@@ -58,6 +58,10 @@ struct SaveJobUpdate {
// 2: deleted
}
+struct RemoveJobUpdates {
+ 1: set<api.JobUpdateKey> keys
+}
+
struct StoredJobUpdateDetails {
1: api.JobUpdateDetails details
// 2: deleted
@@ -94,6 +98,7 @@ union Op {
15: SaveJobUpdateEvent saveJobUpdateEvent
16: SaveJobInstanceUpdateEvent saveJobInstanceUpdateEvent
17: PruneJobUpdateHistory pruneJobUpdateHistory
+ 18: RemoveJobUpdates removeJobUpdate
}
// The current schema version ID. This should be incremented each time the
http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java
b/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java
index a5d1894..7557301 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java
@@ -63,7 +63,7 @@ final class JobUpdates {
ImmutableSet.Builder<IJobUpdateKey> keyBuilder = ImmutableSet.builder();
storage.write((Storage.MutateWork.NoResult.Quiet) store -> {
JobUpdateStore.Mutable updateStore = store.getJobUpdateStore();
- updateStore.deleteAllUpdatesAndEvents();
+ updateStore.deleteAllUpdates();
for (IJobUpdateDetails details : updates) {
IJobUpdateKey key = details.getUpdate().getSummary().getKey();
keyBuilder.add(key);
http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java
----------------------------------------------------------------------
diff --git
a/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java
b/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java
index c98c514..e41db20 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java
@@ -67,13 +67,13 @@ public class UpdateStoreBenchmarks {
@TearDown(Level.Iteration)
public void tearDownIteration() {
storage.write((NoResult.Quiet) storeProvider -> {
- storeProvider.getJobUpdateStore().deleteAllUpdatesAndEvents();
+ storeProvider.getJobUpdateStore().deleteAllUpdates();
});
}
@Benchmark
public IJobUpdateDetails run() throws TException {
- return storage.read(store ->
store.getJobUpdateStore().fetchJobUpdateDetails(
+ return storage.read(store -> store.getJobUpdateStore().fetchJobUpdate(
Iterables.getOnlyElement(keys)).get());
}
}
@@ -106,13 +106,13 @@ public class UpdateStoreBenchmarks {
@TearDown(Level.Iteration)
public void tearDownIteration() {
storage.write((NoResult.Quiet) storeProvider -> {
- storeProvider.getJobUpdateStore().deleteAllUpdatesAndEvents();
+ storeProvider.getJobUpdateStore().deleteAllUpdates();
});
}
@Benchmark
public IJobUpdateDetails run() throws TException {
- return storage.read(store ->
store.getJobUpdateStore().fetchJobUpdateDetails(
+ return storage.read(store -> store.getJobUpdateStore().fetchJobUpdate(
Iterables.getOnlyElement(keys)).get());
}
}
@@ -145,13 +145,13 @@ public class UpdateStoreBenchmarks {
@TearDown(Level.Iteration)
public void tearDownIteration() {
storage.write((NoResult.Quiet) storeProvider -> {
- storeProvider.getJobUpdateStore().deleteAllUpdatesAndEvents();
+ storeProvider.getJobUpdateStore().deleteAllUpdates();
});
}
@Benchmark
public IJobUpdateDetails run() throws TException {
- return storage.read(store ->
store.getJobUpdateStore().fetchJobUpdateDetails(
+ return storage.read(store -> store.getJobUpdateStore().fetchJobUpdate(
Iterables.getOnlyElement(keys)).get());
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/main/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPruner.java
----------------------------------------------------------------------
diff --git
a/src/main/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPruner.java
b/src/main/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPruner.java
index b2768d9..05ada3c 100644
---
a/src/main/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPruner.java
+++
b/src/main/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPruner.java
@@ -13,39 +13,51 @@
*/
package org.apache.aurora.scheduler.pruning;
+import java.util.List;
import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
-import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.Ordering;
+import com.google.common.util.concurrent.AbstractScheduledService;
+import org.apache.aurora.common.inject.TimedInterceptor.Timed;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.common.util.Clock;
+import org.apache.aurora.gen.JobUpdateQuery;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Objects.requireNonNull;
+import static
org.apache.aurora.scheduler.storage.JobUpdateStore.TERMINAL_STATES;
+
/**
* Prunes per-job update history on a periodic basis.
*/
-class JobUpdateHistoryPruner extends AbstractIdleService {
+class JobUpdateHistoryPruner extends AbstractScheduledService {
private static final Logger LOG =
LoggerFactory.getLogger(JobUpdateHistoryPruner.class);
@VisibleForTesting
static final String JOB_UPDATES_PRUNED = "job_updates_pruned";
private final Clock clock;
- private final ScheduledExecutorService executor;
private final Storage storage;
private final HistoryPrunerSettings settings;
private final AtomicLong prunedUpdatesCount;
@@ -69,38 +81,80 @@ class JobUpdateHistoryPruner extends AbstractIdleService {
@Inject
JobUpdateHistoryPruner(
Clock clock,
- ScheduledExecutorService executor,
Storage storage,
HistoryPrunerSettings settings,
StatsProvider statsProvider) {
this.clock = requireNonNull(clock);
- this.executor = requireNonNull(executor);
this.storage = requireNonNull(storage);
this.settings = requireNonNull(settings);
this.prunedUpdatesCount = statsProvider.makeCounter(JOB_UPDATES_PRUNED);
}
@Override
- protected void startUp() {
- executor.scheduleAtFixedRate(
- () -> storage.write((NoResult.Quiet) storeProvider -> {
- Set<IJobUpdateKey> prunedUpdates =
storeProvider.getJobUpdateStore().pruneHistory(
- settings.maxUpdatesPerJob,
- clock.nowMillis() -
settings.maxHistorySize.as(Time.MILLISECONDS));
-
- prunedUpdatesCount.addAndGet(prunedUpdates.size());
- LOG.info(prunedUpdates.isEmpty()
- ? "No job update history to prune."
- : "Pruned job update history: " +
Joiner.on(",").join(prunedUpdates));
- }),
+ protected Scheduler scheduler() {
+ return Scheduler.newFixedDelaySchedule(
settings.pruneInterval.as(Time.MILLISECONDS),
settings.pruneInterval.as(Time.MILLISECONDS),
TimeUnit.MILLISECONDS);
}
+ @VisibleForTesting
+ void runForTest() {
+ runOneIteration();
+ }
+
+ @Timed("job_update_store_prune_history")
@Override
- protected void shutDown() {
- // Nothing to do - await VM shutdown.
+ protected void runOneIteration() {
+ storage.write((NoResult.Quiet) storeProvider -> {
+
+ List<IJobUpdateSummary> completedUpdates =
storeProvider.getJobUpdateStore()
+ .fetchJobUpdates(IJobUpdateQuery.build(
+ new JobUpdateQuery().setUpdateStatuses(TERMINAL_STATES)))
+ .stream()
+ .map(u -> u.getUpdate().getSummary())
+ .collect(Collectors.toList());
+
+ long cutoff = clock.nowMillis() -
settings.maxHistorySize.as(Time.MILLISECONDS);
+ Predicate<IJobUpdateSummary> expiredFilter =
+ s -> s.getState().getCreatedTimestampMs() < cutoff;
+
+ ImmutableSet.Builder<IJobUpdateKey> pruneBuilder =
ImmutableSet.builder();
+
+ // Gather updates based on time threshold.
+ pruneBuilder.addAll(completedUpdates
+ .stream()
+ .filter(expiredFilter)
+ .map(IJobUpdateSummary::getKey)
+ .collect(Collectors.toList()));
+
+ Multimap<IJobKey, IJobUpdateSummary> updatesByJob = Multimaps.index(
+ // Avoid counting to-be-removed expired updates.
+ completedUpdates.stream().filter(expiredFilter.negate()).iterator(),
+ s -> s.getKey().getJob());
+
+ updatesByJob.asMap().values().forEach(updates -> {
+ if (updates.size() > settings.maxUpdatesPerJob) {
+ Ordering<IJobUpdateSummary> creationOrder = Ordering.natural()
+ .onResultOf(s -> s.getState().getCreatedTimestampMs());
+ pruneBuilder.addAll(creationOrder
+ .leastOf(updates, updates.size() - settings.maxUpdatesPerJob)
+ .stream()
+ .map(IJobUpdateSummary::getKey)
+ .iterator());
+ }
+ });
+
+ Set<IJobUpdateKey> pruned = pruneBuilder.build();
+ if (!pruned.isEmpty()) {
+ storeProvider.getJobUpdateStore().removeJobUpdates(pruned);
+ }
+
+ prunedUpdatesCount.addAndGet(pruned.size());
+ LOG.info(pruned.isEmpty()
+ ? "No job update history to prune."
+ : "Pruned job update history: " + Joiner.on(",").join(pruned));
+ });
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java
----------------------------------------------------------------------
diff --git
a/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java
b/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java
index 4433b96..0ed22d9 100644
--- a/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java
@@ -27,7 +27,6 @@ import org.apache.aurora.scheduler.SchedulerServicesModule;
import org.apache.aurora.scheduler.base.AsyncUtil;
import org.apache.aurora.scheduler.config.types.TimeAmount;
import org.apache.aurora.scheduler.events.PubsubEventModule;
-import
org.apache.aurora.scheduler.pruning.TaskHistoryPruner.HistoryPrunnerSettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,11 +78,11 @@ public class PruningModule extends AbstractModule {
protected void configure() {
// TODO(ksweeney): Create a configuration validator module so this can
be injected.
// TODO(William Farner): Revert this once large task counts is cheap
ala hierarchichal store
- bind(HistoryPrunnerSettings.class).toInstance(new
HistoryPrunnerSettings(
- options.historyPruneThreshold,
- options.historyMinRetentionThreshold,
- options.historyMaxPerJobThreshold
- ));
+ bind(TaskHistoryPruner.HistoryPrunerSettings.class).toInstance(
+ new TaskHistoryPruner.HistoryPrunerSettings(
+ options.historyPruneThreshold,
+ options.historyMinRetentionThreshold,
+ options.historyMaxPerJobThreshold));
bind(TaskHistoryPruner.class).in(Singleton.class);
expose(TaskHistoryPruner.class);
http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
----------------------------------------------------------------------
diff --git
a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
index 3cafbc2..9aa51c3 100644
--- a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
+++ b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
@@ -65,7 +65,7 @@ public class TaskHistoryPruner implements EventSubscriber {
private final ScheduledExecutorService executor;
private final StateManager stateManager;
private final Clock clock;
- private final HistoryPrunnerSettings settings;
+ private final HistoryPrunerSettings settings;
private final Storage storage;
private final Lifecycle lifecycle;
private final TaskEventBatchWorker batchWorker;
@@ -79,12 +79,12 @@ public class TaskHistoryPruner implements EventSubscriber {
}
};
- static class HistoryPrunnerSettings {
+ static class HistoryPrunerSettings {
private final long pruneThresholdMillis;
private final long minRetentionThresholdMillis;
private final int perJobHistoryGoal;
- HistoryPrunnerSettings(
+ HistoryPrunerSettings(
Amount<Long, Time> inactivePruneThreshold,
Amount<Long, Time> minRetentionThreshold,
int perJobHistoryGoal) {
@@ -100,7 +100,7 @@ public class TaskHistoryPruner implements EventSubscriber {
@AsyncExecutor ScheduledExecutorService executor,
StateManager stateManager,
Clock clock,
- HistoryPrunnerSettings settings,
+ HistoryPrunerSettings settings,
Storage storage,
Lifecycle lifecycle,
TaskEventBatchWorker batchWorker,
http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
index 7f8c66c..64ad12b 100644
--- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
@@ -16,6 +16,7 @@ package org.apache.aurora.scheduler.quota;
import java.util.EnumSet;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import com.google.common.annotations.VisibleForTesting;
@@ -26,7 +27,6 @@ import com.google.common.base.Predicate;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.RangeSet;
@@ -38,7 +38,6 @@ import
org.apache.aurora.scheduler.configuration.ConfigurationManager;
import org.apache.aurora.scheduler.resources.ResourceBag;
import org.apache.aurora.scheduler.resources.ResourceManager;
import org.apache.aurora.scheduler.resources.ResourceType;
-import org.apache.aurora.scheduler.storage.JobUpdateStore;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
@@ -48,7 +47,6 @@ import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
import org.apache.aurora.scheduler.storage.entities.IRange;
import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -274,8 +272,13 @@ public interface QuotaManager {
.from(storeProvider.getTaskStore().fetchTasks(Query.roleScoped(role).active()))
.transform(IScheduledTask::getAssignedTask);
- Map<IJobKey, IJobUpdateInstructions> updates = Maps.newHashMap(
- fetchActiveJobUpdates(storeProvider.getJobUpdateStore(), role));
+ // Relies on the invariant of at-most-one active update per job.
+ Map<IJobKey, IJobUpdateInstructions> updates =
storeProvider.getJobUpdateStore()
+ .fetchJobUpdates(updateQuery(role))
+ .stream()
+ .collect(Collectors.toMap(
+ u -> u.getUpdate().getSummary().getKey().getJob(),
+ u -> u.getUpdate().getInstructions()));
// Mix in a requested job update (if present) to correctly calculate
consumption.
// This would be an update that is not saved in the store yet (i.e. the
one quota is
@@ -398,20 +401,6 @@ public interface QuotaManager {
};
}
- private static Map<IJobKey, IJobUpdateInstructions> fetchActiveJobUpdates(
- final JobUpdateStore jobUpdateStore,
- String role) {
-
- Function<IJobUpdateSummary, IJobUpdate> fetchUpdate =
- summary -> jobUpdateStore.fetchJobUpdate(summary.getKey()).get();
-
- return Maps.transformValues(
-
FluentIterable.from(jobUpdateStore.fetchJobUpdateSummaries(updateQuery(role)))
- .transform(fetchUpdate)
- .uniqueIndex(UPDATE_TO_JOB_KEY),
- IJobUpdate::getInstructions);
- }
-
@VisibleForTesting
static IJobUpdateQuery updateQuery(String role) {
return IJobUpdateQuery.build(new JobUpdateQuery()
@@ -474,9 +463,6 @@ public interface QuotaManager {
return addAll(Iterables.transform(tasks, QUOTA_RESOURCES));
}
- private static final Function<IJobUpdate, IJobKey> UPDATE_TO_JOB_KEY =
- input -> input.getSummary().getKey().getJob();
-
private static int getUpdateInstanceCount(Set<IRange> ranges) {
int instanceCount = 0;
for (IRange range : ranges) {
http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
----------------------------------------------------------------------
diff --git
a/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
b/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
index b3d906b..6b91d97 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
@@ -19,15 +19,14 @@ import java.util.Set;
import com.google.common.base.Optional;
+import org.apache.aurora.gen.JobUpdateQuery;
import org.apache.aurora.gen.JobUpdateStatus;
import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
import static org.apache.aurora.gen.JobUpdateStatus.ABORTED;
import static org.apache.aurora.gen.JobUpdateStatus.ERROR;
@@ -48,21 +47,15 @@ public interface JobUpdateStore {
ERROR
);
- /**
- * Fetches a read-only view of job update summaries.
- *
- * @param query Query to identify job update summaries with.
- * @return A read-only view of job update summaries.
- */
- List<IJobUpdateSummary> fetchJobUpdateSummaries(IJobUpdateQuery query);
+ IJobUpdateQuery MATCH_ALL = IJobUpdateQuery.build(new JobUpdateQuery());
/**
* Fetches a read-only view of job update details matching the {@code query}.
*
* @param query Query to identify job update details with.
- * @return A read-only list view of job update details matching the query.
+ * @return A read-only view of job update details matching the query.
*/
- List<IJobUpdateDetails> fetchJobUpdateDetails(IJobUpdateQuery query);
+ List<IJobUpdateDetails> fetchJobUpdates(IJobUpdateQuery query);
/**
* Fetches a read-only view of job update details.
@@ -70,57 +63,12 @@ public interface JobUpdateStore {
* @param key Update identifier.
* @return A read-only view of job update details.
*/
- Optional<IJobUpdateDetails> fetchJobUpdateDetails(IJobUpdateKey key);
-
- /**
- * Fetches a read-only view of a job update.
- *
- * @param key Update identifier.
- * @return A read-only view of job update.
- */
- Optional<IJobUpdate> fetchJobUpdate(IJobUpdateKey key);
-
- /**
- * Fetches a read-only view of the instructions for a job update.
- *
- * @param key Update identifier.
- * @return A read-only view of job update instructions.
- */
- Optional<IJobUpdateInstructions> fetchJobUpdateInstructions(IJobUpdateKey
key);
-
- /**
- * Fetches a read-only view of all job update details available in the store.
- * TODO(wfarner): Generate immutable wrappers for storage.thrift structs,
use an immutable object
- * here.
- *
- * @return A read-only view of all job update details.
- */
- Set<IJobUpdateDetails> fetchAllJobUpdateDetails();
- /**
- * Fetches the events that have affected an instance within a job update.
- *
- * @param key Update identifier.
- * @param instanceId Instance to fetch events for.
- * @return Instance events in {@code key} that affected {@code instanceId}.
- */
- List<IJobInstanceUpdateEvent> fetchInstanceEvents(IJobUpdateKey key, int
instanceId);
+ Optional<IJobUpdateDetails> fetchJobUpdate(IJobUpdateKey key);
interface Mutable extends JobUpdateStore {
/**
- * Saves a new job update.
- *
- * <p>
- * Note: This call must be followed by the
- * {@link #saveJobUpdateEvent(IJobUpdateKey, IJobUpdateEvent)} before
fetching a saved update as
- * it does not save the following required fields:
- * <ul>
- * <li>{@link org.apache.aurora.gen.JobUpdateState#status}</li>
- * <li>{@link
org.apache.aurora.gen.JobUpdateState#createdTimestampMs}</li>
- * <li>{@link
org.apache.aurora.gen.JobUpdateState#lastModifiedTimestampMs}</li>
- * </ul>
- * The above fields are auto-populated from the update events and any
attempt to fetch an update
- * without having at least one {@link IJobUpdateEvent} present in the
store will return empty.
+ * Saves a job update.
*
* @param update Update to save.
*/
@@ -143,22 +91,15 @@ public interface JobUpdateStore {
void saveJobInstanceUpdateEvent(IJobUpdateKey key, IJobInstanceUpdateEvent
event);
/**
- * Deletes all updates and update events from the store.
+ * Deletes job updates.
+ *
+ * @param keys Keys of the updates to delete.
*/
- void deleteAllUpdatesAndEvents();
+ void removeJobUpdates(Set<IJobUpdateKey> keys);
/**
- * Prunes (deletes) old completed updates and events from the store.
- * <p>
- * At least {@code perJobRetainCount} last completed updates that
completed less than
- * {@code historyPruneThreshold} ago will be kept for every job.
- *
- * @param perJobRetainCount Number of completed updates to retain per job.
- * @param historyPruneThresholdMs Earliest timestamp in the past to retain
history.
- * Any completed updates created before
this timestamp
- * will be pruned.
- * @return Set of pruned update keys.
+ * Deletes all updates from the store.
*/
- Set<IJobUpdateKey> pruneHistory(int perJobRetainCount, long
historyPruneThresholdMs);
+ void deleteAllUpdates();
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
----------------------------------------------------------------------
diff --git
a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
index 3ce2c7f..07b4bdb 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
@@ -365,9 +365,13 @@ public class LogStorage implements NonVolatileStorage,
DistributedSnapshotStore
IJobUpdateKey.build(event.getKey()),
IJobInstanceUpdateEvent.build(op.getSaveJobInstanceUpdateEvent().getEvent()));
})
- .put(Op._Fields.PRUNE_JOB_UPDATE_HISTORY, op ->
writeBehindJobUpdateStore.pruneHistory(
- op.getPruneJobUpdateHistory().getPerJobRetainCount(),
-
op.getPruneJobUpdateHistory().getHistoryPruneThresholdMs())).build();
+ .put(Op._Fields.PRUNE_JOB_UPDATE_HISTORY, op -> {
+ LOG.info("Dropping prune operation. Updates will be pruned later.");
+ })
+ .put(Op._Fields.REMOVE_JOB_UPDATE, op ->
+ writeBehindJobUpdateStore.removeJobUpdates(
+
IJobUpdateKey.setFromBuilders(op.getRemoveJobUpdate().getKeys())))
+ .build();
}
@Override
http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
----------------------------------------------------------------------
diff --git
a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
index 57c483b..5859f80 100644
---
a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
+++
b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
@@ -218,7 +218,7 @@ public class SnapshotStoreImpl implements
SnapshotStore<Snapshot> {
@Override
public void saveToSnapshot(MutableStoreProvider store, Snapshot
snapshot) {
snapshot.setJobUpdateDetails(
- store.getJobUpdateStore().fetchAllJobUpdateDetails().stream()
+
store.getJobUpdateStore().fetchJobUpdates(JobUpdateStore.MATCH_ALL).stream()
.map(u -> new
StoredJobUpdateDetails().setDetails(u.newBuilder()))
.collect(Collectors.toSet()));
}
@@ -227,7 +227,7 @@ public class SnapshotStoreImpl implements
SnapshotStore<Snapshot> {
public void restoreFromSnapshot(MutableStoreProvider store, Snapshot
snapshot) {
if (snapshot.getJobUpdateDetailsSize() > 0) {
JobUpdateStore.Mutable updateStore = store.getJobUpdateStore();
- updateStore.deleteAllUpdatesAndEvents();
+ updateStore.deleteAllUpdates();
for (StoredJobUpdateDetails storedDetails :
snapshot.getJobUpdateDetails()) {
JobUpdateDetails details = storedDetails.getDetails();
updateStore.saveJobUpdate(thriftBackfill.backFillJobUpdate(details.getUpdate()));
http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
----------------------------------------------------------------------
diff --git
a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
index 4d051fc..41061f8 100644
---
a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
+++
b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
@@ -23,7 +23,6 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import org.apache.aurora.gen.storage.Op;
-import org.apache.aurora.gen.storage.PruneJobUpdateHistory;
import org.apache.aurora.gen.storage.RemoveJob;
import org.apache.aurora.gen.storage.RemoveQuota;
import org.apache.aurora.gen.storage.RemoveTasks;
@@ -52,10 +51,8 @@ import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.slf4j.Logger;
@@ -237,21 +234,12 @@ class WriteAheadStorage implements
}
@Override
- public Set<IJobUpdateKey> pruneHistory(int perJobRetainCount, long
historyPruneThresholdMs) {
- Set<IJobUpdateKey> prunedUpdates = jobUpdateStore.pruneHistory(
- perJobRetainCount,
- historyPruneThresholdMs);
+ public void removeJobUpdates(Set<IJobUpdateKey> keys) {
+ requireNonNull(keys);
- if (!prunedUpdates.isEmpty()) {
- // Pruned updates will eventually go away from persisted storage when a
new snapshot is cut.
- // So, persisting pruning attempts is not strictly necessary as the
periodic pruner will
- // provide eventual consistency between volatile and persistent storage
upon scheduler
- // restart. By generating an out of band pruning during log replay the
consistency is
- // achieved sooner without potentially exposing pruned but not yet
persisted data.
- write(Op.pruneJobUpdateHistory(
- new PruneJobUpdateHistory(perJobRetainCount,
historyPruneThresholdMs)));
- }
- return prunedUpdates;
+ // Compatibility mode - RemoveJobUpdates is not yet written since older
versions cannot
+ // read it. JobUpdates are only removed implicitly when a snapshot is
taken.
+ jobUpdateStore.removeJobUpdates(keys);
}
@Override
@@ -279,7 +267,7 @@ class WriteAheadStorage implements
}
@Override
- public void deleteAllUpdatesAndEvents() {
+ public void deleteAllUpdates() {
throw new UnsupportedOperationException(
"Unsupported since casual storage users should never be doing this.");
}
@@ -370,37 +358,12 @@ class WriteAheadStorage implements
}
@Override
- public List<IJobUpdateSummary> fetchJobUpdateSummaries(IJobUpdateQuery
query) {
- return this.jobUpdateStore.fetchJobUpdateSummaries(query);
- }
-
- @Override
- public List<IJobUpdateDetails> fetchJobUpdateDetails(IJobUpdateQuery query) {
- return this.jobUpdateStore.fetchJobUpdateDetails(query);
+ public List<IJobUpdateDetails> fetchJobUpdates(IJobUpdateQuery query) {
+ return this.jobUpdateStore.fetchJobUpdates(query);
}
@Override
- public Optional<IJobUpdateDetails> fetchJobUpdateDetails(IJobUpdateKey key) {
- return this.jobUpdateStore.fetchJobUpdateDetails(key);
- }
-
- @Override
- public Optional<IJobUpdate> fetchJobUpdate(IJobUpdateKey key) {
+ public Optional<IJobUpdateDetails> fetchJobUpdate(IJobUpdateKey key) {
return this.jobUpdateStore.fetchJobUpdate(key);
}
-
- @Override
- public Optional<IJobUpdateInstructions>
fetchJobUpdateInstructions(IJobUpdateKey key) {
- return this.jobUpdateStore.fetchJobUpdateInstructions(key);
- }
-
- @Override
- public Set<IJobUpdateDetails> fetchAllJobUpdateDetails() {
- return this.jobUpdateStore.fetchAllJobUpdateDetails();
- }
-
- @Override
- public List<IJobInstanceUpdateEvent> fetchInstanceEvents(IJobUpdateKey key,
int instanceId) {
- return this.jobUpdateStore.fetchInstanceEvents(key, instanceId);
- }
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java
----------------------------------------------------------------------
diff --git
a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java
b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java
index 826cee9..f96ec08 100644
---
a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java
+++
b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java
@@ -14,58 +14,39 @@
package org.apache.aurora.scheduler.storage.mem;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
-import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import javax.inject.Inject;
-
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Longs;
import org.apache.aurora.common.base.MorePreconditions;
import org.apache.aurora.common.inject.TimedInterceptor.Timed;
-import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.gen.JobInstanceUpdateEvent;
-import org.apache.aurora.gen.JobUpdateAction;
import org.apache.aurora.gen.JobUpdateDetails;
import org.apache.aurora.gen.JobUpdateEvent;
import org.apache.aurora.gen.JobUpdateState;
-import org.apache.aurora.gen.JobUpdateStatus;
import org.apache.aurora.scheduler.storage.JobUpdateStore;
import org.apache.aurora.scheduler.storage.Storage.StorageException;
import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
import static java.util.Objects.requireNonNull;
-import static org.apache.aurora.scheduler.storage.Util.jobUpdateActionStatName;
-import static org.apache.aurora.scheduler.storage.Util.jobUpdateStatusStatName;
-
public class MemJobUpdateStore implements JobUpdateStore.Mutable {
private static final Ordering<IJobUpdateDetails> REVERSE_LAST_MODIFIED_ORDER
= Ordering.natural()
@@ -73,88 +54,19 @@ public class MemJobUpdateStore implements
JobUpdateStore.Mutable {
.onResultOf(u ->
u.getUpdate().getSummary().getState().getLastModifiedTimestampMs());
private final Map<IJobUpdateKey, IJobUpdateDetails> updates =
Maps.newConcurrentMap();
- private final LoadingCache<JobUpdateStatus, AtomicLong> jobUpdateEventStats;
- private final LoadingCache<JobUpdateAction, AtomicLong> jobUpdateActionStats;
-
- @Inject
- public MemJobUpdateStore(StatsProvider statsProvider) {
- this.jobUpdateEventStats = CacheBuilder.newBuilder()
- .build(new CacheLoader<JobUpdateStatus, AtomicLong>() {
- @Override
- public AtomicLong load(JobUpdateStatus status) {
- return statsProvider.makeCounter(jobUpdateStatusStatName(status));
- }
- });
- for (JobUpdateStatus status : JobUpdateStatus.values()) {
- jobUpdateEventStats.getUnchecked(status).get();
- }
- this.jobUpdateActionStats = CacheBuilder.newBuilder()
- .build(new CacheLoader<JobUpdateAction, AtomicLong>() {
- @Override
- public AtomicLong load(JobUpdateAction action) {
- return statsProvider.makeCounter(jobUpdateActionStatName(action));
- }
- });
- for (JobUpdateAction action : JobUpdateAction.values()) {
- jobUpdateActionStats.getUnchecked(action).get();
- }
- }
- @Timed("job_update_store_fetch_summaries")
+ @Timed("job_update_store_fetch_details_query")
@Override
- public synchronized List<IJobUpdateSummary>
fetchJobUpdateSummaries(IJobUpdateQuery query) {
- return performQuery(query)
- .map(u -> u.getUpdate().getSummary())
- .collect(Collectors.toList());
- }
-
- @Timed("job_update_store_fetch_details_list")
- @Override
- public synchronized List<IJobUpdateDetails>
fetchJobUpdateDetails(IJobUpdateQuery query) {
+ public synchronized List<IJobUpdateDetails> fetchJobUpdates(IJobUpdateQuery
query) {
return performQuery(query).collect(Collectors.toList());
}
@Timed("job_update_store_fetch_details")
@Override
- public synchronized Optional<IJobUpdateDetails>
fetchJobUpdateDetails(IJobUpdateKey key) {
+ public synchronized Optional<IJobUpdateDetails> fetchJobUpdate(IJobUpdateKey
key) {
return Optional.fromNullable(updates.get(key));
}
- @Timed("job_update_store_fetch_update")
- @Override
- public synchronized Optional<IJobUpdate> fetchJobUpdate(IJobUpdateKey key) {
- return
Optional.fromNullable(updates.get(key)).transform(IJobUpdateDetails::getUpdate);
- }
-
- @Timed("job_update_store_fetch_instructions")
- @Override
- public synchronized Optional<IJobUpdateInstructions>
fetchJobUpdateInstructions(
- IJobUpdateKey key) {
-
- return Optional.fromNullable(updates.get(key))
- .transform(u -> u.getUpdate().getInstructions());
- }
-
- @Timed("job_update_store_fetch_all_details")
- @Override
- public synchronized Set<IJobUpdateDetails> fetchAllJobUpdateDetails() {
- return ImmutableSet.copyOf(updates.values());
- }
-
- @Timed("job_update_store_fetch_instance_events")
- @Override
- public synchronized List<IJobInstanceUpdateEvent> fetchInstanceEvents(
- IJobUpdateKey key,
- int instanceId) {
-
- return java.util.Optional.ofNullable(updates.get(key))
- .map(IJobUpdateDetails::getInstanceEvents)
- .orElse(ImmutableList.of())
- .stream()
- .filter(e -> e.getInstanceId() == instanceId)
- .collect(Collectors.toList());
- }
-
private static void validateInstructions(IJobUpdateInstructions
instructions) {
if (!instructions.isSetDesiredState() &&
instructions.getInitialState().isEmpty()) {
throw new IllegalArgumentException(
@@ -182,10 +94,6 @@ public class MemJobUpdateStore implements
JobUpdateStore.Mutable {
requireNonNull(update);
validateInstructions(update.getInstructions());
- if (updates.containsKey(update.getSummary().getKey())) {
- throw new StorageException("Update already exists: " +
update.getSummary().getKey());
- }
-
JobUpdateDetails mutable = new JobUpdateDetails()
.setUpdate(update.newBuilder())
.setUpdateEvents(ImmutableList.of())
@@ -211,7 +119,6 @@ public class MemJobUpdateStore implements
JobUpdateStore.Mutable {
mutable.setUpdateEvents(EVENT_ORDERING.sortedCopy(mutable.getUpdateEvents()));
mutable.getUpdate().getSummary().setState(synthesizeUpdateState(mutable));
updates.put(key, IJobUpdateDetails.build(mutable));
- jobUpdateEventStats.getUnchecked(event.getStatus()).incrementAndGet();
}
private static final Ordering<JobInstanceUpdateEvent>
INSTANCE_EVENT_ORDERING = Ordering.natural()
@@ -233,66 +140,23 @@ public class MemJobUpdateStore implements
JobUpdateStore.Mutable {
mutable.setInstanceEvents(INSTANCE_EVENT_ORDERING.sortedCopy(mutable.getInstanceEvents()));
mutable.getUpdate().getSummary().setState(synthesizeUpdateState(mutable));
updates.put(key, IJobUpdateDetails.build(mutable));
- jobUpdateActionStats.getUnchecked(event.getAction()).incrementAndGet();
}
- @Timed("job_update_store_delete_all")
+ @Timed("job_update_store_delete_updates")
@Override
- public synchronized void deleteAllUpdatesAndEvents() {
- updates.clear();
+ public synchronized void removeJobUpdates(Set<IJobUpdateKey> key) {
+ requireNonNull(key);
+ updates.keySet().removeAll(key);
}
- @Timed("job_update_store_prune_history")
+ @Timed("job_update_store_delete_all")
@Override
- public synchronized Set<IJobUpdateKey> pruneHistory(
- int perJobRetainCount,
- long historyPruneThresholdMs) {
-
- Supplier<Stream<IJobUpdateSummary>> completedUpdates = () ->
updates.values().stream()
- .map(u -> u.getUpdate().getSummary())
- .filter(s -> TERMINAL_STATES.contains(s.getState().getStatus()));
-
- Predicate<IJobUpdateSummary> expiredFilter =
- s -> s.getState().getCreatedTimestampMs() < historyPruneThresholdMs;
-
- ImmutableSet.Builder<IJobUpdateKey> pruneBuilder = ImmutableSet.builder();
-
- // Gather updates based on time threshold.
- pruneBuilder.addAll(completedUpdates.get()
- .filter(expiredFilter)
- .map(IJobUpdateSummary::getKey)
- .collect(Collectors.toList()));
-
- Multimap<IJobKey, IJobUpdateSummary> updatesByJob = Multimaps.index(
- // Avoid counting to-be-removed expired updates.
- completedUpdates.get().filter(expiredFilter.negate()).iterator(),
- s -> s.getKey().getJob());
-
- for (Map.Entry<IJobKey, Collection<IJobUpdateSummary>> entry
- : updatesByJob.asMap().entrySet()) {
-
- if (entry.getValue().size() > perJobRetainCount) {
- Ordering<IJobUpdateSummary> creationOrder = Ordering.natural()
- .onResultOf(s -> s.getState().getCreatedTimestampMs());
- pruneBuilder.addAll(creationOrder
- .leastOf(entry.getValue(), entry.getValue().size() -
perJobRetainCount)
- .stream()
- .map(IJobUpdateSummary::getKey)
- .iterator());
- }
- }
-
- Set<IJobUpdateKey> pruned = pruneBuilder.build();
- updates.keySet().removeAll(pruned);
-
- return pruned;
+ public synchronized void deleteAllUpdates() {
+ updates.clear();
}
private static JobUpdateState synthesizeUpdateState(JobUpdateDetails update)
{
- JobUpdateState state = update.getUpdate().getSummary().getState();
- if (state == null) {
- state = new JobUpdateState();
- }
+ JobUpdateState state = new JobUpdateState();
JobUpdateEvent firstEvent = Iterables.getFirst(update.getUpdateEvents(),
null);
if (firstEvent != null) {
http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java
----------------------------------------------------------------------
diff --git
a/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java
b/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java
index bba1161..9d327e4 100644
---
a/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java
+++
b/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java
@@ -19,6 +19,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.inject.Inject;
@@ -308,11 +309,15 @@ class ReadOnlySchedulerImpl implements
ReadOnlyScheduler.Iface {
@Override
public Response getJobUpdateSummaries(JobUpdateQuery mutableQuery) {
IJobUpdateQuery query =
IJobUpdateQuery.build(requireNonNull(mutableQuery));
- return ok(Result.getJobUpdateSummariesResult(
- new GetJobUpdateSummariesResult()
- .setUpdateSummaries(IJobUpdateSummary.toBuildersList(storage.read(
- storeProvider ->
-
storeProvider.getJobUpdateStore().fetchJobUpdateSummaries(query))))));
+
+ List<IJobUpdateSummary> summaries = storage.read(
+ storeProvider -> storeProvider.getJobUpdateStore()
+ .fetchJobUpdates(query)
+ .stream()
+ .map(u ->
u.getUpdate().getSummary()).collect(Collectors.toList()));
+
+ return ok(Result.getJobUpdateSummariesResult(new
GetJobUpdateSummariesResult()
+ .setUpdateSummaries(IJobUpdateSummary.toBuildersList(summaries))));
}
@Override
@@ -324,8 +329,8 @@ class ReadOnlySchedulerImpl implements
ReadOnlyScheduler.Iface {
if (mutableQuery != null) {
IJobUpdateQuery query = IJobUpdateQuery.build(mutableQuery);
- List<IJobUpdateDetails> details = storage.read(storeProvider ->
- storeProvider.getJobUpdateStore().fetchJobUpdateDetails(query));
+ List<IJobUpdateDetails> details =
+ storage.read(storeProvider ->
storeProvider.getJobUpdateStore().fetchJobUpdates(query));
return ok(Result.getJobUpdateDetailsResult(new
GetJobUpdateDetailsResult()
.setDetailsList(IJobUpdateDetails.toBuildersList(details))));
@@ -334,7 +339,7 @@ class ReadOnlySchedulerImpl implements
ReadOnlyScheduler.Iface {
// TODO(zmanji): Remove this code once `mutableKey` is removed in
AURORA-1765
IJobUpdateKey key = IJobUpdateKey.build(mutableKey);
Optional<IJobUpdateDetails> details = storage.read(storeProvider ->
- storeProvider.getJobUpdateStore().fetchJobUpdateDetails(key));
+ storeProvider.getJobUpdateStore().fetchJobUpdate(key));
if (details.isPresent()) {
return addMessage(ok(Result.getJobUpdateDetailsResult(
http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
----------------------------------------------------------------------
diff --git
a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
index dc8d11c..87b18b4 100644
---
a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
+++
b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
@@ -13,17 +13,23 @@
*/
package org.apache.aurora.scheduler.updater;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Optional;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@@ -36,6 +42,7 @@ import org.apache.aurora.common.application.Lifecycle;
import org.apache.aurora.common.collections.Pair;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.common.util.Clock;
import org.apache.aurora.gen.JobInstanceUpdateEvent;
import org.apache.aurora.gen.JobUpdateAction;
@@ -80,6 +87,8 @@ import static
org.apache.aurora.gen.JobUpdateStatus.ROLL_FORWARD_AWAITING_PULSE;
import static org.apache.aurora.scheduler.base.AsyncUtil.shutdownOnError;
import static org.apache.aurora.scheduler.base.Jobs.AWAITING_PULSE_STATES;
import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import static org.apache.aurora.scheduler.storage.Util.jobUpdateActionStatName;
+import static org.apache.aurora.scheduler.storage.Util.jobUpdateStatusStatName;
import static
org.apache.aurora.scheduler.updater.JobUpdateStateMachine.ACTIVE_QUERY;
import static
org.apache.aurora.scheduler.updater.JobUpdateStateMachine.AUTO_RESUME_STATES;
import static
org.apache.aurora.scheduler.updater.JobUpdateStateMachine.GET_ACTIVE_RESUME_STATE;
@@ -123,6 +132,9 @@ class JobUpdateControllerImpl implements
JobUpdateController {
private final Map<IJobKey, UpdateFactory.Update> updates =
Collections.synchronizedMap(Maps.newHashMap());
+ private final LoadingCache<JobUpdateStatus, AtomicLong> jobUpdateEventStats;
+ private final LoadingCache<JobUpdateAction, AtomicLong> jobUpdateActionStats;
+
@Inject
JobUpdateControllerImpl(
UpdateFactory updateFactory,
@@ -132,7 +144,8 @@ class JobUpdateControllerImpl implements
JobUpdateController {
UpdateAgentReserver updateAgentReserver,
Clock clock,
Lifecycle lifecycle,
- TaskEventBatchWorker batchWorker) {
+ TaskEventBatchWorker batchWorker,
+ StatsProvider statsProvider) {
this.updateFactory = requireNonNull(updateFactory);
this.storage = requireNonNull(storage);
@@ -143,6 +156,26 @@ class JobUpdateControllerImpl implements
JobUpdateController {
this.batchWorker = requireNonNull(batchWorker);
this.pulseHandler = new PulseHandler(clock);
this.updateAgentReserver = requireNonNull(updateAgentReserver);
+
+ this.jobUpdateEventStats = CacheBuilder.newBuilder()
+ .build(new CacheLoader<JobUpdateStatus, AtomicLong>() {
+ @Override
+ public AtomicLong load(JobUpdateStatus status) {
+ return statsProvider.makeCounter(jobUpdateStatusStatName(status));
+ }
+ });
+ Arrays.stream(JobUpdateStatus.values())
+ .forEach(status -> jobUpdateEventStats.getUnchecked(status).get());
+
+ this.jobUpdateActionStats = CacheBuilder.newBuilder()
+ .build(new CacheLoader<JobUpdateAction, AtomicLong>() {
+ @Override
+ public AtomicLong load(JobUpdateAction action) {
+ return statsProvider.makeCounter(jobUpdateActionStatName(action));
+ }
+ });
+ Arrays.stream(JobUpdateAction.values())
+ .forEach(action -> jobUpdateActionStats.getUnchecked(action).get());
}
@Override
@@ -164,8 +197,8 @@ class JobUpdateControllerImpl implements
JobUpdateController {
throw new IllegalArgumentException("Update instruction is a no-op.");
}
- List<IJobUpdateSummary> activeJobUpdates =
-
storeProvider.getJobUpdateStore().fetchJobUpdateSummaries(queryActiveByJob(job));
+ List<IJobUpdateDetails> activeJobUpdates =
+
storeProvider.getJobUpdateStore().fetchJobUpdates(queryActiveByJob(job));
if (!activeJobUpdates.isEmpty()) {
if (activeJobUpdates.size() > 1) {
LOG.error("Multiple active updates exist for this job. {}",
activeJobUpdates);
@@ -173,11 +206,11 @@ class JobUpdateControllerImpl implements
JobUpdateController {
String.format("Multiple active updates exist for this job. %s",
activeJobUpdates));
}
- IJobUpdateSummary activeJobUpdate = activeJobUpdates.get(0);
+ IJobUpdateDetails activeUpdate =
activeJobUpdates.stream().findFirst().get();
throw new UpdateInProgressException("An active update already exists
for this job, "
+ "please terminate it before starting another. "
+ "Active updates are those in states " +
Updates.ACTIVE_JOB_UPDATE_STATES,
- activeJobUpdate);
+ activeUpdate.getUpdate().getSummary());
}
LOG.info("Starting update for job " + job);
@@ -202,7 +235,7 @@ class JobUpdateControllerImpl implements
JobUpdateController {
requireNonNull(job);
if (storage.read(p -> !p.getJobUpdateStore()
- .fetchJobUpdateSummaries(queryActiveByJob(job)).isEmpty())) {
+ .fetchJobUpdates(queryActiveByJob(job)).isEmpty())) {
throw new JobUpdatingException("Job is currently updating");
}
@@ -225,14 +258,13 @@ class JobUpdateControllerImpl implements
JobUpdateController {
requireNonNull(auditData);
LOG.info("Attempting to resume update " + key);
storage.write((NoResult<UpdateStateException>) storeProvider -> {
- IJobUpdateDetails details = Iterables.getOnlyElement(
-
storeProvider.getJobUpdateStore().fetchJobUpdateDetails(queryByUpdate(key)),
null);
+ Optional<IJobUpdateDetails> details =
storeProvider.getJobUpdateStore().fetchJobUpdate(key);
- if (details == null) {
+ if (!details.isPresent()) {
throw new UpdateStateException("Update does not exist: " + key);
}
- IJobUpdate update = details.getUpdate();
+ IJobUpdate update = details.get().getUpdate();
IJobUpdateKey key1 = update.getSummary().getKey();
Function<JobUpdateStatus, JobUpdateStatus> stateChange =
isCoordinatedAndPulseExpired(key1, update.getInstructions())
@@ -294,7 +326,7 @@ class JobUpdateControllerImpl implements
JobUpdateController {
public void systemResume() {
storage.write((NoResult.Quiet) storeProvider -> {
for (IJobUpdateDetails details
- :
storeProvider.getJobUpdateStore().fetchJobUpdateDetails(ACTIVE_QUERY)) {
+ : storeProvider.getJobUpdateStore().fetchJobUpdates(ACTIVE_QUERY)) {
IJobUpdateSummary summary = details.getUpdate().getSummary();
IJobUpdateInstructions instructions =
details.getUpdate().getInstructions();
@@ -396,7 +428,7 @@ class JobUpdateControllerImpl implements
JobUpdateController {
}
private IJobUpdateSummary getOnlyMatch(JobUpdateStore store, IJobUpdateQuery
query) {
- return Iterables.getOnlyElement(store.fetchJobUpdateSummaries(query));
+ return
Iterables.getOnlyElement(store.fetchJobUpdates(query)).getUpdate().getSummary();
}
@VisibleForTesting
@@ -423,13 +455,13 @@ class JobUpdateControllerImpl implements
JobUpdateController {
storage.write((NoResult<UpdateStateException>) storeProvider -> {
- IJobUpdateSummary update = Iterables.getOnlyElement(
-
storeProvider.getJobUpdateStore().fetchJobUpdateSummaries(queryByUpdate(key)),
null);
- if (update == null) {
+ Optional<IJobUpdateDetails> update =
storeProvider.getJobUpdateStore().fetchJobUpdate(key);
+ if (!update.isPresent()) {
throw new UpdateStateException("Update does not exist " + key);
}
- changeUpdateStatus(storeProvider, update,
stateChange.apply(update.getState().getStatus()));
+ IJobUpdateSummary summary = update.get().getUpdate().getSummary();
+ changeUpdateStatus(storeProvider, summary,
stateChange.apply(summary.getState().getStatus()));
});
}
@@ -468,6 +500,7 @@ class JobUpdateControllerImpl implements
JobUpdateController {
updateStore.saveJobUpdateEvent(
key,
IJobUpdateEvent.build(proposedEvent.setTimestampMs(clock.nowMillis()).setStatus(status)));
+ jobUpdateEventStats.getUnchecked(status).incrementAndGet();
}
if (JobUpdateStore.TERMINAL_STATES.contains(status)) {
@@ -487,7 +520,7 @@ class JobUpdateControllerImpl implements
JobUpdateController {
checkState(!updates.containsKey(job), "Updater already exists for %s",
job);
}
- IJobUpdate jobUpdate = updateStore.fetchJobUpdate(key).get();
+ IJobUpdate jobUpdate = updateStore.fetchJobUpdate(key).get().getUpdate();
UpdateFactory.Update update;
try {
update = updateFactory.newUpdate(jobUpdate.getInstructions(), action
== ROLL_FORWARD);
@@ -556,7 +589,8 @@ class JobUpdateControllerImpl implements
JobUpdateController {
JobUpdateStore.Mutable updateStore = storeProvider.getJobUpdateStore();
- IJobUpdateInstructions instructions =
updateStore.fetchJobUpdateInstructions(key).get();
+ IJobUpdateInstructions instructions = updateStore.fetchJobUpdate(key).get()
+ .getUpdate().getInstructions();
if (isCoordinatedAndPulseExpired(key, instructions)) {
// Move coordinated update into awaiting pulse state.
JobUpdateStatus blockedStatus =
getBlockedState(summary.getState().getStatus());
@@ -578,7 +612,11 @@ class JobUpdateControllerImpl implements
JobUpdateController {
Iterable<InstanceUpdateStatus> statusChanges;
int instanceId = entry.getKey();
- List<IJobInstanceUpdateEvent> savedEvents =
updateStore.fetchInstanceEvents(key, instanceId);
+ List<IJobInstanceUpdateEvent> savedEvents =
updateStore.fetchJobUpdate(key).get()
+ .getInstanceEvents()
+ .stream()
+ .filter(e -> e.getInstanceId() == instanceId)
+ .collect(Collectors.toList());
Set<JobUpdateAction> savedActions =
FluentIterable.from(savedEvents).transform(EVENT_TO_ACTION).toSet();
@@ -609,6 +647,7 @@ class JobUpdateControllerImpl implements
JobUpdateController {
.setTimestampMs(clock.nowMillis())
.setAction(action));
updateStore.saveJobInstanceUpdateEvent(summary.getKey(), event);
+ jobUpdateActionStats.getUnchecked(action).incrementAndGet();
}
}
}
@@ -701,11 +740,6 @@ class JobUpdateControllerImpl implements
JobUpdateController {
JobUpdateAction.INSTANCE_ROLLBACK_FAILED)
.build();
- @VisibleForTesting
- static IJobUpdateQuery queryByUpdate(IJobUpdateKey key) {
- return IJobUpdateQuery.build(new
JobUpdateQuery().setKey(key.newBuilder()));
- }
-
private static JobUpdateEvent newEvent(JobUpdateStatus status) {
return new JobUpdateEvent().setStatus(status);
}
@@ -722,7 +756,7 @@ class JobUpdateControllerImpl implements
JobUpdateController {
String.format(FATAL_ERROR_FORMAT, "Key: " + key + " Instance key: " +
instance),
() -> storage.write((NoResult.Quiet) storeProvider -> {
IJobUpdateSummary summary =
- getOnlyMatch(storeProvider.getJobUpdateStore(),
queryByUpdate(key));
+
storeProvider.getJobUpdateStore().fetchJobUpdate(key).get().getUpdate().getSummary();
JobUpdateStatus status = summary.getState().getStatus();
// Suppress this evaluation if the updater is not currently active.
if (JobUpdateStateMachine.isActive(status)) {
http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/test/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPrunerTest.java
----------------------------------------------------------------------
diff --git
a/src/test/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPrunerTest.java
b/src/test/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPrunerTest.java
index 74db5ec..a1bf04a 100644
---
a/src/test/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPrunerTest.java
+++
b/src/test/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPrunerTest.java
@@ -13,67 +13,157 @@
*/
package org.apache.aurora.scheduler.pruning;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.common.util.Clock;
-import org.apache.aurora.gen.JobKey;
+import org.apache.aurora.common.util.testing.FakeClock;
+import org.apache.aurora.gen.InstanceTaskConfig;
+import org.apache.aurora.gen.JobUpdate;
+import org.apache.aurora.gen.JobUpdateDetails;
+import org.apache.aurora.gen.JobUpdateEvent;
+import org.apache.aurora.gen.JobUpdateInstructions;
import org.apache.aurora.gen.JobUpdateKey;
+import org.apache.aurora.gen.JobUpdateState;
+import org.apache.aurora.gen.JobUpdateStatus;
+import org.apache.aurora.gen.JobUpdateSummary;
+import org.apache.aurora.gen.Range;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.base.JobKeys;
import
org.apache.aurora.scheduler.pruning.JobUpdateHistoryPruner.HistoryPrunerSettings;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
+import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.junit.Before;
import org.junit.Test;
-import static
org.apache.aurora.scheduler.pruning.JobUpdateHistoryPruner.JOB_UPDATES_PRUNED;
-import static org.easymock.EasyMock.expect;
+import static org.apache.aurora.gen.JobUpdateStatus.ABORTED;
+import static org.apache.aurora.gen.JobUpdateStatus.ERROR;
+import static org.apache.aurora.gen.JobUpdateStatus.FAILED;
+import static org.apache.aurora.gen.JobUpdateStatus.ROLLED_BACK;
+import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_BACK;
+import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_FORWARD;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
import static org.junit.Assert.assertEquals;
-public class JobUpdateHistoryPrunerTest extends EasyMockTest {
+public class JobUpdateHistoryPrunerTest {
+
+ private Storage storage;
+
+ @Before
+ public void setUp() {
+ storage = MemStorageModule.newEmptyStorage();
+ }
+
@Test
- public void testExecution() throws Exception {
- StorageTestUtil storageUtil = new StorageTestUtil(this);
- storageUtil.expectOperations();
+ public void testPruneHistory() {
+ IJobKey job2 = JobKeys.from("testRole2", "testEnv2", "job2");
+
+ IJobUpdateDetails update1 = makeAndSave(makeKey("u1"), ROLLING_BACK, 123L,
123L);
+ IJobUpdateDetails update2 = makeAndSave(makeKey("u2"), ABORTED, 124L,
124L);
+ IJobUpdateDetails update3 = makeAndSave(makeKey("u3"), ROLLED_BACK, 125L,
125L);
+ IJobUpdateDetails update4 = makeAndSave(makeKey("u4"), FAILED, 126L, 126L);
+ IJobUpdateDetails update5 = makeAndSave(makeKey(job2, "u5"), ERROR, 123L,
123L);
+ IJobUpdateDetails update6 = makeAndSave(makeKey(job2, "u6"), FAILED, 125L,
125L);
+ IJobUpdateDetails update7 = makeAndSave(makeKey(job2, "u7"),
ROLLING_FORWARD, 126L, 126L);
- final FakeStatsProvider statsProvider = new FakeStatsProvider();
+ long pruningThreshold = 120;
- final ScheduledExecutorService executor =
createMock(ScheduledExecutorService.class);
- FakeScheduledExecutor executorClock =
- FakeScheduledExecutor.scheduleAtFixedRateExecutor(executor, 2);
+ // No updates pruned.
+ pruneHistory(3, pruningThreshold);
+ assertRetainedUpdates(update1, update2, update3, update4, update5,
update6, update7);
- Clock mockClock = createMock(Clock.class);
- expect(mockClock.nowMillis()).andReturn(2L).times(2);
+ // 1 update pruned.
+ pruneHistory(2, pruningThreshold);
+ assertRetainedUpdates(update1, update3, update4, update5, update6,
update7);
- expect(storageUtil.jobUpdateStore.pruneHistory(1, 1))
- .andReturn(ImmutableSet.of(
- IJobUpdateKey.build(
- new JobUpdateKey().setJob(new JobKey("role", "env",
"job")).setId("id1"))));
- expect(storageUtil.jobUpdateStore.pruneHistory(1,
1)).andReturn(ImmutableSet.of());
+ // 2 update pruned.
+ pruneHistory(1, pruningThreshold);
+ assertRetainedUpdates(update1, update4, update6, update7);
- control.replay();
+ // The oldest update is pruned.
+ pruneHistory(1, 126);
+ assertRetainedUpdates(update1, update4, update7);
- executorClock.assertEmpty();
+ // Nothing survives the 0 per job count.
+ pruneHistory(0, pruningThreshold);
+ assertRetainedUpdates(update1, update7);
+ }
+
+ private void pruneHistory(int retainCount, long pruningThresholdMs) {
+ FakeClock clock = new FakeClock();
+ clock.setNowMillis(100 + pruningThresholdMs);
JobUpdateHistoryPruner pruner = new JobUpdateHistoryPruner(
- mockClock,
- executor,
- storageUtil.storage,
+ clock,
+ storage,
new HistoryPrunerSettings(
- Amount.of(1L, Time.MILLISECONDS),
- Amount.of(1L, Time.MILLISECONDS),
- 1),
- statsProvider);
-
- pruner.startAsync().awaitRunning();
-
- assertEquals(0L, statsProvider.getValue(JOB_UPDATES_PRUNED));
- executorClock.advance(Amount.of(1L, Time.MILLISECONDS));
- assertEquals(1L, statsProvider.getValue(JOB_UPDATES_PRUNED));
- executorClock.advance(Amount.of(1L, Time.MILLISECONDS));
- assertEquals(1L, statsProvider.getValue(JOB_UPDATES_PRUNED));
+ Amount.of(1L, Time.DAYS),
+ Amount.of(100L, Time.MILLISECONDS),
+ retainCount),
+ new FakeStatsProvider());
+ pruner.runForTest();
+ }
+
+ private void assertRetainedUpdates(IJobUpdateDetails... updates) {
+ storage.read(store -> {
+ assertEquals(
+ Stream.of(updates).map(u -> u.getUpdate().getSummary().getKey())
+ .collect(Collectors.toSet()),
+
store.getJobUpdateStore().fetchJobUpdates(JobUpdateStore.MATCH_ALL).stream()
+ .map(u -> u.getUpdate().getSummary().getKey())
+ .collect(Collectors.toSet()));
+ return null;
+ });
+ }
+
+ private static IJobUpdateKey makeKey(String id) {
+ return makeKey(JOB, id);
+ }
+
+ private static IJobUpdateKey makeKey(IJobKey job, String id) {
+ return IJobUpdateKey.build(new
JobUpdateKey().setJob(job.newBuilder()).setId(id));
+ }
+
+ private IJobUpdateDetails makeAndSave(
+ IJobUpdateKey key,
+ JobUpdateStatus status,
+ long createdMs,
+ long lastMs) {
+
+ IJobUpdateDetails update = IJobUpdateDetails.build(new JobUpdateDetails()
+ .setUpdateEvents(ImmutableList.of(
+ new JobUpdateEvent(status, lastMs)
+ .setUser("user")
+ .setMessage("message")
+ ))
+ .setInstanceEvents(ImmutableList.of())
+ .setUpdate(new JobUpdate()
+ .setInstructions(new JobUpdateInstructions()
+ .setDesiredState(new InstanceTaskConfig()
+ .setTask(new TaskConfig())
+ .setInstances(ImmutableSet.of(new Range()))))
+ .setSummary(new JobUpdateSummary()
+ .setKey(key.newBuilder())
+ .setState(new JobUpdateState()
+ .setCreatedTimestampMs(createdMs)
+ .setLastModifiedTimestampMs(lastMs)
+ .setStatus(status)))));
+
+ storage.write((NoResult.Quiet) storeProvider -> {
+ JobUpdateStore.Mutable store = storeProvider.getJobUpdateStore();
+ store.saveJobUpdate(update.getUpdate());
+ update.getUpdateEvents().forEach(event -> store.saveJobUpdateEvent(key,
event));
+ update.getInstanceEvents().forEach(event ->
store.saveJobInstanceUpdateEvent(key, event));
+ });
+ return update;
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
----------------------------------------------------------------------
diff --git
a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
index 5e5c518..2c33c13 100644
---
a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
+++
b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
@@ -35,7 +35,7 @@ import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.TaskTestUtil;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import
org.apache.aurora.scheduler.pruning.TaskHistoryPruner.HistoryPrunnerSettings;
+import
org.apache.aurora.scheduler.pruning.TaskHistoryPruner.HistoryPrunerSettings;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -92,7 +92,7 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
executor,
stateManager,
clock,
- new HistoryPrunnerSettings(ONE_DAY, ONE_MINUTE, PER_JOB_HISTORY),
+ new HistoryPrunerSettings(ONE_DAY, ONE_MINUTE, PER_JOB_HISTORY),
storageUtil.storage,
new Lifecycle(shutdownCommand),
batchWorker,
http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
----------------------------------------------------------------------
diff --git
a/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
b/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
index 6be4a9c..c1825f6 100644
--- a/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
@@ -13,8 +13,6 @@
*/
package org.apache.aurora.scheduler.quota;
-import java.util.List;
-
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
@@ -25,6 +23,7 @@ import org.apache.aurora.gen.InstanceTaskConfig;
import org.apache.aurora.gen.JobConfiguration;
import org.apache.aurora.gen.JobKey;
import org.apache.aurora.gen.JobUpdate;
+import org.apache.aurora.gen.JobUpdateDetails;
import org.apache.aurora.gen.JobUpdateInstructions;
import org.apache.aurora.gen.JobUpdateKey;
import org.apache.aurora.gen.JobUpdateSummary;
@@ -44,8 +43,8 @@ import
org.apache.aurora.scheduler.storage.Storage.StoreProvider;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
@@ -480,16 +479,11 @@ public class QuotaManagerImplTest extends EasyMockTest {
expectTasks(prodTask("foo", 2, 2, 2), prodTask(JOB_NAME, 2, 2,
2)).times(2);
ITaskConfig config = taskConfig(2, 2, 2, true);
- List<IJobUpdateSummary> summaries = buildJobUpdateSummaries(UPDATE_KEY);
- IJobUpdate update = buildJobUpdate(summaries.get(0), config, 1, config, 1);
- JobUpdate builder = update.newBuilder();
- builder.getInstructions().unsetDesiredState();
-
-
expect(jobUpdateStore.fetchJobUpdateSummaries(updateQuery(config.getJob().getRole())))
- .andReturn(summaries).times(2);
-
- expect(jobUpdateStore.fetchJobUpdate(UPDATE_KEY))
- .andReturn(Optional.of(IJobUpdate.build(builder))).times(2);
+ IJobUpdateDetails update = buildJobUpdate(UPDATE_KEY, config, 1, config,
1);
+ JobUpdateDetails builder = update.newBuilder();
+ builder.getUpdate().getInstructions().unsetDesiredState();
+
expect(jobUpdateStore.fetchJobUpdates(updateQuery(config.getJob().getRole())))
+
.andReturn(ImmutableList.of(IJobUpdateDetails.build(builder))).times(2);
expectNoCronJobs().times(2);
@@ -509,16 +503,11 @@ public class QuotaManagerImplTest extends EasyMockTest {
expectTasks(prodTask("foo", 2, 2, 2), prodTask(JOB_NAME, 2, 2,
2)).times(2);
ITaskConfig config = taskConfig(2, 2, 2, true);
- List<IJobUpdateSummary> summaries = buildJobUpdateSummaries(UPDATE_KEY);
- IJobUpdate update = buildJobUpdate(summaries.get(0), config, 1, config, 1);
- JobUpdate builder = update.newBuilder();
- builder.getInstructions().setInitialState(ImmutableSet.of());
-
-
expect(jobUpdateStore.fetchJobUpdateSummaries(updateQuery(config.getJob().getRole())))
- .andReturn(summaries).times(2);
-
- expect(jobUpdateStore.fetchJobUpdate(UPDATE_KEY))
- .andReturn(Optional.of(IJobUpdate.build(builder))).times(2);
+ IJobUpdateDetails update = buildJobUpdate(UPDATE_KEY, config, 1, config,
1);
+ JobUpdateDetails builder = update.newBuilder();
+ builder.getUpdate().getInstructions().setInitialState(ImmutableSet.of());
+
expect(jobUpdateStore.fetchJobUpdates(updateQuery(config.getJob().getRole())))
+
.andReturn(ImmutableList.of(IJobUpdateDetails.build(builder))).times(2);
expectNoCronJobs().times(2);
@@ -538,16 +527,11 @@ public class QuotaManagerImplTest extends EasyMockTest {
expectTasks(prodTask("foo", 2, 2, 2), prodTask("bar", 2, 2, 2)).times(2);
ITaskConfig config = taskConfig(2, 2, 2, true);
- List<IJobUpdateSummary> summaries = buildJobUpdateSummaries(UPDATE_KEY);
- IJobUpdate update = buildJobUpdate(summaries.get(0), config, 1, config, 1);
- JobUpdate builder = update.newBuilder();
- builder.getInstructions().unsetDesiredState();
-
-
expect(jobUpdateStore.fetchJobUpdateSummaries(updateQuery(config.getJob().getRole())))
- .andReturn(summaries).times(2);
-
- expect(jobUpdateStore.fetchJobUpdate(UPDATE_KEY))
- .andReturn(Optional.of(IJobUpdate.build(builder))).times(2);
+ IJobUpdateDetails update = buildJobUpdate(UPDATE_KEY, config, 1, config,
1);
+ JobUpdateDetails builder = update.newBuilder();
+ builder.getUpdate().getInstructions().unsetDesiredState();
+
expect(jobUpdateStore.fetchJobUpdates(updateQuery(config.getJob().getRole())))
+
.andReturn(ImmutableList.of(IJobUpdateDetails.build(builder))).times(2);
expectNoCronJobs().times(2);
@@ -571,8 +555,8 @@ public class QuotaManagerImplTest extends EasyMockTest {
expectNoJobUpdates().times(2);
ITaskConfig config = taskConfig(1, 1, 1, true);
- IJobUpdate update = buildJobUpdate(
- buildJobUpdateSummaries(UPDATE_KEY).get(0),
+ IJobUpdateDetails update = buildJobUpdate(
+ UPDATE_KEY,
taskConfig(2, 2, 2, true),
1,
config,
@@ -582,7 +566,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
control.replay();
- QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update,
storeProvider);
+ QuotaCheckResult checkQuota =
quotaManager.checkJobUpdate(update.getUpdate(), storeProvider);
assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
assertEquals(
new QuotaInfo(bag(6, 6, 6), bag(6, 6, 6), EMPTY, bag(0, 0, 0), EMPTY),
@@ -596,8 +580,8 @@ public class QuotaManagerImplTest extends EasyMockTest {
expectNoJobUpdates().times(2);
ITaskConfig config = taskConfig(2, 2, 2, true);
- IJobUpdate update = buildJobUpdate(
- buildJobUpdateSummaries(UPDATE_KEY).get(0),
+ IJobUpdateDetails update = buildJobUpdate(
+ UPDATE_KEY,
config,
1,
config,
@@ -607,7 +591,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
control.replay();
- QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update,
storeProvider);
+ QuotaCheckResult checkQuota =
quotaManager.checkJobUpdate(update.getUpdate(), storeProvider);
assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
assertEquals(
new QuotaInfo(bag(6, 6, 6), bag(4, 4, 4), EMPTY, bag(0, 0, 0), EMPTY),
@@ -624,8 +608,8 @@ public class QuotaManagerImplTest extends EasyMockTest {
expectNoJobUpdates().times(2);
ITaskConfig config = taskConfig(2, 2, 2, true);
- IJobUpdate update = buildJobUpdate(
- buildJobUpdateSummaries(UPDATE_KEY).get(0),
+ IJobUpdateDetails update = buildJobUpdate(
+ UPDATE_KEY,
config,
1,
config,
@@ -635,7 +619,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
control.replay();
- QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update,
storeProvider);
+ QuotaCheckResult checkQuota =
quotaManager.checkJobUpdate(update.getUpdate(), storeProvider);
assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
assertEquals(
new QuotaInfo(bag(6, 6, 6), bag(6, 6, 6), EMPTY, bag(0, 0, 0), EMPTY),
@@ -645,8 +629,8 @@ public class QuotaManagerImplTest extends EasyMockTest {
@Test
public void testCheckQuotaNewUpdateSkippedForNonProdDesiredState() {
ITaskConfig config = taskConfig(2, 2, 2, false);
- IJobUpdate update = buildJobUpdate(
- buildJobUpdateSummaries(UPDATE_KEY).get(0),
+ IJobUpdateDetails update = buildJobUpdate(
+ UPDATE_KEY,
taskConfig(2, 2, 2, true),
1,
config,
@@ -654,15 +638,15 @@ public class QuotaManagerImplTest extends EasyMockTest {
control.replay();
- QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update,
storeProvider);
+ QuotaCheckResult checkQuota =
quotaManager.checkJobUpdate(update.getUpdate(), storeProvider);
assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
}
@Test
public void testCheckQuotaNewUpdateSkippedForDedicatedDesiredState() {
ITaskConfig config = taskConfig(2, 2, 2, false);
- IJobUpdate update = buildJobUpdate(
- buildJobUpdateSummaries(UPDATE_KEY).get(0),
+ IJobUpdateDetails update = buildJobUpdate(
+ UPDATE_KEY,
prodDedicatedTask("dedicatedJob", 1, 1, 1).getAssignedTask().getTask(),
1,
config,
@@ -670,20 +654,20 @@ public class QuotaManagerImplTest extends EasyMockTest {
control.replay();
- QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update,
storeProvider);
+ QuotaCheckResult checkQuota =
quotaManager.checkJobUpdate(update.getUpdate(), storeProvider);
assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
}
@Test
public void testCheckQuotaNewUpdateSkippedForEmptyDesiredState() {
ITaskConfig config = taskConfig(2, 2, 2, true);
- IJobUpdate update = buildJobUpdate(
- buildJobUpdateSummaries(UPDATE_KEY).get(0),
+ IJobUpdateDetails update = buildJobUpdate(
+ UPDATE_KEY,
config,
1,
config,
1);
- JobUpdate updateBuilder = update.newBuilder();
+ JobUpdate updateBuilder = update.getUpdate().newBuilder();
updateBuilder.getInstructions().unsetDesiredState();
control.replay();
@@ -844,50 +828,39 @@ public class QuotaManagerImplTest extends EasyMockTest {
private void expectJobUpdates(
ITaskConfig initial,
- int intialInstances,
+ int initialInstances,
ITaskConfig desired,
int desiredInstances,
int times) {
IJobUpdateKey key = IJobUpdateKey.build(new
JobUpdateKey(initial.getJob().newBuilder(), "u1"));
- List<IJobUpdateSummary> summaries = buildJobUpdateSummaries(key);
- IJobUpdate update =
- buildJobUpdate(summaries.get(0), initial, intialInstances, desired,
desiredInstances);
-
-
expect(jobUpdateStore.fetchJobUpdateSummaries(updateQuery(initial.getJob().getRole())))
- .andReturn(summaries)
+
expect(jobUpdateStore.fetchJobUpdates(updateQuery(initial.getJob().getRole())))
+ .andReturn(ImmutableList.of(
+ buildJobUpdate(key, initial, initialInstances, desired,
desiredInstances)))
.times(times);
-
-
expect(jobUpdateStore.fetchJobUpdate(key)).andReturn(Optional.of(update)).times(times);
-
- }
-
- private List<IJobUpdateSummary> buildJobUpdateSummaries(IJobUpdateKey key) {
- return ImmutableList.of(IJobUpdateSummary.build(
- new JobUpdateSummary().setKey(key.newBuilder())));
}
- private IJobUpdate buildJobUpdate(
- IJobUpdateSummary summary,
+ private IJobUpdateDetails buildJobUpdate(
+ IJobUpdateKey key,
ITaskConfig initial,
int intialInstances,
ITaskConfig desired,
int desiredInstances) {
- return IJobUpdate.build(new JobUpdate()
- .setSummary(summary.newBuilder())
- .setInstructions(new JobUpdateInstructions()
- .setDesiredState(new InstanceTaskConfig()
- .setTask(desired.newBuilder())
- .setInstances(ImmutableSet.of(new Range(0, desiredInstances -
1))))
- .setInitialState(ImmutableSet.of(new InstanceTaskConfig()
- .setTask(initial.newBuilder())
- .setInstances(ImmutableSet.of(new Range(0, intialInstances -
1)))))));
+ return IJobUpdateDetails.build(new JobUpdateDetails()
+ .setUpdate(new JobUpdate()
+ .setSummary(new JobUpdateSummary().setKey(key.newBuilder()))
+ .setInstructions(new JobUpdateInstructions()
+ .setDesiredState(new InstanceTaskConfig()
+ .setTask(desired.newBuilder())
+ .setInstances(ImmutableSet.of(new Range(0,
desiredInstances - 1))))
+ .setInitialState(ImmutableSet.of(new InstanceTaskConfig()
+ .setTask(initial.newBuilder())
+ .setInstances(ImmutableSet.of(new Range(0, intialInstances
- 1))))))));
}
private IExpectationSetters<?> expectNoJobUpdates() {
- return expect(jobUpdateStore.fetchJobUpdateSummaries(updateQuery(ROLE)))
- .andReturn(ImmutableList.of());
+ return
expect(jobUpdateStore.fetchJobUpdates(updateQuery(ROLE))).andReturn(ImmutableList.of());
}
private IExpectationSetters<?> expectNoTasks() {