Repository: aurora Updated Branches: refs/heads/master a91a759d0 -> 26efe5517
Implementing db snapshotting Bugs closed: AURORA-1627 Reviewed at https://reviews.apache.org/r/44471/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/26efe551 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/26efe551 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/26efe551 Branch: refs/heads/master Commit: 26efe5517fc0cb471101fdcb072e5dbf5d20bc56 Parents: a91a759 Author: Maxim Khutornenko <[email protected]> Authored: Tue Mar 8 10:10:45 2016 -0800 Committer: Maxim Khutornenko <[email protected]> Committed: Tue Mar 8 10:10:45 2016 -0800 ---------------------------------------------------------------------- NEWS | 2 + .../thrift/org/apache/aurora/gen/storage.thrift | 3 + config/checkstyle/checkstyle.xml | 3 - .../org/apache/aurora/benchmark/JobUpdates.java | 50 +++- .../aurora/benchmark/SnapshotBenchmarks.java | 69 ++--- .../aurora/benchmark/UpdateStoreBenchmarks.java | 55 +--- .../aurora/scheduler/storage/Storage.java | 13 + .../storage/backup/TemporaryStorage.java | 5 +- .../aurora/scheduler/storage/db/DbModule.java | 4 +- .../aurora/scheduler/storage/db/DbStorage.java | 6 + .../scheduler/storage/log/LogStorageModule.java | 6 + .../storage/log/SnapshotStoreImpl.java | 175 +++++++++++-- .../storage/log/WriteAheadStorage.java | 6 + .../scheduler/storage/backup/RecoveryTest.java | 12 +- .../storage/log/SnapshotStoreImplIT.java | 262 +++++++++++++++++++ .../storage/log/SnapshotStoreImplTest.java | 196 -------------- .../sh/org/apache/aurora/e2e/test_end_to_end.sh | 1 + 17 files changed, 549 insertions(+), 319 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/NEWS ---------------------------------------------------------------------- diff --git a/NEWS b/NEWS index b84a945..7905451 100644 --- a/NEWS +++ b/NEWS @@ -14,6 +14,8 @@ Deprecations and removals: - `TaskConfig.jobName` - `TaskQuery.owner` - Task ID strings are no longer prefixed by a timestamp. +- Scheduler H2 in-memory database is now using MVStore: http://www.h2database.com/html/mvstore.html. + In addition, scheduler thrift snapshots are now supporting full DB dumps for faster restarts. 0.12.0 ------ http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/api/src/main/thrift/org/apache/aurora/gen/storage.thrift ---------------------------------------------------------------------- diff --git a/api/src/main/thrift/org/apache/aurora/gen/storage.thrift b/api/src/main/thrift/org/apache/aurora/gen/storage.thrift index 6dc4614..9e4213f 100644 --- a/api/src/main/thrift/org/apache/aurora/gen/storage.thrift +++ b/api/src/main/thrift/org/apache/aurora/gen/storage.thrift @@ -149,6 +149,9 @@ struct Snapshot { 8: set<QuotaConfiguration> quotaConfigurations 9: set<api.Lock> locks 10: set<StoredJobUpdateDetails> jobUpdateDetails + 11: list<string> dbScript + // Indicates if experimental DB store for tasks and cron jobs was enabled when snapshot was cut. + 12: bool experimentalTaskStore } // A message header that calls out the number of expected FrameChunks to follow to form a complete http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/config/checkstyle/checkstyle.xml ---------------------------------------------------------------------- diff --git a/config/checkstyle/checkstyle.xml b/config/checkstyle/checkstyle.xml index 2074beb..abc0760 100644 --- a/config/checkstyle/checkstyle.xml +++ b/config/checkstyle/checkstyle.xml @@ -234,9 +234,6 @@ limitations under the License. <module name="NestedForDepth"> <property name="max" value="2"/> </module> - <module name="NestedTryDepth"> - <property name="max" value="1"/> - </module> <module name="NoClone"/> <module name="NoFinalizer"/> <module name="SuperClone"/> http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java b/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java index 50044e1..f4f8d00 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java +++ b/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java @@ -17,6 +17,7 @@ import java.util.Arrays; import java.util.Set; import java.util.UUID; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; @@ -32,19 +33,66 @@ import org.apache.aurora.gen.JobUpdateKey; import org.apache.aurora.gen.JobUpdateSettings; import org.apache.aurora.gen.JobUpdateStatus; import org.apache.aurora.gen.JobUpdateSummary; +import org.apache.aurora.gen.Lock; +import org.apache.aurora.gen.LockKey; import org.apache.aurora.gen.Range; import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.scheduler.base.TaskTestUtil; +import org.apache.aurora.scheduler.storage.JobUpdateStore; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent; import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; +import org.apache.aurora.scheduler.storage.entities.ILock; /** * Job update factory. */ final class JobUpdates { + private JobUpdates() { + // Utility class. + } + + /** + * Saves job updates into provided storage. + * + * @param storage {@link Storage} instance. + * @param updates updates to save. + * @return update keys. + */ + static Set<IJobUpdateKey> saveUpdates(Storage storage, Iterable<IJobUpdateDetails> updates) { + ImmutableSet.Builder<IJobUpdateKey> keyBuilder = ImmutableSet.builder(); + storage.write((Storage.MutateWork.NoResult.Quiet) store -> { + JobUpdateStore.Mutable updateStore = store.getJobUpdateStore(); + updateStore.deleteAllUpdatesAndEvents(); + for (IJobUpdateDetails details : updates) { + IJobUpdateKey key = details.getUpdate().getSummary().getKey(); + keyBuilder.add(key); + String lockToken = UUID.randomUUID().toString(); + store.getLockStore().saveLock(ILock.build(new Lock() + .setKey(LockKey.job(key.getJob().newBuilder())) + .setToken(lockToken) + .setUser(Builder.USER) + .setTimestampMs(0L))); + + updateStore.saveJobUpdate(details.getUpdate(), Optional.of(lockToken)); + + for (IJobUpdateEvent updateEvent : details.getUpdateEvents()) { + updateStore.saveJobUpdateEvent(key, updateEvent); + } + + for (IJobInstanceUpdateEvent instanceEvent : details.getInstanceEvents()) { + updateStore.saveJobInstanceUpdateEvent(key, instanceEvent); + } + } + }); + return keyBuilder.build(); + } static final class Builder { - private static final String USER = "user"; + static final String USER = "user"; private int numEvents = 1; private int numInstanceEvents = 5000; private int numInstanceOverrides = 1; http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java index ca484fa..2c56b2e 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java +++ b/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java @@ -13,32 +13,26 @@ */ package org.apache.aurora.benchmark; -import java.util.Set; -import java.util.UUID; import java.util.concurrent.TimeUnit; import javax.inject.Singleton; import com.google.common.base.Optional; -import com.google.common.collect.ImmutableSet; import com.google.inject.AbstractModule; import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.Key; +import com.google.inject.TypeLiteral; import org.apache.aurora.benchmark.fakes.FakeStatsProvider; import org.apache.aurora.common.inject.Bindings; import org.apache.aurora.common.stats.StatsProvider; import org.apache.aurora.common.util.Clock; -import org.apache.aurora.gen.Lock; -import org.apache.aurora.gen.LockKey; import org.apache.aurora.gen.storage.Snapshot; -import org.apache.aurora.gen.storage.StoredJobUpdateDetails; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.db.DbModule; -import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails; -import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl; +import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl.ExperimentalTaskStore; import org.apache.thrift.TException; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; @@ -68,6 +62,7 @@ public class SnapshotBenchmarks { public static class RestoreSnapshotWithUpdatesBenchmark { private SnapshotStoreImpl snapshotStore; private Snapshot snapshot; + private Storage storage; @Param({"1", "5", "10"}) private int updateCount; @@ -88,44 +83,34 @@ public class SnapshotBenchmarks { // Return non-guessable result to satisfy "blackhole" requirement. return System.currentTimeMillis() % 5 == 0; } - } - - private static SnapshotStoreImpl getSnapshotStore() { - Bindings.KeyFactory keyFactory = Bindings.annotatedKeyFactory(Storage.Volatile.class); - Injector injector = Guice.createInjector( - new AbstractModule() { - @Override - protected void configure() { - bind(Clock.class).toInstance(Clock.SYSTEM_CLOCK); - bind(StatsProvider.class).toInstance(new FakeStatsProvider()); - bind(SnapshotStoreImpl.class).in(Singleton.class); - } - }, - DbModule.testModule(keyFactory, Optional.of(new DbModule.TaskStoreModule(keyFactory)))); - Storage storage = injector.getInstance(Key.get(Storage.class, Storage.Volatile.class)); - storage.prepare(); - return injector.getInstance(SnapshotStoreImpl.class); - } + private SnapshotStoreImpl getSnapshotStore() { + Bindings.KeyFactory keyFactory = Bindings.annotatedKeyFactory(Storage.Volatile.class); + Injector injector = Guice.createInjector( + new AbstractModule() { + @Override + protected void configure() { + bind(Clock.class).toInstance(Clock.SYSTEM_CLOCK); + bind(StatsProvider.class).toInstance(new FakeStatsProvider()); + bind(SnapshotStoreImpl.class).in(Singleton.class); + bind(new TypeLiteral<Boolean>() { }).annotatedWith(ExperimentalTaskStore.class) + .toInstance(true); + } + }, + DbModule.testModule(keyFactory, Optional.of(new DbModule.TaskStoreModule(keyFactory)))); - private static Snapshot createSnapshot(int updates, int events, int instanceEvents) { - Set<IJobUpdateDetails> updateDetails = new JobUpdates.Builder() - .setNumEvents(events) - .setNumInstanceEvents(instanceEvents) - .build(updates); + storage = injector.getInstance(Key.get(Storage.class, Storage.Volatile.class)); + storage.prepare(); + return injector.getInstance(SnapshotStoreImpl.class); + } - ImmutableSet.Builder<Lock> lockBuilder = ImmutableSet.builder(); - ImmutableSet.Builder<StoredJobUpdateDetails> detailsBuilder = ImmutableSet.builder(); - for (IJobUpdateDetails details : updateDetails) { - IJobUpdateKey key = details.getUpdate().getSummary().getKey(); - String lockToken = UUID.randomUUID().toString(); + private Snapshot createSnapshot(int updates, int events, int instanceEvents) { + JobUpdates.saveUpdates(storage, new JobUpdates.Builder() + .setNumEvents(events) + .setNumInstanceEvents(instanceEvents) + .build(updates)); - lockBuilder.add(new Lock(LockKey.job(key.getJob().newBuilder()), lockToken, "user", 0L)); - detailsBuilder.add(new StoredJobUpdateDetails(details.newBuilder(), lockToken)); + return snapshotStore.createSnapshot(); } - - return new Snapshot() - .setLocks(lockBuilder.build()) - .setJobUpdateDetails(detailsBuilder.build()); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java index 92849d9..e5228ae 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java +++ b/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java @@ -14,24 +14,15 @@ package org.apache.aurora.benchmark; import java.util.Set; -import java.util.UUID; import java.util.concurrent.TimeUnit; -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; -import org.apache.aurora.gen.Lock; -import org.apache.aurora.gen.LockKey; -import org.apache.aurora.scheduler.storage.JobUpdateStore; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; import org.apache.aurora.scheduler.storage.db.DbUtil; -import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent; import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails; -import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent; import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; -import org.apache.aurora.scheduler.storage.entities.ILock; import org.apache.thrift.TException; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; @@ -48,8 +39,6 @@ import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.Warmup; public class UpdateStoreBenchmarks { - private static final String USER = "user"; - @BenchmarkMode(Mode.Throughput) @OutputTimeUnit(TimeUnit.SECONDS) @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) @@ -70,12 +59,9 @@ public class UpdateStoreBenchmarks { @Setup(Level.Iteration) public void setUpIteration() { - storage.write((NoResult.Quiet) storeProvider -> { - Set<IJobUpdateDetails> updates = - new JobUpdates.Builder().setNumInstanceEvents(instances).build(1); - - keys = saveToStore(updates, storeProvider); - }); + keys = JobUpdates.saveUpdates( + storage, + new JobUpdates.Builder().setNumInstanceEvents(instances).build(1)); } @TearDown(Level.Iteration) @@ -113,12 +99,9 @@ public class UpdateStoreBenchmarks { @Setup(Level.Iteration) public void setUpIteration() { - storage.write((NoResult.Quiet) storeProvider -> { - Set<IJobUpdateDetails> updates = - new JobUpdates.Builder().setNumInstanceOverrides(instanceOverrides).build(1); - - keys = saveToStore(updates, storeProvider); - }); + keys = JobUpdates.saveUpdates( + storage, + new JobUpdates.Builder().setNumInstanceOverrides(instanceOverrides).build(1)); } @TearDown(Level.Iteration) @@ -135,30 +118,4 @@ public class UpdateStoreBenchmarks { Iterables.getOnlyElement(keys)).get()); } } - - private static Set<IJobUpdateKey> saveToStore( - Set<IJobUpdateDetails> updates, - Storage.MutableStoreProvider storeProvider) { - - JobUpdateStore.Mutable updateStore = storeProvider.getJobUpdateStore(); - ImmutableSet.Builder<IJobUpdateKey> keyBuilder = ImmutableSet.builder(); - for (IJobUpdateDetails details : updates) { - IJobUpdateKey key = details.getUpdate().getSummary().getKey(); - keyBuilder.add(key); - String lockToken = UUID.randomUUID().toString(); - storeProvider.getLockStore().saveLock( - ILock.build(new Lock(LockKey.job(key.getJob().newBuilder()), lockToken, USER, 0L))); - - updateStore.saveJobUpdate(details.getUpdate(), Optional.of(lockToken)); - - for (IJobUpdateEvent updateEvent : details.getUpdateEvents()) { - updateStore.saveJobUpdateEvent(key, updateEvent); - } - - for (IJobInstanceUpdateEvent instanceEvent : details.getInstanceEvents()) { - updateStore.saveJobInstanceUpdateEvent(key, instanceEvent); - } - } - return keyBuilder.build(); - } } http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/main/java/org/apache/aurora/scheduler/storage/Storage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java index 5124d17..859c964 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java @@ -65,6 +65,19 @@ public interface Storage { QuotaStore.Mutable getQuotaStore(); AttributeStore.Mutable getAttributeStore(); JobUpdateStore.Mutable getJobUpdateStore(); + + /** + * Gets direct low level access to the underlying storage. + * <p> + * This grants a potentially dangerous direct access to the underlying storage and should + * only be used during storage initialization when unstructured bulk data manipulations + * are required. + * </p> + * + * @param <T> Direct access type. + * @return Direct read/write accessor to the storage. + */ + <T> T getUnsafeStoreAccess(); } /** http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java index 46b3d10..5c7d92f 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java @@ -73,7 +73,10 @@ interface TemporaryStorage { final SnapshotStore<Snapshot> snapshotStore = new SnapshotStoreImpl( buildInfo, clock, - storage); + storage, + // Safe to pass false here to default to the non-experimental task store + // during restore from backup procedure. + false /** useDbSnapshotForTaskStore */); snapshotStore.applySnapshot(snapshot); return new TemporaryStorage() { http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java index 6d8fa11..ff663fa 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java @@ -68,7 +68,7 @@ public final class DbModule extends PrivateModule { @CmdLine(name = "use_beta_db_task_store", help = "Whether to use the experimental database-backed task store.") - private static final Arg<Boolean> USE_DB_TASK_STORE = Arg.create(false); + public static final Arg<Boolean> USE_DB_TASK_STORE = Arg.create(false); @CmdLine(name = "slow_query_log_threshold", help = "Log all queries that take at least this long to execute.") @@ -115,8 +115,6 @@ public final class DbModule extends PrivateModule { Map<String, String> args = ImmutableMap.<String, String>builder() .putAll(jdbcUriArgs) - // We always disable the MvStore, as it is in beta as of this writing. - .put("MV_STORE", "false") // READ COMMITTED transaction isolation. More details here // http://www.h2database.com/html/advanced.html?#transaction_isolation .put("LOCK_MODE", "3") http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java index cca92dd..360914e 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java @@ -134,6 +134,12 @@ class DbStorage extends AbstractIdleService implements Storage { public JobUpdateStore.Mutable getJobUpdateStore() { return jobUpdateStore; } + + @Override + @SuppressWarnings("unchecked") + public <T> T getUnsafeStoreAccess() { + return (T) sessionFactory.getConfiguration().getEnvironment().getDataSource(); + } }; this.statsProvider = requireNonNull(statsProvider); } http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java index ed63a74..7dcd1bf 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java @@ -31,8 +31,10 @@ import org.apache.aurora.scheduler.storage.CallOrderEnforcingStorage; import org.apache.aurora.scheduler.storage.DistributedSnapshotStore; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage; +import org.apache.aurora.scheduler.storage.db.DbModule; import org.apache.aurora.scheduler.storage.log.LogManager.MaxEntrySize; import org.apache.aurora.scheduler.storage.log.LogStorage.Settings; +import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl.ExperimentalTaskStore; import static org.apache.aurora.scheduler.storage.log.EntrySerializer.EntrySerializerImpl; import static org.apache.aurora.scheduler.storage.log.LogManager.LogEntryHashFunction; @@ -67,6 +69,9 @@ public class LogStorageModule extends PrivateModule { bind(Settings.class) .toInstance(new Settings(SHUTDOWN_GRACE_PERIOD.get(), SNAPSHOT_INTERVAL.get())); + bind(new TypeLiteral<Boolean>() { }).annotatedWith(ExperimentalTaskStore.class) + .toInstance(DbModule.USE_DB_TASK_STORE.get()); + bind(new TypeLiteral<Amount<Integer, Data>>() { }).annotatedWith(MaxEntrySize.class) .toInstance(MAX_LOG_ENTRY_SIZE.get()); bind(LogManager.class).in(Singleton.class); @@ -77,6 +82,7 @@ public class LogStorageModule extends PrivateModule { expose(Storage.class); expose(NonVolatileStorage.class); expose(DistributedSnapshotStore.class); + expose(new TypeLiteral<Boolean>() { }).annotatedWith(ExperimentalTaskStore.class); bind(EntrySerializer.class).to(EntrySerializerImpl.class); // TODO(ksweeney): We don't need a cryptographic checksum here - assess performance of MD5 http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java index db90150..6fee251 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java @@ -13,13 +13,28 @@ */ package org.apache.aurora.scheduler.storage.log; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; import java.util.Arrays; +import java.util.List; import java.util.Map; import javax.inject.Inject; +import javax.inject.Qualifier; +import javax.sql.DataSource; +import com.google.common.base.Joiner; import com.google.common.base.Optional; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; import org.apache.aurora.common.inject.TimedInterceptor.Timed; import org.apache.aurora.common.util.BuildInfo; @@ -40,7 +55,6 @@ import org.apache.aurora.scheduler.storage.SnapshotStore; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; -import org.apache.aurora.scheduler.storage.Storage.StoreProvider; import org.apache.aurora.scheduler.storage.Storage.Volatile; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; @@ -64,17 +78,94 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> { private static final Logger LOG = LoggerFactory.getLogger(SnapshotStoreImpl.class); - private static final Iterable<SnapshotField> SNAPSHOT_FIELDS = Arrays.asList( + /** + * Number of rows to run in a single batch during dbsnapshot restore. + */ + private static final int DB_BATCH_SIZE = 20; + + private static boolean hasDbSnapshot(Snapshot snapshot) { + return snapshot.isSetDbScript(); + } + + private boolean hasDbTaskStore(Snapshot snapshot) { + return useDbSnapshotForTaskStore + && hasDbSnapshot(snapshot) + && snapshot.isExperimentalTaskStore(); + } + + private final Iterable<SnapshotField> snapshotFields = Arrays.asList( + // Order is critical here. The DB snapshot should always be tried first to ensure + // graceful migration to DBTaskStore. Otherwise, there is a direct risk of losing the cluster. + // The following scenario illustrates how that can happen: + // - Dbsnapshot:ON, DBTaskStore:OFF + // - Scheduler is updated with DBTaskStore:ON, restarts and populates all tasks from snapshot + // - Should the dbsnapshot get applied last, all tables would be dropped and recreated BUT + // since there was no task data stored in dbsnapshot (DBTaskStore was OFF last time + // snapshot was cut), all tasks would be erased + // - If the above is not detected before a new snapshot is cut all tasks will be dropped the + // moment a new snapshot is created + new SnapshotField() { + @Override + public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) { + LOG.info("Saving dbsnapshot"); + // Note: we don't use mybatis mapped statements for performance reasons and to avoid + // mapping/unmapping hassle as snapshot commands should never be used upstream. + try (Connection c = ((DataSource) store.getUnsafeStoreAccess()).getConnection()) { + try (PreparedStatement ps = c.prepareStatement("SCRIPT")) { + try (ResultSet rs = ps.executeQuery()) { + ImmutableList.Builder<String> builder = ImmutableList.builder(); + while (rs.next()) { + String columnValue = rs.getString("SCRIPT"); + builder.add(columnValue + "\n"); + } + snapshot.setDbScript(builder.build()); + } + } + } catch (SQLException e) { + Throwables.propagate(e); + } + } + + @Override + public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) { + + if (snapshot.isSetDbScript()) { + try (Connection c = ((DataSource) store.getUnsafeStoreAccess()).getConnection()) { + LOG.info("Dropping all tables"); + try (PreparedStatement drop = c.prepareStatement("DROP ALL OBJECTS")) { + drop.executeUpdate(); + } + + LOG.info("Restoring dbsnapshot. Row count: " + snapshot.getDbScript().size()); + // Partition the restore script into manageable size batches to avoid possible OOM + // due to large size DML statement. + List<List<String>> batches = Lists.partition(snapshot.getDbScript(), DB_BATCH_SIZE); + for (List<String> batch : batches) { + try (PreparedStatement restore = c.prepareStatement(Joiner.on("").join(batch))) { + restore.executeUpdate(); + } + } + } catch (SQLException e) { + Throwables.propagate(e); + } + } + } + }, new SnapshotField() { // It's important for locks to be replayed first, since there are relations that expect // references to be valid on insertion. @Override - public void saveToSnapshot(StoreProvider store, Snapshot snapshot) { + public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) { snapshot.setLocks(ILock.toBuildersSet(store.getLockStore().fetchLocks())); } @Override public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) { + if (hasDbSnapshot(snapshot)) { + LOG.info("Deferring lock restore to dbsnapshot"); + return; + } + store.getLockStore().deleteLocks(); if (snapshot.isSetLocks()) { @@ -86,38 +177,42 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> { }, new SnapshotField() { @Override - public void saveToSnapshot(StoreProvider storeProvider, Snapshot snapshot) { + public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) { snapshot.setHostAttributes( - IHostAttributes.toBuildersSet(storeProvider.getAttributeStore().getHostAttributes())); + IHostAttributes.toBuildersSet(store.getAttributeStore().getHostAttributes())); } @Override public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) { + if (hasDbSnapshot(snapshot)) { + LOG.info("Deferring attribute restore to dbsnapshot"); + return; + } + store.getAttributeStore().deleteHostAttributes(); if (snapshot.isSetHostAttributes()) { for (HostAttributes attributes : snapshot.getHostAttributes()) { - // Prior to commit 5cf760b, the store would persist maintenance mode changes for - // unknown hosts. 5cf760b began rejecting these, but the replicated log may still - // contain entries with a null slave ID. - if (attributes.isSetSlaveId()) { - store.getAttributeStore().saveHostAttributes(IHostAttributes.build(attributes)); - } else { - LOG.info("Dropping host attributes with no slave ID: " + attributes); - } + store.getAttributeStore().saveHostAttributes(IHostAttributes.build(attributes)); } } } }, new SnapshotField() { @Override - public void saveToSnapshot(StoreProvider store, Snapshot snapshot) { + public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) { snapshot.setTasks( IScheduledTask.toBuildersSet(store.getTaskStore().fetchTasks(Query.unscoped()))); + snapshot.setExperimentalTaskStore(useDbSnapshotForTaskStore); } @Override public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) { + if (hasDbTaskStore(snapshot)) { + LOG.info("Deferring task restore to dbsnapshot"); + return; + } + store.getUnsafeTaskStore().deleteAllTasks(); if (snapshot.isSetTasks()) { @@ -128,17 +223,23 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> { }, new SnapshotField() { @Override - public void saveToSnapshot(StoreProvider store, Snapshot snapshot) { + public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) { ImmutableSet.Builder<StoredCronJob> jobs = ImmutableSet.builder(); for (IJobConfiguration config : store.getCronJobStore().fetchJobs()) { jobs.add(new StoredCronJob(config.newBuilder())); } snapshot.setCronJobs(jobs.build()); + snapshot.setExperimentalTaskStore(useDbSnapshotForTaskStore); } @Override public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) { + if (hasDbTaskStore(snapshot)) { + LOG.info("Deferring cron job restore to dbsnapshot"); + return; + } + store.getCronJobStore().deleteJobs(); if (snapshot.isSetCronJobs()) { @@ -151,12 +252,17 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> { }, new SnapshotField() { @Override - public void saveToSnapshot(StoreProvider store, Snapshot snapshot) { + public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) { // SchedulerMetadata is updated outside of the static list of SnapshotFields } @Override public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) { + if (hasDbSnapshot(snapshot)) { + LOG.info("Deferring metadata restore to dbsnapshot"); + return; + } + if (snapshot.isSetSchedulerMetadata() && snapshot.getSchedulerMetadata().isSetFrameworkId()) { // No delete necessary here since this is a single value. @@ -168,7 +274,7 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> { }, new SnapshotField() { @Override - public void saveToSnapshot(StoreProvider store, Snapshot snapshot) { + public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) { ImmutableSet.Builder<QuotaConfiguration> quotas = ImmutableSet.builder(); for (Map.Entry<String, IResourceAggregate> entry : store.getQuotaStore().fetchQuotas().entrySet()) { @@ -181,6 +287,11 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> { @Override public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) { + if (hasDbSnapshot(snapshot)) { + LOG.info("Deferring quota restore to dbsnapshot"); + return; + } + store.getQuotaStore().deleteQuotas(); if (snapshot.isSetQuotaConfigurations()) { @@ -193,12 +304,17 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> { }, new SnapshotField() { @Override - public void saveToSnapshot(StoreProvider store, Snapshot snapshot) { + public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) { snapshot.setJobUpdateDetails(store.getJobUpdateStore().fetchAllJobUpdateDetails()); } @Override public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) { + if (hasDbSnapshot(snapshot)) { + LOG.info("Deferring job update restore to dbsnapshot"); + return; + } + JobUpdateStore.Mutable updateStore = store.getJobUpdateStore(); updateStore.deleteAllUpdatesAndEvents(); @@ -233,12 +349,27 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> { private final BuildInfo buildInfo; private final Clock clock; private final Storage storage; + private final boolean useDbSnapshotForTaskStore; + + /** + * Identifies if experimental task store is in use. + */ + @Retention(RetentionPolicy.RUNTIME) + @Target({ ElementType.PARAMETER, ElementType.METHOD }) + @Qualifier + public @interface ExperimentalTaskStore { } @Inject - public SnapshotStoreImpl(BuildInfo buildInfo, Clock clock, @Volatile Storage storage) { + public SnapshotStoreImpl( + BuildInfo buildInfo, + Clock clock, + @Volatile Storage storage, + @ExperimentalTaskStore boolean useDbSnapshotForTaskStore) { + this.buildInfo = requireNonNull(buildInfo); this.clock = requireNonNull(clock); this.storage = requireNonNull(storage); + this.useDbSnapshotForTaskStore = useDbSnapshotForTaskStore; } @Timed("snapshot_create") @@ -252,7 +383,7 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> { // Capture timestamp to signify the beginning of a snapshot operation, apply after in case // one of the field closures is mean and tries to apply a timestamp. long timestamp = clock.nowMillis(); - for (SnapshotField field : SNAPSHOT_FIELDS) { + for (SnapshotField field : snapshotFields) { field.saveToSnapshot(storeProvider, snapshot); } @@ -274,14 +405,14 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> { storage.write((NoResult.Quiet) storeProvider -> { LOG.info("Restoring snapshot."); - for (SnapshotField field : SNAPSHOT_FIELDS) { + for (SnapshotField field : snapshotFields) { field.restoreFromSnapshot(storeProvider, snapshot); } }); } private interface SnapshotField { - void saveToSnapshot(StoreProvider storeProvider, Snapshot snapshot); + void saveToSnapshot(MutableStoreProvider storeProvider, Snapshot snapshot); void restoreFromSnapshot(MutableStoreProvider storeProvider, Snapshot snapshot); } http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java index 2f07afb..d0de063 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java @@ -381,4 +381,10 @@ class WriteAheadStorage extends WriteAheadStorageForwarder implements public JobUpdateStore.Mutable getJobUpdateStore() { return this; } + + @Override + public <T> T getUnsafeStoreAccess() { + throw new UnsupportedOperationException( + "Unsupported since casual storage users should never be doing this."); + } } http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java b/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java index 172dd20..a33f6f7 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java @@ -96,7 +96,8 @@ public class RecoveryTest extends EasyMockTest { expect(snapshotStore.createSnapshot()).andReturn(SNAPSHOT1); Capture<MutateWork<Object, Exception>> transaction = createCapture(); expect(primaryStorage.write(capture(transaction))).andReturn(null); - distributedStore.persist(SNAPSHOT1); + Capture<Snapshot> snapshot = createCapture(); + distributedStore.persist(capture(snapshot)); shutDownNow.execute(); control.replay(); @@ -114,6 +115,9 @@ public class RecoveryTest extends EasyMockTest { recovery.query(Query.unscoped())); recovery.commit(); transaction.getValue().apply(storeProvider); + + snapshot.getValue().unsetDbScript(); + assertEquals(SNAPSHOT1, snapshot.getValue()); } @Test @@ -122,7 +126,8 @@ public class RecoveryTest extends EasyMockTest { Snapshot modified = SNAPSHOT1.deepCopy().setTasks(ImmutableSet.of(TASK1.newBuilder())); Capture<MutateWork<Object, Exception>> transaction = createCapture(); expect(primaryStorage.write(capture(transaction))).andReturn(null); - distributedStore.persist(modified); + Capture<Snapshot> snapshot = createCapture(); + distributedStore.persist(capture(snapshot)); shutDownNow.execute(); control.replay(); @@ -140,6 +145,9 @@ public class RecoveryTest extends EasyMockTest { recovery.query(Query.unscoped())); recovery.commit(); transaction.getValue().apply(storeProvider); + + snapshot.getValue().unsetDbScript(); + assertEquals(modified, snapshot.getValue()); } @Test(expected = RecoveryException.class) http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java new file mode 100644 index 0000000..6a39d89 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java @@ -0,0 +1,262 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.log; + +import java.util.Map; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +import org.apache.aurora.common.util.testing.FakeBuildInfo; +import org.apache.aurora.common.util.testing.FakeClock; +import org.apache.aurora.gen.Attribute; +import org.apache.aurora.gen.CronCollisionPolicy; +import org.apache.aurora.gen.HostAttributes; +import org.apache.aurora.gen.Identity; +import org.apache.aurora.gen.InstanceTaskConfig; +import org.apache.aurora.gen.JobConfiguration; +import org.apache.aurora.gen.JobInstanceUpdateEvent; +import org.apache.aurora.gen.JobKey; +import org.apache.aurora.gen.JobUpdate; +import org.apache.aurora.gen.JobUpdateAction; +import org.apache.aurora.gen.JobUpdateDetails; +import org.apache.aurora.gen.JobUpdateEvent; +import org.apache.aurora.gen.JobUpdateInstructions; +import org.apache.aurora.gen.JobUpdateKey; +import org.apache.aurora.gen.JobUpdateSettings; +import org.apache.aurora.gen.JobUpdateState; +import org.apache.aurora.gen.JobUpdateStatus; +import org.apache.aurora.gen.JobUpdateSummary; +import org.apache.aurora.gen.Lock; +import org.apache.aurora.gen.LockKey; +import org.apache.aurora.gen.MaintenanceMode; +import org.apache.aurora.gen.Range; +import org.apache.aurora.gen.storage.QuotaConfiguration; +import org.apache.aurora.gen.storage.SchedulerMetadata; +import org.apache.aurora.gen.storage.Snapshot; +import org.apache.aurora.gen.storage.StoredCronJob; +import org.apache.aurora.gen.storage.StoredJobUpdateDetails; +import org.apache.aurora.scheduler.ResourceAggregates; +import org.apache.aurora.scheduler.base.JobKeys; +import org.apache.aurora.scheduler.base.TaskTestUtil; +import org.apache.aurora.scheduler.storage.SnapshotStore; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; +import org.apache.aurora.scheduler.storage.entities.IJobKey; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; +import org.apache.aurora.scheduler.storage.entities.ILock; +import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; +import org.apache.aurora.scheduler.storage.mem.InMemStoresModule; +import org.junit.Test; + +import static org.apache.aurora.common.inject.Bindings.KeyFactory.PLAIN; +import static org.apache.aurora.common.util.testing.FakeBuildInfo.generateBuildInfo; +import static org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; +import static org.apache.aurora.scheduler.storage.db.DbModule.testModule; +import static org.apache.aurora.scheduler.storage.db.DbUtil.createStorage; +import static org.apache.aurora.scheduler.storage.db.DbUtil.createStorageInjector; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class SnapshotStoreImplIT { + + private static final long NOW = 10335463456L; + private static final IJobKey JOB_KEY = JobKeys.from("role", "env", "job"); + + private Storage storage; + private SnapshotStore<Snapshot> snapshotStore; + + private void setUpStore(boolean dbTaskStore) { + storage = dbTaskStore + ? createStorage() + : createStorageInjector( + testModule(PLAIN, Optional.of(new InMemStoresModule(PLAIN)))).getInstance(Storage.class); + + FakeClock clock = new FakeClock(); + clock.setNowMillis(NOW); + snapshotStore = new SnapshotStoreImpl( + generateBuildInfo(), + clock, + storage, + dbTaskStore); + } + + private static Snapshot makeComparable(Snapshot snapshot) { + Snapshot copy = snapshot.deepCopy(); + // Ignore DB snapshot. It will be tested by asserting the DB data. + copy.unsetDbScript(); + copy.setExperimentalTaskStore(false); + return copy; + } + + @Test + public void testNoDBTaskStore() { + setUpStore(false); + populateStore(); + + Snapshot snapshot1 = snapshotStore.createSnapshot(); + assertEquals(expected(), makeComparable(snapshot1)); + assertFalse(snapshot1.isExperimentalTaskStore()); + + snapshotStore.applySnapshot(snapshot1); + Snapshot snapshot2 = snapshotStore.createSnapshot(); + assertEquals(expected(), makeComparable(snapshot2)); + assertEquals(makeComparable(snapshot1), makeComparable(snapshot2)); + } + + @Test + public void testMigrateToDBTaskStore() { + setUpStore(false); + populateStore(); + + Snapshot snapshot1 = snapshotStore.createSnapshot(); + assertEquals(expected(), makeComparable(snapshot1)); + assertFalse(snapshot1.isExperimentalTaskStore()); + + setUpStore(true); + snapshotStore.applySnapshot(snapshot1); + Snapshot snapshot2 = snapshotStore.createSnapshot(); + assertTrue(snapshot2.isExperimentalTaskStore()); + assertEquals(expected(), makeComparable(snapshot2)); + assertEquals(makeComparable(snapshot1), makeComparable(snapshot2)); + } + + @Test + public void testMigrateFromDBTaskStore() { + setUpStore(true); + populateStore(); + + Snapshot snapshot1 = snapshotStore.createSnapshot(); + assertEquals(expected(), makeComparable(snapshot1)); + assertTrue(snapshot1.isExperimentalTaskStore()); + + setUpStore(false); + snapshotStore.applySnapshot(snapshot1); + Snapshot snapshot2 = snapshotStore.createSnapshot(); + assertFalse(snapshot2.isExperimentalTaskStore()); + assertEquals(expected(), makeComparable(snapshot2)); + assertEquals(makeComparable(snapshot1), makeComparable(snapshot2)); + } + + @Test + public void testDBTaskStore() { + setUpStore(true); + populateStore(); + + Snapshot snapshot1 = snapshotStore.createSnapshot(); + assertEquals(expected(), makeComparable(snapshot1)); + assertTrue(snapshot1.isExperimentalTaskStore()); + + snapshotStore.applySnapshot(snapshot1); + Snapshot snapshot2 = snapshotStore.createSnapshot(); + assertEquals(expected(), makeComparable(snapshot2)); + assertEquals(makeComparable(snapshot1), makeComparable(snapshot2)); + } + + private static final IScheduledTask TASK = TaskTestUtil.makeTask("id", JOB_KEY); + private static final ITaskConfig TASK_CONFIG = TaskTestUtil.makeConfig(JOB_KEY); + private static final IJobConfiguration CRON_JOB = IJobConfiguration.build(new JobConfiguration() + .setKey(new JobKey("owner", "env", "name")) + .setOwner(new Identity("user")) + .setCronSchedule("* * * * *") + .setCronCollisionPolicy(CronCollisionPolicy.KILL_EXISTING) + .setInstanceCount(1) + .setTaskConfig(TASK_CONFIG.newBuilder())); + private static final String ROLE = "role"; + private static final IResourceAggregate QUOTA = ResourceAggregates.LARGE; + private static final IHostAttributes ATTRIBUTES = IHostAttributes.build( + new HostAttributes("host", ImmutableSet.of(new Attribute("attr", ImmutableSet.of("value")))) + .setMode(MaintenanceMode.NONE) + .setSlaveId("slave id")); + private static final String FRAMEWORK_ID = "framework_id"; + private static final Map<String, String> METADATA = ImmutableMap.of( + FakeBuildInfo.DATE, FakeBuildInfo.DATE, + FakeBuildInfo.GIT_REVISION, FakeBuildInfo.GIT_REVISION, + FakeBuildInfo.GIT_TAG, FakeBuildInfo.GIT_TAG); + private static final ILock LOCK = ILock.build(new Lock() + .setKey(LockKey.job(JobKeys.from("testRole", "testEnv", "testJob").newBuilder())) + .setToken("lockId") + .setUser("testUser") + .setTimestampMs(12345L)); + private static final IJobUpdateKey UPDATE_ID = + IJobUpdateKey.build(new JobUpdateKey(JOB_KEY.newBuilder(), "updateId1")); + private static final IJobUpdateDetails UPDATE = IJobUpdateDetails.build(new JobUpdateDetails() + .setUpdate(new JobUpdate() + .setInstructions(new JobUpdateInstructions() + .setDesiredState(new InstanceTaskConfig() + .setTask(TASK_CONFIG.newBuilder()) + .setInstances(ImmutableSet.of(new Range(0, 7)))) + .setInitialState(ImmutableSet.of( + new InstanceTaskConfig() + .setInstances(ImmutableSet.of(new Range(0, 1))) + .setTask(TASK_CONFIG.newBuilder()))) + .setSettings(new JobUpdateSettings() + .setBlockIfNoPulsesAfterMs(500) + .setUpdateGroupSize(1) + .setMaxPerInstanceFailures(1) + .setMaxFailedInstances(1) + .setMinWaitInInstanceRunningMs(200) + .setRollbackOnFailure(true) + .setWaitForBatchCompletion(true) + .setUpdateOnlyTheseInstances(ImmutableSet.of(new Range(0, 0))))) + .setSummary(new JobUpdateSummary() + .setState(new JobUpdateState().setStatus(JobUpdateStatus.ERROR)) + .setUser("user") + .setKey(UPDATE_ID.newBuilder()))) + .setUpdateEvents(ImmutableList.of(new JobUpdateEvent() + .setUser("user") + .setMessage("message") + .setStatus(JobUpdateStatus.ERROR))) + .setInstanceEvents(ImmutableList.of(new JobInstanceUpdateEvent() + .setAction(JobUpdateAction.INSTANCE_UPDATED)))); + + private Snapshot expected() { + return new Snapshot() + .setTimestamp(NOW) + .setTasks(ImmutableSet.of(TASK.newBuilder())) + .setQuotaConfigurations(ImmutableSet.of(new QuotaConfiguration(ROLE, QUOTA.newBuilder()))) + .setHostAttributes(ImmutableSet.of(ATTRIBUTES.newBuilder())) + .setCronJobs(ImmutableSet.of(new StoredCronJob(CRON_JOB.newBuilder()))) + .setSchedulerMetadata(new SchedulerMetadata(FRAMEWORK_ID, METADATA)) + .setLocks(ImmutableSet.of(LOCK.newBuilder())) + .setJobUpdateDetails(ImmutableSet.of( + new StoredJobUpdateDetails(UPDATE.newBuilder(), LOCK.getToken()))); + } + + private void populateStore() { + storage.write((NoResult.Quiet) store -> { + store.getUnsafeTaskStore().saveTasks(ImmutableSet.of(TASK)); + store.getCronJobStore().saveAcceptedJob(CRON_JOB); + store.getQuotaStore().saveQuota(ROLE, QUOTA); + store.getAttributeStore().saveHostAttributes(ATTRIBUTES); + store.getSchedulerStore().saveFrameworkId(FRAMEWORK_ID); + store.getLockStore().saveLock(LOCK); + store.getJobUpdateStore().saveJobUpdate(UPDATE.getUpdate(), Optional.of(LOCK.getToken())); + store.getJobUpdateStore().saveJobUpdateEvent( + UPDATE.getUpdate().getSummary().getKey(), + UPDATE.getUpdateEvents().get(0)); + store.getJobUpdateStore().saveJobInstanceUpdateEvent( + UPDATE.getUpdate().getSummary().getKey(), + UPDATE.getInstanceEvents().get(0) + ); + }); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplTest.java deleted file mode 100644 index 4407867..0000000 --- a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplTest.java +++ /dev/null @@ -1,196 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.storage.log; - -import java.util.Set; - -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; - -import org.apache.aurora.common.testing.easymock.EasyMockTest; -import org.apache.aurora.common.util.testing.FakeBuildInfo; -import org.apache.aurora.common.util.testing.FakeClock; -import org.apache.aurora.gen.Attribute; -import org.apache.aurora.gen.HostAttributes; -import org.apache.aurora.gen.JobConfiguration; -import org.apache.aurora.gen.JobInstanceUpdateEvent; -import org.apache.aurora.gen.JobKey; -import org.apache.aurora.gen.JobUpdate; -import org.apache.aurora.gen.JobUpdateDetails; -import org.apache.aurora.gen.JobUpdateEvent; -import org.apache.aurora.gen.JobUpdateKey; -import org.apache.aurora.gen.JobUpdateStatus; -import org.apache.aurora.gen.JobUpdateSummary; -import org.apache.aurora.gen.Lock; -import org.apache.aurora.gen.LockKey; -import org.apache.aurora.gen.ScheduleStatus; -import org.apache.aurora.gen.ScheduledTask; -import org.apache.aurora.gen.storage.QuotaConfiguration; -import org.apache.aurora.gen.storage.SchedulerMetadata; -import org.apache.aurora.gen.storage.Snapshot; -import org.apache.aurora.gen.storage.StoredCronJob; -import org.apache.aurora.gen.storage.StoredJobUpdateDetails; -import org.apache.aurora.scheduler.ResourceAggregates; -import org.apache.aurora.scheduler.base.JobKeys; -import org.apache.aurora.scheduler.base.Query; -import org.apache.aurora.scheduler.storage.SnapshotStore; -import org.apache.aurora.scheduler.storage.entities.IHostAttributes; -import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; -import org.apache.aurora.scheduler.storage.entities.IJobUpdate; -import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails; -import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; -import org.apache.aurora.scheduler.storage.entities.ILock; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; -import org.junit.Before; -import org.junit.Test; - -import static org.apache.aurora.common.util.testing.FakeBuildInfo.generateBuildInfo; -import static org.easymock.EasyMock.expect; -import static org.junit.Assert.assertEquals; - -public class SnapshotStoreImplTest extends EasyMockTest { - - private static final long NOW = 10335463456L; - private static final JobKey JOB_KEY = JobKeys.from("role", "env", "job").newBuilder(); - - private StorageTestUtil storageUtil; - private SnapshotStore<Snapshot> snapshotStore; - - @Before - public void setUp() { - FakeClock clock = new FakeClock(); - clock.setNowMillis(NOW); - storageUtil = new StorageTestUtil(this); - snapshotStore = new SnapshotStoreImpl( - generateBuildInfo(), - clock, - storageUtil.storage); - } - - private static IJobUpdateKey makeKey(String id) { - return IJobUpdateKey.build(new JobUpdateKey(JOB_KEY, id)); - } - - @Test - public void testCreateAndRestoreNewSnapshot() { - ImmutableSet<IScheduledTask> tasks = ImmutableSet.of( - IScheduledTask.build(new ScheduledTask().setStatus(ScheduleStatus.PENDING))); - - Set<QuotaConfiguration> quotas = - ImmutableSet.of( - new QuotaConfiguration("steve", ResourceAggregates.EMPTY.newBuilder())); - IHostAttributes attribute = IHostAttributes.build( - new HostAttributes("host", ImmutableSet.of(new Attribute("attr", ImmutableSet.of("value")))) - .setSlaveId("slave id")); - // A legacy attribute that has a maintenance mode set, but nothing else. These should be - // dropped. - IHostAttributes legacyAttribute = IHostAttributes.build( - new HostAttributes("host", ImmutableSet.of())); - StoredCronJob job = new StoredCronJob( - new JobConfiguration().setKey(new JobKey("owner", "env", "name"))); - String frameworkId = "framework_id"; - ILock lock = ILock.build(new Lock() - .setKey(LockKey.job(JobKeys.from("testRole", "testEnv", "testJob").newBuilder())) - .setToken("lockId") - .setUser("testUser") - .setTimestampMs(12345L)); - SchedulerMetadata metadata = new SchedulerMetadata().setFrameworkId(frameworkId); - metadata.setDetails(Maps.newHashMap()); - metadata.getDetails().put(FakeBuildInfo.DATE, FakeBuildInfo.DATE); - metadata.getDetails().put(FakeBuildInfo.GIT_REVISION, FakeBuildInfo.GIT_REVISION); - metadata.getDetails().put(FakeBuildInfo.GIT_TAG, FakeBuildInfo.GIT_TAG); - IJobUpdateKey updateId1 = makeKey("updateId1"); - IJobUpdateKey updateId2 = makeKey("updateId2"); - IJobUpdateDetails updateDetails1 = IJobUpdateDetails.build(new JobUpdateDetails() - .setUpdate(new JobUpdate().setSummary( - new JobUpdateSummary().setKey(updateId1.newBuilder()))) - .setUpdateEvents(ImmutableList.of(new JobUpdateEvent().setStatus(JobUpdateStatus.ERROR))) - .setInstanceEvents(ImmutableList.of(new JobInstanceUpdateEvent().setTimestampMs(123L)))); - - IJobUpdateDetails updateDetails2 = IJobUpdateDetails.build(new JobUpdateDetails() - .setUpdate(new JobUpdate().setSummary( - new JobUpdateSummary().setKey(updateId2.newBuilder())))); - - storageUtil.expectOperations(); - expect(storageUtil.taskStore.fetchTasks(Query.unscoped())).andReturn(tasks); - expect(storageUtil.quotaStore.fetchQuotas()) - .andReturn(ImmutableMap.of("steve", ResourceAggregates.EMPTY)); - expect(storageUtil.attributeStore.getHostAttributes()) - .andReturn(ImmutableSet.of(attribute, legacyAttribute)); - expect(storageUtil.jobStore.fetchJobs()) - .andReturn(ImmutableSet.of(IJobConfiguration.build(job.getJobConfiguration()))); - expect(storageUtil.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(frameworkId)); - expect(storageUtil.lockStore.fetchLocks()).andReturn(ImmutableSet.of(lock)); - String lockToken = "token"; - expect(storageUtil.jobUpdateStore.fetchAllJobUpdateDetails()) - .andReturn(ImmutableSet.of( - new StoredJobUpdateDetails(updateDetails1.newBuilder(), lockToken), - new StoredJobUpdateDetails(updateDetails2.newBuilder(), null))); - - expectDataWipe(); - storageUtil.taskStore.saveTasks(tasks); - storageUtil.quotaStore.saveQuota("steve", ResourceAggregates.EMPTY); - expect(storageUtil.attributeStore.saveHostAttributes(attribute)).andReturn(true); - storageUtil.jobStore.saveAcceptedJob(IJobConfiguration.build(job.getJobConfiguration())); - storageUtil.schedulerStore.saveFrameworkId(frameworkId); - storageUtil.lockStore.saveLock(lock); - storageUtil.jobUpdateStore.saveJobUpdate( - updateDetails1.getUpdate(), Optional.fromNullable(lockToken)); - storageUtil.jobUpdateStore.saveJobUpdateEvent( - updateId1, - Iterables.getOnlyElement(updateDetails1.getUpdateEvents())); - storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent( - updateId1, - Iterables.getOnlyElement(updateDetails1.getInstanceEvents())); - - // The saved object for update2 should be backfilled with update key. - JobUpdate update2Expected = updateDetails2.getUpdate().newBuilder(); - update2Expected.getSummary().setKey(updateId2.newBuilder()); - storageUtil.jobUpdateStore.saveJobUpdate( - IJobUpdate.build(update2Expected), Optional.absent()); - - control.replay(); - - Snapshot expected = new Snapshot() - .setTimestamp(NOW) - .setTasks(IScheduledTask.toBuildersSet(tasks)) - .setQuotaConfigurations(quotas) - .setHostAttributes(ImmutableSet.of(attribute.newBuilder(), legacyAttribute.newBuilder())) - .setCronJobs(ImmutableSet.of(job)) - .setSchedulerMetadata(metadata) - .setLocks(ImmutableSet.of(lock.newBuilder())) - .setJobUpdateDetails(ImmutableSet.of( - new StoredJobUpdateDetails(updateDetails1.newBuilder(), lockToken), - new StoredJobUpdateDetails(updateDetails2.newBuilder(), null))); - - Snapshot snapshot = snapshotStore.createSnapshot(); - assertEquals(expected, snapshot); - - snapshotStore.applySnapshot(expected); - } - - private void expectDataWipe() { - storageUtil.taskStore.deleteAllTasks(); - storageUtil.quotaStore.deleteQuotas(); - storageUtil.attributeStore.deleteHostAttributes(); - storageUtil.jobStore.deleteJobs(); - storageUtil.lockStore.deleteLocks(); - storageUtil.jobUpdateStore.deleteAllUpdatesAndEvents(); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/26efe551/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh ---------------------------------------------------------------------- diff --git a/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh b/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh index 75130a3..b469f9b 100755 --- a/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh +++ b/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh @@ -150,6 +150,7 @@ test_update() { assert_update_state $_jobkey 'ROLLING_FORWARD' local _update_id=$(aurora update list $_jobkey --status ROLLING_FORWARD \ | tail -n +2 | awk '{print $2}') + aurora_admin scheduler_snapshot devcluster sudo restart aurora-scheduler assert_update_state $_jobkey 'ROLLING_FORWARD' aurora update pause $_jobkey --message='hello'
