Extract a storage Persistence layer This extracts the `Log`- and `Snapshot`-specific details from `LogStorage`, leaving `DurableStorage`. `DurableStorage` is useful as a general-purpose `Storage` mutation observer, with `Persistence` being the minimal behavior needed for an underlying durability layer to provide.
Reviewed at https://reviews.apache.org/r/64234/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/cea43db9 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/cea43db9 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/cea43db9 Branch: refs/heads/master Commit: cea43db9ded1201f69a85a43fb67244c69cf5347 Parents: de8b375 Author: Bill Farner <[email protected]> Authored: Sat Dec 2 19:59:03 2017 -0800 Committer: Bill Farner <[email protected]> Committed: Sat Dec 2 19:59:03 2017 -0800 ---------------------------------------------------------------------- .../apache/aurora/codec/ThriftBinaryCodec.java | 2 +- .../aurora/scheduler/base/TaskTestUtil.java | 2 +- .../configuration/ConfigurationManager.java | 2 +- .../scheduler/resources/ResourceManager.java | 2 +- .../storage/CallOrderEnforcingStorage.java | 6 - .../storage/DistributedSnapshotStore.java | 15 +- .../aurora/scheduler/storage/Storage.java | 10 - .../scheduler/storage/backup/Recovery.java | 2 +- .../storage/backup/TemporaryStorage.java | 2 +- .../storage/durability/DurableStorage.java | 350 ++++++++ .../storage/durability/Persistence.java | 64 ++ .../storage/durability/ThriftBackfill.java | 175 ++++ .../storage/durability/TransactionRecorder.java | 122 +++ .../storage/durability/WriteAheadStorage.java | 368 ++++++++ .../scheduler/storage/log/LogPersistence.java | 257 ++++++ .../scheduler/storage/log/LogStorage.java | 576 ------------ .../scheduler/storage/log/LogStorageModule.java | 13 +- .../storage/log/SnapshotStoreImpl.java | 1 + .../scheduler/storage/log/StreamManager.java | 15 +- .../storage/log/StreamManagerImpl.java | 46 +- .../scheduler/storage/log/ThriftBackfill.java | 175 ---- .../storage/log/WriteAheadStorage.java | 369 -------- .../thrift/SchedulerThriftInterface.java | 8 +- .../app/local/FakeNonVolatileStorage.java | 5 - .../scheduler/app/local/LocalSchedulerMain.java | 13 +- .../scheduler/storage/backup/RecoveryTest.java | 4 +- .../storage/durability/DurableStorageTest.java | 781 ++++++++++++++++ .../storage/durability/ThriftBackfillTest.java | 222 +++++ .../durability/TransactionRecorderTest.java | 78 ++ .../durability/WriteAheadStorageTest.java | 166 ++++ .../scheduler/storage/log/LogManagerTest.java | 86 +- .../scheduler/storage/log/LogStorageTest.java | 897 ------------------- .../storage/log/NonVolatileStorageTest.java | 5 +- .../storage/log/SnapshotStoreImplIT.java | 1 + .../storage/log/ThriftBackfillTest.java | 222 ----- .../storage/log/WriteAheadStorageTest.java | 165 ---- .../thrift/SchedulerThriftInterfaceTest.java | 8 +- .../aurora/scheduler/thrift/ThriftIT.java | 2 + 38 files changed, 2687 insertions(+), 2550 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java b/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java index 3c12532..cdbe359 100644 --- a/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java +++ b/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java @@ -217,7 +217,7 @@ public final class ThriftBinaryCodec { /** * Thrown when serialization or deserialization failed. */ - public static class CodingException extends Exception { + public static class CodingException extends RuntimeException { public CodingException(String message) { super(message); } http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java b/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java index 5fe7b9b..e1f20f4 100644 --- a/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java +++ b/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java @@ -47,10 +47,10 @@ import org.apache.aurora.scheduler.configuration.ConfigurationManager; import org.apache.aurora.scheduler.configuration.ConfigurationManager.ConfigurationManagerSettings; import org.apache.aurora.scheduler.configuration.executor.ExecutorConfig; import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings; +import org.apache.aurora.scheduler.storage.durability.ThriftBackfill; import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; -import org.apache.aurora.scheduler.storage.log.ThriftBackfill; import org.apache.mesos.v1.Protos; import org.apache.mesos.v1.Protos.ExecutorID; import org.apache.mesos.v1.Protos.ExecutorInfo; http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java b/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java index fa2f39c..f3e98f2 100644 --- a/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java +++ b/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java @@ -40,6 +40,7 @@ import org.apache.aurora.scheduler.base.UserProvidedStrings; import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings; import org.apache.aurora.scheduler.resources.ResourceManager; import org.apache.aurora.scheduler.resources.ResourceType; +import org.apache.aurora.scheduler.storage.durability.ThriftBackfill; import org.apache.aurora.scheduler.storage.entities.IConstraint; import org.apache.aurora.scheduler.storage.entities.IContainer; import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; @@ -48,7 +49,6 @@ import org.apache.aurora.scheduler.storage.entities.IResource; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import org.apache.aurora.scheduler.storage.entities.ITaskConstraint; import org.apache.aurora.scheduler.storage.entities.IValueConstraint; -import org.apache.aurora.scheduler.storage.log.ThriftBackfill; import static java.util.Objects.requireNonNull; http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java index f9dee22..d093753 100644 --- a/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java +++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java @@ -26,12 +26,12 @@ import com.google.common.base.Predicates; import com.google.common.collect.Iterables; import org.apache.aurora.gen.ResourceAggregate; +import org.apache.aurora.scheduler.storage.durability.ThriftBackfill; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.IResource; import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; -import org.apache.aurora.scheduler.storage.log.ThriftBackfill; import org.apache.mesos.v1.Protos.Resource; import static org.apache.aurora.scheduler.resources.ResourceType.BY_MESOS_NAME; http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java index 1b10ec5..25fd315 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java @@ -132,12 +132,6 @@ public class CallOrderEnforcingStorage implements NonVolatileStorage { return wrapped.write(work); } - @Override - public void snapshot() throws StorageException { - checkState(State.READY); - wrapped.snapshot(); - } - /** * Creates a binding module that will wrap a storage class with {@link CallOrderEnforcingStorage}, * exposing the order-enforced storage as {@link Storage} and {@link NonVolatileStorage}. http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java b/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java index 4ddee40..0c6a955 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java @@ -15,18 +15,25 @@ package org.apache.aurora.scheduler.storage; import org.apache.aurora.codec.ThriftBinaryCodec.CodingException; import org.apache.aurora.gen.storage.Snapshot; +import org.apache.aurora.scheduler.storage.Storage.StorageException; /** * A distributed snapshot store that supports persisting globally-visible snapshots. */ public interface DistributedSnapshotStore { + + /** + * Clean up the underlying storage by optimizing internal data structures. Does not change + * externally-visible state but might not run concurrently with write operations. + */ + void snapshot() throws StorageException; + /** - * Writes a snapshot to the distributed storage system. - * TODO(William Farner): Currently we're hiding some exceptions (which happen to be - * RuntimeExceptions). Clean these up to be checked, and throw another exception type here. + * Identical to {@link #snapshot()}, using a custom {@link Snapshot} rather than an + * internally-generated one based on the current state. * * @param snapshot Snapshot to write. * @throws CodingException If the snapshot could not be serialized. */ - void persist(Snapshot snapshot) throws CodingException; + void snapshotWith(Snapshot snapshot) throws CodingException; } http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/Storage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java index 7d325b6..c9ea1de 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java @@ -196,10 +196,6 @@ public interface Storage { * Executes the unit of read-only {@code work}. The consistency model creates the possibility * for a reader to read uncommitted state from a concurrent writer. * <p> - * TODO(wfarner): Update this documentation once all stores are backed by - * {@link org.apache.aurora.scheduler.storage.db.DbStorage}, as the concurrency behavior will then - * be dictated by the {@link org.mybatis.guice.transactional.Transactional#isolation()} used. - * <p> * TODO(wfarner): This method no longer needs to exist now that there is no global locking for * reads. We could instead directly inject the individual stores where they are used, as long * as the stores have a layer to replicate what is currently done by @@ -253,12 +249,6 @@ public interface Storage { void start(MutateWork.NoResult.Quiet initializationLogic) throws StorageException; /** - * Clean up the underlying storage by optimizing internal data structures. Does not change - * externally-visible state but might not run concurrently with write operations. - */ - void snapshot() throws StorageException; - - /** * Prepares the underlying storage system for clean shutdown. */ void stop(); http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java index 6cd5b2b..3a62f02 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java @@ -197,7 +197,7 @@ public interface Recovery { void commit() { primaryStorage.write((NoResult.Quiet) storeProvider -> { try { - distributedStore.persist(tempStorage.toSnapshot()); + distributedStore.snapshotWith(tempStorage.toSnapshot()); shutDownNow.execute(); } catch (CodingException e) { throw new IllegalStateException("Failed to encode snapshot.", e); http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java index 3000796..18296b0 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java @@ -27,9 +27,9 @@ import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.storage.SnapshotStore; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; +import org.apache.aurora.scheduler.storage.durability.ThriftBackfill; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl; -import org.apache.aurora.scheduler.storage.log.ThriftBackfill; import org.apache.aurora.scheduler.storage.mem.MemStorageModule; import static java.util.Objects.requireNonNull; http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorage.java new file mode 100644 index 0000000..85b2113 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorage.java @@ -0,0 +1,350 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.durability; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; + +import javax.inject.Inject; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; + +import org.apache.aurora.common.inject.TimedInterceptor.Timed; +import org.apache.aurora.common.stats.SlidingStats; +import org.apache.aurora.gen.HostAttributes; +import org.apache.aurora.gen.storage.Op; +import org.apache.aurora.gen.storage.SaveCronJob; +import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent; +import org.apache.aurora.gen.storage.SaveJobUpdateEvent; +import org.apache.aurora.gen.storage.SaveQuota; +import org.apache.aurora.scheduler.base.SchedulerException; +import org.apache.aurora.scheduler.events.EventSink; +import org.apache.aurora.scheduler.storage.AttributeStore; +import org.apache.aurora.scheduler.storage.CronJobStore; +import org.apache.aurora.scheduler.storage.JobUpdateStore; +import org.apache.aurora.scheduler.storage.QuotaStore; +import org.apache.aurora.scheduler.storage.SchedulerStore; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; +import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage; +import org.apache.aurora.scheduler.storage.TaskStore; +import org.apache.aurora.scheduler.storage.durability.Persistence.PersistenceException; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent; +import org.apache.aurora.scheduler.storage.entities.IJobKey; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Objects.requireNonNull; + +/** + * A storage implementation that ensures storage mutations are written to a persistence layer. + * + * <p>In the classic write-ahead log usage we'd perform mutations as follows: + * <ol> + * <li>record op</li> + * <li>perform op locally</li> + * <li>persist ops</li> + * </ol> + * + * <p>Writing the operation to persistences ensures we have a record of our mutation in case we + * should need to recover state later after a crash or on a new host (assuming the scheduler is + * distributed). We then apply the mutation to a local (in-memory) data structure for serving fast + * read requests. + * + * <p>This implementation leverages a local transaction to handle this: + * <ol> + * <li>start local transaction</li> + * <li>perform op locally (uncommitted!)</li> + * <li>write op to persistence</li> + * </ol> + * + * <p>If the op fails to apply to local storage we will never persist the op, and if the op + * fails to persist, it'll throw and abort the local storage operation as well. + */ +public class DurableStorage implements NonVolatileStorage { + + /** + * A maintainer for context about open transactions. Assumes that an external entity is + * responsible for opening and closing transactions. + */ + interface TransactionManager { + + /** + * Checks whether there is an open transaction. + * + * @return {@code true} if there is an open transaction, {@code false} otherwise. + */ + boolean hasActiveTransaction(); + + /** + * Adds an operation to the existing transaction. + * + * @param op Operation to include in the existing transaction. + */ + void log(Op op); + } + + private static final Logger LOG = LoggerFactory.getLogger(DurableStorage.class); + + private final Persistence persistence; + private final Storage writeBehindStorage; + private final SchedulerStore.Mutable writeBehindSchedulerStore; + private final CronJobStore.Mutable writeBehindJobStore; + private final TaskStore.Mutable writeBehindTaskStore; + private final QuotaStore.Mutable writeBehindQuotaStore; + private final AttributeStore.Mutable writeBehindAttributeStore; + private final JobUpdateStore.Mutable writeBehindJobUpdateStore; + private final ReentrantLock writeLock; + private final ThriftBackfill thriftBackfill; + + private final WriteAheadStorage writeAheadStorage; + + // TODO(wfarner): It should be possible to remove this flag now, since all call stacks when + // recovering are controlled at this layer (they're all calls to Mutable store implementations). + // The more involved change is changing SnapshotStore to accept a Mutable store provider to + // avoid a call to Storage.write() when we replay a Snapshot. + private boolean recovered = false; + private TransactionRecorder transaction = null; + + private final SlidingStats writerWaitStats = new SlidingStats("storage_write_lock_wait", "ns"); + + private final Map<Op._Fields, Consumer<Op>> transactionReplayActions; + + @Inject + DurableStorage( + Persistence persistence, + @Volatile Storage delegateStorage, + @Volatile SchedulerStore.Mutable schedulerStore, + @Volatile CronJobStore.Mutable jobStore, + @Volatile TaskStore.Mutable taskStore, + @Volatile QuotaStore.Mutable quotaStore, + @Volatile AttributeStore.Mutable attributeStore, + @Volatile JobUpdateStore.Mutable jobUpdateStore, + EventSink eventSink, + ReentrantLock writeLock, + ThriftBackfill thriftBackfill) { + + this.persistence = requireNonNull(persistence); + + // DurableStorage has two distinct operating modes: pre- and post-recovery. When recovering, + // we write directly to the writeBehind stores since we are replaying what's already persisted. + // After that, all writes must succeed in Persistence before they may be considered successful. + this.writeBehindStorage = requireNonNull(delegateStorage); + this.writeBehindSchedulerStore = requireNonNull(schedulerStore); + this.writeBehindJobStore = requireNonNull(jobStore); + this.writeBehindTaskStore = requireNonNull(taskStore); + this.writeBehindQuotaStore = requireNonNull(quotaStore); + this.writeBehindAttributeStore = requireNonNull(attributeStore); + this.writeBehindJobUpdateStore = requireNonNull(jobUpdateStore); + this.writeLock = requireNonNull(writeLock); + this.thriftBackfill = requireNonNull(thriftBackfill); + TransactionManager transactionManager = new TransactionManager() { + @Override + public boolean hasActiveTransaction() { + return transaction != null; + } + + @Override + public void log(Op op) { + transaction.add(op); + } + }; + this.writeAheadStorage = new WriteAheadStorage( + transactionManager, + schedulerStore, + jobStore, + taskStore, + quotaStore, + attributeStore, + jobUpdateStore, + LoggerFactory.getLogger(WriteAheadStorage.class), + eventSink); + + this.transactionReplayActions = buildTransactionReplayActions(); + } + + @VisibleForTesting + final Map<Op._Fields, Consumer<Op>> buildTransactionReplayActions() { + return ImmutableMap.<Op._Fields, Consumer<Op>>builder() + .put( + Op._Fields.SAVE_FRAMEWORK_ID, + op -> writeBehindSchedulerStore.saveFrameworkId(op.getSaveFrameworkId().getId())) + .put(Op._Fields.SAVE_CRON_JOB, op -> { + SaveCronJob cronJob = op.getSaveCronJob(); + writeBehindJobStore.saveAcceptedJob( + thriftBackfill.backfillJobConfiguration(cronJob.getJobConfig())); + }) + .put( + Op._Fields.REMOVE_JOB, + op -> writeBehindJobStore.removeJob(IJobKey.build(op.getRemoveJob().getJobKey()))) + .put( + Op._Fields.SAVE_TASKS, + op -> writeBehindTaskStore.saveTasks( + thriftBackfill.backfillTasks(op.getSaveTasks().getTasks()))) + .put( + Op._Fields.REMOVE_TASKS, + op -> writeBehindTaskStore.deleteTasks(op.getRemoveTasks().getTaskIds())) + .put(Op._Fields.SAVE_QUOTA, op -> { + SaveQuota saveQuota = op.getSaveQuota(); + writeBehindQuotaStore.saveQuota( + saveQuota.getRole(), + ThriftBackfill.backfillResourceAggregate(saveQuota.getQuota())); + }) + .put( + Op._Fields.REMOVE_QUOTA, + op -> writeBehindQuotaStore.removeQuota(op.getRemoveQuota().getRole())) + .put(Op._Fields.SAVE_HOST_ATTRIBUTES, op -> { + HostAttributes attributes = op.getSaveHostAttributes().getHostAttributes(); + // Prior to commit 5cf760b, the store would persist maintenance mode changes for + // unknown hosts. 5cf760b began rejecting these, but the storage may still + // contain entries with a null slave ID. + if (attributes.isSetSlaveId()) { + writeBehindAttributeStore.saveHostAttributes(IHostAttributes.build(attributes)); + } else { + LOG.info("Dropping host attributes with no agent ID: " + attributes); + } + }) + .put(Op._Fields.SAVE_JOB_UPDATE, op -> + writeBehindJobUpdateStore.saveJobUpdate( + thriftBackfill.backFillJobUpdate(op.getSaveJobUpdate().getJobUpdate()))) + .put(Op._Fields.SAVE_JOB_UPDATE_EVENT, op -> { + SaveJobUpdateEvent event = op.getSaveJobUpdateEvent(); + writeBehindJobUpdateStore.saveJobUpdateEvent( + IJobUpdateKey.build(event.getKey()), + IJobUpdateEvent.build(op.getSaveJobUpdateEvent().getEvent())); + }) + .put(Op._Fields.SAVE_JOB_INSTANCE_UPDATE_EVENT, op -> { + SaveJobInstanceUpdateEvent event = op.getSaveJobInstanceUpdateEvent(); + writeBehindJobUpdateStore.saveJobInstanceUpdateEvent( + IJobUpdateKey.build(event.getKey()), + IJobInstanceUpdateEvent.build(op.getSaveJobInstanceUpdateEvent().getEvent())); + }) + .put(Op._Fields.PRUNE_JOB_UPDATE_HISTORY, op -> { + LOG.info("Dropping prune operation. Updates will be pruned later."); + }) + .put(Op._Fields.REMOVE_JOB_UPDATE, op -> + writeBehindJobUpdateStore.removeJobUpdates( + IJobUpdateKey.setFromBuilders(op.getRemoveJobUpdate().getKeys()))) + .build(); + } + + @Override + @Timed("scheduler_storage_prepare") + public synchronized void prepare() { + writeBehindStorage.prepare(); + persistence.prepare(); + } + + @Override + @Timed("scheduler_storage_start") + public synchronized void start(final MutateWork.NoResult.Quiet initializationLogic) { + write((NoResult.Quiet) unused -> { + // Must have the underlying storage started so we can query it. + // We replay these entries in the forwarded storage system's transactions but not ours - we + // do not want to re-record these ops. + recover(); + recovered = true; + + // Now that we're recovered we should persist any mutations done in initializationLogic, so + // run it in one of our transactions. + write(initializationLogic); + }); + } + + @Override + public void stop() { + // No-op. + } + + @Timed("scheduler_storage_recover") + void recover() throws RecoveryFailedException { + try { + persistence.recover().forEach(DurableStorage.this::replayOp); + } catch (PersistenceException e) { + throw new RecoveryFailedException(e); + } + } + + private static final class RecoveryFailedException extends SchedulerException { + RecoveryFailedException(Throwable cause) { + super(cause); + } + } + + private void replayOp(Op op) { + Op._Fields opField = op.getSetField(); + if (!transactionReplayActions.containsKey(opField)) { + throw new IllegalStateException("Unknown transaction op: " + opField); + } + + transactionReplayActions.get(opField).accept(op); + } + + private <T, E extends Exception> T doInTransaction(final MutateWork<T, E> work) + throws StorageException, E { + + // The transaction has already been set up so we just need to delegate with our store provider + // so any mutations may be persisted. + if (transaction != null) { + return work.apply(writeAheadStorage); + } + + transaction = new TransactionRecorder(); + try { + return writeBehindStorage.write(unused -> { + T result = work.apply(writeAheadStorage); + List<Op> ops = transaction.getOps(); + if (!ops.isEmpty()) { + try { + persistence.persist(ops.stream()); + } catch (PersistenceException e) { + throw new StorageException("Failed to persist storage changes", e); + } + } + return result; + }); + } finally { + transaction = null; + } + } + + @Override + public <T, E extends Exception> T write(final MutateWork<T, E> work) throws StorageException, E { + long waitStart = System.nanoTime(); + writeLock.lock(); + try { + writerWaitStats.accumulate(System.nanoTime() - waitStart); + // We don't want to persist when recovering, we just want to update the underlying + // store - so pass mutations straight through to the underlying storage. + if (!recovered) { + return writeBehindStorage.write(work); + } + + return doInTransaction(work); + } finally { + writeLock.unlock(); + } + } + + @Override + public <T, E extends Exception> T read(Work<T, E> work) throws StorageException, E { + return writeBehindStorage.read(work); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/durability/Persistence.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/Persistence.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/Persistence.java new file mode 100644 index 0000000..9eb862c --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/Persistence.java @@ -0,0 +1,64 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.durability; + +import java.util.stream.Stream; + +import org.apache.aurora.gen.storage.Op; + +/** + * Persistence layer for storage operations. + */ +public interface Persistence { + + /** + * Prepares the persistence layer. The implementation may use this, for example, to advertise as + * a replica to cohort schedulers, or begin syncing state for warm standby. + */ + void prepare(); + + /** + * Recovers previously-persisted records. + * + * @return All persisted records. + * @throws PersistenceException If recovery failed. + */ + Stream<Op> recover() throws PersistenceException; + + /** + * Saves new records. No records may be considered durably saved until this method returns + * successfully. + * + * @param records Records to save. + * @throws PersistenceException If the records could not be saved. + */ + void persist(Stream<Op> records) throws PersistenceException; + + /** + * Thrown when a persistence operation fails. + */ + class PersistenceException extends Exception { + public PersistenceException(String msg) { + super(msg); + } + + public PersistenceException(Throwable cause) { + super(cause); + } + + public PersistenceException(String msg, Throwable cause) { + super(msg, cause); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfill.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfill.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfill.java new file mode 100644 index 0000000..4425d02 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfill.java @@ -0,0 +1,175 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.durability; + +import java.util.EnumSet; +import java.util.Set; +import java.util.stream.Collectors; + +import com.google.inject.Inject; + +import org.apache.aurora.GuavaUtils; +import org.apache.aurora.gen.JobConfiguration; +import org.apache.aurora.gen.JobUpdate; +import org.apache.aurora.gen.JobUpdateInstructions; +import org.apache.aurora.gen.Resource; +import org.apache.aurora.gen.ResourceAggregate; +import org.apache.aurora.gen.ScheduledTask; +import org.apache.aurora.gen.TaskConfig; +import org.apache.aurora.scheduler.TierInfo; +import org.apache.aurora.scheduler.TierManager; +import org.apache.aurora.scheduler.quota.QuotaManager; +import org.apache.aurora.scheduler.resources.ResourceType; +import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; +import org.apache.aurora.scheduler.storage.entities.IJobUpdate; +import org.apache.aurora.scheduler.storage.entities.IResource; +import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; + +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; + +import static org.apache.aurora.scheduler.resources.ResourceType.CPUS; +import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB; +import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB; + +/** + * Helps migrating thrift schema by populating deprecated and/or replacement fields. + */ +public final class ThriftBackfill { + + private final TierManager tierManager; + + @Inject + public ThriftBackfill(TierManager tierManager) { + this.tierManager = requireNonNull(tierManager); + } + + private static Resource getResource(Set<Resource> resources, ResourceType type) { + return resources.stream() + .filter(e -> ResourceType.fromResource(IResource.build(e)).equals(type)) + .findFirst() + .orElseThrow(() -> + new IllegalArgumentException("Missing resource definition for " + type)); + } + + /** + * Ensures TaskConfig.resources and correspondent task-level fields are all populated. + * + * @param config TaskConfig to backfill. + * @return Backfilled TaskConfig. + */ + public TaskConfig backfillTask(TaskConfig config) { + backfillTier(config); + return config; + } + + private void backfillTier(TaskConfig config) { + ITaskConfig taskConfig = ITaskConfig.build(config); + if (config.isSetTier()) { + TierInfo tier = tierManager.getTier(taskConfig); + config.setProduction(!tier.isPreemptible() && !tier.isRevocable()); + } else { + config.setTier(tierManager.getTiers() + .entrySet() + .stream() + .filter(e -> e.getValue().isPreemptible() == !taskConfig.isProduction() + && !e.getValue().isRevocable()) + .findFirst() + .orElseThrow(() -> new IllegalStateException( + format("No matching implicit tier for task of job %s", taskConfig.getJob()))) + .getKey()); + } + } + + /** + * Backfills JobConfiguration. See {@link #backfillTask(TaskConfig)}. + * + * @param jobConfig JobConfiguration to backfill. + * @return Backfilled JobConfiguration. + */ + public IJobConfiguration backfillJobConfiguration(JobConfiguration jobConfig) { + backfillTask(jobConfig.getTaskConfig()); + return IJobConfiguration.build(jobConfig); + } + + /** + * Backfills set of tasks. See {@link #backfillTask(TaskConfig)}. + * + * @param tasks Set of tasks to backfill. + * @return Backfilled set of tasks. + */ + public Set<IScheduledTask> backfillTasks(Set<ScheduledTask> tasks) { + return tasks.stream() + .map(t -> backfillScheduledTask(t)) + .map(IScheduledTask::build) + .collect(GuavaUtils.toImmutableSet()); + } + + /** + * Ensures ResourceAggregate.resources and correspondent deprecated fields are all populated. + * + * @param aggregate ResourceAggregate to backfill. + * @return Backfilled IResourceAggregate. + */ + public static IResourceAggregate backfillResourceAggregate(ResourceAggregate aggregate) { + if (!aggregate.isSetResources() || aggregate.getResources().isEmpty()) { + aggregate.addToResources(Resource.numCpus(aggregate.getNumCpus())); + aggregate.addToResources(Resource.ramMb(aggregate.getRamMb())); + aggregate.addToResources(Resource.diskMb(aggregate.getDiskMb())); + } else { + EnumSet<ResourceType> quotaResources = QuotaManager.QUOTA_RESOURCE_TYPES; + if (aggregate.getResources().size() > quotaResources.size()) { + throw new IllegalArgumentException("Too many resource values in quota."); + } + + if (!quotaResources.equals(aggregate.getResources().stream() + .map(e -> ResourceType.fromResource(IResource.build(e))) + .collect(Collectors.toSet()))) { + + throw new IllegalArgumentException("Quota resources must be exactly: " + quotaResources); + } + aggregate.setNumCpus( + getResource(aggregate.getResources(), CPUS).getNumCpus()); + aggregate.setRamMb( + getResource(aggregate.getResources(), RAM_MB).getRamMb()); + aggregate.setDiskMb( + getResource(aggregate.getResources(), DISK_MB).getDiskMb()); + } + return IResourceAggregate.build(aggregate); + } + + private ScheduledTask backfillScheduledTask(ScheduledTask task) { + backfillTask(task.getAssignedTask().getTask()); + return task; + } + + /** + * Backfills JobUpdate. See {@link #backfillTask(TaskConfig)}. + * + * @param update JobUpdate to backfill. + * @return Backfilled job update. + */ + public IJobUpdate backFillJobUpdate(JobUpdate update) { + JobUpdateInstructions instructions = update.getInstructions(); + if (instructions.isSetDesiredState()) { + backfillTask(instructions.getDesiredState().getTask()); + } + + instructions.getInitialState().forEach(e -> backfillTask(e.getTask())); + + return IJobUpdate.build(update); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorder.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorder.java new file mode 100644 index 0000000..1c811e3 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorder.java @@ -0,0 +1,122 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.durability; + +import java.util.List; +import java.util.Map; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import org.apache.aurora.gen.ScheduledTask; +import org.apache.aurora.gen.storage.Op; +import org.apache.aurora.gen.storage.RemoveTasks; +import org.apache.aurora.gen.storage.SaveHostAttributes; +import org.apache.aurora.gen.storage.SaveTasks; + +/** + * Records a sequence of mutations to the storage. + */ +class TransactionRecorder { + private final List<Op> ops = Lists.newArrayList(); + + void add(Op op) { + Op prior = Iterables.getLast(ops, null); + if (prior == null || !coalesce(prior, op)) { + ops.add(op); + } + } + + List<Op> getOps() { + return ops; + } + + /** + * Tries to coalesce a new op into the prior to compact the binary representation and increase + * batching. + * + * @param prior The previous op. + * @param next The next op to be added. + * @return {@code true} if the next op was coalesced into the prior, {@code false} otherwise. + */ + private boolean coalesce(Op prior, Op next) { + if (!prior.isSet() && !next.isSet()) { + return false; + } + + Op._Fields priorType = prior.getSetField(); + if (!priorType.equals(next.getSetField())) { + return false; + } + + switch (priorType) { + case SAVE_FRAMEWORK_ID: + prior.setSaveFrameworkId(next.getSaveFrameworkId()); + return true; + case SAVE_TASKS: + coalesce(prior.getSaveTasks(), next.getSaveTasks()); + return true; + case REMOVE_TASKS: + coalesce(prior.getRemoveTasks(), next.getRemoveTasks()); + return true; + case SAVE_HOST_ATTRIBUTES: + return coalesce(prior.getSaveHostAttributes(), next.getSaveHostAttributes()); + default: + return false; + } + } + + private void coalesce(SaveTasks prior, SaveTasks next) { + if (next.isSetTasks()) { + if (prior.isSetTasks()) { + // It is an expected invariant that an operation may reference a task (identified by + // task ID) no more than one time. Therefore, to coalesce two SaveTasks operations, + // the most recent task definition overrides the prior operation. + Map<String, ScheduledTask> coalesced = Maps.newHashMap(); + for (ScheduledTask task : prior.getTasks()) { + coalesced.put(task.getAssignedTask().getTaskId(), task); + } + for (ScheduledTask task : next.getTasks()) { + coalesced.put(task.getAssignedTask().getTaskId(), task); + } + prior.setTasks(ImmutableSet.copyOf(coalesced.values())); + } else { + prior.setTasks(next.getTasks()); + } + } + } + + private void coalesce(RemoveTasks prior, RemoveTasks next) { + if (next.isSetTaskIds()) { + if (prior.isSetTaskIds()) { + prior.setTaskIds(ImmutableSet.<String>builder() + .addAll(prior.getTaskIds()) + .addAll(next.getTaskIds()) + .build()); + } else { + prior.setTaskIds(next.getTaskIds()); + } + } + } + + private boolean coalesce(SaveHostAttributes prior, SaveHostAttributes next) { + if (prior.getHostAttributes().getHost().equals(next.getHostAttributes().getHost())) { + prior.getHostAttributes().setAttributes(next.getHostAttributes().getAttributes()); + return true; + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorage.java new file mode 100644 index 0000000..667db06 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorage.java @@ -0,0 +1,368 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.durability; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; + +import org.apache.aurora.gen.storage.Op; +import org.apache.aurora.gen.storage.RemoveJob; +import org.apache.aurora.gen.storage.RemoveQuota; +import org.apache.aurora.gen.storage.RemoveTasks; +import org.apache.aurora.gen.storage.SaveCronJob; +import org.apache.aurora.gen.storage.SaveFrameworkId; +import org.apache.aurora.gen.storage.SaveHostAttributes; +import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent; +import org.apache.aurora.gen.storage.SaveJobUpdate; +import org.apache.aurora.gen.storage.SaveJobUpdateEvent; +import org.apache.aurora.gen.storage.SaveQuota; +import org.apache.aurora.gen.storage.SaveTasks; +import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.events.EventSink; +import org.apache.aurora.scheduler.events.PubsubEvent; +import org.apache.aurora.scheduler.storage.AttributeStore; +import org.apache.aurora.scheduler.storage.CronJobStore; +import org.apache.aurora.scheduler.storage.JobUpdateStore; +import org.apache.aurora.scheduler.storage.QuotaStore; +import org.apache.aurora.scheduler.storage.SchedulerStore; +import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; +import org.apache.aurora.scheduler.storage.TaskStore; +import org.apache.aurora.scheduler.storage.durability.DurableStorage.TransactionManager; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; +import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent; +import org.apache.aurora.scheduler.storage.entities.IJobKey; +import org.apache.aurora.scheduler.storage.entities.IJobUpdate; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery; +import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.slf4j.Logger; + +import static java.util.Objects.requireNonNull; + +/** + * Mutable stores implementation that translates all operations to {@link Op}s (which are passed + * to a provided {@link TransactionManager}) before forwarding the operations to delegate mutable + * stores. + */ +public class WriteAheadStorage implements + MutableStoreProvider, + SchedulerStore.Mutable, + CronJobStore.Mutable, + TaskStore.Mutable, + QuotaStore.Mutable, + AttributeStore.Mutable, + JobUpdateStore.Mutable { + + private final TransactionManager transactionManager; + private final SchedulerStore.Mutable schedulerStore; + private final CronJobStore.Mutable jobStore; + private final TaskStore.Mutable taskStore; + private final QuotaStore.Mutable quotaStore; + private final AttributeStore.Mutable attributeStore; + private final JobUpdateStore.Mutable jobUpdateStore; + private final Logger log; + private final EventSink eventSink; + + /** + * Creates a new write-ahead storage that delegates to the providing default stores. + * + * @param transactionManager External controller for transaction operations. + * @param schedulerStore Delegate. + * @param jobStore Delegate. + * @param taskStore Delegate. + * @param quotaStore Delegate. + * @param attributeStore Delegate. + * @param jobUpdateStore Delegate. + */ + public WriteAheadStorage( + TransactionManager transactionManager, + SchedulerStore.Mutable schedulerStore, + CronJobStore.Mutable jobStore, + TaskStore.Mutable taskStore, + QuotaStore.Mutable quotaStore, + AttributeStore.Mutable attributeStore, + JobUpdateStore.Mutable jobUpdateStore, + Logger log, + EventSink eventSink) { + + this.transactionManager = requireNonNull(transactionManager); + this.schedulerStore = requireNonNull(schedulerStore); + this.jobStore = requireNonNull(jobStore); + this.taskStore = requireNonNull(taskStore); + this.quotaStore = requireNonNull(quotaStore); + this.attributeStore = requireNonNull(attributeStore); + this.jobUpdateStore = requireNonNull(jobUpdateStore); + this.log = requireNonNull(log); + this.eventSink = requireNonNull(eventSink); + } + + private void write(Op op) { + Preconditions.checkState( + transactionManager.hasActiveTransaction(), + "Mutating operations must be within a transaction."); + transactionManager.log(op); + } + + @Override + public void saveFrameworkId(final String frameworkId) { + requireNonNull(frameworkId); + + write(Op.saveFrameworkId(new SaveFrameworkId(frameworkId))); + schedulerStore.saveFrameworkId(frameworkId); + } + + @Override + public void deleteTasks(final Set<String> taskIds) { + requireNonNull(taskIds); + + write(Op.removeTasks(new RemoveTasks(taskIds))); + taskStore.deleteTasks(taskIds); + } + + @Override + public void saveTasks(final Set<IScheduledTask> newTasks) { + requireNonNull(newTasks); + + write(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(newTasks)))); + taskStore.saveTasks(newTasks); + } + + @Override + public Optional<IScheduledTask> mutateTask( + String taskId, + Function<IScheduledTask, IScheduledTask> mutator) { + + Optional<IScheduledTask> mutated = taskStore.mutateTask(taskId, mutator); + log.debug("Storing updated task to log: {}={}", taskId, mutated.get().getStatus()); + write(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder())))); + + return mutated; + } + + @Override + public void saveQuota(final String role, final IResourceAggregate quota) { + requireNonNull(role); + requireNonNull(quota); + + write(Op.saveQuota(new SaveQuota(role, quota.newBuilder()))); + quotaStore.saveQuota(role, quota); + } + + @Override + public boolean saveHostAttributes(final IHostAttributes attrs) { + requireNonNull(attrs); + + boolean changed = attributeStore.saveHostAttributes(attrs); + if (changed) { + write(Op.saveHostAttributes(new SaveHostAttributes(attrs.newBuilder()))); + eventSink.post(new PubsubEvent.HostAttributesChanged(attrs)); + } + return changed; + } + + @Override + public void removeJob(final IJobKey jobKey) { + requireNonNull(jobKey); + + write(Op.removeJob(new RemoveJob().setJobKey(jobKey.newBuilder()))); + jobStore.removeJob(jobKey); + } + + @Override + public void saveAcceptedJob(final IJobConfiguration jobConfig) { + requireNonNull(jobConfig); + + write(Op.saveCronJob(new SaveCronJob(jobConfig.newBuilder()))); + jobStore.saveAcceptedJob(jobConfig); + } + + @Override + public void removeQuota(final String role) { + requireNonNull(role); + + write(Op.removeQuota(new RemoveQuota(role))); + quotaStore.removeQuota(role); + } + + @Override + public void saveJobUpdate(IJobUpdate update) { + requireNonNull(update); + + write(Op.saveJobUpdate(new SaveJobUpdate().setJobUpdate(update.newBuilder()))); + jobUpdateStore.saveJobUpdate(update); + } + + @Override + public void saveJobUpdateEvent(IJobUpdateKey key, IJobUpdateEvent event) { + requireNonNull(key); + requireNonNull(event); + + write(Op.saveJobUpdateEvent(new SaveJobUpdateEvent(event.newBuilder(), key.newBuilder()))); + jobUpdateStore.saveJobUpdateEvent(key, event); + } + + @Override + public void saveJobInstanceUpdateEvent(IJobUpdateKey key, IJobInstanceUpdateEvent event) { + requireNonNull(key); + requireNonNull(event); + + write(Op.saveJobInstanceUpdateEvent( + new SaveJobInstanceUpdateEvent(event.newBuilder(), key.newBuilder()))); + jobUpdateStore.saveJobInstanceUpdateEvent(key, event); + } + + @Override + public void removeJobUpdates(Set<IJobUpdateKey> keys) { + requireNonNull(keys); + + // Compatibility mode - RemoveJobUpdates is not yet written since older versions cannot + // read it. JobUpdates are only removed implicitly when a snapshot is taken. + jobUpdateStore.removeJobUpdates(keys); + } + + @Override + public void deleteAllTasks() { + throw new UnsupportedOperationException( + "Unsupported since casual storage users should never be doing this."); + } + + @Override + public void deleteHostAttributes() { + throw new UnsupportedOperationException( + "Unsupported since casual storage users should never be doing this."); + } + + @Override + public void deleteJobs() { + throw new UnsupportedOperationException( + "Unsupported since casual storage users should never be doing this."); + } + + @Override + public void deleteQuotas() { + throw new UnsupportedOperationException( + "Unsupported since casual storage users should never be doing this."); + } + + @Override + public void deleteAllUpdates() { + throw new UnsupportedOperationException( + "Unsupported since casual storage users should never be doing this."); + } + + @Override + public SchedulerStore.Mutable getSchedulerStore() { + return this; + } + + @Override + public CronJobStore.Mutable getCronJobStore() { + return this; + } + + @Override + public TaskStore.Mutable getUnsafeTaskStore() { + return this; + } + + @Override + public QuotaStore.Mutable getQuotaStore() { + return this; + } + + @Override + public AttributeStore.Mutable getAttributeStore() { + return this; + } + + @Override + public TaskStore getTaskStore() { + return this; + } + + @Override + public JobUpdateStore.Mutable getJobUpdateStore() { + return this; + } + + @Override + public Optional<String> fetchFrameworkId() { + return this.schedulerStore.fetchFrameworkId(); + } + + @Override + public Iterable<IJobConfiguration> fetchJobs() { + return this.jobStore.fetchJobs(); + } + + @Override + public Optional<IJobConfiguration> fetchJob(IJobKey jobKey) { + return this.jobStore.fetchJob(jobKey); + } + + @Override + public Optional<IScheduledTask> fetchTask(String taskId) { + return this.taskStore.fetchTask(taskId); + } + + @Override + public Iterable<IScheduledTask> fetchTasks(Query.Builder query) { + return this.taskStore.fetchTasks(query); + } + + @Override + public Set<IJobKey> getJobKeys() { + return this.taskStore.getJobKeys(); + } + + @Override + public Optional<IResourceAggregate> fetchQuota(String role) { + return this.quotaStore.fetchQuota(role); + } + + @Override + public Map<String, IResourceAggregate> fetchQuotas() { + return this.quotaStore.fetchQuotas(); + } + + @Override + public Optional<IHostAttributes> getHostAttributes(String host) { + return this.attributeStore.getHostAttributes(host); + } + + @Override + public Set<IHostAttributes> getHostAttributes() { + return this.attributeStore.getHostAttributes(); + } + + @Override + public List<IJobUpdateDetails> fetchJobUpdates(IJobUpdateQuery query) { + return this.jobUpdateStore.fetchJobUpdates(query); + } + + @Override + public Optional<IJobUpdateDetails> fetchJobUpdate(IJobUpdateKey key) { + return this.jobUpdateStore.fetchJobUpdate(key); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java new file mode 100644 index 0000000..a0a6b6c --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java @@ -0,0 +1,257 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.log; + +import java.io.IOException; +import java.util.Date; +import java.util.Iterator; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import javax.inject.Inject; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.MoreExecutors; + +import org.apache.aurora.codec.ThriftBinaryCodec.CodingException; +import org.apache.aurora.common.application.ShutdownRegistry; +import org.apache.aurora.common.inject.TimedInterceptor.Timed; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.gen.storage.LogEntry; +import org.apache.aurora.gen.storage.Op; +import org.apache.aurora.gen.storage.Snapshot; +import org.apache.aurora.scheduler.base.AsyncUtil; +import org.apache.aurora.scheduler.log.Log.Stream.InvalidPositionException; +import org.apache.aurora.scheduler.log.Log.Stream.StreamAccessException; +import org.apache.aurora.scheduler.storage.DistributedSnapshotStore; +import org.apache.aurora.scheduler.storage.SnapshotStore; +import org.apache.aurora.scheduler.storage.Storage.StorageException; +import org.apache.aurora.scheduler.storage.durability.Persistence; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Objects.requireNonNull; + +/** + * Persistence layer that uses a replicated log. + */ +class LogPersistence implements Persistence, DistributedSnapshotStore { + + private static final Logger LOG = LoggerFactory.getLogger(LogPersistence.class); + + private final LogManager logManager; + private final SnapshotStore<Snapshot> snapshotStore; + private final SchedulingService schedulingService; + private final Amount<Long, Time> snapshotInterval; + private StreamManager streamManager; + + @Inject + LogPersistence( + Settings settings, + LogManager logManager, + SnapshotStore<Snapshot> snapshotStore, + ShutdownRegistry shutdownRegistry) { + + this(new ScheduledExecutorSchedulingService( + shutdownRegistry, + settings.getShutdownGracePeriod()), + settings.getSnapshotInterval(), + logManager, + snapshotStore); + } + + @VisibleForTesting + LogPersistence( + SchedulingService schedulingService, + Amount<Long, Time> snapshotInterval, + LogManager logManager, + SnapshotStore<Snapshot> snapshotStore) { + + this.schedulingService = requireNonNull(schedulingService); + this.snapshotInterval = requireNonNull(snapshotInterval); + this.logManager = requireNonNull(logManager); + this.snapshotStore = requireNonNull(snapshotStore); + } + + @Override + public void prepare() { + // Open the log to make a log replica available to the scheduler group. + try { + streamManager = logManager.open(); + } catch (IOException e) { + throw new IllegalStateException("Failed to open the log, cannot continue", e); + } + } + + @Override + public void persist(Stream<Op> mutations) throws PersistenceException { + StreamTransaction transaction = streamManager.startTransaction(); + mutations.forEach(transaction::add); + try { + transaction.commit(); + } catch (CodingException e) { + throw new PersistenceException(e); + } + } + + @Override + public Stream<Op> recover() throws PersistenceException { + scheduleSnapshots(); + + try { + Iterator<LogEntry> entries = streamManager.readFromBeginning(); + Iterable<LogEntry> iterableEntries = () -> entries; + Stream<LogEntry> entryStream = StreamSupport.stream(iterableEntries.spliterator(), false); + + return entryStream + .filter(entry -> entry.getSetField() != LogEntry._Fields.NOOP) + .filter(entry -> { + if (entry.getSetField() == LogEntry._Fields.SNAPSHOT) { + Snapshot snapshot = entry.getSnapshot(); + LOG.info("Applying snapshot taken on " + new Date(snapshot.getTimestamp())); + snapshotStore.applySnapshot(snapshot); + return false; + } + return true; + }) + .peek(entry -> { + if (entry.getSetField() != LogEntry._Fields.TRANSACTION) { + throw new IllegalStateException("Unknown log entry type: " + entry.getSetField()); + } + }) + .flatMap(entry -> entry.getTransaction().getOps().stream()); + } catch (CodingException | InvalidPositionException | StreamAccessException e) { + throw new PersistenceException(e); + } + } + + private void scheduleSnapshots() { + if (snapshotInterval.getValue() > 0) { + schedulingService.doEvery(snapshotInterval, () -> { + try { + snapshot(); + } catch (StorageException e) { + if (e.getCause() == null) { + LOG.warn("StorageException when attempting to snapshot.", e); + } else { + LOG.warn(e.getMessage(), e.getCause()); + } + } + }); + } + } + + @Override + public void snapshot() throws StorageException { + try { + doSnapshot(); + } catch (CodingException e) { + throw new StorageException("Failed to encode a snapshot", e); + } catch (InvalidPositionException e) { + throw new StorageException("Saved snapshot but failed to truncate entries preceding it", e); + } catch (StreamAccessException e) { + throw new StorageException("Failed to create a snapshot", e); + } + } + + @Timed("scheduler_log_snapshot_persist") + @Override + public void snapshotWith(Snapshot snapshot) + throws CodingException, InvalidPositionException, StreamAccessException { + + streamManager.snapshot(snapshot); + } + + /** + * Forces a snapshot of the storage state. + * + * @throws CodingException If there is a problem encoding the snapshot. + * @throws InvalidPositionException If the log stream cursor is invalid. + * @throws StreamAccessException If there is a problem writing the snapshot to the log stream. + */ + @Timed("scheduler_log_snapshot") + void doSnapshot() throws CodingException, InvalidPositionException, StreamAccessException { + LOG.info("Creating snapshot."); + Snapshot snapshot = snapshotStore.createSnapshot(); + snapshotWith(snapshot); + LOG.info("Snapshot complete." + + " host attrs: " + snapshot.getHostAttributesSize() + + ", cron jobs: " + snapshot.getCronJobsSize() + + ", quota confs: " + snapshot.getQuotaConfigurationsSize() + + ", tasks: " + snapshot.getTasksSize() + + ", updates: " + snapshot.getJobUpdateDetailsSize()); + } + + /** + * A service that can schedule an action to be executed periodically. + */ + @VisibleForTesting + interface SchedulingService { + + /** + * Schedules an action to execute periodically. + * + * @param interval The time period to wait until running the {@code action} again. + * @param action The action to execute periodically. + */ + void doEvery(Amount<Long, Time> interval, Runnable action); + } + + private static class ScheduledExecutorSchedulingService implements SchedulingService { + private final ScheduledExecutorService scheduledExecutor; + + ScheduledExecutorSchedulingService(ShutdownRegistry shutdownRegistry, + Amount<Long, Time> shutdownGracePeriod) { + scheduledExecutor = AsyncUtil.singleThreadLoggingScheduledExecutor("LogStorage-%d", LOG); + shutdownRegistry.addAction(() -> MoreExecutors.shutdownAndAwaitTermination( + scheduledExecutor, + shutdownGracePeriod.getValue(), + shutdownGracePeriod.getUnit().getTimeUnit())); + } + + @Override + public void doEvery(Amount<Long, Time> interval, Runnable action) { + requireNonNull(interval); + requireNonNull(action); + + long delay = interval.getValue(); + TimeUnit timeUnit = interval.getUnit().getTimeUnit(); + scheduledExecutor.scheduleWithFixedDelay(action, delay, delay, timeUnit); + } + } + + /** + * Configuration settings for log persistence. + */ + public static class Settings { + private final Amount<Long, Time> shutdownGracePeriod; + private final Amount<Long, Time> snapshotInterval; + + Settings(Amount<Long, Time> shutdownGracePeriod, Amount<Long, Time> snapshotInterval) { + this.shutdownGracePeriod = requireNonNull(shutdownGracePeriod); + this.snapshotInterval = requireNonNull(snapshotInterval); + } + + public Amount<Long, Time> getShutdownGracePeriod() { + return shutdownGracePeriod; + } + + public Amount<Long, Time> getSnapshotInterval() { + return snapshotInterval; + } + } +}
