Repository: aurora Updated Branches: refs/heads/master 96c990875 -> ae051f3b9
Moving db migration into LogStorage Reviewed at https://reviews.apache.org/r/46291/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/ae051f3b Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/ae051f3b Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/ae051f3b Branch: refs/heads/master Commit: ae051f3b92797d5c9f328c6c6d42d03ee4077938 Parents: 96c9908 Author: Maxim Khutornenko <[email protected]> Authored: Mon Apr 18 11:24:54 2016 -0700 Committer: Maxim Khutornenko <[email protected]> Committed: Mon Apr 18 11:24:54 2016 -0700 ---------------------------------------------------------------------- .../storage/backup/TemporaryStorage.java | 5 +--- .../scheduler/storage/log/LogStorage.java | 27 +++++++++++++++++--- .../storage/log/SnapshotStoreImpl.java | 12 +-------- .../scheduler/storage/log/LogStorageTest.java | 9 +++++-- .../storage/log/SnapshotStoreImplIT.java | 4 +-- 5 files changed, 34 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/ae051f3b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java index d08873c..5c7d92f 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java @@ -76,10 +76,7 @@ interface TemporaryStorage { storage, // Safe to pass false here to default to the non-experimental task store // during restore from backup procedure. - false /** useDbSnapshotForTaskStore */, - // We can just pass an empty lambda for the MigrationManager as migration is a no-op - // when restoring from backup. - () -> { } /** migrationManager */); + false /** useDbSnapshotForTaskStore */); snapshotStore.applySnapshot(snapshot); return new TemporaryStorage() { http://git-wip-us.apache.org/repos/asf/aurora/blob/ae051f3b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java index f586186..74c4688 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java @@ -14,6 +14,7 @@ package org.apache.aurora.scheduler.storage.log; import java.io.IOException; +import java.sql.SQLException; import java.util.Date; import java.util.Map; import java.util.concurrent.ScheduledExecutorService; @@ -61,6 +62,7 @@ import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage; import org.apache.aurora.scheduler.storage.TaskStore; +import org.apache.aurora.scheduler.storage.db.MigrationManager; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent; @@ -190,6 +192,7 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore private final AttributeStore.Mutable writeBehindAttributeStore; private final JobUpdateStore.Mutable writeBehindJobUpdateStore; private final ReentrantLock writeLock; + private final MigrationManager migrationManager; private StreamManager streamManager; private final WriteAheadStorage writeAheadStorage; @@ -222,7 +225,8 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore @Volatile AttributeStore.Mutable attributeStore, @Volatile JobUpdateStore.Mutable jobUpdateStore, EventSink eventSink, - ReentrantLock writeLock) { + ReentrantLock writeLock, + MigrationManager migrationManager) { this(logManager, new ScheduledExecutorSchedulingService(shutdownRegistry, settings.getShutdownGracePeriod()), @@ -237,7 +241,8 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore attributeStore, jobUpdateStore, eventSink, - writeLock); + writeLock, + migrationManager); } @VisibleForTesting @@ -255,7 +260,8 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore AttributeStore.Mutable attributeStore, JobUpdateStore.Mutable jobUpdateStore, EventSink eventSink, - ReentrantLock writeLock) { + ReentrantLock writeLock, + MigrationManager migrationManager) { this.logManager = requireNonNull(logManager); this.schedulingService = requireNonNull(schedulingService); @@ -275,6 +281,7 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore this.writeBehindAttributeStore = requireNonNull(attributeStore); this.writeBehindJobUpdateStore = requireNonNull(jobUpdateStore); this.writeLock = requireNonNull(writeLock); + this.migrationManager = requireNonNull(migrationManager); TransactionManager transactionManager = new TransactionManager() { @Override public boolean hasActiveTransaction() { @@ -415,6 +422,11 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore // We replay these entries in the forwarded storage system's transactions but not ours - we // do not want to re-record these ops to the log. recover(); + + // Apply any schema/data migrations before 'activating' write-ahead (native log) writes. + migrate(); + + // Activate write-ahead writes now that the store is fully recovered and migrated. recovered = true; // Now that we're recovered we should let any mutations done in initializationLogic append @@ -439,6 +451,15 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore } } + @Timed("scheduler_log_migrate") + void migrate() { + try { + migrationManager.migrate(); + } catch (SQLException e) { + throw new RecoveryFailedException(e); + } + } + private static final class RecoveryFailedException extends SchedulerException { RecoveryFailedException(Throwable cause) { super(cause); http://git-wip-us.apache.org/repos/asf/aurora/blob/ae051f3b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java index b6922e1..97b9e26 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java @@ -56,7 +56,6 @@ import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; import org.apache.aurora.scheduler.storage.Storage.Volatile; -import org.apache.aurora.scheduler.storage.db.MigrationManager; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent; @@ -148,12 +147,6 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> { } catch (SQLException e) { Throwables.propagate(e); } - - try { - migrationManager.migrate(); - } catch (SQLException e) { - Throwables.propagate(e); - } } } }, @@ -356,7 +349,6 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> { private final Clock clock; private final Storage storage; private final boolean useDbSnapshotForTaskStore; - private final MigrationManager migrationManager; /** * Identifies if experimental task store is in use. @@ -371,14 +363,12 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> { BuildInfo buildInfo, Clock clock, @Volatile Storage storage, - @ExperimentalTaskStore boolean useDbSnapshotForTaskStore, - MigrationManager migrationManager) { + @ExperimentalTaskStore boolean useDbSnapshotForTaskStore) { this.buildInfo = requireNonNull(buildInfo); this.clock = requireNonNull(clock); this.storage = requireNonNull(storage); this.useDbSnapshotForTaskStore = useDbSnapshotForTaskStore; - this.migrationManager = requireNonNull(migrationManager); } @Timed("snapshot_create") http://git-wip-us.apache.org/repos/asf/aurora/blob/ae051f3b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java index bf9479d..a2b1d22 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java @@ -92,6 +92,7 @@ import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; import org.apache.aurora.scheduler.storage.Storage.MutateWork; import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet; +import org.apache.aurora.scheduler.storage.db.MigrationManager; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent; @@ -142,7 +143,7 @@ public class LogStorageTest extends EasyMockTest { private EventSink eventSink; @Before - public void setUp() { + public void setUp() throws Exception { log = createMock(Log.class); deduplicator = createMock(SnapshotDeduplicator.class); @@ -160,6 +161,9 @@ public class LogStorageTest extends EasyMockTest { snapshotStore = createMock(new Clazz<SnapshotStore<Snapshot>>() { }); storageUtil = new StorageTestUtil(this); eventSink = createMock(EventSink.class); + MigrationManager migrationManager = createMock(MigrationManager.class); + migrationManager.migrate(); + expectLastCall().anyTimes(); logStorage = new LogStorage( logManager, @@ -175,7 +179,8 @@ public class LogStorageTest extends EasyMockTest { storageUtil.attributeStore, storageUtil.jobUpdateStore, eventSink, - new ReentrantLock()); + new ReentrantLock(), + migrationManager); stream = createMock(Stream.class); streamMatcher = LogOpMatcher.matcherFor(stream); http://git-wip-us.apache.org/repos/asf/aurora/blob/ae051f3b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java index ff9c1d0..d5918b9 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java @@ -54,7 +54,6 @@ import org.apache.aurora.scheduler.base.TaskTestUtil; import org.apache.aurora.scheduler.resources.ResourceAggregates; import org.apache.aurora.scheduler.storage.SnapshotStore; import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.db.MigrationManager; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; import org.apache.aurora.scheduler.storage.entities.IJobKey; @@ -98,8 +97,7 @@ public class SnapshotStoreImplIT { generateBuildInfo(), clock, storage, - dbTaskStore, - createStorageInjector(testModuleWithWorkQueue()).getInstance(MigrationManager.class)); + dbTaskStore); } private static Snapshot makeComparable(Snapshot snapshot) {
