Add a storage recovery tool This tool was originally intended as a migration path between Persistence backends. As it turns out, the model also works well for recovering from a backup.
I propose we drop our current recovery mechanism to use this tool. The existing recovery-via-scheduler-rpc is slightly non-sensical, as it assumes a healthy scheduler. When an operator decides it is necessary to recover from a backup, we should assume the scheduler state may be broken. Furthermore, starting an empty scheduler to bootstrap can have undesirable effects such as advertising false state to clients and establishing a new empty framework with the master. Testing Done: end-to-end tests pass (and exercise recovery tool) Reviewed at https://reviews.apache.org/r/64625/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/2e1ca428 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/2e1ca428 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/2e1ca428 Branch: refs/heads/master Commit: 2e1ca42887bc8ea1e8c6cddebe9d1cf29268c714 Parents: 6fd765b Author: Bill Farner <[email protected]> Authored: Fri Dec 15 12:07:37 2017 -0800 Committer: Bill Farner <[email protected]> Committed: Fri Dec 15 12:07:37 2017 -0800 ---------------------------------------------------------------------- build.gradle | 13 + config/checkstyle/suppressions.xml | 2 + .../aurora/benchmark/SnapshotBenchmarks.java | 10 +- .../aurora/scheduler/app/SchedulerMain.java | 12 +- .../aurora/scheduler/config/CliOptions.java | 6 +- .../discovery/ServiceDiscoveryBindings.java | 2 +- .../scheduler/storage/backup/BackupReader.java | 56 ++++ .../scheduler/storage/backup/Recovery.java | 35 +- .../storage/backup/TemporaryStorage.java | 4 +- .../durability/DurableStorageModule.java | 35 ++ .../scheduler/storage/durability/Recovery.java | 119 +++++++ .../storage/durability/RecoveryTool.java | 196 +++++++++++ .../storage/log/LogPersistenceModule.java | 78 +++++ .../scheduler/storage/log/LogStorageModule.java | 110 ------ .../scheduler/storage/log/SnapshotModule.java | 54 +++ .../storage/log/SnapshotStoreImpl.java | 332 ------------------- .../scheduler/storage/log/SnapshotterImpl.java | 332 +++++++++++++++++++ .../aurora/scheduler/app/SchedulerIT.java | 12 +- .../scheduler/config/CommandLineTest.java | 4 +- .../storage/durability/RecoveryTest.java | 110 ++++++ .../storage/log/LogPersistenceTest.java | 6 +- .../storage/log/NonVolatileStorageTest.java | 12 +- .../storage/log/SnapshotServiceTest.java | 7 +- .../storage/log/SnapshotStoreImplIT.java | 189 ----------- .../storage/log/SnapshotterImplIT.java | 189 +++++++++++ .../sh/org/apache/aurora/e2e/test_end_to_end.sh | 37 +++ 26 files changed, 1284 insertions(+), 678 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 4674513..64af7ae 100644 --- a/build.gradle +++ b/build.gradle @@ -623,3 +623,16 @@ startScripts { unixScript.text = unixScript.text.replace('CLASSPATH=', "CLASSPATH=${environmentClasspathPrefix}:") } } + +// Include a script to run the recovery tool. +task moreStartScripts(type: CreateStartScripts) { + mainClassName = 'org.apache.aurora.scheduler.storage.durability.RecoveryTool' + applicationName = 'recovery-tool' + outputDir = new File(project.buildDir, 'scripts') + classpath = jar.outputs.files + project.configurations.runtime +} + +applicationDistribution.into('bin') { + from(moreStartScripts) + fileMode = 0755 +} http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/config/checkstyle/suppressions.xml ---------------------------------------------------------------------- diff --git a/config/checkstyle/suppressions.xml b/config/checkstyle/suppressions.xml index c4081b9..03f57c8 100644 --- a/config/checkstyle/suppressions.xml +++ b/config/checkstyle/suppressions.xml @@ -21,6 +21,8 @@ limitations under the License. <!-- Allow use of System.exit() in main. --> <suppress files="org/apache/aurora/scheduler/config/CommandLine.java" checks="RegexpSinglelineJava"/> + <suppress files="org/apache/aurora/scheduler/storage/durability/RecoveryTool.java" + checks="RegexpSinglelineJava"/> <suppress files="org/apache/aurora/scheduler/storage/db/migration/.*" checks="TypeName" /> <suppress files="org/apache/aurora/scheduler/storage/db/testmigration/.*" checks="TypeName" /> </suppressions> http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java index 4f99f80..e3ed3f2 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java +++ b/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java @@ -27,7 +27,7 @@ import org.apache.aurora.common.stats.StatsProvider; import org.apache.aurora.common.util.Clock; import org.apache.aurora.gen.storage.Snapshot; import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl; +import org.apache.aurora.scheduler.storage.log.SnapshotterImpl; import org.apache.aurora.scheduler.storage.mem.MemStorageModule; import org.apache.thrift.TException; import org.openjdk.jmh.annotations.Benchmark; @@ -56,7 +56,7 @@ public class SnapshotBenchmarks { @Threads(1) @State(Scope.Thread) public static class RestoreSnapshotWithUpdatesBenchmark { - private SnapshotStoreImpl snapshotStore; + private SnapshotterImpl snapshotStore; private Snapshot snapshot; private Storage storage; @@ -80,21 +80,21 @@ public class SnapshotBenchmarks { return System.currentTimeMillis() % 5 == 0; } - private SnapshotStoreImpl getSnapshotStore() { + private SnapshotterImpl getSnapshotStore() { Injector injector = Guice.createInjector( new AbstractModule() { @Override protected void configure() { bind(Clock.class).toInstance(Clock.SYSTEM_CLOCK); bind(StatsProvider.class).toInstance(new FakeStatsProvider()); - bind(SnapshotStoreImpl.class).in(Singleton.class); + bind(SnapshotterImpl.class).in(Singleton.class); } }, new MemStorageModule()); storage = injector.getInstance(Key.get(Storage.class, Storage.Volatile.class)); storage.prepare(); - return injector.getInstance(SnapshotStoreImpl.class); + return injector.getInstance(SnapshotterImpl.class); } private Snapshot createSnapshot(int updates, int events, int instanceEvents) { http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java index 2bf7e7b..3ce9bc2 100644 --- a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java +++ b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java @@ -59,9 +59,11 @@ import org.apache.aurora.scheduler.mesos.LibMesosLoadingModule; import org.apache.aurora.scheduler.stats.StatsModule; import org.apache.aurora.scheduler.storage.Storage.Volatile; import org.apache.aurora.scheduler.storage.backup.BackupModule; +import org.apache.aurora.scheduler.storage.durability.DurableStorageModule; import org.apache.aurora.scheduler.storage.entities.IServerInfo; -import org.apache.aurora.scheduler.storage.log.LogStorageModule; -import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl; +import org.apache.aurora.scheduler.storage.log.LogPersistenceModule; +import org.apache.aurora.scheduler.storage.log.SnapshotModule; +import org.apache.aurora.scheduler.storage.log.SnapshotterImpl; import org.apache.aurora.scheduler.storage.mem.MemStorageModule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -210,7 +212,7 @@ public class SchedulerMain { new ServiceDiscoveryModule( FlaggedZooKeeperConfig.create(options.zk), options.main.serversetPath), - new BackupModule(options.backup, SnapshotStoreImpl.class), + new BackupModule(options.backup, SnapshotterImpl.class), new ExecutorModule(options.executor), new AbstractModule() { @Override @@ -249,8 +251,10 @@ public class SchedulerMain { .add( new CommandLineDriverSettingsModule(options.driver, options.main.allowGpuResource), new LibMesosLoadingModule(options.main.driverImpl), + new DurableStorageModule(), new MesosLogStreamModule(options.mesosLog, FlaggedZooKeeperConfig.create(options.zk)), - new LogStorageModule(options.logStorage), + new LogPersistenceModule(options.logPersistence), + new SnapshotModule(options.snapshot), new TierModule(options.tiers), new WebhookModule(options.webhook) ) http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java b/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java index b7f43e0..e4e5358 100644 --- a/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java +++ b/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java @@ -48,7 +48,8 @@ import org.apache.aurora.scheduler.state.StateModule; import org.apache.aurora.scheduler.stats.AsyncStatsModule; import org.apache.aurora.scheduler.stats.StatsModule; import org.apache.aurora.scheduler.storage.backup.BackupModule; -import org.apache.aurora.scheduler.storage.log.LogStorageModule; +import org.apache.aurora.scheduler.storage.log.LogPersistenceModule; +import org.apache.aurora.scheduler.storage.log.SnapshotModule; import org.apache.aurora.scheduler.thrift.aop.AopModule; import org.apache.aurora.scheduler.updater.UpdaterModule; @@ -64,7 +65,8 @@ public class CliOptions { public final FlaggedZooKeeperConfig.Options zk = new FlaggedZooKeeperConfig.Options(); public final UpdaterModule.Options updater = new UpdaterModule.Options(); public final StateModule.Options state = new StateModule.Options(); - public final LogStorageModule.Options logStorage = new LogStorageModule.Options(); + public final LogPersistenceModule.Options logPersistence = new LogPersistenceModule.Options(); + public final SnapshotModule.Options snapshot = new SnapshotModule.Options(); public final BackupModule.Options backup = new BackupModule.Options(); public final AopModule.Options aop = new AopModule.Options(); public final PruningModule.Options pruning = new PruningModule.Options(); http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryBindings.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryBindings.java b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryBindings.java index b574c13..a57a77c 100644 --- a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryBindings.java +++ b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryBindings.java @@ -44,7 +44,7 @@ public final class ServiceDiscoveryBindings { /** * A binding key for the ZooKeeper cluster endpoints. */ - static final Key<Iterable<InetSocketAddress>> ZOO_KEEPER_CLUSTER_KEY = + public static final Key<Iterable<InetSocketAddress>> ZOO_KEEPER_CLUSTER_KEY = Key.get(new TypeLiteral<Iterable<InetSocketAddress>>() { }, ZooKeeper.class); /** http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/storage/backup/BackupReader.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/BackupReader.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/BackupReader.java new file mode 100644 index 0000000..82d712c --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/BackupReader.java @@ -0,0 +1,56 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.backup; + +import java.io.File; +import java.util.stream.Stream; + +import org.apache.aurora.gen.storage.Op; +import org.apache.aurora.scheduler.storage.Snapshotter; +import org.apache.aurora.scheduler.storage.durability.Persistence; + +import static java.util.Objects.requireNonNull; + +/** + * A persistence implementation to be used as a migration source. + */ +public class BackupReader implements Persistence { + + private final File backupFile; + private final Snapshotter snapshotter; + + public BackupReader(File backupFile, Snapshotter snapshotter) { + this.backupFile = requireNonNull(backupFile); + this.snapshotter = requireNonNull(snapshotter); + } + + @Override + public Stream<Edit> recover() throws PersistenceException { + if (!backupFile.exists()) { + throw new PersistenceException("Backup " + backupFile + " does not exist."); + } + + return snapshotter.asStream(Recovery.load(backupFile)).map(Edit::op); + } + + @Override + public void prepare() { + // no-op + } + + @Override + public void persist(Stream<Op> records) { + throw new UnsupportedOperationException("Backups are read-only"); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java index 79899a0..3c2ea50 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java @@ -136,22 +136,7 @@ public interface Recovery { @Override public void stage(String backupName) throws RecoveryException { - File backupFile = new File(backupDir, backupName); - if (!backupFile.exists()) { - throw new RecoveryException("Backup " + backupName + " does not exist."); - } - - Snapshot snapshot = new Snapshot(); - try { - TBinaryProtocol prot = new TBinaryProtocol( - new TIOStreamTransport(new BufferedInputStream(new FileInputStream(backupFile)))); - - snapshot.read(prot); - } catch (TException e) { - throw new RecoveryException("Failed to decode backup " + e, e); - } catch (IOException e) { - throw new RecoveryException("Failed to read backup " + e, e); - } + Snapshot snapshot = load(new File(backupDir, backupName)); boolean applied = recovery.compareAndSet(null, new PendingRecovery(tempStorageFactory.apply(snapshot))); if (!applied) { @@ -214,4 +199,22 @@ public interface Recovery { } } } + + static Snapshot load(File backupFile) throws RecoveryException { + if (!backupFile.exists()) { + throw new RecoveryException("Backup " + backupFile + " does not exist."); + } + + try { + Snapshot snapshot = new Snapshot(); + TBinaryProtocol prot = new TBinaryProtocol( + new TIOStreamTransport(new BufferedInputStream(new FileInputStream(backupFile)))); + snapshot.read(prot); + return snapshot; + } catch (TException e) { + throw new RecoveryException("Failed to decode backup " + e, e); + } catch (IOException e) { + throw new RecoveryException("Failed to read backup " + e, e); + } + } } http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java index 0305d9d..5641738 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java @@ -31,7 +31,7 @@ import org.apache.aurora.scheduler.storage.durability.Loader; import org.apache.aurora.scheduler.storage.durability.Persistence.Edit; import org.apache.aurora.scheduler.storage.durability.ThriftBackfill; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl; +import org.apache.aurora.scheduler.storage.log.SnapshotterImpl; import org.apache.aurora.scheduler.storage.mem.MemStorageModule; import static java.util.Objects.requireNonNull; @@ -84,7 +84,7 @@ interface TemporaryStorage { BuildInfo buildInfo = generateBuildInfo(); FakeClock clock = new FakeClock(); clock.setNowMillis(snapshot.getTimestamp()); - Snapshotter snapshotter = new SnapshotStoreImpl(buildInfo, clock); + Snapshotter snapshotter = new SnapshotterImpl(buildInfo, clock); storage.write((NoResult.Quiet) stores -> { Loader.load(stores, thriftBackfill, snapshotter.asStream(snapshot).map(Edit::op)); http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorageModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorageModule.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorageModule.java new file mode 100644 index 0000000..6bb134a --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorageModule.java @@ -0,0 +1,35 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.durability; + +import javax.inject.Singleton; + +import com.google.inject.PrivateModule; + +import org.apache.aurora.scheduler.storage.CallOrderEnforcingStorage; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage; + +/** + * Binding module for a durable storage layer. + */ +public class DurableStorageModule extends PrivateModule { + @Override + protected void configure() { + install(CallOrderEnforcingStorage.wrappingModule(DurableStorage.class)); + bind(DurableStorage.class).in(Singleton.class); + expose(Storage.class); + expose(NonVolatileStorage.class); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/storage/durability/Recovery.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/Recovery.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/Recovery.java new file mode 100644 index 0000000..819d70e --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/Recovery.java @@ -0,0 +1,119 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.durability; + +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; + +import com.google.common.collect.Lists; + +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.gen.storage.Op; +import org.apache.aurora.scheduler.storage.durability.Persistence.Edit; +import org.apache.aurora.scheduler.storage.durability.Persistence.PersistenceException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class to clone a persistence. + */ +final class Recovery { + + private static final Logger LOG = LoggerFactory.getLogger(Recovery.class); + + private Recovery() { + // utility class. + } + + /** + * Copies all state from one persistence to another, batching into calls to + * {@link Persistence#persist(Stream)}. + * + * @param from Source. + * @param to Destination. + * @param batchSize Maximum number of entries to include in any given persist. + */ + static void copy(Persistence from, Persistence to, int batchSize) { + requireEmpty(to); + + long start = System.nanoTime(); + AtomicLong count = new AtomicLong(); + AtomicInteger batchNumber = new AtomicInteger(); + List<Op> batch = Lists.newArrayListWithExpectedSize(batchSize); + Runnable saveBatch = () -> { + LOG.info("Saving batch " + batchNumber.incrementAndGet()); + try { + to.persist(batch.stream()); + } catch (PersistenceException e) { + throw new RuntimeException(e); + } + batch.clear(); + }; + + AtomicBoolean dataBegin = new AtomicBoolean(false); + try { + from.recover() + .filter(edit -> { + if (edit.isDeleteAll()) { + // Suppress any storage reset instructions. + // Persistence implementations may recover with these, but do not support persisting + // them. As a result, we require that the recovery source produces a reset + // instruction at the beginning of the stream, if at all. + + if (dataBegin.get()) { + throw new IllegalStateException( + "A storage reset instruction arrived after the beginning of data"); + } + return false; + } else { + dataBegin.set(true); + } + return true; + }) + .forEach(edit -> { + count.incrementAndGet(); + batch.add(edit.getOp()); + if (batch.size() == batchSize) { + saveBatch.run(); + LOG.info("Fetching batch"); + } + }); + } catch (PersistenceException e) { + throw new RuntimeException(e); + } + + if (!batch.isEmpty()) { + saveBatch.run(); + } + long end = System.nanoTime(); + LOG.info("Recovery finished"); + LOG.info("Copied " + count.get() + " ops in " + + Amount.of(end - start, Time.NANOSECONDS).as(Time.MILLISECONDS) + " ms"); + } + + private static void requireEmpty(Persistence persistence) { + LOG.info("Ensuring recovery destination is empty"); + try (Stream<Edit> edits = persistence.recover()) { + if (edits.findFirst().isPresent()) { + throw new IllegalStateException("Refusing to recover into non-empty persistence"); + } + } catch (PersistenceException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/storage/durability/RecoveryTool.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/RecoveryTool.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/RecoveryTool.java new file mode 100644 index 0000000..7cb4c52 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/RecoveryTool.java @@ -0,0 +1,196 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.durability; + +import java.io.File; +import java.net.InetSocketAddress; +import java.util.Map; + +import com.beust.jcommander.IStringConverter; +import com.beust.jcommander.IStringConverterFactory; +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.Parameters; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; + +import org.apache.aurora.common.util.BuildInfo; +import org.apache.aurora.common.util.Clock; +import org.apache.aurora.scheduler.TierModule; +import org.apache.aurora.scheduler.app.LifecycleModule; +import org.apache.aurora.scheduler.base.TaskTestUtil; +import org.apache.aurora.scheduler.config.converters.DataAmountConverter; +import org.apache.aurora.scheduler.config.converters.InetSocketAddressConverter; +import org.apache.aurora.scheduler.config.converters.TimeAmountConverter; +import org.apache.aurora.scheduler.config.types.DataAmount; +import org.apache.aurora.scheduler.config.types.TimeAmount; +import org.apache.aurora.scheduler.discovery.FlaggedZooKeeperConfig; +import org.apache.aurora.scheduler.discovery.ServiceDiscoveryBindings; +import org.apache.aurora.scheduler.log.mesos.MesosLogStreamModule; +import org.apache.aurora.scheduler.storage.Snapshotter; +import org.apache.aurora.scheduler.storage.backup.BackupReader; +import org.apache.aurora.scheduler.storage.log.LogPersistenceModule; +import org.apache.aurora.scheduler.storage.log.SnapshotterImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A utility to recover the contents of one persistence into another. + */ +public final class RecoveryTool { + + private static final Logger LOG = LoggerFactory.getLogger(RecoveryTool.class); + + private RecoveryTool() { + // Main-only class. + } + + interface RecoveryEndpoint { + Iterable<Object> getOptions(); + + Persistence create(); + } + + private static class Log implements RecoveryEndpoint { + private final FlaggedZooKeeperConfig.Options zkOptions = new FlaggedZooKeeperConfig.Options(); + private final MesosLogStreamModule.Options logOptions = new MesosLogStreamModule.Options(); + private final LogPersistenceModule.Options options = new LogPersistenceModule.Options(); + + @Override + public Iterable<Object> getOptions() { + return ImmutableList.of(logOptions, options, zkOptions); + } + + @Override + public Persistence create() { + Injector injector = Guice.createInjector( + new TierModule(TaskTestUtil.TIER_CONFIG), + new MesosLogStreamModule(logOptions, FlaggedZooKeeperConfig.create(zkOptions)), + new LogPersistenceModule(options), + new LifecycleModule(), + new AbstractModule() { + @Override + protected void configure() { + bind(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY) + .toInstance(zkOptions.zkEndpoints); + bind(Snapshotter.class).to(SnapshotterImpl.class); + bind(Clock.class).toInstance(Clock.SYSTEM_CLOCK); + bind(BuildInfo.class).toInstance(new BuildInfo()); + } + }); + return injector.getInstance(Persistence.class); + } + } + + private static class Backup implements RecoveryEndpoint { + @Parameters(separators = "=") + private static class Options { + @Parameter(names = "-backup", description = "Backup file to load") + File backup; + } + + private final Options options = new Options(); + + @Override + public Iterable<Object> getOptions() { + return ImmutableList.of(options); + } + + @Override + public Persistence create() { + return new BackupReader( + options.backup, + new SnapshotterImpl(new BuildInfo(), Clock.SYSTEM_CLOCK)); + } + } + + enum Endpoint { + LOG(new Log()), + BACKUP(new Backup()); + + private final RecoveryEndpoint impl; + + Endpoint(RecoveryEndpoint impl) { + this.impl = impl; + } + } + + @Parameters(separators = "=") + private static class Options { + @Parameter(names = "-from", + required = true, + description = "Persistence to read state from") + Endpoint from; + + @Parameter(names = "-to", + required = true, + description = "Persistence to write recovered state into") + Endpoint to; + + @Parameter(names = "-batch-size", + description = "Write in batches of this may ops.") + int batchSize = 50; + + @Parameter(names = "--help", description = "Print usage", help = true) + boolean help; + } + + private static JCommander configure(Options options, String... args) { + JCommander.Builder builder = JCommander.newBuilder().programName(RecoveryTool.class.getName()); + builder.addConverterFactory(new IStringConverterFactory() { + private Map<Class<?>, Class<? extends IStringConverter<?>>> classConverters = + ImmutableMap.<Class<?>, Class<? extends IStringConverter<?>>>builder() + .put(DataAmount.class, DataAmountConverter.class) + .put(InetSocketAddress.class, InetSocketAddressConverter.class) + .put(TimeAmount.class, TimeAmountConverter.class) + .build(); + + @SuppressWarnings("unchecked") + @Override + public <T> Class<? extends IStringConverter<T>> getConverter(Class<T> forType) { + return (Class<IStringConverter<T>>) classConverters.get(forType); + } + }); + + builder.addObject(options); + for (Endpoint endpoint : Endpoint.values()) { + endpoint.impl.getOptions().forEach(builder::addObject); + } + + JCommander parser = builder.build(); + parser.parse(args); + return parser; + } + + public static void main(String[] args) { + Options options = new Options(); + JCommander parser = configure(options, args); + if (options.help) { + parser.usage(); + System.exit(1); + } + + LOG.info("Recovering from " + options.from + " to " + options.to); + Persistence from = options.from.impl.create(); + Persistence to = options.to.impl.create(); + + from.prepare(); + to.prepare(); + + Recovery.copy(from, to, options.batchSize); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistenceModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistenceModule.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistenceModule.java new file mode 100644 index 0000000..ffe3cbf --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistenceModule.java @@ -0,0 +1,78 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.log; + +import javax.inject.Singleton; + +import com.beust.jcommander.Parameter; +import com.beust.jcommander.Parameters; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; +import com.google.inject.PrivateModule; +import com.google.inject.TypeLiteral; +import com.google.inject.assistedinject.FactoryModuleBuilder; + +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Data; +import org.apache.aurora.scheduler.config.types.DataAmount; +import org.apache.aurora.scheduler.storage.durability.Persistence; +import org.apache.aurora.scheduler.storage.log.EntrySerializer.EntrySerializerImpl; +import org.apache.aurora.scheduler.storage.log.LogManager.LogEntryHashFunction; +import org.apache.aurora.scheduler.storage.log.LogManager.MaxEntrySize; +import org.apache.aurora.scheduler.storage.log.SnapshotDeduplicator.SnapshotDeduplicatorImpl; + +/** + * Bindings for scheduler distributed log based persistence. + */ +public class LogPersistenceModule extends PrivateModule { + + @Parameters(separators = "=") + public static class Options { + @Parameter(names = "-dlog_max_entry_size", + description = + "Specifies the maximum entry size to append to the log. Larger entries will be " + + "split across entry Frames.") + public DataAmount maxLogEntrySize = new DataAmount(512, Data.KB); + } + + private final Options options; + + public LogPersistenceModule(Options options) { + this.options = options; + } + + @Override + protected void configure() { + bind(new TypeLiteral<Amount<Integer, Data>>() { }).annotatedWith(MaxEntrySize.class) + .toInstance(options.maxLogEntrySize); + bind(LogManager.class).in(Singleton.class); + bind(LogPersistence.class).in(Singleton.class); + bind(Persistence.class).to(LogPersistence.class); + expose(Persistence.class); + expose(LogPersistence.class); + + bind(EntrySerializer.class).to(EntrySerializerImpl.class); + // TODO(ksweeney): We don't need a cryptographic checksum here - assess performance of MD5 + // versus a faster error-detection checksum like CRC32 for large Snapshots. + @SuppressWarnings("deprecation") + HashFunction hashFunction = Hashing.md5(); + bind(HashFunction.class).annotatedWith(LogEntryHashFunction.class).toInstance(hashFunction); + + bind(SnapshotDeduplicator.class).to(SnapshotDeduplicatorImpl.class); + + install(new FactoryModuleBuilder() + .implement(StreamManager.class, StreamManagerImpl.class) + .build(StreamManagerFactory.class)); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java deleted file mode 100644 index 671593c..0000000 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.storage.log; - -import javax.inject.Singleton; - -import com.beust.jcommander.Parameter; -import com.beust.jcommander.Parameters; -import com.google.common.hash.HashFunction; -import com.google.common.hash.Hashing; -import com.google.inject.AbstractModule; -import com.google.inject.PrivateModule; -import com.google.inject.TypeLiteral; -import com.google.inject.assistedinject.FactoryModuleBuilder; - -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Data; -import org.apache.aurora.common.quantity.Time; -import org.apache.aurora.scheduler.SchedulerServicesModule; -import org.apache.aurora.scheduler.config.types.DataAmount; -import org.apache.aurora.scheduler.config.types.TimeAmount; -import org.apache.aurora.scheduler.storage.CallOrderEnforcingStorage; -import org.apache.aurora.scheduler.storage.SnapshotStore; -import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage; -import org.apache.aurora.scheduler.storage.durability.DurableStorage; -import org.apache.aurora.scheduler.storage.durability.Persistence; -import org.apache.aurora.scheduler.storage.log.EntrySerializer.EntrySerializerImpl; -import org.apache.aurora.scheduler.storage.log.LogManager.LogEntryHashFunction; -import org.apache.aurora.scheduler.storage.log.LogManager.MaxEntrySize; -import org.apache.aurora.scheduler.storage.log.SnapshotDeduplicator.SnapshotDeduplicatorImpl; -import org.apache.aurora.scheduler.storage.log.SnapshotService.Settings; - -/** - * Bindings for scheduler distributed log based storage. - */ -public class LogStorageModule extends AbstractModule { - - @Parameters(separators = "=") - public static class Options { - @Parameter(names = "-dlog_snapshot_interval", - description = "Specifies the frequency at which snapshots of local storage are taken and " - + "written to the log.") - public TimeAmount snapshotInterval = new TimeAmount(1, Time.HOURS); - - @Parameter(names = "-dlog_max_entry_size", - description = - "Specifies the maximum entry size to append to the log. Larger entries will be " - + "split across entry Frames.") - public DataAmount maxLogEntrySize = new DataAmount(512, Data.KB); - } - - private final Options options; - - public LogStorageModule(Options options) { - this.options = options; - } - - @Override - protected void configure() { - install(new PrivateModule() { - @Override - protected void configure() { - bind(Settings.class).toInstance(new Settings(options.snapshotInterval)); - - bind(new TypeLiteral<Amount<Integer, Data>>() { }).annotatedWith(MaxEntrySize.class) - .toInstance(options.maxLogEntrySize); - bind(LogManager.class).in(Singleton.class); - bind(DurableStorage.class).in(Singleton.class); - - install(CallOrderEnforcingStorage.wrappingModule(DurableStorage.class)); - bind(LogPersistence.class).in(Singleton.class); - bind(Persistence.class).to(LogPersistence.class); - bind(SnapshotStore.class).to(SnapshotService.class); - bind(SnapshotService.class).in(Singleton.class); - expose(SnapshotService.class); - expose(Persistence.class); - expose(Storage.class); - expose(NonVolatileStorage.class); - expose(SnapshotStore.class); - - bind(EntrySerializer.class).to(EntrySerializerImpl.class); - // TODO(ksweeney): We don't need a cryptographic checksum here - assess performance of MD5 - // versus a faster error-detection checksum like CRC32 for large Snapshots. - @SuppressWarnings("deprecation") - HashFunction hashFunction = Hashing.md5(); - bind(HashFunction.class).annotatedWith(LogEntryHashFunction.class).toInstance(hashFunction); - - bind(SnapshotDeduplicator.class).to(SnapshotDeduplicatorImpl.class); - - install(new FactoryModuleBuilder() - .implement(StreamManager.class, StreamManagerImpl.class) - .build(StreamManagerFactory.class)); - } - }); - - SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(SnapshotService.class); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotModule.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotModule.java new file mode 100644 index 0000000..8c0fc12 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotModule.java @@ -0,0 +1,54 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.log; + +import javax.inject.Singleton; + +import com.beust.jcommander.Parameter; +import com.beust.jcommander.Parameters; +import com.google.inject.AbstractModule; + +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.scheduler.SchedulerServicesModule; +import org.apache.aurora.scheduler.config.types.TimeAmount; +import org.apache.aurora.scheduler.storage.SnapshotStore; +import org.apache.aurora.scheduler.storage.log.SnapshotService.Settings; + +/** + * Binding for a snapshot store and period snapshotting service. + */ +public class SnapshotModule extends AbstractModule { + + @Parameters(separators = "=") + public static class Options { + @Parameter(names = "-dlog_snapshot_interval", + description = "Specifies the frequency at which snapshots of local storage are taken and " + + "written to the log.") + public TimeAmount snapshotInterval = new TimeAmount(1, Time.HOURS); + } + + private final Options options; + + public SnapshotModule(Options options) { + this.options = options; + } + + @Override + protected void configure() { + bind(Settings.class).toInstance(new Settings(options.snapshotInterval)); + bind(SnapshotStore.class).to(SnapshotService.class); + bind(SnapshotService.class).in(Singleton.class); + SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(SnapshotService.class); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java deleted file mode 100644 index 50553f8..0000000 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java +++ /dev/null @@ -1,332 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.storage.log; - -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import javax.inject.Inject; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Streams; - -import org.apache.aurora.common.inject.TimedInterceptor.Timed; -import org.apache.aurora.common.stats.SlidingStats; -import org.apache.aurora.common.stats.SlidingStats.Timeable; -import org.apache.aurora.common.util.BuildInfo; -import org.apache.aurora.common.util.Clock; -import org.apache.aurora.gen.storage.Op; -import org.apache.aurora.gen.storage.QuotaConfiguration; -import org.apache.aurora.gen.storage.SaveCronJob; -import org.apache.aurora.gen.storage.SaveFrameworkId; -import org.apache.aurora.gen.storage.SaveHostAttributes; -import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent; -import org.apache.aurora.gen.storage.SaveJobUpdate; -import org.apache.aurora.gen.storage.SaveJobUpdateEvent; -import org.apache.aurora.gen.storage.SaveQuota; -import org.apache.aurora.gen.storage.SaveTasks; -import org.apache.aurora.gen.storage.SchedulerMetadata; -import org.apache.aurora.gen.storage.Snapshot; -import org.apache.aurora.gen.storage.StoredCronJob; -import org.apache.aurora.gen.storage.StoredJobUpdateDetails; -import org.apache.aurora.scheduler.base.Query; -import org.apache.aurora.scheduler.storage.JobUpdateStore; -import org.apache.aurora.scheduler.storage.Snapshotter; -import org.apache.aurora.scheduler.storage.Storage.StoreProvider; -import org.apache.aurora.scheduler.storage.entities.IHostAttributes; -import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; -import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static java.util.Objects.requireNonNull; - -/** - * Snapshot store implementation that delegates to underlying snapshot stores by - * extracting/applying fields in a snapshot thrift struct. - */ -public class SnapshotStoreImpl implements Snapshotter { - - @VisibleForTesting - static final String SNAPSHOT_SAVE = "snapshot_save_"; - @VisibleForTesting - static final String SNAPSHOT_RESTORE = "snapshot_restore_"; - - private static final Logger LOG = LoggerFactory.getLogger(SnapshotStoreImpl.class); - - private static final String HOST_ATTRIBUTES_FIELD = "hosts"; - private static final String QUOTA_FIELD = "quota"; - private static final String TASK_FIELD = "tasks"; - private static final String CRON_FIELD = "crons"; - private static final String JOB_UPDATE_FIELD = "job_updates"; - private static final String SCHEDULER_METADATA_FIELD = "scheduler_metadata"; - - @VisibleForTesting - Set<String> snapshotFieldNames() { - return snapshotFields.stream() - .map(SnapshotField::getName) - .collect(Collectors.toSet()); - } - - private final List<SnapshotField> snapshotFields = ImmutableList.of( - new SnapshotField() { - @Override - String getName() { - return HOST_ATTRIBUTES_FIELD; - } - - @Override - void saveToSnapshot(StoreProvider store, Snapshot snapshot) { - snapshot.setHostAttributes( - IHostAttributes.toBuildersSet(store.getAttributeStore().getHostAttributes())); - } - - @Override - Stream<Op> doStreamFrom(Snapshot snapshot) { - if (snapshot.getHostAttributesSize() > 0) { - return snapshot.getHostAttributes().stream() - .map(attributes -> Op.saveHostAttributes( - new SaveHostAttributes().setHostAttributes(attributes))); - } - return Stream.empty(); - } - }, - new SnapshotField() { - @Override - String getName() { - return TASK_FIELD; - } - - @Override - void saveToSnapshot(StoreProvider store, Snapshot snapshot) { - snapshot.setTasks( - IScheduledTask.toBuildersSet(store.getTaskStore().fetchTasks(Query.unscoped()))); - } - - @Override - Stream<Op> doStreamFrom(Snapshot snapshot) { - if (snapshot.getTasksSize() > 0) { - return Stream.of(Op.saveTasks(new SaveTasks().setTasks(snapshot.getTasks()))); - } - return Stream.empty(); - } - }, - new SnapshotField() { - @Override - String getName() { - return CRON_FIELD; - } - - @Override - void saveToSnapshot(StoreProvider store, Snapshot snapshot) { - ImmutableSet.Builder<StoredCronJob> jobs = ImmutableSet.builder(); - - for (IJobConfiguration config : store.getCronJobStore().fetchJobs()) { - jobs.add(new StoredCronJob(config.newBuilder())); - } - snapshot.setCronJobs(jobs.build()); - } - - @Override - Stream<Op> doStreamFrom(Snapshot snapshot) { - if (snapshot.getCronJobsSize() > 0) { - return snapshot.getCronJobs().stream() - .map(job -> Op.saveCronJob( - new SaveCronJob().setJobConfig(job.getJobConfiguration()))); - } - return Stream.empty(); - } - }, - new SnapshotField() { - @Override - String getName() { - return SCHEDULER_METADATA_FIELD; - } - - @Override - void saveToSnapshot(StoreProvider store, Snapshot snapshot) { - // SchedulerMetadata is updated outside of the static list of SnapshotFields - } - - @Override - Stream<Op> doStreamFrom(Snapshot snapshot) { - if (snapshot.isSetSchedulerMetadata() - && snapshot.getSchedulerMetadata().isSetFrameworkId()) { - // No delete necessary here since this is a single value. - - return Stream.of(Op.saveFrameworkId( - new SaveFrameworkId().setId(snapshot.getSchedulerMetadata().getFrameworkId()))); - } - return Stream.empty(); - } - }, - new SnapshotField() { - @Override - String getName() { - return QUOTA_FIELD; - } - - @Override - void saveToSnapshot(StoreProvider store, Snapshot snapshot) { - ImmutableSet.Builder<QuotaConfiguration> quotas = ImmutableSet.builder(); - for (Map.Entry<String, IResourceAggregate> entry - : store.getQuotaStore().fetchQuotas().entrySet()) { - - quotas.add(new QuotaConfiguration(entry.getKey(), entry.getValue().newBuilder())); - } - - snapshot.setQuotaConfigurations(quotas.build()); - } - - @Override - Stream<Op> doStreamFrom(Snapshot snapshot) { - if (snapshot.getQuotaConfigurationsSize() > 0) { - return snapshot.getQuotaConfigurations().stream() - .map(quota -> Op.saveQuota(new SaveQuota() - .setRole(quota.getRole()) - .setQuota(quota.getQuota()))); - } - return Stream.empty(); - } - }, - new SnapshotField() { - @Override - String getName() { - return JOB_UPDATE_FIELD; - } - - @Override - void saveToSnapshot(StoreProvider store, Snapshot snapshot) { - snapshot.setJobUpdateDetails( - store.getJobUpdateStore().fetchJobUpdates(JobUpdateStore.MATCH_ALL).stream() - .map(u -> new StoredJobUpdateDetails().setDetails(u.newBuilder())) - .collect(Collectors.toSet())); - } - - @Override - Stream<Op> doStreamFrom(Snapshot snapshot) { - if (snapshot.getJobUpdateDetailsSize() > 0) { - return snapshot.getJobUpdateDetails().stream() - .flatMap(details -> { - Stream<Op> parent = Stream.of(Op.saveJobUpdate( - new SaveJobUpdate().setJobUpdate(details.getDetails().getUpdate()))); - Stream<Op> jobEvents; - if (details.getDetails().getUpdateEventsSize() > 0) { - jobEvents = details.getDetails().getUpdateEvents().stream() - .map(event -> Op.saveJobUpdateEvent( - new SaveJobUpdateEvent() - .setKey(details.getDetails().getUpdate().getSummary().getKey()) - .setEvent(event))); - } else { - jobEvents = Stream.empty(); - } - - Stream<Op> instanceEvents; - if (details.getDetails().getInstanceEventsSize() > 0) { - instanceEvents = details.getDetails().getInstanceEvents().stream() - .map(event -> Op.saveJobInstanceUpdateEvent( - new SaveJobInstanceUpdateEvent() - .setKey(details.getDetails().getUpdate().getSummary().getKey()) - .setEvent(event))); - } else { - instanceEvents = Stream.empty(); - } - - return Streams.concat(parent, jobEvents, instanceEvents); - }); - } - return Stream.empty(); - } - } - ); - - private final BuildInfo buildInfo; - private final Clock clock; - - @Inject - public SnapshotStoreImpl(BuildInfo buildInfo, Clock clock) { - this.buildInfo = requireNonNull(buildInfo); - this.clock = requireNonNull(clock); - } - - private Snapshot createSnapshot(StoreProvider storeProvider) { - Snapshot snapshot = new Snapshot(); - - // Capture timestamp to signify the beginning of a snapshot operation, apply after in case - // one of the field closures is mean and tries to apply a timestamp. - long timestamp = clock.nowMillis(); - for (SnapshotField field : snapshotFields) { - field.save(storeProvider, snapshot); - } - - SchedulerMetadata metadata = new SchedulerMetadata() - .setFrameworkId(storeProvider.getSchedulerStore().fetchFrameworkId().orElse(null)) - .setDetails(buildInfo.getProperties()); - - snapshot.setSchedulerMetadata(metadata); - snapshot.setTimestamp(timestamp); - return snapshot; - } - - @Timed("snapshot_create") - @Override - public Snapshot from(StoreProvider stores) { - return createSnapshot(stores); - } - - @Timed("snapshot_apply") - @Override - public Stream<Op> asStream(Snapshot snapshot) { - requireNonNull(snapshot); - - LOG.info("Restoring snapshot."); - return snapshotFields.stream() - .flatMap(field -> field.streamFrom(snapshot)); - } - - abstract class SnapshotField { - - abstract String getName(); - - abstract void saveToSnapshot(StoreProvider storeProvider, Snapshot snapshot); - - abstract Stream<Op> doStreamFrom(Snapshot snapshot); - - void save(StoreProvider storeProvider, Snapshot snapshot) { - stats.getUnchecked(SNAPSHOT_SAVE + getName()) - .time((Timeable.NoResult.Quiet) () -> saveToSnapshot(storeProvider, snapshot)); - } - - Stream<Op> streamFrom(Snapshot snapshot) { - return stats.getUnchecked(SNAPSHOT_RESTORE + getName()).time(() -> doStreamFrom(snapshot)); - } - } - - private final LoadingCache<String, SlidingStats> stats = CacheBuilder.newBuilder().build( - new CacheLoader<String, SlidingStats>() { - @Override - public SlidingStats load(String name) throws Exception { - return new SlidingStats(name, "nanos"); - } - }); -} http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotterImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotterImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotterImpl.java new file mode 100644 index 0000000..4b52be0 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotterImpl.java @@ -0,0 +1,332 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.log; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import javax.inject.Inject; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Streams; + +import org.apache.aurora.common.inject.TimedInterceptor.Timed; +import org.apache.aurora.common.stats.SlidingStats; +import org.apache.aurora.common.stats.SlidingStats.Timeable; +import org.apache.aurora.common.util.BuildInfo; +import org.apache.aurora.common.util.Clock; +import org.apache.aurora.gen.storage.Op; +import org.apache.aurora.gen.storage.QuotaConfiguration; +import org.apache.aurora.gen.storage.SaveCronJob; +import org.apache.aurora.gen.storage.SaveFrameworkId; +import org.apache.aurora.gen.storage.SaveHostAttributes; +import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent; +import org.apache.aurora.gen.storage.SaveJobUpdate; +import org.apache.aurora.gen.storage.SaveJobUpdateEvent; +import org.apache.aurora.gen.storage.SaveQuota; +import org.apache.aurora.gen.storage.SaveTasks; +import org.apache.aurora.gen.storage.SchedulerMetadata; +import org.apache.aurora.gen.storage.Snapshot; +import org.apache.aurora.gen.storage.StoredCronJob; +import org.apache.aurora.gen.storage.StoredJobUpdateDetails; +import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.storage.JobUpdateStore; +import org.apache.aurora.scheduler.storage.Snapshotter; +import org.apache.aurora.scheduler.storage.Storage.StoreProvider; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; +import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Objects.requireNonNull; + +/** + * Snapshot store implementation that delegates to underlying snapshot stores by + * extracting/applying fields in a snapshot thrift struct. + */ +public class SnapshotterImpl implements Snapshotter { + + @VisibleForTesting + static final String SNAPSHOT_SAVE = "snapshot_save_"; + @VisibleForTesting + static final String SNAPSHOT_RESTORE = "snapshot_restore_"; + + private static final Logger LOG = LoggerFactory.getLogger(SnapshotterImpl.class); + + private static final String HOST_ATTRIBUTES_FIELD = "hosts"; + private static final String QUOTA_FIELD = "quota"; + private static final String TASK_FIELD = "tasks"; + private static final String CRON_FIELD = "crons"; + private static final String JOB_UPDATE_FIELD = "job_updates"; + private static final String SCHEDULER_METADATA_FIELD = "scheduler_metadata"; + + @VisibleForTesting + Set<String> snapshotFieldNames() { + return snapshotFields.stream() + .map(SnapshotField::getName) + .collect(Collectors.toSet()); + } + + private final List<SnapshotField> snapshotFields = ImmutableList.of( + new SnapshotField() { + @Override + String getName() { + return HOST_ATTRIBUTES_FIELD; + } + + @Override + void saveToSnapshot(StoreProvider store, Snapshot snapshot) { + snapshot.setHostAttributes( + IHostAttributes.toBuildersSet(store.getAttributeStore().getHostAttributes())); + } + + @Override + Stream<Op> doStreamFrom(Snapshot snapshot) { + if (snapshot.getHostAttributesSize() > 0) { + return snapshot.getHostAttributes().stream() + .map(attributes -> Op.saveHostAttributes( + new SaveHostAttributes().setHostAttributes(attributes))); + } + return Stream.empty(); + } + }, + new SnapshotField() { + @Override + String getName() { + return TASK_FIELD; + } + + @Override + void saveToSnapshot(StoreProvider store, Snapshot snapshot) { + snapshot.setTasks( + IScheduledTask.toBuildersSet(store.getTaskStore().fetchTasks(Query.unscoped()))); + } + + @Override + Stream<Op> doStreamFrom(Snapshot snapshot) { + if (snapshot.getTasksSize() > 0) { + return Stream.of(Op.saveTasks(new SaveTasks().setTasks(snapshot.getTasks()))); + } + return Stream.empty(); + } + }, + new SnapshotField() { + @Override + String getName() { + return CRON_FIELD; + } + + @Override + void saveToSnapshot(StoreProvider store, Snapshot snapshot) { + ImmutableSet.Builder<StoredCronJob> jobs = ImmutableSet.builder(); + + for (IJobConfiguration config : store.getCronJobStore().fetchJobs()) { + jobs.add(new StoredCronJob(config.newBuilder())); + } + snapshot.setCronJobs(jobs.build()); + } + + @Override + Stream<Op> doStreamFrom(Snapshot snapshot) { + if (snapshot.getCronJobsSize() > 0) { + return snapshot.getCronJobs().stream() + .map(job -> Op.saveCronJob( + new SaveCronJob().setJobConfig(job.getJobConfiguration()))); + } + return Stream.empty(); + } + }, + new SnapshotField() { + @Override + String getName() { + return SCHEDULER_METADATA_FIELD; + } + + @Override + void saveToSnapshot(StoreProvider store, Snapshot snapshot) { + // SchedulerMetadata is updated outside of the static list of SnapshotFields + } + + @Override + Stream<Op> doStreamFrom(Snapshot snapshot) { + if (snapshot.isSetSchedulerMetadata() + && snapshot.getSchedulerMetadata().isSetFrameworkId()) { + // No delete necessary here since this is a single value. + + return Stream.of(Op.saveFrameworkId( + new SaveFrameworkId().setId(snapshot.getSchedulerMetadata().getFrameworkId()))); + } + return Stream.empty(); + } + }, + new SnapshotField() { + @Override + String getName() { + return QUOTA_FIELD; + } + + @Override + void saveToSnapshot(StoreProvider store, Snapshot snapshot) { + ImmutableSet.Builder<QuotaConfiguration> quotas = ImmutableSet.builder(); + for (Map.Entry<String, IResourceAggregate> entry + : store.getQuotaStore().fetchQuotas().entrySet()) { + + quotas.add(new QuotaConfiguration(entry.getKey(), entry.getValue().newBuilder())); + } + + snapshot.setQuotaConfigurations(quotas.build()); + } + + @Override + Stream<Op> doStreamFrom(Snapshot snapshot) { + if (snapshot.getQuotaConfigurationsSize() > 0) { + return snapshot.getQuotaConfigurations().stream() + .map(quota -> Op.saveQuota(new SaveQuota() + .setRole(quota.getRole()) + .setQuota(quota.getQuota()))); + } + return Stream.empty(); + } + }, + new SnapshotField() { + @Override + String getName() { + return JOB_UPDATE_FIELD; + } + + @Override + void saveToSnapshot(StoreProvider store, Snapshot snapshot) { + snapshot.setJobUpdateDetails( + store.getJobUpdateStore().fetchJobUpdates(JobUpdateStore.MATCH_ALL).stream() + .map(u -> new StoredJobUpdateDetails().setDetails(u.newBuilder())) + .collect(Collectors.toSet())); + } + + @Override + Stream<Op> doStreamFrom(Snapshot snapshot) { + if (snapshot.getJobUpdateDetailsSize() > 0) { + return snapshot.getJobUpdateDetails().stream() + .flatMap(details -> { + Stream<Op> parent = Stream.of(Op.saveJobUpdate( + new SaveJobUpdate().setJobUpdate(details.getDetails().getUpdate()))); + Stream<Op> jobEvents; + if (details.getDetails().getUpdateEventsSize() > 0) { + jobEvents = details.getDetails().getUpdateEvents().stream() + .map(event -> Op.saveJobUpdateEvent( + new SaveJobUpdateEvent() + .setKey(details.getDetails().getUpdate().getSummary().getKey()) + .setEvent(event))); + } else { + jobEvents = Stream.empty(); + } + + Stream<Op> instanceEvents; + if (details.getDetails().getInstanceEventsSize() > 0) { + instanceEvents = details.getDetails().getInstanceEvents().stream() + .map(event -> Op.saveJobInstanceUpdateEvent( + new SaveJobInstanceUpdateEvent() + .setKey(details.getDetails().getUpdate().getSummary().getKey()) + .setEvent(event))); + } else { + instanceEvents = Stream.empty(); + } + + return Streams.concat(parent, jobEvents, instanceEvents); + }); + } + return Stream.empty(); + } + } + ); + + private final BuildInfo buildInfo; + private final Clock clock; + + @Inject + public SnapshotterImpl(BuildInfo buildInfo, Clock clock) { + this.buildInfo = requireNonNull(buildInfo); + this.clock = requireNonNull(clock); + } + + private Snapshot createSnapshot(StoreProvider storeProvider) { + Snapshot snapshot = new Snapshot(); + + // Capture timestamp to signify the beginning of a snapshot operation, apply after in case + // one of the field closures is mean and tries to apply a timestamp. + long timestamp = clock.nowMillis(); + for (SnapshotField field : snapshotFields) { + field.save(storeProvider, snapshot); + } + + SchedulerMetadata metadata = new SchedulerMetadata() + .setFrameworkId(storeProvider.getSchedulerStore().fetchFrameworkId().orElse(null)) + .setDetails(buildInfo.getProperties()); + + snapshot.setSchedulerMetadata(metadata); + snapshot.setTimestamp(timestamp); + return snapshot; + } + + @Timed("snapshot_create") + @Override + public Snapshot from(StoreProvider stores) { + return createSnapshot(stores); + } + + @Timed("snapshot_apply") + @Override + public Stream<Op> asStream(Snapshot snapshot) { + requireNonNull(snapshot); + + LOG.info("Restoring snapshot."); + return snapshotFields.stream() + .flatMap(field -> field.streamFrom(snapshot)); + } + + abstract class SnapshotField { + + abstract String getName(); + + abstract void saveToSnapshot(StoreProvider storeProvider, Snapshot snapshot); + + abstract Stream<Op> doStreamFrom(Snapshot snapshot); + + void save(StoreProvider storeProvider, Snapshot snapshot) { + stats.getUnchecked(SNAPSHOT_SAVE + getName()) + .time((Timeable.NoResult.Quiet) () -> saveToSnapshot(storeProvider, snapshot)); + } + + Stream<Op> streamFrom(Snapshot snapshot) { + return stats.getUnchecked(SNAPSHOT_RESTORE + getName()).time(() -> doStreamFrom(snapshot)); + } + } + + private final LoadingCache<String, SlidingStats> stats = CacheBuilder.newBuilder().build( + new CacheLoader<String, SlidingStats>() { + @Override + public SlidingStats load(String name) throws Exception { + return new SlidingStats(name, "nanos"); + } + }); +} http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java index 77fa904..63c338e 100644 --- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java +++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java @@ -76,12 +76,14 @@ import org.apache.aurora.scheduler.mesos.DriverSettings; import org.apache.aurora.scheduler.mesos.FrameworkInfoFactory; import org.apache.aurora.scheduler.mesos.TestExecutorSettings; import org.apache.aurora.scheduler.storage.backup.BackupModule; +import org.apache.aurora.scheduler.storage.durability.DurableStorageModule; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.entities.IServerInfo; import org.apache.aurora.scheduler.storage.log.EntrySerializer; -import org.apache.aurora.scheduler.storage.log.LogStorageModule; -import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl; +import org.apache.aurora.scheduler.storage.log.LogPersistenceModule; +import org.apache.aurora.scheduler.storage.log.SnapshotModule; +import org.apache.aurora.scheduler.storage.log.SnapshotterImpl; import org.apache.aurora.scheduler.storage.log.testing.LogOpMatcher; import org.apache.aurora.scheduler.storage.log.testing.LogOpMatcher.StreamMatcher; import org.apache.mesos.Protos; @@ -198,7 +200,7 @@ public class SchedulerIT extends BaseZooKeeperTest { BackupModule.Options backupOptions = new BackupModule.Options(); backupOptions.backupDir = backupDir; - install(new BackupModule(backupOptions, SnapshotStoreImpl.class)); + install(new BackupModule(backupOptions, SnapshotterImpl.class)); bind(IServerInfo.class).toInstance( IServerInfo.build( @@ -217,7 +219,9 @@ public class SchedulerIT extends BaseZooKeeperTest { ImmutableList.<Module>builder() .add(SchedulerMain.getUniversalModule(new CliOptions())) .add(new TierModule(TaskTestUtil.TIER_CONFIG)) - .add(new LogStorageModule(new LogStorageModule.Options())) + .add(new DurableStorageModule()) + .add(new LogPersistenceModule(new LogPersistenceModule.Options())) + .add(new SnapshotModule(new SnapshotModule.Options())) .add(new ServiceDiscoveryModule(zkClientConfig, SERVERSET_PATH)) .add(testModule) .build() http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java index 53a2315..f685d2e 100644 --- a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java +++ b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java @@ -171,8 +171,8 @@ public class CommandLineTest { expected.updater.enableAffinity = true; expected.updater.affinityExpiration = TEST_TIME; expected.state.taskAssignerModules = ImmutableList.of(NoopModule.class); - expected.logStorage.snapshotInterval = TEST_TIME; - expected.logStorage.maxLogEntrySize = TEST_DATA; + expected.snapshot.snapshotInterval = TEST_TIME; + expected.logPersistence.maxLogEntrySize = TEST_DATA; expected.backup.backupInterval = TEST_TIME; expected.backup.maxSavedBackups = 42; expected.backup.backupDir = new File("testing"); http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/test/java/org/apache/aurora/scheduler/storage/durability/RecoveryTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/durability/RecoveryTest.java b/src/test/java/org/apache/aurora/scheduler/storage/durability/RecoveryTest.java new file mode 100644 index 0000000..4cbd0cf --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/storage/durability/RecoveryTest.java @@ -0,0 +1,110 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.durability; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import com.google.common.collect.Lists; + +import org.apache.aurora.gen.storage.Op; +import org.apache.aurora.gen.storage.SaveQuota; +import org.apache.aurora.gen.storage.SaveTasks; +import org.apache.aurora.scheduler.storage.durability.Persistence.Edit; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class RecoveryTest { + + @Test + public void testRecover() { + ListPersistence from = new ListPersistence( + Edit.op(Op.saveQuota(new SaveQuota())), + Edit.op(Op.saveTasks(new SaveTasks()))); + ListPersistence to = new ListPersistence(); + + Recovery.copy(from, to, 100); + + assertEquals(from.edits, to.edits); + } + + @Test + public void testRecoverWithDeleteAll() { + ListPersistence from = new ListPersistence( + Edit.deleteAll(), + Edit.op(Op.saveQuota(new SaveQuota())), + Edit.op(Op.saveTasks(new SaveTasks()))); + ListPersistence to = new ListPersistence(); + + Recovery.copy(from, to, 100); + + assertEquals(from.edits.subList(1, from.edits.size()), to.edits); + } + + @Test + public void testRequiresEmptyTarget() { + ListPersistence from = new ListPersistence(); + ListPersistence to = new ListPersistence(Edit.op(Op.saveQuota(new SaveQuota()))); + + try { + Recovery.copy(from, to, 100); + fail(); + } catch (IllegalStateException e) { + // expected. + } + } + + @Test + public void testDeleteAllAfterFirstPosition() { + ListPersistence from = new ListPersistence( + Edit.op(Op.saveQuota(new SaveQuota())), + Edit.deleteAll(), + Edit.op(Op.saveTasks(new SaveTasks()))); + ListPersistence to = new ListPersistence(); + + try { + Recovery.copy(from, to, 100); + fail(); + } catch (IllegalStateException e) { + // expected + } + } + + private static class ListPersistence implements Persistence { + + private final List<Edit> edits; + + ListPersistence(Edit... edits) { + this.edits = Lists.newArrayList(edits); + } + + @Override + public void prepare() { + // no-op + } + + @Override + public Stream<Edit> recover() { + return edits.stream(); + } + + @Override + public void persist(Stream<Op> records) throws PersistenceException { + edits.addAll(records.map(Edit::op).collect(Collectors.toList())); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/test/java/org/apache/aurora/scheduler/storage/log/LogPersistenceTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogPersistenceTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogPersistenceTest.java index 3d6d555..a84e408 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogPersistenceTest.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogPersistenceTest.java @@ -45,7 +45,7 @@ import org.apache.aurora.scheduler.storage.Snapshotter; import org.apache.aurora.scheduler.storage.Storage.Volatile; import org.apache.aurora.scheduler.storage.durability.Persistence; import org.apache.aurora.scheduler.storage.durability.Persistence.Edit; -import org.apache.aurora.scheduler.storage.log.LogStorageModule.Options; +import org.apache.aurora.scheduler.storage.log.LogPersistenceModule.Options; import org.apache.aurora.scheduler.storage.mem.MemStorageModule; import org.apache.aurora.scheduler.testing.FakeStatsProvider; import org.junit.Before; @@ -67,7 +67,7 @@ public class LogPersistenceTest extends EasyMockTest { mockStream = createMock(Stream.class); Injector injector = Guice.createInjector( - new LogStorageModule(new Options()), + new LogPersistenceModule(new Options()), new MemStorageModule(Bindings.annotatedKeyFactory(Volatile.class)), new TierModule(TaskTestUtil.TIER_CONFIG), new AbstractModule() { @@ -77,7 +77,7 @@ public class LogPersistenceTest extends EasyMockTest { bind(EventSink.class).toInstance(e -> { }); bind(BuildInfo.class).toInstance(FakeBuildInfo.generateBuildInfo()); bind(Clock.class).toInstance(new FakeClock()); - bind(Snapshotter.class).to(SnapshotStoreImpl.class); + bind(Snapshotter.class).to(SnapshotterImpl.class); bind(Log.class).toInstance(mockLog); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java index ffd4167..3bd8bf6 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java @@ -27,7 +27,6 @@ import org.apache.aurora.common.application.ShutdownRegistry.ShutdownRegistryImp import org.apache.aurora.common.collections.Pair; import org.apache.aurora.common.inject.Bindings; import org.apache.aurora.common.quantity.Data; -import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.stats.StatsProvider; import org.apache.aurora.common.testing.TearDownTestCase; import org.apache.aurora.common.util.BuildInfo; @@ -36,7 +35,6 @@ import org.apache.aurora.common.util.testing.FakeBuildInfo; import org.apache.aurora.common.util.testing.FakeClock; import org.apache.aurora.scheduler.TierModule; import org.apache.aurora.scheduler.config.types.DataAmount; -import org.apache.aurora.scheduler.config.types.TimeAmount; import org.apache.aurora.scheduler.events.EventSink; import org.apache.aurora.scheduler.log.Log; import org.apache.aurora.scheduler.resources.ResourceTestUtil; @@ -46,8 +44,9 @@ import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet; import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage; import org.apache.aurora.scheduler.storage.Storage.StoreProvider; import org.apache.aurora.scheduler.storage.Storage.Volatile; +import org.apache.aurora.scheduler.storage.durability.DurableStorageModule; import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; -import org.apache.aurora.scheduler.storage.log.LogStorageModule.Options; +import org.apache.aurora.scheduler.storage.log.LogPersistenceModule.Options; import org.apache.aurora.scheduler.storage.mem.MemStorageModule; import org.apache.aurora.scheduler.testing.FakeStatsProvider; import org.junit.Before; @@ -74,12 +73,13 @@ public class NonVolatileStorageTest extends TearDownTestCase { Options options = new Options(); options.maxLogEntrySize = new DataAmount(1, Data.GB); - options.snapshotInterval = new TimeAmount(1, Time.DAYS); ShutdownRegistryImpl shutdownRegistry = new ShutdownRegistryImpl(); Injector injector = Guice.createInjector( new MemStorageModule(Bindings.annotatedKeyFactory(Volatile.class)), - new LogStorageModule(options), + new DurableStorageModule(), + new LogPersistenceModule(options), + new SnapshotModule(new SnapshotModule.Options()), new TierModule(new TierModule.Options()), new AbstractModule() { @Override @@ -90,7 +90,7 @@ public class NonVolatileStorageTest extends TearDownTestCase { bind(ShutdownRegistry.class).toInstance(shutdownRegistry); bind(StatsProvider.class).toInstance(new FakeStatsProvider()); bind(Log.class).toInstance(log); - bind(Snapshotter.class).to(SnapshotStoreImpl.class); + bind(Snapshotter.class).to(SnapshotterImpl.class); } } ); http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotServiceTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotServiceTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotServiceTest.java index 270453d..e37d566 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotServiceTest.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotServiceTest.java @@ -48,7 +48,8 @@ import org.apache.aurora.scheduler.storage.SnapshotStore; import org.apache.aurora.scheduler.storage.Snapshotter; import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage; import org.apache.aurora.scheduler.storage.Storage.Volatile; -import org.apache.aurora.scheduler.storage.log.LogStorageModule.Options; +import org.apache.aurora.scheduler.storage.durability.DurableStorageModule; +import org.apache.aurora.scheduler.storage.log.SnapshotModule.Options; import org.apache.aurora.scheduler.storage.mem.MemStorageModule; import org.apache.aurora.scheduler.testing.FakeStatsProvider; import org.easymock.IAnswer; @@ -84,7 +85,9 @@ public class SnapshotServiceTest extends EasyMockTest { Injector injector = Guice.createInjector( new SchedulerServicesModule(), - new LogStorageModule(options), + new LogPersistenceModule(new LogPersistenceModule.Options()), + new SnapshotModule(options), + new DurableStorageModule(), new MemStorageModule(Bindings.annotatedKeyFactory(Volatile.class)), new TierModule(TaskTestUtil.TIER_CONFIG), new AbstractModule() {
