Recover snapshots via the Op stream This cleans up the various interfaces around persisting and recovering from `Snapshot`s. Most importantly, `LogPersistence` no longer bypasses the `recover()` `Op` stream to apply snapshots. As a result, it should be straightforward to build a migration utility that clones `LogPersistence` state into another `Persistence` implementation.
Reviewed at https://reviews.apache.org/r/64286/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/5f79f7ca Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/5f79f7ca Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/5f79f7ca Branch: refs/heads/master Commit: 5f79f7ca7c62f053f66a9ea925cebb78a644ce54 Parents: 4489dc3 Author: Bill Farner <[email protected]> Authored: Wed Dec 13 20:37:57 2017 -0800 Committer: Bill Farner <[email protected]> Committed: Wed Dec 13 20:37:57 2017 -0800 ---------------------------------------------------------------------- .../aurora/benchmark/SnapshotBenchmarks.java | 4 +- .../storage/DistributedSnapshotStore.java | 39 -- .../aurora/scheduler/storage/SnapshotStore.java | 25 +- .../aurora/scheduler/storage/Snapshotter.java | 43 +++ .../scheduler/storage/backup/BackupModule.java | 14 +- .../scheduler/storage/backup/Recovery.java | 10 +- .../scheduler/storage/backup/StorageBackup.java | 27 +- .../storage/backup/TemporaryStorage.java | 21 +- .../storage/durability/DurableStorage.java | 155 +------- .../scheduler/storage/durability/Loader.java | 150 ++++++++ .../storage/durability/Persistence.java | 56 ++- .../storage/durability/WriteAheadStorage.java | 368 ------------------- .../storage/durability/WriteRecorder.java | 368 +++++++++++++++++++ .../scheduler/storage/log/LogPersistence.java | 206 ++--------- .../scheduler/storage/log/LogStorageModule.java | 86 ++--- .../scheduler/storage/log/SnapshotService.java | 121 ++++++ .../storage/log/SnapshotStoreImpl.java | 236 ++++++------ .../thrift/SchedulerThriftInterface.java | 6 +- .../scheduler/app/local/LocalSchedulerMain.java | 4 +- .../scheduler/config/CommandLineTest.java | 2 - .../scheduler/storage/backup/RecoveryTest.java | 22 +- .../storage/backup/StorageBackupTest.java | 45 ++- .../storage/durability/DurableStorageTest.java | 53 +-- .../durability/WriteAheadStorageTest.java | 166 --------- .../storage/durability/WriteRecorderTest.java | 166 +++++++++ .../storage/log/LogPersistenceTest.java | 134 +++++++ .../storage/log/NonVolatileStorageTest.java | 10 +- .../storage/log/SnapshotServiceTest.java | 174 +++++++++ .../storage/log/SnapshotStoreImplIT.java | 41 +-- .../storage/testing/StorageTestUtil.java | 11 +- .../thrift/SchedulerThriftInterfaceTest.java | 6 +- .../aurora/scheduler/thrift/ThriftIT.java | 4 +- 32 files changed, 1578 insertions(+), 1195 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java index 755582d..4f99f80 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java +++ b/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java @@ -75,7 +75,7 @@ public class SnapshotBenchmarks { @Benchmark public boolean run() throws TException { - snapshotStore.applySnapshot(snapshot); + snapshotStore.asStream(snapshot); // Return non-guessable result to satisfy "blackhole" requirement. return System.currentTimeMillis() % 5 == 0; } @@ -103,7 +103,7 @@ public class SnapshotBenchmarks { .setNumInstanceEvents(instanceEvents) .build(updates)); - return snapshotStore.createSnapshot(); + return storage.write(snapshotStore::from); } } } http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java b/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java deleted file mode 100644 index 0c6a955..0000000 --- a/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.storage; - -import org.apache.aurora.codec.ThriftBinaryCodec.CodingException; -import org.apache.aurora.gen.storage.Snapshot; -import org.apache.aurora.scheduler.storage.Storage.StorageException; - -/** - * A distributed snapshot store that supports persisting globally-visible snapshots. - */ -public interface DistributedSnapshotStore { - - /** - * Clean up the underlying storage by optimizing internal data structures. Does not change - * externally-visible state but might not run concurrently with write operations. - */ - void snapshot() throws StorageException; - - /** - * Identical to {@link #snapshot()}, using a custom {@link Snapshot} rather than an - * internally-generated one based on the current state. - * - * @param snapshot Snapshot to write. - * @throws CodingException If the snapshot could not be serialized. - */ - void snapshotWith(Snapshot snapshot) throws CodingException; -} http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/SnapshotStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/SnapshotStore.java b/src/main/java/org/apache/aurora/scheduler/storage/SnapshotStore.java index 6b5e5dd..ab109ab 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/SnapshotStore.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/SnapshotStore.java @@ -13,24 +13,27 @@ */ package org.apache.aurora.scheduler.storage; +import org.apache.aurora.codec.ThriftBinaryCodec.CodingException; +import org.apache.aurora.gen.storage.Snapshot; +import org.apache.aurora.scheduler.storage.Storage.StorageException; + /** - * Storage mechanism that is able to create complete snapshots of the local storage system state - * and apply these to restore local storage from a snapshotted baseline. + * A storage component that applies full-state snapshots. */ -public interface SnapshotStore<T> { +public interface SnapshotStore { /** - * Creates a consistent snapshot of the local storage system. - * - * @return A blob that can be used to recover local storage via {@link #applySnapshot(Object)}. + * Clean up the underlying storage by optimizing internal data structures. Does not change + * externally-visible state but might not run concurrently with write operations. */ - T createSnapshot(); + void snapshot() throws StorageException; /** - * Applies a snapshot blob to the local storage system, wiping out all existing data and - * resetting with the contents of the snapshot. + * Identical to {@link #snapshot()}, using a custom {@link Snapshot} rather than an + * internally-generated one based on the current state. * - * @param snapshot A snapshot blob created by {@link #createSnapshot()}. + * @param snapshot Snapshot to write. + * @throws CodingException If the snapshot could not be serialized. */ - void applySnapshot(T snapshot); + void snapshotWith(Snapshot snapshot) throws CodingException; } http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/Snapshotter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Snapshotter.java b/src/main/java/org/apache/aurora/scheduler/storage/Snapshotter.java new file mode 100644 index 0000000..0966faf --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/Snapshotter.java @@ -0,0 +1,43 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage; + +import java.util.stream.Stream; + +import org.apache.aurora.gen.storage.Op; +import org.apache.aurora.gen.storage.Snapshot; +import org.apache.aurora.scheduler.storage.Storage.StoreProvider; + +/** + * Logic to convert storage contents into a snapshot, and a snapshot into a stream of storage + * operations. + */ +public interface Snapshotter { + + /** + * Creates a snapshot from the contents of storage. + * + * @param stores stores to create a snapshot from. + * @return A snapshot that can be used to recover storage. + */ + Snapshot from(StoreProvider stores); + + /** + * Converts a snapshot into an equivalent linear stream of storage operations. + * + * @param snapshot A snapshot created by {@link #from(StoreProvider)}. + * @return a stream of operations representing the contents of the snapshot. + */ + Stream<Op> asStream(Snapshot snapshot); +} http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/backup/BackupModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/BackupModule.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/BackupModule.java index 7eaae89..4397c1e 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/backup/BackupModule.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/BackupModule.java @@ -32,7 +32,7 @@ import org.apache.aurora.common.quantity.Time; import org.apache.aurora.gen.storage.Snapshot; import org.apache.aurora.scheduler.base.AsyncUtil; import org.apache.aurora.scheduler.config.types.TimeAmount; -import org.apache.aurora.scheduler.storage.SnapshotStore; +import org.apache.aurora.scheduler.storage.Snapshotter; import org.apache.aurora.scheduler.storage.backup.Recovery.RecoveryImpl; import org.apache.aurora.scheduler.storage.backup.StorageBackup.StorageBackupImpl; import org.apache.aurora.scheduler.storage.backup.StorageBackup.StorageBackupImpl.BackupConfig; @@ -66,9 +66,9 @@ public class BackupModule extends PrivateModule { } private final Options options; - private final Class<? extends SnapshotStore<Snapshot>> snapshotStore; + private final Class<? extends Snapshotter> snapshotStore; - public BackupModule(Options options, Class<? extends SnapshotStore<Snapshot>> snapshotStore) { + public BackupModule(Options options, Class<? extends Snapshotter> snapshotStore) { this.options = options; this.snapshotStore = snapshotStore; } @@ -78,13 +78,13 @@ public class BackupModule extends PrivateModule { Executor executor = AsyncUtil.singleThreadLoggingScheduledExecutor("StorageBackup-%d", LOG); bind(Executor.class).toInstance(executor); - TypeLiteral<SnapshotStore<Snapshot>> type = new TypeLiteral<SnapshotStore<Snapshot>>() { }; - bind(type).annotatedWith(StorageBackupImpl.SnapshotDelegate.class).to(snapshotStore); + bind(Snapshotter.class).annotatedWith(StorageBackupImpl.SnapshotDelegate.class) + .to(snapshotStore); - bind(type).to(StorageBackupImpl.class); + bind(Snapshotter.class).to(StorageBackupImpl.class); bind(StorageBackup.class).to(StorageBackupImpl.class); bind(StorageBackupImpl.class).in(Singleton.class); - expose(type); + expose(Snapshotter.class); expose(StorageBackup.class); bind(new TypeLiteral<Function<Snapshot, TemporaryStorage>>() { }) http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java index 3a62f02..79899a0 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java @@ -31,7 +31,7 @@ import org.apache.aurora.codec.ThriftBinaryCodec.CodingException; import org.apache.aurora.common.base.Command; import org.apache.aurora.gen.storage.Snapshot; import org.apache.aurora.scheduler.base.Query; -import org.apache.aurora.scheduler.storage.DistributedSnapshotStore; +import org.apache.aurora.scheduler.storage.SnapshotStore; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; @@ -110,7 +110,7 @@ public interface Recovery { private final Function<Snapshot, TemporaryStorage> tempStorageFactory; private final AtomicReference<PendingRecovery> recovery; private final Storage primaryStorage; - private final DistributedSnapshotStore distributedStore; + private final SnapshotStore snapshotStore; private final Command shutDownNow; @Inject @@ -118,14 +118,14 @@ public interface Recovery { File backupDir, Function<Snapshot, TemporaryStorage> tempStorageFactory, Storage primaryStorage, - DistributedSnapshotStore distributedStore, + SnapshotStore snapshotStore, Command shutDownNow) { this.backupDir = requireNonNull(backupDir); this.tempStorageFactory = requireNonNull(tempStorageFactory); this.recovery = Atomics.newReference(); this.primaryStorage = requireNonNull(primaryStorage); - this.distributedStore = requireNonNull(distributedStore); + this.snapshotStore = requireNonNull(snapshotStore); this.shutDownNow = requireNonNull(shutDownNow); } @@ -197,7 +197,7 @@ public interface Recovery { void commit() { primaryStorage.write((NoResult.Quiet) storeProvider -> { try { - distributedStore.snapshotWith(tempStorage.toSnapshot()); + snapshotStore.snapshotWith(tempStorage.toSnapshot()); shutDownNow.execute(); } catch (CodingException e) { throw new IllegalStateException("Failed to encode snapshot.", e); http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/backup/StorageBackup.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/StorageBackup.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/StorageBackup.java index 2d61678..1675893 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/backup/StorageBackup.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/StorageBackup.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Locale; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; import javax.inject.Inject; import javax.inject.Qualifier; @@ -42,8 +43,11 @@ import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.stats.Stats; import org.apache.aurora.common.util.Clock; +import org.apache.aurora.gen.storage.Op; import org.apache.aurora.gen.storage.Snapshot; -import org.apache.aurora.scheduler.storage.SnapshotStore; +import org.apache.aurora.scheduler.storage.Snapshotter; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.Storage.StoreProvider; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; @@ -69,7 +73,7 @@ public interface StorageBackup { */ void backupNow(); - class StorageBackupImpl implements StorageBackup, SnapshotStore<Snapshot> { + class StorageBackupImpl implements StorageBackup, Snapshotter { private static final Logger LOG = LoggerFactory.getLogger(StorageBackupImpl.class); private static final String FILE_PREFIX = "scheduler-backup-"; @@ -93,13 +97,14 @@ public interface StorageBackup { } /** - * Binding annotation that the underlying {@link SnapshotStore} must be bound with. + * Binding annotation that the underlying {@link Snapshotter} must be bound with. */ @Qualifier @Target({FIELD, PARAMETER, METHOD}) @Retention(RUNTIME) @interface SnapshotDelegate { } - private final SnapshotStore<Snapshot> delegate; + private final Storage storage; + private final Snapshotter delegate; private final Clock clock; private final long backupIntervalMs; private volatile long lastBackupMs; @@ -120,11 +125,13 @@ public interface StorageBackup { @Inject StorageBackupImpl( - @SnapshotDelegate SnapshotStore<Snapshot> delegate, + Storage storage, + @SnapshotDelegate Snapshotter delegate, Clock clock, BackupConfig config, Executor executor) { + this.storage = requireNonNull(storage); this.delegate = requireNonNull(delegate); this.clock = requireNonNull(clock); this.config = requireNonNull(config); @@ -135,8 +142,8 @@ public interface StorageBackup { } @Override - public Snapshot createSnapshot() { - final Snapshot snapshot = delegate.createSnapshot(); + public Snapshot from(StoreProvider stores) { + Snapshot snapshot = delegate.from(stores); if (clock.nowMillis() >= (lastBackupMs + backupIntervalMs)) { executor.execute(() -> save(snapshot)); } @@ -145,7 +152,7 @@ public interface StorageBackup { @Override public void backupNow() { - save(delegate.createSnapshot()); + save(storage.write(delegate::from)); } @VisibleForTesting @@ -210,8 +217,8 @@ public interface StorageBackup { static final Function<File, String> FILE_NAME = File::getName; @Override - public void applySnapshot(Snapshot snapshot) { - delegate.applySnapshot(snapshot); + public Stream<Op> asStream(Snapshot snapshot) { + return delegate.asStream(snapshot); } } } http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java index 18296b0..0305d9d 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java @@ -24,9 +24,11 @@ import org.apache.aurora.common.util.testing.FakeClock; import org.apache.aurora.gen.storage.Snapshot; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.Tasks; -import org.apache.aurora.scheduler.storage.SnapshotStore; +import org.apache.aurora.scheduler.storage.Snapshotter; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; +import org.apache.aurora.scheduler.storage.durability.Loader; +import org.apache.aurora.scheduler.storage.durability.Persistence.Edit; import org.apache.aurora.scheduler.storage.durability.ThriftBackfill; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl; @@ -78,16 +80,15 @@ interface TemporaryStorage { @Override public TemporaryStorage apply(Snapshot snapshot) { - final Storage storage = MemStorageModule.newEmptyStorage(); - final BuildInfo buildInfo = generateBuildInfo(); + Storage storage = MemStorageModule.newEmptyStorage(); + BuildInfo buildInfo = generateBuildInfo(); FakeClock clock = new FakeClock(); clock.setNowMillis(snapshot.getTimestamp()); - final SnapshotStore<Snapshot> snapshotStore = new SnapshotStoreImpl( - buildInfo, - clock, - storage, - thriftBackfill); - snapshotStore.applySnapshot(snapshot); + Snapshotter snapshotter = new SnapshotStoreImpl(buildInfo, clock); + + storage.write((NoResult.Quiet) stores -> { + Loader.load(stores, thriftBackfill, snapshotter.asStream(snapshot).map(Edit::op)); + }); return new TemporaryStorage() { @Override @@ -107,7 +108,7 @@ interface TemporaryStorage { @Override public Snapshot toSnapshot() { - return snapshotStore.createSnapshot(); + return storage.write(snapshotter::from); } }; } http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorage.java index 6a7c0ad..f1fdc27 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorage.java @@ -14,23 +14,13 @@ package org.apache.aurora.scheduler.storage.durability; import java.util.List; -import java.util.Map; import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Consumer; import javax.inject.Inject; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; - import org.apache.aurora.common.inject.TimedInterceptor.Timed; import org.apache.aurora.common.stats.SlidingStats; -import org.apache.aurora.gen.HostAttributes; import org.apache.aurora.gen.storage.Op; -import org.apache.aurora.gen.storage.SaveCronJob; -import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent; -import org.apache.aurora.gen.storage.SaveJobUpdateEvent; -import org.apache.aurora.gen.storage.SaveQuota; import org.apache.aurora.scheduler.base.SchedulerException; import org.apache.aurora.scheduler.events.EventSink; import org.apache.aurora.scheduler.storage.AttributeStore; @@ -43,12 +33,6 @@ import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage; import org.apache.aurora.scheduler.storage.TaskStore; import org.apache.aurora.scheduler.storage.durability.Persistence.PersistenceException; -import org.apache.aurora.scheduler.storage.entities.IHostAttributes; -import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent; -import org.apache.aurora.scheduler.storage.entities.IJobKey; -import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent; -import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; -import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static java.util.Objects.requireNonNull; @@ -101,32 +85,17 @@ public class DurableStorage implements NonVolatileStorage { void log(Op op); } - private static final Logger LOG = LoggerFactory.getLogger(DurableStorage.class); - private final Persistence persistence; private final Storage writeBehindStorage; - private final SchedulerStore.Mutable writeBehindSchedulerStore; - private final CronJobStore.Mutable writeBehindJobStore; - private final TaskStore.Mutable writeBehindTaskStore; - private final QuotaStore.Mutable writeBehindQuotaStore; - private final AttributeStore.Mutable writeBehindAttributeStore; - private final JobUpdateStore.Mutable writeBehindJobUpdateStore; private final ReentrantLock writeLock; private final ThriftBackfill thriftBackfill; - private final WriteAheadStorage writeAheadStorage; + private final WriteRecorder writeRecorder; - // TODO(wfarner): It should be possible to remove this flag now, since all call stacks when - // recovering are controlled at this layer (they're all calls to Mutable store implementations). - // The more involved change is changing SnapshotStore to accept a Mutable store provider to - // avoid a call to Storage.write() when we replay a Snapshot. - private boolean recovered = false; private TransactionRecorder transaction = null; private final SlidingStats writerWaitStats = new SlidingStats("storage_write_lock_wait", "ns"); - private final Map<Op._Fields, Consumer<Op>> transactionReplayActions; - @Inject DurableStorage( Persistence persistence, @@ -147,12 +116,6 @@ public class DurableStorage implements NonVolatileStorage { // we write directly to the writeBehind stores since we are replaying what's already persisted. // After that, all writes must succeed in Persistence before they may be considered successful. this.writeBehindStorage = requireNonNull(delegateStorage); - this.writeBehindSchedulerStore = requireNonNull(schedulerStore); - this.writeBehindJobStore = requireNonNull(jobStore); - this.writeBehindTaskStore = requireNonNull(taskStore); - this.writeBehindQuotaStore = requireNonNull(quotaStore); - this.writeBehindAttributeStore = requireNonNull(attributeStore); - this.writeBehindJobUpdateStore = requireNonNull(jobUpdateStore); this.writeLock = requireNonNull(writeLock); this.thriftBackfill = requireNonNull(thriftBackfill); TransactionManager transactionManager = new TransactionManager() { @@ -166,7 +129,7 @@ public class DurableStorage implements NonVolatileStorage { transaction.add(op); } }; - this.writeAheadStorage = new WriteAheadStorage( + this.writeRecorder = new WriteRecorder( transactionManager, schedulerStore, jobStore, @@ -174,81 +137,8 @@ public class DurableStorage implements NonVolatileStorage { quotaStore, attributeStore, jobUpdateStore, - LoggerFactory.getLogger(WriteAheadStorage.class), + LoggerFactory.getLogger(WriteRecorder.class), eventSink); - - this.transactionReplayActions = buildTransactionReplayActions(); - } - - @VisibleForTesting - final Map<Op._Fields, Consumer<Op>> buildTransactionReplayActions() { - return ImmutableMap.<Op._Fields, Consumer<Op>>builder() - .put( - Op._Fields.SAVE_FRAMEWORK_ID, - op -> writeBehindSchedulerStore.saveFrameworkId(op.getSaveFrameworkId().getId())) - .put(Op._Fields.SAVE_CRON_JOB, op -> { - SaveCronJob cronJob = op.getSaveCronJob(); - writeBehindJobStore.saveAcceptedJob( - thriftBackfill.backfillJobConfiguration(cronJob.getJobConfig())); - }) - .put( - Op._Fields.REMOVE_JOB, - op -> writeBehindJobStore.removeJob(IJobKey.build(op.getRemoveJob().getJobKey()))) - .put( - Op._Fields.SAVE_TASKS, - op -> writeBehindTaskStore.saveTasks( - thriftBackfill.backfillTasks(op.getSaveTasks().getTasks()))) - .put( - Op._Fields.REMOVE_TASKS, - op -> writeBehindTaskStore.deleteTasks(op.getRemoveTasks().getTaskIds())) - .put(Op._Fields.SAVE_QUOTA, op -> { - SaveQuota saveQuota = op.getSaveQuota(); - writeBehindQuotaStore.saveQuota( - saveQuota.getRole(), - ThriftBackfill.backfillResourceAggregate(saveQuota.getQuota())); - }) - .put( - Op._Fields.REMOVE_QUOTA, - op -> writeBehindQuotaStore.removeQuota(op.getRemoveQuota().getRole())) - .put(Op._Fields.SAVE_HOST_ATTRIBUTES, op -> { - HostAttributes attributes = op.getSaveHostAttributes().getHostAttributes(); - // Prior to commit 5cf760b, the store would persist maintenance mode changes for - // unknown hosts. 5cf760b began rejecting these, but the storage may still - // contain entries with a null slave ID. - if (attributes.isSetSlaveId()) { - writeBehindAttributeStore.saveHostAttributes(IHostAttributes.build(attributes)); - } else { - LOG.info("Dropping host attributes with no agent ID: " + attributes); - } - }) - .put( - Op._Fields.SAVE_LOCK, // TODO(jly): Deprecated, remove in 0.21. See AURORA-1959. - op -> { /* no-op */ }) - .put( - Op._Fields.REMOVE_LOCK, // TODO(jly): Deprecated, remove in 0.21. See AURORA-1959. - op -> { /* no-op */ }) - .put(Op._Fields.SAVE_JOB_UPDATE, op -> - writeBehindJobUpdateStore.saveJobUpdate( - thriftBackfill.backFillJobUpdate(op.getSaveJobUpdate().getJobUpdate()))) - .put(Op._Fields.SAVE_JOB_UPDATE_EVENT, op -> { - SaveJobUpdateEvent event = op.getSaveJobUpdateEvent(); - writeBehindJobUpdateStore.saveJobUpdateEvent( - IJobUpdateKey.build(event.getKey()), - IJobUpdateEvent.build(op.getSaveJobUpdateEvent().getEvent())); - }) - .put(Op._Fields.SAVE_JOB_INSTANCE_UPDATE_EVENT, op -> { - SaveJobInstanceUpdateEvent event = op.getSaveJobInstanceUpdateEvent(); - writeBehindJobUpdateStore.saveJobInstanceUpdateEvent( - IJobUpdateKey.build(event.getKey()), - IJobInstanceUpdateEvent.build(op.getSaveJobInstanceUpdateEvent().getEvent())); - }) - .put(Op._Fields.PRUNE_JOB_UPDATE_HISTORY, op -> { - LOG.info("Dropping prune operation. Updates will be pruned later."); - }) - .put(Op._Fields.REMOVE_JOB_UPDATE, op -> - writeBehindJobUpdateStore.removeJobUpdates( - IJobUpdateKey.setFromBuilders(op.getRemoveJobUpdate().getKeys()))) - .build(); } @Override @@ -260,18 +150,18 @@ public class DurableStorage implements NonVolatileStorage { @Override @Timed("scheduler_storage_start") - public synchronized void start(final MutateWork.NoResult.Quiet initializationLogic) { - write((NoResult.Quiet) unused -> { - // Must have the underlying storage started so we can query it. - // We replay these entries in the forwarded storage system's transactions but not ours - we - // do not want to re-record these ops. - recover(); - recovered = true; + public void start(final MutateWork.NoResult.Quiet initializationLogic) { + writeLock.lock(); + try { + // We recover directly into the forwarded system to avoid persisting replayed operations. + writeBehindStorage.write((NoResult.Quiet) this::recover); // Now that we're recovered we should persist any mutations done in initializationLogic, so // run it in one of our transactions. write(initializationLogic); - }); + } finally { + writeLock.unlock(); + } } @Override @@ -280,9 +170,9 @@ public class DurableStorage implements NonVolatileStorage { } @Timed("scheduler_storage_recover") - void recover() throws RecoveryFailedException { + void recover(MutableStoreProvider stores) throws RecoveryFailedException { try { - persistence.recover().forEach(DurableStorage.this::replayOp); + Loader.load(stores, thriftBackfill, persistence.recover()); } catch (PersistenceException e) { throw new RecoveryFailedException(e); } @@ -294,28 +184,19 @@ public class DurableStorage implements NonVolatileStorage { } } - private void replayOp(Op op) { - Op._Fields opField = op.getSetField(); - if (!transactionReplayActions.containsKey(opField)) { - throw new IllegalStateException("Unknown transaction op: " + opField); - } - - transactionReplayActions.get(opField).accept(op); - } - private <T, E extends Exception> T doInTransaction(final MutateWork<T, E> work) throws StorageException, E { // The transaction has already been set up so we just need to delegate with our store provider // so any mutations may be persisted. if (transaction != null) { - return work.apply(writeAheadStorage); + return work.apply(writeRecorder); } transaction = new TransactionRecorder(); try { return writeBehindStorage.write(unused -> { - T result = work.apply(writeAheadStorage); + T result = work.apply(writeRecorder); List<Op> ops = transaction.getOps(); if (!ops.isEmpty()) { try { @@ -337,12 +218,6 @@ public class DurableStorage implements NonVolatileStorage { writeLock.lock(); try { writerWaitStats.accumulate(System.nanoTime() - waitStart); - // We don't want to persist when recovering, we just want to update the underlying - // store - so pass mutations straight through to the underlying storage. - if (!recovered) { - return writeBehindStorage.write(work); - } - return doInTransaction(work); } finally { writeLock.unlock(); http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/durability/Loader.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/Loader.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/Loader.java new file mode 100644 index 0000000..10864f1 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/Loader.java @@ -0,0 +1,150 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.durability; + +import java.util.stream.Stream; + +import org.apache.aurora.gen.HostAttributes; +import org.apache.aurora.gen.storage.Op; +import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent; +import org.apache.aurora.gen.storage.SaveJobUpdateEvent; +import org.apache.aurora.gen.storage.SaveQuota; +import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; +import org.apache.aurora.scheduler.storage.durability.Persistence.Edit; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent; +import org.apache.aurora.scheduler.storage.entities.IJobKey; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class Loader { + + private static final Logger LOG = LoggerFactory.getLogger(Loader.class); + + private Loader() { + // Utility class. + } + + /** + * Loads a sequence of storage operations into the provided stores, applying backfills. + * + * @param stores Stores to populate. + * @param backfill Backfill mechanism to use. + * @param edits Edits to apply. + */ + public static void load( + MutableStoreProvider stores, + ThriftBackfill backfill, + Stream<Edit> edits) { + + edits.forEach(edit -> load(stores, backfill, edit)); + } + + private static void load(MutableStoreProvider stores, ThriftBackfill backfill, Edit edit) { + if (edit.isDeleteAll()) { + LOG.info("Resetting storage"); + stores.getCronJobStore().deleteJobs(); + stores.getUnsafeTaskStore().deleteAllTasks(); + stores.getQuotaStore().deleteQuotas(); + stores.getAttributeStore().deleteHostAttributes(); + stores.getJobUpdateStore().deleteAllUpdates(); + return; + } + + Op op = edit.getOp(); + switch (op.getSetField()) { + case SAVE_FRAMEWORK_ID: + stores.getSchedulerStore().saveFrameworkId(op.getSaveFrameworkId().getId()); + break; + + case SAVE_CRON_JOB: + stores.getCronJobStore().saveAcceptedJob( + backfill.backfillJobConfiguration(op.getSaveCronJob().getJobConfig())); + break; + + case REMOVE_JOB: + stores.getCronJobStore().removeJob(IJobKey.build(op.getRemoveJob().getJobKey())); + break; + + case REMOVE_LOCK: + case SAVE_LOCK: + // TODO(jly): Deprecated, remove in 0.21. See AURORA-1959. + break; + + case SAVE_TASKS: + stores.getUnsafeTaskStore().saveTasks(backfill.backfillTasks(op.getSaveTasks().getTasks())); + break; + + case REMOVE_TASKS: + stores.getUnsafeTaskStore().deleteTasks(op.getRemoveTasks().getTaskIds()); + break; + + case SAVE_QUOTA: + SaveQuota saveQuota = op.getSaveQuota(); + stores.getQuotaStore().saveQuota( + saveQuota.getRole(), + ThriftBackfill.backfillResourceAggregate(saveQuota.getQuota())); + break; + + case REMOVE_QUOTA: + stores.getQuotaStore().removeQuota(op.getRemoveQuota().getRole()); + break; + + case SAVE_HOST_ATTRIBUTES: + HostAttributes attributes = op.getSaveHostAttributes().getHostAttributes(); + // Prior to commit 5cf760b, the store would persist maintenance mode changes for + // unknown hosts. 5cf760b began rejecting these, but the storage may still + // contain entries with a null slave ID. + if (attributes.isSetSlaveId()) { + stores.getAttributeStore().saveHostAttributes(IHostAttributes.build(attributes)); + } else { + LOG.info("Dropping host attributes with no agent ID: " + attributes); + } + break; + + case SAVE_JOB_UPDATE: + stores.getJobUpdateStore().saveJobUpdate( + backfill.backFillJobUpdate(op.getSaveJobUpdate().getJobUpdate())); + break; + + case SAVE_JOB_UPDATE_EVENT: + SaveJobUpdateEvent jobEvent = op.getSaveJobUpdateEvent(); + stores.getJobUpdateStore().saveJobUpdateEvent( + IJobUpdateKey.build(jobEvent.getKey()), + IJobUpdateEvent.build(op.getSaveJobUpdateEvent().getEvent())); + break; + + case SAVE_JOB_INSTANCE_UPDATE_EVENT: + SaveJobInstanceUpdateEvent instanceEvent = op.getSaveJobInstanceUpdateEvent(); + stores.getJobUpdateStore().saveJobInstanceUpdateEvent( + IJobUpdateKey.build(instanceEvent.getKey()), + IJobInstanceUpdateEvent.build(op.getSaveJobInstanceUpdateEvent().getEvent())); + break; + + case PRUNE_JOB_UPDATE_HISTORY: + LOG.info("Dropping prune operation. Updates will be pruned later."); + break; + + case REMOVE_JOB_UPDATE: + stores.getJobUpdateStore().removeJobUpdates( + IJobUpdateKey.setFromBuilders(op.getRemoveJobUpdate().getKeys())); + break; + + default: + throw new IllegalArgumentException("Unrecognized op type " + op.getSetField()); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/durability/Persistence.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/Persistence.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/Persistence.java index 9eb862c..4476d90 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/durability/Persistence.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/Persistence.java @@ -13,10 +13,15 @@ */ package org.apache.aurora.scheduler.storage.durability; +import java.util.Objects; import java.util.stream.Stream; +import javax.annotation.Nullable; + import org.apache.aurora.gen.storage.Op; +import static java.util.Objects.requireNonNull; + /** * Persistence layer for storage operations. */ @@ -31,10 +36,10 @@ public interface Persistence { /** * Recovers previously-persisted records. * - * @return All persisted records. + * @return All edits to apply. * @throws PersistenceException If recovery failed. */ - Stream<Op> recover() throws PersistenceException; + Stream<Edit> recover() throws PersistenceException; /** * Saves new records. No records may be considered durably saved until this method returns @@ -46,6 +51,53 @@ public interface Persistence { void persist(Stream<Op> records) throws PersistenceException; /** + * An edit to apply when recovering from persistence. + */ + class Edit { + @Nullable private final Op op; + + private Edit(@Nullable Op op) { + this.op = op; + } + + public static Edit op(Op op) { + return new Edit(requireNonNull(op)); + } + + public static Edit deleteAll() { + return new Edit(null); + } + + public boolean isDeleteAll() { + return op == null; + } + + public Op getOp() { + return requireNonNull(op); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof Edit)) { + return false; + } + + Edit other = (Edit) obj; + return Objects.equals(op, other.op); + } + + @Override + public int hashCode() { + return Objects.hashCode(op); + } + + @Override + public String toString() { + return Objects.toString(op); + } + } + + /** * Thrown when a persistence operation fails. */ class PersistenceException extends Exception { http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorage.java deleted file mode 100644 index 667db06..0000000 --- a/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorage.java +++ /dev/null @@ -1,368 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.storage.durability; - -import java.util.List; -import java.util.Map; -import java.util.Set; - -import com.google.common.base.Function; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableSet; - -import org.apache.aurora.gen.storage.Op; -import org.apache.aurora.gen.storage.RemoveJob; -import org.apache.aurora.gen.storage.RemoveQuota; -import org.apache.aurora.gen.storage.RemoveTasks; -import org.apache.aurora.gen.storage.SaveCronJob; -import org.apache.aurora.gen.storage.SaveFrameworkId; -import org.apache.aurora.gen.storage.SaveHostAttributes; -import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent; -import org.apache.aurora.gen.storage.SaveJobUpdate; -import org.apache.aurora.gen.storage.SaveJobUpdateEvent; -import org.apache.aurora.gen.storage.SaveQuota; -import org.apache.aurora.gen.storage.SaveTasks; -import org.apache.aurora.scheduler.base.Query; -import org.apache.aurora.scheduler.events.EventSink; -import org.apache.aurora.scheduler.events.PubsubEvent; -import org.apache.aurora.scheduler.storage.AttributeStore; -import org.apache.aurora.scheduler.storage.CronJobStore; -import org.apache.aurora.scheduler.storage.JobUpdateStore; -import org.apache.aurora.scheduler.storage.QuotaStore; -import org.apache.aurora.scheduler.storage.SchedulerStore; -import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; -import org.apache.aurora.scheduler.storage.TaskStore; -import org.apache.aurora.scheduler.storage.durability.DurableStorage.TransactionManager; -import org.apache.aurora.scheduler.storage.entities.IHostAttributes; -import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; -import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent; -import org.apache.aurora.scheduler.storage.entities.IJobKey; -import org.apache.aurora.scheduler.storage.entities.IJobUpdate; -import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails; -import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent; -import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; -import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery; -import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import org.slf4j.Logger; - -import static java.util.Objects.requireNonNull; - -/** - * Mutable stores implementation that translates all operations to {@link Op}s (which are passed - * to a provided {@link TransactionManager}) before forwarding the operations to delegate mutable - * stores. - */ -public class WriteAheadStorage implements - MutableStoreProvider, - SchedulerStore.Mutable, - CronJobStore.Mutable, - TaskStore.Mutable, - QuotaStore.Mutable, - AttributeStore.Mutable, - JobUpdateStore.Mutable { - - private final TransactionManager transactionManager; - private final SchedulerStore.Mutable schedulerStore; - private final CronJobStore.Mutable jobStore; - private final TaskStore.Mutable taskStore; - private final QuotaStore.Mutable quotaStore; - private final AttributeStore.Mutable attributeStore; - private final JobUpdateStore.Mutable jobUpdateStore; - private final Logger log; - private final EventSink eventSink; - - /** - * Creates a new write-ahead storage that delegates to the providing default stores. - * - * @param transactionManager External controller for transaction operations. - * @param schedulerStore Delegate. - * @param jobStore Delegate. - * @param taskStore Delegate. - * @param quotaStore Delegate. - * @param attributeStore Delegate. - * @param jobUpdateStore Delegate. - */ - public WriteAheadStorage( - TransactionManager transactionManager, - SchedulerStore.Mutable schedulerStore, - CronJobStore.Mutable jobStore, - TaskStore.Mutable taskStore, - QuotaStore.Mutable quotaStore, - AttributeStore.Mutable attributeStore, - JobUpdateStore.Mutable jobUpdateStore, - Logger log, - EventSink eventSink) { - - this.transactionManager = requireNonNull(transactionManager); - this.schedulerStore = requireNonNull(schedulerStore); - this.jobStore = requireNonNull(jobStore); - this.taskStore = requireNonNull(taskStore); - this.quotaStore = requireNonNull(quotaStore); - this.attributeStore = requireNonNull(attributeStore); - this.jobUpdateStore = requireNonNull(jobUpdateStore); - this.log = requireNonNull(log); - this.eventSink = requireNonNull(eventSink); - } - - private void write(Op op) { - Preconditions.checkState( - transactionManager.hasActiveTransaction(), - "Mutating operations must be within a transaction."); - transactionManager.log(op); - } - - @Override - public void saveFrameworkId(final String frameworkId) { - requireNonNull(frameworkId); - - write(Op.saveFrameworkId(new SaveFrameworkId(frameworkId))); - schedulerStore.saveFrameworkId(frameworkId); - } - - @Override - public void deleteTasks(final Set<String> taskIds) { - requireNonNull(taskIds); - - write(Op.removeTasks(new RemoveTasks(taskIds))); - taskStore.deleteTasks(taskIds); - } - - @Override - public void saveTasks(final Set<IScheduledTask> newTasks) { - requireNonNull(newTasks); - - write(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(newTasks)))); - taskStore.saveTasks(newTasks); - } - - @Override - public Optional<IScheduledTask> mutateTask( - String taskId, - Function<IScheduledTask, IScheduledTask> mutator) { - - Optional<IScheduledTask> mutated = taskStore.mutateTask(taskId, mutator); - log.debug("Storing updated task to log: {}={}", taskId, mutated.get().getStatus()); - write(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder())))); - - return mutated; - } - - @Override - public void saveQuota(final String role, final IResourceAggregate quota) { - requireNonNull(role); - requireNonNull(quota); - - write(Op.saveQuota(new SaveQuota(role, quota.newBuilder()))); - quotaStore.saveQuota(role, quota); - } - - @Override - public boolean saveHostAttributes(final IHostAttributes attrs) { - requireNonNull(attrs); - - boolean changed = attributeStore.saveHostAttributes(attrs); - if (changed) { - write(Op.saveHostAttributes(new SaveHostAttributes(attrs.newBuilder()))); - eventSink.post(new PubsubEvent.HostAttributesChanged(attrs)); - } - return changed; - } - - @Override - public void removeJob(final IJobKey jobKey) { - requireNonNull(jobKey); - - write(Op.removeJob(new RemoveJob().setJobKey(jobKey.newBuilder()))); - jobStore.removeJob(jobKey); - } - - @Override - public void saveAcceptedJob(final IJobConfiguration jobConfig) { - requireNonNull(jobConfig); - - write(Op.saveCronJob(new SaveCronJob(jobConfig.newBuilder()))); - jobStore.saveAcceptedJob(jobConfig); - } - - @Override - public void removeQuota(final String role) { - requireNonNull(role); - - write(Op.removeQuota(new RemoveQuota(role))); - quotaStore.removeQuota(role); - } - - @Override - public void saveJobUpdate(IJobUpdate update) { - requireNonNull(update); - - write(Op.saveJobUpdate(new SaveJobUpdate().setJobUpdate(update.newBuilder()))); - jobUpdateStore.saveJobUpdate(update); - } - - @Override - public void saveJobUpdateEvent(IJobUpdateKey key, IJobUpdateEvent event) { - requireNonNull(key); - requireNonNull(event); - - write(Op.saveJobUpdateEvent(new SaveJobUpdateEvent(event.newBuilder(), key.newBuilder()))); - jobUpdateStore.saveJobUpdateEvent(key, event); - } - - @Override - public void saveJobInstanceUpdateEvent(IJobUpdateKey key, IJobInstanceUpdateEvent event) { - requireNonNull(key); - requireNonNull(event); - - write(Op.saveJobInstanceUpdateEvent( - new SaveJobInstanceUpdateEvent(event.newBuilder(), key.newBuilder()))); - jobUpdateStore.saveJobInstanceUpdateEvent(key, event); - } - - @Override - public void removeJobUpdates(Set<IJobUpdateKey> keys) { - requireNonNull(keys); - - // Compatibility mode - RemoveJobUpdates is not yet written since older versions cannot - // read it. JobUpdates are only removed implicitly when a snapshot is taken. - jobUpdateStore.removeJobUpdates(keys); - } - - @Override - public void deleteAllTasks() { - throw new UnsupportedOperationException( - "Unsupported since casual storage users should never be doing this."); - } - - @Override - public void deleteHostAttributes() { - throw new UnsupportedOperationException( - "Unsupported since casual storage users should never be doing this."); - } - - @Override - public void deleteJobs() { - throw new UnsupportedOperationException( - "Unsupported since casual storage users should never be doing this."); - } - - @Override - public void deleteQuotas() { - throw new UnsupportedOperationException( - "Unsupported since casual storage users should never be doing this."); - } - - @Override - public void deleteAllUpdates() { - throw new UnsupportedOperationException( - "Unsupported since casual storage users should never be doing this."); - } - - @Override - public SchedulerStore.Mutable getSchedulerStore() { - return this; - } - - @Override - public CronJobStore.Mutable getCronJobStore() { - return this; - } - - @Override - public TaskStore.Mutable getUnsafeTaskStore() { - return this; - } - - @Override - public QuotaStore.Mutable getQuotaStore() { - return this; - } - - @Override - public AttributeStore.Mutable getAttributeStore() { - return this; - } - - @Override - public TaskStore getTaskStore() { - return this; - } - - @Override - public JobUpdateStore.Mutable getJobUpdateStore() { - return this; - } - - @Override - public Optional<String> fetchFrameworkId() { - return this.schedulerStore.fetchFrameworkId(); - } - - @Override - public Iterable<IJobConfiguration> fetchJobs() { - return this.jobStore.fetchJobs(); - } - - @Override - public Optional<IJobConfiguration> fetchJob(IJobKey jobKey) { - return this.jobStore.fetchJob(jobKey); - } - - @Override - public Optional<IScheduledTask> fetchTask(String taskId) { - return this.taskStore.fetchTask(taskId); - } - - @Override - public Iterable<IScheduledTask> fetchTasks(Query.Builder query) { - return this.taskStore.fetchTasks(query); - } - - @Override - public Set<IJobKey> getJobKeys() { - return this.taskStore.getJobKeys(); - } - - @Override - public Optional<IResourceAggregate> fetchQuota(String role) { - return this.quotaStore.fetchQuota(role); - } - - @Override - public Map<String, IResourceAggregate> fetchQuotas() { - return this.quotaStore.fetchQuotas(); - } - - @Override - public Optional<IHostAttributes> getHostAttributes(String host) { - return this.attributeStore.getHostAttributes(host); - } - - @Override - public Set<IHostAttributes> getHostAttributes() { - return this.attributeStore.getHostAttributes(); - } - - @Override - public List<IJobUpdateDetails> fetchJobUpdates(IJobUpdateQuery query) { - return this.jobUpdateStore.fetchJobUpdates(query); - } - - @Override - public Optional<IJobUpdateDetails> fetchJobUpdate(IJobUpdateKey key) { - return this.jobUpdateStore.fetchJobUpdate(key); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java new file mode 100644 index 0000000..5ae834a --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java @@ -0,0 +1,368 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.durability; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; + +import org.apache.aurora.gen.storage.Op; +import org.apache.aurora.gen.storage.RemoveJob; +import org.apache.aurora.gen.storage.RemoveQuota; +import org.apache.aurora.gen.storage.RemoveTasks; +import org.apache.aurora.gen.storage.SaveCronJob; +import org.apache.aurora.gen.storage.SaveFrameworkId; +import org.apache.aurora.gen.storage.SaveHostAttributes; +import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent; +import org.apache.aurora.gen.storage.SaveJobUpdate; +import org.apache.aurora.gen.storage.SaveJobUpdateEvent; +import org.apache.aurora.gen.storage.SaveQuota; +import org.apache.aurora.gen.storage.SaveTasks; +import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.events.EventSink; +import org.apache.aurora.scheduler.events.PubsubEvent; +import org.apache.aurora.scheduler.storage.AttributeStore; +import org.apache.aurora.scheduler.storage.CronJobStore; +import org.apache.aurora.scheduler.storage.JobUpdateStore; +import org.apache.aurora.scheduler.storage.QuotaStore; +import org.apache.aurora.scheduler.storage.SchedulerStore; +import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; +import org.apache.aurora.scheduler.storage.TaskStore; +import org.apache.aurora.scheduler.storage.durability.DurableStorage.TransactionManager; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; +import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent; +import org.apache.aurora.scheduler.storage.entities.IJobKey; +import org.apache.aurora.scheduler.storage.entities.IJobUpdate; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery; +import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.slf4j.Logger; + +import static java.util.Objects.requireNonNull; + +/** + * Mutable stores implementation that translates all operations to {@link Op}s (which are passed + * to a provided {@link TransactionManager}) before forwarding the operations to delegate mutable + * stores. + */ +public class WriteRecorder implements + MutableStoreProvider, + SchedulerStore.Mutable, + CronJobStore.Mutable, + TaskStore.Mutable, + QuotaStore.Mutable, + AttributeStore.Mutable, + JobUpdateStore.Mutable { + + private final TransactionManager transactionManager; + private final SchedulerStore.Mutable schedulerStore; + private final CronJobStore.Mutable jobStore; + private final TaskStore.Mutable taskStore; + private final QuotaStore.Mutable quotaStore; + private final AttributeStore.Mutable attributeStore; + private final JobUpdateStore.Mutable jobUpdateStore; + private final Logger log; + private final EventSink eventSink; + + /** + * Creates a new write-ahead storage that delegates to the providing default stores. + * + * @param transactionManager External controller for transaction operations. + * @param schedulerStore Delegate. + * @param jobStore Delegate. + * @param taskStore Delegate. + * @param quotaStore Delegate. + * @param attributeStore Delegate. + * @param jobUpdateStore Delegate. + */ + public WriteRecorder( + TransactionManager transactionManager, + SchedulerStore.Mutable schedulerStore, + CronJobStore.Mutable jobStore, + TaskStore.Mutable taskStore, + QuotaStore.Mutable quotaStore, + AttributeStore.Mutable attributeStore, + JobUpdateStore.Mutable jobUpdateStore, + Logger log, + EventSink eventSink) { + + this.transactionManager = requireNonNull(transactionManager); + this.schedulerStore = requireNonNull(schedulerStore); + this.jobStore = requireNonNull(jobStore); + this.taskStore = requireNonNull(taskStore); + this.quotaStore = requireNonNull(quotaStore); + this.attributeStore = requireNonNull(attributeStore); + this.jobUpdateStore = requireNonNull(jobUpdateStore); + this.log = requireNonNull(log); + this.eventSink = requireNonNull(eventSink); + } + + private void write(Op op) { + Preconditions.checkState( + transactionManager.hasActiveTransaction(), + "Mutating operations must be within a transaction."); + transactionManager.log(op); + } + + @Override + public void saveFrameworkId(final String frameworkId) { + requireNonNull(frameworkId); + + write(Op.saveFrameworkId(new SaveFrameworkId(frameworkId))); + schedulerStore.saveFrameworkId(frameworkId); + } + + @Override + public void deleteTasks(final Set<String> taskIds) { + requireNonNull(taskIds); + + write(Op.removeTasks(new RemoveTasks(taskIds))); + taskStore.deleteTasks(taskIds); + } + + @Override + public void saveTasks(final Set<IScheduledTask> newTasks) { + requireNonNull(newTasks); + + write(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(newTasks)))); + taskStore.saveTasks(newTasks); + } + + @Override + public Optional<IScheduledTask> mutateTask( + String taskId, + Function<IScheduledTask, IScheduledTask> mutator) { + + Optional<IScheduledTask> mutated = taskStore.mutateTask(taskId, mutator); + log.debug("Storing updated task to log: {}={}", taskId, mutated.get().getStatus()); + write(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder())))); + + return mutated; + } + + @Override + public void saveQuota(final String role, final IResourceAggregate quota) { + requireNonNull(role); + requireNonNull(quota); + + write(Op.saveQuota(new SaveQuota(role, quota.newBuilder()))); + quotaStore.saveQuota(role, quota); + } + + @Override + public boolean saveHostAttributes(final IHostAttributes attrs) { + requireNonNull(attrs); + + boolean changed = attributeStore.saveHostAttributes(attrs); + if (changed) { + write(Op.saveHostAttributes(new SaveHostAttributes(attrs.newBuilder()))); + eventSink.post(new PubsubEvent.HostAttributesChanged(attrs)); + } + return changed; + } + + @Override + public void removeJob(final IJobKey jobKey) { + requireNonNull(jobKey); + + write(Op.removeJob(new RemoveJob().setJobKey(jobKey.newBuilder()))); + jobStore.removeJob(jobKey); + } + + @Override + public void saveAcceptedJob(final IJobConfiguration jobConfig) { + requireNonNull(jobConfig); + + write(Op.saveCronJob(new SaveCronJob(jobConfig.newBuilder()))); + jobStore.saveAcceptedJob(jobConfig); + } + + @Override + public void removeQuota(final String role) { + requireNonNull(role); + + write(Op.removeQuota(new RemoveQuota(role))); + quotaStore.removeQuota(role); + } + + @Override + public void saveJobUpdate(IJobUpdate update) { + requireNonNull(update); + + write(Op.saveJobUpdate(new SaveJobUpdate().setJobUpdate(update.newBuilder()))); + jobUpdateStore.saveJobUpdate(update); + } + + @Override + public void saveJobUpdateEvent(IJobUpdateKey key, IJobUpdateEvent event) { + requireNonNull(key); + requireNonNull(event); + + write(Op.saveJobUpdateEvent(new SaveJobUpdateEvent(event.newBuilder(), key.newBuilder()))); + jobUpdateStore.saveJobUpdateEvent(key, event); + } + + @Override + public void saveJobInstanceUpdateEvent(IJobUpdateKey key, IJobInstanceUpdateEvent event) { + requireNonNull(key); + requireNonNull(event); + + write(Op.saveJobInstanceUpdateEvent( + new SaveJobInstanceUpdateEvent(event.newBuilder(), key.newBuilder()))); + jobUpdateStore.saveJobInstanceUpdateEvent(key, event); + } + + @Override + public void removeJobUpdates(Set<IJobUpdateKey> keys) { + requireNonNull(keys); + + // Compatibility mode - RemoveJobUpdates is not yet written since older versions cannot + // read it. JobUpdates are only removed implicitly when a snapshot is taken. + jobUpdateStore.removeJobUpdates(keys); + } + + @Override + public void deleteAllTasks() { + throw new UnsupportedOperationException( + "Unsupported since casual storage users should never be doing this."); + } + + @Override + public void deleteHostAttributes() { + throw new UnsupportedOperationException( + "Unsupported since casual storage users should never be doing this."); + } + + @Override + public void deleteJobs() { + throw new UnsupportedOperationException( + "Unsupported since casual storage users should never be doing this."); + } + + @Override + public void deleteQuotas() { + throw new UnsupportedOperationException( + "Unsupported since casual storage users should never be doing this."); + } + + @Override + public void deleteAllUpdates() { + throw new UnsupportedOperationException( + "Unsupported since casual storage users should never be doing this."); + } + + @Override + public SchedulerStore.Mutable getSchedulerStore() { + return this; + } + + @Override + public CronJobStore.Mutable getCronJobStore() { + return this; + } + + @Override + public TaskStore.Mutable getUnsafeTaskStore() { + return this; + } + + @Override + public QuotaStore.Mutable getQuotaStore() { + return this; + } + + @Override + public AttributeStore.Mutable getAttributeStore() { + return this; + } + + @Override + public TaskStore getTaskStore() { + return this; + } + + @Override + public JobUpdateStore.Mutable getJobUpdateStore() { + return this; + } + + @Override + public Optional<String> fetchFrameworkId() { + return this.schedulerStore.fetchFrameworkId(); + } + + @Override + public Iterable<IJobConfiguration> fetchJobs() { + return this.jobStore.fetchJobs(); + } + + @Override + public Optional<IJobConfiguration> fetchJob(IJobKey jobKey) { + return this.jobStore.fetchJob(jobKey); + } + + @Override + public Optional<IScheduledTask> fetchTask(String taskId) { + return this.taskStore.fetchTask(taskId); + } + + @Override + public Iterable<IScheduledTask> fetchTasks(Query.Builder query) { + return this.taskStore.fetchTasks(query); + } + + @Override + public Set<IJobKey> getJobKeys() { + return this.taskStore.getJobKeys(); + } + + @Override + public Optional<IResourceAggregate> fetchQuota(String role) { + return this.quotaStore.fetchQuota(role); + } + + @Override + public Map<String, IResourceAggregate> fetchQuotas() { + return this.quotaStore.fetchQuotas(); + } + + @Override + public Optional<IHostAttributes> getHostAttributes(String host) { + return this.attributeStore.getHostAttributes(host); + } + + @Override + public Set<IHostAttributes> getHostAttributes() { + return this.attributeStore.getHostAttributes(); + } + + @Override + public List<IJobUpdateDetails> fetchJobUpdates(IJobUpdateQuery query) { + return this.jobUpdateStore.fetchJobUpdates(query); + } + + @Override + public Optional<IJobUpdateDetails> fetchJobUpdate(IJobUpdateKey key) { + return this.jobUpdateStore.fetchJobUpdate(key); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java index e70e605..8ca3169 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java @@ -16,31 +16,19 @@ package org.apache.aurora.scheduler.storage.log; import java.io.IOException; import java.util.Date; import java.util.Iterator; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; import javax.inject.Inject; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.MoreExecutors; - import org.apache.aurora.codec.ThriftBinaryCodec.CodingException; -import org.apache.aurora.common.application.ShutdownRegistry; -import org.apache.aurora.common.inject.TimedInterceptor.Timed; -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; import org.apache.aurora.gen.storage.LogEntry; import org.apache.aurora.gen.storage.Op; import org.apache.aurora.gen.storage.Snapshot; -import org.apache.aurora.scheduler.base.AsyncUtil; import org.apache.aurora.scheduler.log.Log.Stream.InvalidPositionException; import org.apache.aurora.scheduler.log.Log.Stream.StreamAccessException; -import org.apache.aurora.scheduler.storage.DistributedSnapshotStore; -import org.apache.aurora.scheduler.storage.SnapshotStore; -import org.apache.aurora.scheduler.storage.Storage.StorageException; +import org.apache.aurora.scheduler.storage.Snapshotter; import org.apache.aurora.scheduler.storage.durability.Persistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,42 +38,18 @@ import static java.util.Objects.requireNonNull; /** * Persistence layer that uses a replicated log. */ -class LogPersistence implements Persistence, DistributedSnapshotStore { +class LogPersistence implements Persistence { private static final Logger LOG = LoggerFactory.getLogger(LogPersistence.class); private final LogManager logManager; - private final SnapshotStore<Snapshot> snapshotStore; - private final SchedulingService schedulingService; - private final Amount<Long, Time> snapshotInterval; + private final Snapshotter snapshotter; private StreamManager streamManager; @Inject - LogPersistence( - Settings settings, - LogManager logManager, - SnapshotStore<Snapshot> snapshotStore, - ShutdownRegistry shutdownRegistry) { - - this(new ScheduledExecutorSchedulingService( - shutdownRegistry, - settings.getShutdownGracePeriod()), - settings.getSnapshotInterval(), - logManager, - snapshotStore); - } - - @VisibleForTesting - LogPersistence( - SchedulingService schedulingService, - Amount<Long, Time> snapshotInterval, - LogManager logManager, - SnapshotStore<Snapshot> snapshotStore) { - - this.schedulingService = requireNonNull(schedulingService); - this.snapshotInterval = requireNonNull(snapshotInterval); + LogPersistence(LogManager logManager, Snapshotter snapshotter) { this.logManager = requireNonNull(logManager); - this.snapshotStore = requireNonNull(snapshotStore); + this.snapshotter = requireNonNull(snapshotter); } @Override @@ -98,6 +62,15 @@ class LogPersistence implements Persistence, DistributedSnapshotStore { } } + /** + * Saves a snapshot to the log stream. + * + * @param snapshot Snapshot to save. + */ + void persist(Snapshot snapshot) { + streamManager.snapshot(snapshot); + } + @Override public void persist(Stream<Op> mutations) throws PersistenceException { try { @@ -108,9 +81,7 @@ class LogPersistence implements Persistence, DistributedSnapshotStore { } @Override - public Stream<Op> recover() throws PersistenceException { - scheduleSnapshots(); - + public Stream<Edit> recover() throws PersistenceException { try { Iterator<LogEntry> entries = streamManager.readFromBeginning(); Iterable<LogEntry> iterableEntries = () -> entries; @@ -118,139 +89,26 @@ class LogPersistence implements Persistence, DistributedSnapshotStore { return entryStream .filter(entry -> entry.getSetField() != LogEntry._Fields.NOOP) - .filter(entry -> { - if (entry.getSetField() == LogEntry._Fields.SNAPSHOT) { - Snapshot snapshot = entry.getSnapshot(); - LOG.info("Applying snapshot taken on " + new Date(snapshot.getTimestamp())); - snapshotStore.applySnapshot(snapshot); - return false; - } - return true; - }) - .peek(entry -> { - if (entry.getSetField() != LogEntry._Fields.TRANSACTION) { - throw new IllegalStateException("Unknown log entry type: " + entry.getSetField()); + .flatMap(entry -> { + switch (entry.getSetField()) { + case SNAPSHOT: + Snapshot snapshot = entry.getSnapshot(); + LOG.info("Applying snapshot taken on " + new Date(snapshot.getTimestamp())); + return Stream.concat( + Stream.of(Edit.deleteAll()), + snapshotter.asStream(snapshot) + .map(Edit::op)); + + case TRANSACTION: + return entry.getTransaction().getOps().stream() + .map(Edit::op); + + default: + throw new IllegalStateException("Unknown log entry type: " + entry.getSetField()); } - }) - .flatMap(entry -> entry.getTransaction().getOps().stream()); + }); } catch (CodingException | InvalidPositionException | StreamAccessException e) { throw new PersistenceException(e); } } - - private void scheduleSnapshots() { - if (snapshotInterval.getValue() > 0) { - schedulingService.doEvery(snapshotInterval, () -> { - try { - snapshot(); - } catch (StorageException e) { - if (e.getCause() == null) { - LOG.warn("StorageException when attempting to snapshot.", e); - } else { - LOG.warn(e.getMessage(), e.getCause()); - } - } - }); - } - } - - @Override - public void snapshot() throws StorageException { - try { - doSnapshot(); - } catch (CodingException e) { - throw new StorageException("Failed to encode a snapshot", e); - } catch (InvalidPositionException e) { - throw new StorageException("Saved snapshot but failed to truncate entries preceding it", e); - } catch (StreamAccessException e) { - throw new StorageException("Failed to create a snapshot", e); - } - } - - @Timed("scheduler_log_snapshot_persist") - @Override - public void snapshotWith(Snapshot snapshot) - throws CodingException, InvalidPositionException, StreamAccessException { - - streamManager.snapshot(snapshot); - } - - /** - * Forces a snapshot of the storage state. - * - * @throws CodingException If there is a problem encoding the snapshot. - * @throws InvalidPositionException If the log stream cursor is invalid. - * @throws StreamAccessException If there is a problem writing the snapshot to the log stream. - */ - @Timed("scheduler_log_snapshot") - void doSnapshot() throws CodingException, InvalidPositionException, StreamAccessException { - LOG.info("Creating snapshot."); - Snapshot snapshot = snapshotStore.createSnapshot(); - snapshotWith(snapshot); - LOG.info("Snapshot complete." - + " host attrs: " + snapshot.getHostAttributesSize() - + ", cron jobs: " + snapshot.getCronJobsSize() - + ", quota confs: " + snapshot.getQuotaConfigurationsSize() - + ", tasks: " + snapshot.getTasksSize() - + ", updates: " + snapshot.getJobUpdateDetailsSize()); - } - - /** - * A service that can schedule an action to be executed periodically. - */ - @VisibleForTesting - interface SchedulingService { - - /** - * Schedules an action to execute periodically. - * - * @param interval The time period to wait until running the {@code action} again. - * @param action The action to execute periodically. - */ - void doEvery(Amount<Long, Time> interval, Runnable action); - } - - private static class ScheduledExecutorSchedulingService implements SchedulingService { - private final ScheduledExecutorService scheduledExecutor; - - ScheduledExecutorSchedulingService(ShutdownRegistry shutdownRegistry, - Amount<Long, Time> shutdownGracePeriod) { - scheduledExecutor = AsyncUtil.singleThreadLoggingScheduledExecutor("LogStorage-%d", LOG); - shutdownRegistry.addAction(() -> MoreExecutors.shutdownAndAwaitTermination( - scheduledExecutor, - shutdownGracePeriod.getValue(), - shutdownGracePeriod.getUnit().getTimeUnit())); - } - - @Override - public void doEvery(Amount<Long, Time> interval, Runnable action) { - requireNonNull(interval); - requireNonNull(action); - - long delay = interval.getValue(); - TimeUnit timeUnit = interval.getUnit().getTimeUnit(); - scheduledExecutor.scheduleWithFixedDelay(action, delay, delay, timeUnit); - } - } - - /** - * Configuration settings for log persistence. - */ - public static class Settings { - private final Amount<Long, Time> shutdownGracePeriod; - private final Amount<Long, Time> snapshotInterval; - - Settings(Amount<Long, Time> shutdownGracePeriod, Amount<Long, Time> snapshotInterval) { - this.shutdownGracePeriod = requireNonNull(shutdownGracePeriod); - this.snapshotInterval = requireNonNull(snapshotInterval); - } - - public Amount<Long, Time> getShutdownGracePeriod() { - return shutdownGracePeriod; - } - - public Amount<Long, Time> getSnapshotInterval() { - return snapshotInterval; - } - } } http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java index 75ec42a..671593c 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java @@ -19,6 +19,7 @@ import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameters; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; +import com.google.inject.AbstractModule; import com.google.inject.PrivateModule; import com.google.inject.TypeLiteral; import com.google.inject.assistedinject.FactoryModuleBuilder; @@ -26,33 +27,28 @@ import com.google.inject.assistedinject.FactoryModuleBuilder; import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Data; import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.scheduler.SchedulerServicesModule; import org.apache.aurora.scheduler.config.types.DataAmount; import org.apache.aurora.scheduler.config.types.TimeAmount; import org.apache.aurora.scheduler.storage.CallOrderEnforcingStorage; -import org.apache.aurora.scheduler.storage.DistributedSnapshotStore; +import org.apache.aurora.scheduler.storage.SnapshotStore; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage; import org.apache.aurora.scheduler.storage.durability.DurableStorage; import org.apache.aurora.scheduler.storage.durability.Persistence; +import org.apache.aurora.scheduler.storage.log.EntrySerializer.EntrySerializerImpl; +import org.apache.aurora.scheduler.storage.log.LogManager.LogEntryHashFunction; import org.apache.aurora.scheduler.storage.log.LogManager.MaxEntrySize; -import org.apache.aurora.scheduler.storage.log.LogPersistence.Settings; - -import static org.apache.aurora.scheduler.storage.log.EntrySerializer.EntrySerializerImpl; -import static org.apache.aurora.scheduler.storage.log.LogManager.LogEntryHashFunction; -import static org.apache.aurora.scheduler.storage.log.SnapshotDeduplicator.SnapshotDeduplicatorImpl; +import org.apache.aurora.scheduler.storage.log.SnapshotDeduplicator.SnapshotDeduplicatorImpl; +import org.apache.aurora.scheduler.storage.log.SnapshotService.Settings; /** * Bindings for scheduler distributed log based storage. */ -public class LogStorageModule extends PrivateModule { +public class LogStorageModule extends AbstractModule { @Parameters(separators = "=") public static class Options { - @Parameter(names = "-dlog_shutdown_grace_period", - description = "Specifies the maximum time to wait for scheduled checkpoint and snapshot " - + "actions to complete before forcibly shutting down.") - public TimeAmount shutdownGracePeriod = new TimeAmount(2, Time.SECONDS); - @Parameter(names = "-dlog_snapshot_interval", description = "Specifies the frequency at which snapshots of local storage are taken and " + "written to the log.") @@ -73,34 +69,42 @@ public class LogStorageModule extends PrivateModule { @Override protected void configure() { - bind(Settings.class) - .toInstance(new Settings(options.shutdownGracePeriod, options.snapshotInterval)); - - bind(new TypeLiteral<Amount<Integer, Data>>() { }).annotatedWith(MaxEntrySize.class) - .toInstance(options.maxLogEntrySize); - bind(LogManager.class).in(Singleton.class); - bind(DurableStorage.class).in(Singleton.class); - - install(CallOrderEnforcingStorage.wrappingModule(DurableStorage.class)); - bind(LogPersistence.class).in(Singleton.class); - bind(Persistence.class).to(LogPersistence.class); - bind(DistributedSnapshotStore.class).to(LogPersistence.class); - expose(Persistence.class); - expose(Storage.class); - expose(NonVolatileStorage.class); - expose(DistributedSnapshotStore.class); - - bind(EntrySerializer.class).to(EntrySerializerImpl.class); - // TODO(ksweeney): We don't need a cryptographic checksum here - assess performance of MD5 - // versus a faster error-detection checksum like CRC32 for large Snapshots. - @SuppressWarnings("deprecation") - HashFunction hashFunction = Hashing.md5(); - bind(HashFunction.class).annotatedWith(LogEntryHashFunction.class).toInstance(hashFunction); - - bind(SnapshotDeduplicator.class).to(SnapshotDeduplicatorImpl.class); - - install(new FactoryModuleBuilder() - .implement(StreamManager.class, StreamManagerImpl.class) - .build(StreamManagerFactory.class)); + install(new PrivateModule() { + @Override + protected void configure() { + bind(Settings.class).toInstance(new Settings(options.snapshotInterval)); + + bind(new TypeLiteral<Amount<Integer, Data>>() { }).annotatedWith(MaxEntrySize.class) + .toInstance(options.maxLogEntrySize); + bind(LogManager.class).in(Singleton.class); + bind(DurableStorage.class).in(Singleton.class); + + install(CallOrderEnforcingStorage.wrappingModule(DurableStorage.class)); + bind(LogPersistence.class).in(Singleton.class); + bind(Persistence.class).to(LogPersistence.class); + bind(SnapshotStore.class).to(SnapshotService.class); + bind(SnapshotService.class).in(Singleton.class); + expose(SnapshotService.class); + expose(Persistence.class); + expose(Storage.class); + expose(NonVolatileStorage.class); + expose(SnapshotStore.class); + + bind(EntrySerializer.class).to(EntrySerializerImpl.class); + // TODO(ksweeney): We don't need a cryptographic checksum here - assess performance of MD5 + // versus a faster error-detection checksum like CRC32 for large Snapshots. + @SuppressWarnings("deprecation") + HashFunction hashFunction = Hashing.md5(); + bind(HashFunction.class).annotatedWith(LogEntryHashFunction.class).toInstance(hashFunction); + + bind(SnapshotDeduplicator.class).to(SnapshotDeduplicatorImpl.class); + + install(new FactoryModuleBuilder() + .implement(StreamManager.class, StreamManagerImpl.class) + .build(StreamManagerFactory.class)); + } + }); + + SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(SnapshotService.class); } }
