Repository: aurora Updated Branches: refs/heads/master 4489dc345 -> 5f79f7ca7
http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotService.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotService.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotService.java new file mode 100644 index 0000000..b30de88 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotService.java @@ -0,0 +1,121 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.log; + +import javax.inject.Inject; + +import com.google.common.util.concurrent.AbstractScheduledService; + +import org.apache.aurora.codec.ThriftBinaryCodec.CodingException; +import org.apache.aurora.common.inject.TimedInterceptor.Timed; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.gen.storage.Snapshot; +import org.apache.aurora.scheduler.log.Log.Stream.InvalidPositionException; +import org.apache.aurora.scheduler.log.Log.Stream.StreamAccessException; +import org.apache.aurora.scheduler.storage.SnapshotStore; +import org.apache.aurora.scheduler.storage.Snapshotter; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; +import org.apache.aurora.scheduler.storage.Storage.StorageException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Objects.requireNonNull; + +/** + * A {@link SnapshotStore} that snapshots to the log, and automatically snapshots on + * a fixed interval. + */ +class SnapshotService extends AbstractScheduledService implements SnapshotStore { + private static final Logger LOG = LoggerFactory.getLogger(SnapshotService.class); + + private final Storage storage; + private final LogPersistence log; + private final Snapshotter snapshotter; + private final Amount<Long, Time> snapshotInterval; + + @Inject + SnapshotService(Storage storage, LogPersistence log, Snapshotter snapshotter, Settings settings) { + this.storage = requireNonNull(storage); + this.log = requireNonNull(log); + this.snapshotter = requireNonNull(snapshotter); + this.snapshotInterval = settings.getSnapshotInterval(); + } + + @Override + protected void runOneIteration() { + snapshot(); + } + + @Timed("scheduler_log_snapshot") + @Override + public void snapshot() throws StorageException { + try { + LOG.info("Creating snapshot"); + + // It's important to perform snapshot creation in a write lock to ensure all upstream callers + // are correctly synchronized (e.g. during backup creation). + storage.write((NoResult.Quiet) stores -> { + Snapshot snapshot = snapshotter.from(stores); + LOG.info("Saving snapshot"); + snapshotWith(snapshot); + + LOG.info("Snapshot complete." + + " host attrs: " + snapshot.getHostAttributesSize() + + ", cron jobs: " + snapshot.getCronJobsSize() + + ", quota confs: " + snapshot.getQuotaConfigurationsSize() + + ", tasks: " + snapshot.getTasksSize() + + ", updates: " + snapshot.getJobUpdateDetailsSize()); + }); + } catch (CodingException e) { + throw new StorageException("Failed to encode a snapshot", e); + } catch (InvalidPositionException e) { + throw new StorageException("Saved snapshot but failed to truncate entries preceding it", e); + } catch (StreamAccessException e) { + throw new StorageException("Failed to create a snapshot", e); + } + } + + @Timed("scheduler_log_snapshot_persist") + @Override + public void snapshotWith(Snapshot snapshot) + throws CodingException, InvalidPositionException, StreamAccessException { + + log.persist(snapshot); + } + + @Override + protected Scheduler scheduler() { + return Scheduler.newFixedDelaySchedule( + snapshotInterval.getValue(), + snapshotInterval.getValue(), + snapshotInterval.getUnit().getTimeUnit()); + } + + /** + * Configuration settings for log persistence. + */ + public static class Settings { + private final Amount<Long, Time> snapshotInterval; + + Settings(Amount<Long, Time> snapshotInterval) { + this.snapshotInterval = requireNonNull(snapshotInterval); + } + + public Amount<Long, Time> getSnapshotInterval() { + return snapshotInterval; + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java index 739fad7..5aefe5f 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java @@ -13,10 +13,11 @@ */ package org.apache.aurora.scheduler.storage.log; -import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.inject.Inject; @@ -24,36 +25,35 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; -import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Streams; import org.apache.aurora.common.inject.TimedInterceptor.Timed; import org.apache.aurora.common.stats.SlidingStats; import org.apache.aurora.common.stats.SlidingStats.Timeable; import org.apache.aurora.common.util.BuildInfo; import org.apache.aurora.common.util.Clock; -import org.apache.aurora.gen.HostAttributes; -import org.apache.aurora.gen.JobInstanceUpdateEvent; -import org.apache.aurora.gen.JobUpdateDetails; -import org.apache.aurora.gen.JobUpdateEvent; +import org.apache.aurora.gen.storage.Op; import org.apache.aurora.gen.storage.QuotaConfiguration; +import org.apache.aurora.gen.storage.SaveCronJob; +import org.apache.aurora.gen.storage.SaveFrameworkId; +import org.apache.aurora.gen.storage.SaveHostAttributes; +import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent; +import org.apache.aurora.gen.storage.SaveJobUpdate; +import org.apache.aurora.gen.storage.SaveJobUpdateEvent; +import org.apache.aurora.gen.storage.SaveQuota; +import org.apache.aurora.gen.storage.SaveTasks; import org.apache.aurora.gen.storage.SchedulerMetadata; import org.apache.aurora.gen.storage.Snapshot; import org.apache.aurora.gen.storage.StoredCronJob; import org.apache.aurora.gen.storage.StoredJobUpdateDetails; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.storage.JobUpdateStore; -import org.apache.aurora.scheduler.storage.SnapshotStore; -import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; -import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; -import org.apache.aurora.scheduler.storage.Storage.Volatile; -import org.apache.aurora.scheduler.storage.durability.ThriftBackfill; +import org.apache.aurora.scheduler.storage.Snapshotter; +import org.apache.aurora.scheduler.storage.Storage.StoreProvider; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; -import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent; -import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent; -import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.slf4j.Logger; @@ -65,7 +65,7 @@ import static java.util.Objects.requireNonNull; * Snapshot store implementation that delegates to underlying snapshot stores by * extracting/applying fields in a snapshot thrift struct. */ -public class SnapshotStoreImpl implements SnapshotStore<Snapshot> { +public class SnapshotStoreImpl implements Snapshotter { @VisibleForTesting static final String SNAPSHOT_SAVE = "snapshot_save_"; @@ -83,63 +83,62 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> { @VisibleForTesting Set<String> snapshotFieldNames() { - return FluentIterable.from(snapshotFields) - .transform(SnapshotField::getName) - .toSet(); + return snapshotFields.stream() + .map(SnapshotField::getName) + .collect(Collectors.toSet()); } - private final Iterable<SnapshotField> snapshotFields = Arrays.asList( + private final List<SnapshotField> snapshotFields = ImmutableList.of( new SnapshotField() { @Override - public String getName() { + String getName() { return HOST_ATTRIBUTES_FIELD; } @Override - public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) { + void saveToSnapshot(StoreProvider store, Snapshot snapshot) { snapshot.setHostAttributes( IHostAttributes.toBuildersSet(store.getAttributeStore().getHostAttributes())); } @Override - public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) { + Stream<Op> doStreamFrom(Snapshot snapshot) { if (snapshot.getHostAttributesSize() > 0) { - store.getAttributeStore().deleteHostAttributes(); - for (HostAttributes attributes : snapshot.getHostAttributes()) { - store.getAttributeStore().saveHostAttributes(IHostAttributes.build(attributes)); - } + return snapshot.getHostAttributes().stream() + .map(attributes -> Op.saveHostAttributes( + new SaveHostAttributes().setHostAttributes(attributes))); } + return Stream.empty(); } }, new SnapshotField() { @Override - public String getName() { + String getName() { return TASK_FIELD; } @Override - public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) { + void saveToSnapshot(StoreProvider store, Snapshot snapshot) { snapshot.setTasks( IScheduledTask.toBuildersSet(store.getTaskStore().fetchTasks(Query.unscoped()))); } @Override - public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) { + Stream<Op> doStreamFrom(Snapshot snapshot) { if (snapshot.getTasksSize() > 0) { - store.getUnsafeTaskStore().deleteAllTasks(); - store.getUnsafeTaskStore() - .saveTasks(thriftBackfill.backfillTasks(snapshot.getTasks())); + return Stream.of(Op.saveTasks(new SaveTasks().setTasks(snapshot.getTasks()))); } + return Stream.empty(); } }, new SnapshotField() { @Override - public String getName() { + String getName() { return CRON_FIELD; } @Override - public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) { + void saveToSnapshot(StoreProvider store, Snapshot snapshot) { ImmutableSet.Builder<StoredCronJob> jobs = ImmutableSet.builder(); for (IJobConfiguration config : store.getCronJobStore().fetchJobs()) { @@ -149,46 +148,46 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> { } @Override - public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) { + Stream<Op> doStreamFrom(Snapshot snapshot) { if (snapshot.getCronJobsSize() > 0) { - store.getCronJobStore().deleteJobs(); - for (StoredCronJob job : snapshot.getCronJobs()) { - store.getCronJobStore().saveAcceptedJob( - thriftBackfill.backfillJobConfiguration(job.getJobConfiguration())); - } + return snapshot.getCronJobs().stream() + .map(job -> Op.saveCronJob( + new SaveCronJob().setJobConfig(job.getJobConfiguration()))); } + return Stream.empty(); } }, new SnapshotField() { @Override - public String getName() { + String getName() { return SCHEDULER_METADATA_FIELD; } @Override - public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) { + void saveToSnapshot(StoreProvider store, Snapshot snapshot) { // SchedulerMetadata is updated outside of the static list of SnapshotFields } @Override - public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) { + Stream<Op> doStreamFrom(Snapshot snapshot) { if (snapshot.isSetSchedulerMetadata() && snapshot.getSchedulerMetadata().isSetFrameworkId()) { // No delete necessary here since this is a single value. - store.getSchedulerStore() - .saveFrameworkId(snapshot.getSchedulerMetadata().getFrameworkId()); + return Stream.of(Op.saveFrameworkId( + new SaveFrameworkId().setId(snapshot.getSchedulerMetadata().getFrameworkId()))); } + return Stream.empty(); } }, new SnapshotField() { @Override - public String getName() { + String getName() { return QUOTA_FIELD; } @Override - public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) { + void saveToSnapshot(StoreProvider store, Snapshot snapshot) { ImmutableSet.Builder<QuotaConfiguration> quotas = ImmutableSet.builder(); for (Map.Entry<String, IResourceAggregate> entry : store.getQuotaStore().fetchQuotas().entrySet()) { @@ -200,24 +199,24 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> { } @Override - public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) { + Stream<Op> doStreamFrom(Snapshot snapshot) { if (snapshot.getQuotaConfigurationsSize() > 0) { - store.getQuotaStore().deleteQuotas(); - for (QuotaConfiguration quota : snapshot.getQuotaConfigurations()) { - store.getQuotaStore() - .saveQuota(quota.getRole(), IResourceAggregate.build(quota.getQuota())); - } + return snapshot.getQuotaConfigurations().stream() + .map(quota -> Op.saveQuota(new SaveQuota() + .setRole(quota.getRole()) + .setQuota(quota.getQuota()))); } + return Stream.empty(); } }, new SnapshotField() { @Override - public String getName() { + String getName() { return JOB_UPDATE_FIELD; } @Override - public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) { + void saveToSnapshot(StoreProvider store, Snapshot snapshot) { snapshot.setJobUpdateDetails( store.getJobUpdateStore().fetchJobUpdates(JobUpdateStore.MATCH_ALL).stream() .map(u -> new StoredJobUpdateDetails().setDetails(u.newBuilder())) @@ -225,112 +224,101 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> { } @Override - public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) { + Stream<Op> doStreamFrom(Snapshot snapshot) { if (snapshot.getJobUpdateDetailsSize() > 0) { - JobUpdateStore.Mutable updateStore = store.getJobUpdateStore(); - updateStore.deleteAllUpdates(); - for (StoredJobUpdateDetails storedDetails : snapshot.getJobUpdateDetails()) { - JobUpdateDetails details = storedDetails.getDetails(); - updateStore.saveJobUpdate(thriftBackfill.backFillJobUpdate(details.getUpdate())); - - if (details.getUpdateEventsSize() > 0) { - for (JobUpdateEvent updateEvent : details.getUpdateEvents()) { - updateStore.saveJobUpdateEvent( - IJobUpdateKey.build(details.getUpdate().getSummary().getKey()), - IJobUpdateEvent.build(updateEvent)); - } - } - - if (details.getInstanceEventsSize() > 0) { - for (JobInstanceUpdateEvent instanceEvent : details.getInstanceEvents()) { - updateStore.saveJobInstanceUpdateEvent( - IJobUpdateKey.build(details.getUpdate().getSummary().getKey()), - IJobInstanceUpdateEvent.build(instanceEvent)); - } - } - } + return snapshot.getJobUpdateDetails().stream() + .flatMap(details -> { + Stream<Op> parent = Stream.of(Op.saveJobUpdate( + new SaveJobUpdate().setJobUpdate(details.getDetails().getUpdate()))); + Stream<Op> jobEvents; + if (details.getDetails().getUpdateEventsSize() > 0) { + jobEvents = details.getDetails().getUpdateEvents().stream() + .map(event -> Op.saveJobUpdateEvent( + new SaveJobUpdateEvent() + .setKey(details.getDetails().getUpdate().getSummary().getKey()) + .setEvent(event))); + } else { + jobEvents = Stream.empty(); + } + + Stream<Op> instanceEvents; + if (details.getDetails().getInstanceEventsSize() > 0) { + instanceEvents = details.getDetails().getInstanceEvents().stream() + .map(event -> Op.saveJobInstanceUpdateEvent( + new SaveJobInstanceUpdateEvent() + .setKey(details.getDetails().getUpdate().getSummary().getKey()) + .setEvent(event))); + } else { + instanceEvents = Stream.empty(); + } + + return Streams.concat(parent, jobEvents, instanceEvents); + }); } + return Stream.empty(); } } ); private final BuildInfo buildInfo; private final Clock clock; - private final Storage storage; - private final ThriftBackfill thriftBackfill; @Inject - public SnapshotStoreImpl( - BuildInfo buildInfo, - Clock clock, - @Volatile Storage storage, - ThriftBackfill thriftBackfill) { - + public SnapshotStoreImpl(BuildInfo buildInfo, Clock clock) { this.buildInfo = requireNonNull(buildInfo); this.clock = requireNonNull(clock); - this.storage = requireNonNull(storage); - this.thriftBackfill = requireNonNull(thriftBackfill); } - private Snapshot createSnapshot(Storage anyStorage) { - // It's important to perform snapshot creation in a write lock to ensure all upstream callers - // are correctly synchronized (e.g. during backup creation). - return anyStorage.write(storeProvider -> { - Snapshot snapshot = new Snapshot(); - - // Capture timestamp to signify the beginning of a snapshot operation, apply after in case - // one of the field closures is mean and tries to apply a timestamp. - long timestamp = clock.nowMillis(); - for (SnapshotField field : snapshotFields) { - field.save(storeProvider, snapshot); - } + private Snapshot createSnapshot(StoreProvider storeProvider) { + Snapshot snapshot = new Snapshot(); + + // Capture timestamp to signify the beginning of a snapshot operation, apply after in case + // one of the field closures is mean and tries to apply a timestamp. + long timestamp = clock.nowMillis(); + for (SnapshotField field : snapshotFields) { + field.save(storeProvider, snapshot); + } - SchedulerMetadata metadata = new SchedulerMetadata() - .setFrameworkId(storeProvider.getSchedulerStore().fetchFrameworkId().orNull()) - .setDetails(buildInfo.getProperties()); + SchedulerMetadata metadata = new SchedulerMetadata() + .setFrameworkId(storeProvider.getSchedulerStore().fetchFrameworkId().orNull()) + .setDetails(buildInfo.getProperties()); - snapshot.setSchedulerMetadata(metadata); - snapshot.setTimestamp(timestamp); - return snapshot; - }); + snapshot.setSchedulerMetadata(metadata); + snapshot.setTimestamp(timestamp); + return snapshot; } @Timed("snapshot_create") @Override - public Snapshot createSnapshot() { - return createSnapshot(storage); + public Snapshot from(StoreProvider stores) { + return createSnapshot(stores); } @Timed("snapshot_apply") @Override - public void applySnapshot(final Snapshot snapshot) { + public Stream<Op> asStream(Snapshot snapshot) { requireNonNull(snapshot); - storage.write((NoResult.Quiet) storeProvider -> { - LOG.info("Restoring snapshot."); - - for (SnapshotField field : snapshotFields) { - field.restore(storeProvider, snapshot); - } - }); + LOG.info("Restoring snapshot."); + return snapshotFields.stream() + .flatMap(field -> field.streamFrom(snapshot)); } abstract class SnapshotField { abstract String getName(); - abstract void saveToSnapshot(MutableStoreProvider storeProvider, Snapshot snapshot); + abstract void saveToSnapshot(StoreProvider storeProvider, Snapshot snapshot); - abstract void restoreFromSnapshot(MutableStoreProvider storeProvider, Snapshot snapshot); + abstract Stream<Op> doStreamFrom(Snapshot snapshot); - void save(MutableStoreProvider storeProvider, Snapshot snapshot) { + void save(StoreProvider storeProvider, Snapshot snapshot) { stats.getUnchecked(SNAPSHOT_SAVE + getName()) .time((Timeable.NoResult.Quiet) () -> saveToSnapshot(storeProvider, snapshot)); } - void restore(MutableStoreProvider storeProvider, Snapshot snapshot) { - stats.getUnchecked(SNAPSHOT_RESTORE + getName()) - .time((Timeable.NoResult.Quiet) () -> restoreFromSnapshot(storeProvider, snapshot)); + Stream<Op> streamFrom(Snapshot snapshot) { + return stats.getUnchecked(SNAPSHOT_RESTORE + getName()).time(() -> doStreamFrom(snapshot)); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java index 159fb29..1b003ab 100644 --- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java +++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java @@ -83,7 +83,7 @@ import org.apache.aurora.scheduler.state.MaintenanceController; import org.apache.aurora.scheduler.state.StateChangeResult; import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.state.UUIDGenerator; -import org.apache.aurora.scheduler.storage.DistributedSnapshotStore; +import org.apache.aurora.scheduler.storage.SnapshotStore; import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage; import org.apache.aurora.scheduler.storage.Storage.StoreProvider; @@ -170,7 +170,7 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin { private final ConfigurationManager configurationManager; private final Thresholds thresholds; private final NonVolatileStorage storage; - private final DistributedSnapshotStore snapshotStore; + private final SnapshotStore snapshotStore; private final StorageBackup backup; private final Recovery recovery; private final MaintenanceController maintenance; @@ -200,7 +200,7 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin { ConfigurationManager configurationManager, Thresholds thresholds, NonVolatileStorage storage, - DistributedSnapshotStore snapshotStore, + SnapshotStore snapshotStore, StorageBackup backup, Recovery recovery, CronJobManager cronJobManager, http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java b/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java index aeb8685..020f348 100644 --- a/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java +++ b/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java @@ -35,7 +35,7 @@ import org.apache.aurora.scheduler.config.CommandLine; import org.apache.aurora.scheduler.mesos.DriverFactory; import org.apache.aurora.scheduler.mesos.DriverSettings; import org.apache.aurora.scheduler.mesos.FrameworkInfoFactory; -import org.apache.aurora.scheduler.storage.DistributedSnapshotStore; +import org.apache.aurora.scheduler.storage.SnapshotStore; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage; import org.apache.mesos.SchedulerDriver; @@ -83,7 +83,7 @@ public final class LocalSchedulerMain { protected void configure() { bind(Storage.class).to(Key.get(Storage.class, Storage.Volatile.class)); bind(NonVolatileStorage.class).to(FakeNonVolatileStorage.class); - bind(DistributedSnapshotStore.class).toInstance(new DistributedSnapshotStore() { + bind(SnapshotStore.class).toInstance(new SnapshotStore() { @Override public void snapshot() throws Storage.StorageException { // no-op http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java index 5cb5310..53a2315 100644 --- a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java +++ b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java @@ -171,7 +171,6 @@ public class CommandLineTest { expected.updater.enableAffinity = true; expected.updater.affinityExpiration = TEST_TIME; expected.state.taskAssignerModules = ImmutableList.of(NoopModule.class); - expected.logStorage.shutdownGracePeriod = TEST_TIME; expected.logStorage.snapshotInterval = TEST_TIME; expected.logStorage.maxLogEntrySize = TEST_DATA; expected.backup.backupInterval = TEST_TIME; @@ -318,7 +317,6 @@ public class CommandLineTest { "-enable_update_affinity=true", "-update_affinity_reservation_hold_time=42days", "-task_assigner_modules=org.apache.aurora.scheduler.config.CommandLineTest$NoopModule", - "-dlog_shutdown_grace_period=42days", "-dlog_snapshot_interval=42days", "-dlog_max_entry_size=42GB", "-backup_interval=42days", http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java b/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java index 09560f4..ba03ff9 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java @@ -31,8 +31,8 @@ import org.apache.aurora.gen.storage.Snapshot; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.TaskTestUtil; import org.apache.aurora.scheduler.base.Tasks; -import org.apache.aurora.scheduler.storage.DistributedSnapshotStore; import org.apache.aurora.scheduler.storage.SnapshotStore; +import org.apache.aurora.scheduler.storage.Snapshotter; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; import org.apache.aurora.scheduler.storage.Storage.MutateWork; @@ -49,6 +49,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.capture; import static org.easymock.EasyMock.expect; import static org.junit.Assert.assertEquals; @@ -60,8 +61,8 @@ public class RecoveryTest extends EasyMockTest { private static final IScheduledTask TASK2 = TaskTestUtil.makeTask("task2", TaskTestUtil.JOB); private static final Snapshot SNAPSHOT1 = makeSnapshot(TASK1, TASK2); - private SnapshotStore<Snapshot> snapshotStore; - private DistributedSnapshotStore distributedStore; + private Snapshotter snapshotter; + private SnapshotStore distributedStore; private Storage primaryStorage; private MutableStoreProvider storeProvider; private Command shutDownNow; @@ -74,8 +75,8 @@ public class RecoveryTest extends EasyMockTest { @Before public void setUp() throws IOException { final File backupDir = temporaryFolder.newFolder(); - snapshotStore = createMock(new Clazz<SnapshotStore<Snapshot>>() { }); - distributedStore = createMock(DistributedSnapshotStore.class); + snapshotter = createMock(Snapshotter.class); + distributedStore = createMock(SnapshotStore.class); primaryStorage = createMock(Storage.class); storeProvider = createMock(MutableStoreProvider.class); shutDownNow = createMock(Command.class); @@ -84,7 +85,8 @@ public class RecoveryTest extends EasyMockTest { TemporaryStorageFactory factory = new TemporaryStorageFactory(TaskTestUtil.THRIFT_BACKFILL); storageBackup = new StorageBackupImpl( - snapshotStore, + primaryStorage, + snapshotter, clock, new BackupConfig(backupDir, 5, INTERVAL), executor); @@ -94,7 +96,7 @@ public class RecoveryTest extends EasyMockTest { @Test public void testRecover() throws Exception { - expect(snapshotStore.createSnapshot()).andReturn(SNAPSHOT1); + expect(snapshotter.from(anyObject())).andReturn(SNAPSHOT1); Capture<MutateWork<Object, Exception>> transaction = createCapture(); expect(primaryStorage.write(capture(transaction))).andReturn(null); Capture<Snapshot> snapshot = createCapture(); @@ -106,7 +108,7 @@ public class RecoveryTest extends EasyMockTest { assertEquals(ImmutableSet.of(), recovery.listBackups()); clock.advance(INTERVAL); - storageBackup.createSnapshot(); + storageBackup.from(storeProvider); String backup1 = storageBackup.createBackupName(); assertEquals(ImmutableSet.of(backup1), recovery.listBackups()); @@ -122,7 +124,7 @@ public class RecoveryTest extends EasyMockTest { @Test public void testModifySnapshotBeforeCommit() throws Exception { - expect(snapshotStore.createSnapshot()).andReturn(SNAPSHOT1); + expect(snapshotter.from(anyObject())).andReturn(SNAPSHOT1); Snapshot modified = SNAPSHOT1.deepCopy().setTasks(ImmutableSet.of(TASK1.newBuilder())); Capture<MutateWork<Object, Exception>> transaction = createCapture(); expect(primaryStorage.write(capture(transaction))).andReturn(null); @@ -133,7 +135,7 @@ public class RecoveryTest extends EasyMockTest { control.replay(); clock.advance(INTERVAL); - storageBackup.createSnapshot(); + storageBackup.from(storeProvider); String backup1 = storageBackup.createBackupName(); recovery.stage(backup1); assertEquals( http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/storage/backup/StorageBackupTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/backup/StorageBackupTest.java b/src/test/java/org/apache/aurora/scheduler/storage/backup/StorageBackupTest.java index fff376f..f0ba33f 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/backup/StorageBackupTest.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/backup/StorageBackupTest.java @@ -39,9 +39,12 @@ import org.apache.aurora.gen.storage.QuotaConfiguration; import org.apache.aurora.gen.storage.SchedulerMetadata; import org.apache.aurora.gen.storage.Snapshot; import org.apache.aurora.gen.storage.StoredCronJob; -import org.apache.aurora.scheduler.storage.SnapshotStore; +import org.apache.aurora.scheduler.storage.Snapshotter; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; import org.apache.aurora.scheduler.storage.backup.StorageBackup.StorageBackupImpl; import org.apache.aurora.scheduler.storage.backup.StorageBackup.StorageBackupImpl.BackupConfig; +import org.apache.aurora.scheduler.storage.mem.MemStorageModule; import org.apache.aurora.scheduler.testing.FakeScheduledExecutor; import org.junit.Before; import org.junit.Rule; @@ -49,6 +52,7 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import static org.apache.aurora.scheduler.resources.ResourceTestUtil.aggregate; +import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.expect; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -58,7 +62,8 @@ public class StorageBackupTest extends EasyMockTest { private static final int MAX_BACKUPS = 5; private static final Amount<Long, Time> INTERVAL = Amount.of(1L, Time.HOURS); - private SnapshotStore<Snapshot> delegate; + private Storage storage; + private Snapshotter delegate; private FakeClock clock; private BackupConfig config; private StorageBackupImpl storageBackup; @@ -67,29 +72,35 @@ public class StorageBackupTest extends EasyMockTest { @Before public void setUp() throws IOException { - delegate = createMock(new Clazz<SnapshotStore<Snapshot>>() { }); + storage = MemStorageModule.newEmptyStorage(); + delegate = createMock(Snapshotter.class); final File backupDir = temporaryFolder.newFolder(); ScheduledExecutorService executor = createMock(ScheduledExecutorService.class); clock = FakeScheduledExecutor.scheduleExecutor(executor); config = new BackupConfig(backupDir, MAX_BACKUPS, INTERVAL); clock.advance(Amount.of(365 * 30L, Time.DAYS)); - storageBackup = new StorageBackupImpl(delegate, clock, config, executor); + storageBackup = new StorageBackupImpl(storage, delegate, clock, config, executor); + } + + private void triggerSnapshot(Snapshot expectedResult) { + storage.write((NoResult.Quiet) stores -> + assertEquals(expectedResult, storageBackup.from(stores))); } @Test public void testBackup() throws Exception { Snapshot snapshot = makeSnapshot(); - expect(delegate.createSnapshot()).andReturn(snapshot).times(3); + expect(delegate.from(anyObject())).andReturn(snapshot).times(3); control.replay(); - assertEquals(snapshot, storageBackup.createSnapshot()); + triggerSnapshot(snapshot); assertBackupCount(0); clock.advance(Amount.of(INTERVAL.as(Time.MILLISECONDS) - 1, Time.MILLISECONDS)); - assertEquals(snapshot, storageBackup.createSnapshot()); + triggerSnapshot(snapshot); assertBackupCount(0); clock.advance(Amount.of(1L, Time.MILLISECONDS)); - assertEquals(snapshot, storageBackup.createSnapshot()); + triggerSnapshot(snapshot); assertBackupCount(1); assertEquals(1, storageBackup.getSuccesses().get()); @@ -104,34 +115,34 @@ public class StorageBackupTest extends EasyMockTest { @Test public void testDirectoryMissing() { Snapshot snapshot = makeSnapshot(); - expect(delegate.createSnapshot()).andReturn(snapshot).times(1); + expect(delegate.from(anyObject())).andReturn(snapshot).times(1); control.replay(); clock.advance(INTERVAL); config.getDir().delete(); - assertEquals(snapshot, storageBackup.createSnapshot()); + triggerSnapshot(snapshot); assertEquals(1, storageBackup.getFailures().get()); } @Test public void testOldBackupsDeleted() { Snapshot snapshot = makeSnapshot(); - expect(delegate.createSnapshot()).andReturn(snapshot).times(MAX_BACKUPS + 1); + expect(delegate.from(anyObject())).andReturn(snapshot).times(MAX_BACKUPS + 1); control.replay(); ImmutableList.Builder<String> nameBuilder = ImmutableList.builder(); for (int i = 0; i < MAX_BACKUPS; i++) { clock.advance(Amount.of(INTERVAL.as(Time.MILLISECONDS), Time.MILLISECONDS)); - assertEquals(snapshot, storageBackup.createSnapshot()); + triggerSnapshot(snapshot); nameBuilder.add(storageBackup.createBackupName()); assertBackupCount(i + 1); assertEquals(i + 1, storageBackup.getSuccesses().get()); } clock.advance(Amount.of(INTERVAL.as(Time.MILLISECONDS), Time.MILLISECONDS)); - assertEquals(snapshot, storageBackup.createSnapshot()); + triggerSnapshot(snapshot); nameBuilder.add(storageBackup.createBackupName()); assertBackupCount(MAX_BACKUPS); assertEquals(MAX_BACKUPS + 1, storageBackup.getSuccesses().get()); @@ -150,17 +161,17 @@ public class StorageBackupTest extends EasyMockTest { public void testInterval() { // Ensures that a long initial interval does not result in shortened subsequent intervals. Snapshot snapshot = makeSnapshot(); - expect(delegate.createSnapshot()).andReturn(snapshot).times(3); + expect(delegate.from(anyObject())).andReturn(snapshot).times(3); control.replay(); - assertEquals(snapshot, storageBackup.createSnapshot()); + triggerSnapshot(snapshot); assertBackupCount(0); clock.advance(Amount.of(INTERVAL.as(Time.MILLISECONDS) * 3, Time.MILLISECONDS)); - assertEquals(snapshot, storageBackup.createSnapshot()); + triggerSnapshot(snapshot); assertBackupCount(1); assertEquals(1, storageBackup.getSuccesses().get()); - assertEquals(snapshot, storageBackup.createSnapshot()); + triggerSnapshot(snapshot); assertBackupCount(1); assertEquals(1, storageBackup.getSuccesses().get()); } http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java index 3ad40ad..a6bf330 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java @@ -13,7 +13,6 @@ */ package org.apache.aurora.scheduler.storage.durability; -import java.util.EnumSet; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; @@ -74,6 +73,7 @@ import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; import org.apache.aurora.scheduler.storage.Storage.MutateWork; import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet; +import org.apache.aurora.scheduler.storage.durability.Persistence.Edit; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent; @@ -167,44 +167,49 @@ public class DurableStorageTest extends EasyMockTest { // Populate all Op types. buildReplayOps(); + storageUtil.expectStoreAccesses(); + control.replay(); durableStorage.prepare(); durableStorage.start(initializationLogic); assertTrue(initialized.get()); - - // Assert all Transaction types have handlers defined. - assertEquals( - EnumSet.allOf(Op._Fields.class), - EnumSet.copyOf(durableStorage.buildTransactionReplayActions().keySet())); } private void buildReplayOps() throws Exception { - ImmutableSet.Builder<Op> builder = ImmutableSet.builder(); + ImmutableSet.Builder<Edit> builder = ImmutableSet.builder(); - builder.add(Op.saveFrameworkId(new SaveFrameworkId("bob"))); + builder.add(Edit.op(Op.saveFrameworkId(new SaveFrameworkId("bob")))); storageUtil.schedulerStore.saveFrameworkId("bob"); JobConfiguration actualJob = new JobConfiguration().setTaskConfig(nonBackfilledConfig()); JobConfiguration expectedJob = new JobConfiguration().setTaskConfig(makeConfig(JOB_KEY).newBuilder()); SaveCronJob cronJob = new SaveCronJob().setJobConfig(actualJob); - builder.add(Op.saveCronJob(cronJob)); + builder.add(Edit.op(Op.saveCronJob(cronJob))); storageUtil.jobStore.saveAcceptedJob(IJobConfiguration.build(expectedJob)); RemoveJob removeJob = new RemoveJob(JOB_KEY.newBuilder()); - builder.add(Op.removeJob(removeJob)); + builder.add(Edit.op(Op.removeJob(removeJob))); storageUtil.jobStore.removeJob(JOB_KEY); ScheduledTask actualTask = makeTask("id", JOB_KEY).newBuilder(); actualTask.getAssignedTask().setTask(nonBackfilledConfig()); IScheduledTask expectedTask = makeTask("id", JOB_KEY); SaveTasks saveTasks = new SaveTasks(ImmutableSet.of(actualTask)); - builder.add(Op.saveTasks(saveTasks)); + builder.add(Edit.op(Op.saveTasks(saveTasks))); storageUtil.taskStore.saveTasks(ImmutableSet.of(expectedTask)); + // Side-effects from a storage reset, caused by a snapshot. + builder.add(Edit.deleteAll()); + storageUtil.jobStore.deleteJobs(); + storageUtil.taskStore.deleteAllTasks(); + storageUtil.quotaStore.deleteQuotas(); + storageUtil.attributeStore.deleteHostAttributes(); + storageUtil.jobUpdateStore.deleteAllUpdates(); + RemoveTasks removeTasks = new RemoveTasks(ImmutableSet.of("taskId1")); - builder.add(Op.removeTasks(removeTasks)); + builder.add(Edit.op(Op.removeTasks(removeTasks))); storageUtil.taskStore.deleteTasks(removeTasks.getTaskIds()); ResourceAggregate nonBackfilled = new ResourceAggregate() @@ -212,33 +217,33 @@ public class DurableStorageTest extends EasyMockTest { .setRamMb(32) .setDiskMb(64); SaveQuota saveQuota = new SaveQuota(JOB_KEY.getRole(), nonBackfilled); - builder.add(Op.saveQuota(saveQuota)); + builder.add(Edit.op(Op.saveQuota(saveQuota))); storageUtil.quotaStore.saveQuota( saveQuota.getRole(), IResourceAggregate.build(nonBackfilled.deepCopy() .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64))))); - builder.add(Op.removeQuota(new RemoveQuota(JOB_KEY.getRole()))); + builder.add(Edit.op(Op.removeQuota(new RemoveQuota(JOB_KEY.getRole())))); storageUtil.quotaStore.removeQuota(JOB_KEY.getRole()); // This entry lacks a slave ID, and should therefore be discarded. SaveHostAttributes hostAttributes1 = new SaveHostAttributes(new HostAttributes() .setHost("host1") .setMode(MaintenanceMode.DRAINED)); - builder.add(Op.saveHostAttributes(hostAttributes1)); + builder.add(Edit.op(Op.saveHostAttributes(hostAttributes1))); SaveHostAttributes hostAttributes2 = new SaveHostAttributes(new HostAttributes() .setHost("host2") .setSlaveId("slave2") .setMode(MaintenanceMode.DRAINED)); - builder.add(Op.saveHostAttributes(hostAttributes2)); + builder.add(Edit.op(Op.saveHostAttributes(hostAttributes2))); expect(storageUtil.attributeStore.saveHostAttributes( IHostAttributes.build(hostAttributes2.getHostAttributes()))).andReturn(true); - builder.add(Op.saveLock(new SaveLock())); + builder.add(Edit.op(Op.saveLock(new SaveLock()))); // TODO(jly): Deprecated, this is a no-op to be removed in 0.21. See AURORA-1959. - builder.add(Op.removeLock(new RemoveLock())); + builder.add(Edit.op(Op.removeLock(new RemoveLock()))); // TODO(jly): Deprecated, this is a no-op to be removed in 0.21. See AURORA-1959. JobUpdate actualUpdate = new JobUpdate() @@ -252,12 +257,12 @@ public class DurableStorageTest extends EasyMockTest { expectedUpdate.getInstructions().getInitialState() .forEach(e -> e.setTask(makeConfig(JOB_KEY).newBuilder())); SaveJobUpdate saveUpdate = new SaveJobUpdate().setJobUpdate(actualUpdate); - builder.add(Op.saveJobUpdate(saveUpdate)); + builder.add(Edit.op(Op.saveJobUpdate(saveUpdate))); storageUtil.jobUpdateStore.saveJobUpdate(IJobUpdate.build(expectedUpdate)); SaveJobUpdateEvent saveUpdateEvent = new SaveJobUpdateEvent(new JobUpdateEvent(), UPDATE_ID.newBuilder()); - builder.add(Op.saveJobUpdateEvent(saveUpdateEvent)); + builder.add(Edit.op(Op.saveJobUpdateEvent(saveUpdateEvent))); storageUtil.jobUpdateStore.saveJobUpdateEvent( UPDATE_ID, IJobUpdateEvent.build(saveUpdateEvent.getEvent())); @@ -265,16 +270,16 @@ public class DurableStorageTest extends EasyMockTest { SaveJobInstanceUpdateEvent saveInstanceEvent = new SaveJobInstanceUpdateEvent( new JobInstanceUpdateEvent(), UPDATE_ID.newBuilder()); - builder.add(Op.saveJobInstanceUpdateEvent(saveInstanceEvent)); + builder.add(Edit.op(Op.saveJobInstanceUpdateEvent(saveInstanceEvent))); storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent( UPDATE_ID, IJobInstanceUpdateEvent.build(saveInstanceEvent.getEvent())); - builder.add(Op.pruneJobUpdateHistory(new PruneJobUpdateHistory(5, 10L))); + builder.add(Edit.op(Op.pruneJobUpdateHistory(new PruneJobUpdateHistory(5, 10L)))); // No expectation - this op is ignored. - builder.add(Op.removeJobUpdate( - new RemoveJobUpdates().setKeys(ImmutableSet.of(UPDATE_ID.newBuilder())))); + builder.add(Edit.op(Op.removeJobUpdate( + new RemoveJobUpdates().setKeys(ImmutableSet.of(UPDATE_ID.newBuilder()))))); storageUtil.jobUpdateStore.removeJobUpdates(ImmutableSet.of(UPDATE_ID)); expect(persistence.recover()).andReturn(builder.build().stream()); http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorageTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorageTest.java deleted file mode 100644 index e8b564b..0000000 --- a/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorageTest.java +++ /dev/null @@ -1,166 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.storage.durability; - -import java.util.Set; - -import com.google.common.base.Function; -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableSet; - -import org.apache.aurora.common.testing.easymock.EasyMockTest; -import org.apache.aurora.gen.Attribute; -import org.apache.aurora.gen.HostAttributes; -import org.apache.aurora.gen.JobUpdateKey; -import org.apache.aurora.gen.MaintenanceMode; -import org.apache.aurora.gen.storage.Op; -import org.apache.aurora.gen.storage.SaveHostAttributes; -import org.apache.aurora.gen.storage.SaveTasks; -import org.apache.aurora.scheduler.base.TaskTestUtil; -import org.apache.aurora.scheduler.events.EventSink; -import org.apache.aurora.scheduler.events.PubsubEvent; -import org.apache.aurora.scheduler.storage.AttributeStore; -import org.apache.aurora.scheduler.storage.CronJobStore; -import org.apache.aurora.scheduler.storage.JobUpdateStore; -import org.apache.aurora.scheduler.storage.QuotaStore; -import org.apache.aurora.scheduler.storage.SchedulerStore; -import org.apache.aurora.scheduler.storage.TaskStore; -import org.apache.aurora.scheduler.storage.durability.DurableStorage.TransactionManager; -import org.apache.aurora.scheduler.storage.entities.IHostAttributes; -import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.LoggerFactory; - -import static org.easymock.EasyMock.expect; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -public class WriteAheadStorageTest extends EasyMockTest { - - private TransactionManager transactionManager; - private TaskStore.Mutable taskStore; - private AttributeStore.Mutable attributeStore; - private JobUpdateStore.Mutable jobUpdateStore; - private EventSink eventSink; - private WriteAheadStorage storage; - - @Before - public void setUp() { - transactionManager = createMock(TransactionManager.class); - taskStore = createMock(TaskStore.Mutable.class); - attributeStore = createMock(AttributeStore.Mutable.class); - jobUpdateStore = createMock(JobUpdateStore.Mutable.class); - eventSink = createMock(EventSink.class); - - storage = new WriteAheadStorage( - transactionManager, - createMock(SchedulerStore.Mutable.class), - createMock(CronJobStore.Mutable.class), - taskStore, - createMock(QuotaStore.Mutable.class), - attributeStore, - jobUpdateStore, - LoggerFactory.getLogger(WriteAheadStorageTest.class), - eventSink); - } - - private void expectOp(Op op) { - expect(transactionManager.hasActiveTransaction()).andReturn(true); - transactionManager.log(op); - } - - @Test - public void testRemoveUpdates() { - Set<IJobUpdateKey> removed = ImmutableSet.of( - IJobUpdateKey.build(new JobUpdateKey(TaskTestUtil.JOB.newBuilder(), "a")), - IJobUpdateKey.build(new JobUpdateKey(TaskTestUtil.JOB.newBuilder(), "b"))); - jobUpdateStore.removeJobUpdates(removed); - // No operation is written since this Op is in read-only compatibility mode. - - control.replay(); - - storage.removeJobUpdates(removed); - } - - @Test - public void testMutate() { - String taskId = "a"; - Function<IScheduledTask, IScheduledTask> mutator = - createMock(new Clazz<Function<IScheduledTask, IScheduledTask>>() { }); - Optional<IScheduledTask> mutated = Optional.of(TaskTestUtil.makeTask(taskId, TaskTestUtil.JOB)); - - expect(taskStore.mutateTask(taskId, mutator)).andReturn(mutated); - expectOp(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder())))); - - control.replay(); - - assertEquals(mutated, storage.mutateTask(taskId, mutator)); - } - - @Test - public void testSaveHostAttributes() { - IHostAttributes attributes = IHostAttributes.build( - new HostAttributes() - .setHost("a") - .setMode(MaintenanceMode.DRAINING) - .setAttributes(ImmutableSet.of( - new Attribute().setName("b").setValues(ImmutableSet.of("1", "2"))))); - - expect(attributeStore.saveHostAttributes(attributes)).andReturn(true); - expectOp(Op.saveHostAttributes( - new SaveHostAttributes().setHostAttributes(attributes.newBuilder()))); - eventSink.post(new PubsubEvent.HostAttributesChanged(attributes)); - - expect(attributeStore.saveHostAttributes(attributes)).andReturn(false); - - control.replay(); - - assertTrue(storage.saveHostAttributes(attributes)); - - assertFalse(storage.saveHostAttributes(attributes)); - } - - @Test(expected = UnsupportedOperationException.class) - public void testDeleteAllTasks() { - control.replay(); - storage.deleteAllTasks(); - } - - @Test(expected = UnsupportedOperationException.class) - public void testDeleteHostAttributes() { - control.replay(); - storage.deleteHostAttributes(); - } - - @Test(expected = UnsupportedOperationException.class) - public void testDeleteJobs() { - control.replay(); - storage.deleteJobs(); - } - - @Test(expected = UnsupportedOperationException.class) - public void testDeleteQuotas() { - control.replay(); - storage.deleteQuotas(); - } - - @Test(expected = UnsupportedOperationException.class) - public void testDeleteAllUpdatesAndEvents() { - control.replay(); - storage.deleteAllUpdates(); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteRecorderTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteRecorderTest.java b/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteRecorderTest.java new file mode 100644 index 0000000..1a89e83 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteRecorderTest.java @@ -0,0 +1,166 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.durability; + +import java.util.Set; + +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableSet; + +import org.apache.aurora.common.testing.easymock.EasyMockTest; +import org.apache.aurora.gen.Attribute; +import org.apache.aurora.gen.HostAttributes; +import org.apache.aurora.gen.JobUpdateKey; +import org.apache.aurora.gen.MaintenanceMode; +import org.apache.aurora.gen.storage.Op; +import org.apache.aurora.gen.storage.SaveHostAttributes; +import org.apache.aurora.gen.storage.SaveTasks; +import org.apache.aurora.scheduler.base.TaskTestUtil; +import org.apache.aurora.scheduler.events.EventSink; +import org.apache.aurora.scheduler.events.PubsubEvent; +import org.apache.aurora.scheduler.storage.AttributeStore; +import org.apache.aurora.scheduler.storage.CronJobStore; +import org.apache.aurora.scheduler.storage.JobUpdateStore; +import org.apache.aurora.scheduler.storage.QuotaStore; +import org.apache.aurora.scheduler.storage.SchedulerStore; +import org.apache.aurora.scheduler.storage.TaskStore; +import org.apache.aurora.scheduler.storage.durability.DurableStorage.TransactionManager; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.LoggerFactory; + +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class WriteRecorderTest extends EasyMockTest { + + private TransactionManager transactionManager; + private TaskStore.Mutable taskStore; + private AttributeStore.Mutable attributeStore; + private JobUpdateStore.Mutable jobUpdateStore; + private EventSink eventSink; + private WriteRecorder storage; + + @Before + public void setUp() { + transactionManager = createMock(TransactionManager.class); + taskStore = createMock(TaskStore.Mutable.class); + attributeStore = createMock(AttributeStore.Mutable.class); + jobUpdateStore = createMock(JobUpdateStore.Mutable.class); + eventSink = createMock(EventSink.class); + + storage = new WriteRecorder( + transactionManager, + createMock(SchedulerStore.Mutable.class), + createMock(CronJobStore.Mutable.class), + taskStore, + createMock(QuotaStore.Mutable.class), + attributeStore, + jobUpdateStore, + LoggerFactory.getLogger(WriteRecorderTest.class), + eventSink); + } + + private void expectOp(Op op) { + expect(transactionManager.hasActiveTransaction()).andReturn(true); + transactionManager.log(op); + } + + @Test + public void testRemoveUpdates() { + Set<IJobUpdateKey> removed = ImmutableSet.of( + IJobUpdateKey.build(new JobUpdateKey(TaskTestUtil.JOB.newBuilder(), "a")), + IJobUpdateKey.build(new JobUpdateKey(TaskTestUtil.JOB.newBuilder(), "b"))); + jobUpdateStore.removeJobUpdates(removed); + // No operation is written since this Op is in read-only compatibility mode. + + control.replay(); + + storage.removeJobUpdates(removed); + } + + @Test + public void testMutate() { + String taskId = "a"; + Function<IScheduledTask, IScheduledTask> mutator = + createMock(new Clazz<Function<IScheduledTask, IScheduledTask>>() { }); + Optional<IScheduledTask> mutated = Optional.of(TaskTestUtil.makeTask(taskId, TaskTestUtil.JOB)); + + expect(taskStore.mutateTask(taskId, mutator)).andReturn(mutated); + expectOp(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder())))); + + control.replay(); + + assertEquals(mutated, storage.mutateTask(taskId, mutator)); + } + + @Test + public void testSaveHostAttributes() { + IHostAttributes attributes = IHostAttributes.build( + new HostAttributes() + .setHost("a") + .setMode(MaintenanceMode.DRAINING) + .setAttributes(ImmutableSet.of( + new Attribute().setName("b").setValues(ImmutableSet.of("1", "2"))))); + + expect(attributeStore.saveHostAttributes(attributes)).andReturn(true); + expectOp(Op.saveHostAttributes( + new SaveHostAttributes().setHostAttributes(attributes.newBuilder()))); + eventSink.post(new PubsubEvent.HostAttributesChanged(attributes)); + + expect(attributeStore.saveHostAttributes(attributes)).andReturn(false); + + control.replay(); + + assertTrue(storage.saveHostAttributes(attributes)); + + assertFalse(storage.saveHostAttributes(attributes)); + } + + @Test(expected = UnsupportedOperationException.class) + public void testDeleteAllTasks() { + control.replay(); + storage.deleteAllTasks(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testDeleteHostAttributes() { + control.replay(); + storage.deleteHostAttributes(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testDeleteJobs() { + control.replay(); + storage.deleteJobs(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testDeleteQuotas() { + control.replay(); + storage.deleteQuotas(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testDeleteAllUpdatesAndEvents() { + control.replay(); + storage.deleteAllUpdates(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/storage/log/LogPersistenceTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogPersistenceTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogPersistenceTest.java new file mode 100644 index 0000000..3d6d555 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogPersistenceTest.java @@ -0,0 +1,134 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.log; + +import java.util.List; +import java.util.stream.Collectors; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; + +import org.apache.aurora.codec.ThriftBinaryCodec; +import org.apache.aurora.common.inject.Bindings; +import org.apache.aurora.common.stats.StatsProvider; +import org.apache.aurora.common.testing.easymock.EasyMockTest; +import org.apache.aurora.common.util.BuildInfo; +import org.apache.aurora.common.util.Clock; +import org.apache.aurora.common.util.testing.FakeBuildInfo; +import org.apache.aurora.common.util.testing.FakeClock; +import org.apache.aurora.gen.storage.LogEntry; +import org.apache.aurora.gen.storage.Op; +import org.apache.aurora.gen.storage.SaveTasks; +import org.apache.aurora.gen.storage.Snapshot; +import org.apache.aurora.gen.storage.Transaction; +import org.apache.aurora.scheduler.TierModule; +import org.apache.aurora.scheduler.base.TaskTestUtil; +import org.apache.aurora.scheduler.events.EventSink; +import org.apache.aurora.scheduler.log.Log; +import org.apache.aurora.scheduler.log.Log.Entry; +import org.apache.aurora.scheduler.log.Log.Stream; +import org.apache.aurora.scheduler.storage.Snapshotter; +import org.apache.aurora.scheduler.storage.Storage.Volatile; +import org.apache.aurora.scheduler.storage.durability.Persistence; +import org.apache.aurora.scheduler.storage.durability.Persistence.Edit; +import org.apache.aurora.scheduler.storage.log.LogStorageModule.Options; +import org.apache.aurora.scheduler.storage.mem.MemStorageModule; +import org.apache.aurora.scheduler.testing.FakeStatsProvider; +import org.junit.Before; +import org.junit.Test; + +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.assertEquals; + +public class LogPersistenceTest extends EasyMockTest { + + private Persistence persistence; + + private Log mockLog; + private Stream mockStream; + + @Before + public void setUp() { + mockLog = createMock(Log.class); + mockStream = createMock(Stream.class); + + Injector injector = Guice.createInjector( + new LogStorageModule(new Options()), + new MemStorageModule(Bindings.annotatedKeyFactory(Volatile.class)), + new TierModule(TaskTestUtil.TIER_CONFIG), + new AbstractModule() { + @Override + protected void configure() { + bind(StatsProvider.class).toInstance(new FakeStatsProvider()); + bind(EventSink.class).toInstance(e -> { }); + bind(BuildInfo.class).toInstance(FakeBuildInfo.generateBuildInfo()); + bind(Clock.class).toInstance(new FakeClock()); + bind(Snapshotter.class).to(SnapshotStoreImpl.class); + bind(Log.class).toInstance(mockLog); + } + } + ); + + persistence = injector.getInstance(Persistence.class); + } + + @Test + public void testRecoverEmpty() throws Exception { + expect(mockLog.open()).andReturn(mockStream); + List<Entry> empty = ImmutableList.of(); + expect(mockStream.readAll()).andReturn(empty.iterator()); + + control.replay(); + + persistence.prepare(); + assertEquals(ImmutableList.of(), persistence.recover().collect(Collectors.toList())); + } + + @Test + public void testRecoverSnapshot() throws Exception { + expect(mockLog.open()).andReturn(mockStream); + + Op saveA = Op.saveTasks(new SaveTasks().setTasks(ImmutableSet.of( + TaskTestUtil.makeTask("a", TaskTestUtil.JOB).newBuilder()))); + Op saveB = Op.saveTasks(new SaveTasks().setTasks(ImmutableSet.of( + TaskTestUtil.makeTask("b", TaskTestUtil.JOB).newBuilder()))); + Op saveC = Op.saveTasks(new SaveTasks().setTasks(ImmutableSet.of( + TaskTestUtil.makeTask("c", TaskTestUtil.JOB).newBuilder()))); + + List<Entry> entries = ImmutableList.of( + logEntry(LogEntry.transaction(new Transaction().setOps(ImmutableList.of(saveA)))), + logEntry(LogEntry.snapshot(new Snapshot().setTasks(saveB.getSaveTasks().getTasks()))), + logEntry(LogEntry.transaction(new Transaction().setOps(ImmutableList.of(saveC))))); + + expect(mockStream.readAll()).andReturn(entries.iterator()); + + control.replay(); + + persistence.prepare(); + assertEquals( + ImmutableList.of( + Edit.op(saveA), + Edit.deleteAll(), + Edit.op(saveB), + Edit.op(saveC)), + persistence.recover().collect(Collectors.toList())); + } + + private static Entry logEntry(LogEntry entry) { + return () -> ThriftBinaryCodec.encodeNonNull(entry); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java index eb966d7..fdde73d 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java @@ -21,7 +21,6 @@ import com.google.common.collect.Lists; import com.google.inject.AbstractModule; import com.google.inject.Guice; import com.google.inject.Injector; -import com.google.inject.TypeLiteral; import org.apache.aurora.common.application.ShutdownRegistry; import org.apache.aurora.common.application.ShutdownRegistry.ShutdownRegistryImpl; @@ -35,15 +34,14 @@ import org.apache.aurora.common.util.BuildInfo; import org.apache.aurora.common.util.Clock; import org.apache.aurora.common.util.testing.FakeBuildInfo; import org.apache.aurora.common.util.testing.FakeClock; -import org.apache.aurora.gen.storage.Snapshot; import org.apache.aurora.scheduler.TierModule; import org.apache.aurora.scheduler.config.types.DataAmount; import org.apache.aurora.scheduler.config.types.TimeAmount; import org.apache.aurora.scheduler.events.EventSink; import org.apache.aurora.scheduler.log.Log; import org.apache.aurora.scheduler.resources.ResourceTestUtil; -import org.apache.aurora.scheduler.storage.DistributedSnapshotStore; import org.apache.aurora.scheduler.storage.SnapshotStore; +import org.apache.aurora.scheduler.storage.Snapshotter; import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet; import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage; import org.apache.aurora.scheduler.storage.Storage.StoreProvider; @@ -62,7 +60,7 @@ public class NonVolatileStorageTest extends TearDownTestCase { private FakeLog log; private Runnable teardown = () -> { }; private NonVolatileStorage storage; - private DistributedSnapshotStore snapshotStore; + private SnapshotStore snapshotStore; @Before public void setUp() { @@ -92,12 +90,12 @@ public class NonVolatileStorageTest extends TearDownTestCase { bind(ShutdownRegistry.class).toInstance(shutdownRegistry); bind(StatsProvider.class).toInstance(new FakeStatsProvider()); bind(Log.class).toInstance(log); - bind(new TypeLiteral<SnapshotStore<Snapshot>>() { }).to(SnapshotStoreImpl.class); + bind(Snapshotter.class).to(SnapshotStoreImpl.class); } } ); storage = injector.getInstance(NonVolatileStorage.class); - snapshotStore = injector.getInstance(DistributedSnapshotStore.class); + snapshotStore = injector.getInstance(SnapshotStore.class); storage.prepare(); storage.start(w -> { }); http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotServiceTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotServiceTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotServiceTest.java new file mode 100644 index 0000000..270453d --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotServiceTest.java @@ -0,0 +1,174 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.log; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Key; + +import org.apache.aurora.GuavaUtils.ServiceManagerIface; +import org.apache.aurora.common.application.ShutdownRegistry.ShutdownRegistryImpl; +import org.apache.aurora.common.application.ShutdownStage; +import org.apache.aurora.common.base.Command; +import org.apache.aurora.common.inject.Bindings; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.stats.StatsProvider; +import org.apache.aurora.common.testing.easymock.EasyMockTest; +import org.apache.aurora.gen.storage.Snapshot; +import org.apache.aurora.scheduler.SchedulerLifecycle.SchedulerActive; +import org.apache.aurora.scheduler.SchedulerServicesModule; +import org.apache.aurora.scheduler.TierModule; +import org.apache.aurora.scheduler.base.TaskTestUtil; +import org.apache.aurora.scheduler.config.types.TimeAmount; +import org.apache.aurora.scheduler.events.EventSink; +import org.apache.aurora.scheduler.log.Log; +import org.apache.aurora.scheduler.log.Log.Entry; +import org.apache.aurora.scheduler.log.Log.Position; +import org.apache.aurora.scheduler.log.Log.Stream; +import org.apache.aurora.scheduler.storage.SnapshotStore; +import org.apache.aurora.scheduler.storage.Snapshotter; +import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage; +import org.apache.aurora.scheduler.storage.Storage.Volatile; +import org.apache.aurora.scheduler.storage.log.LogStorageModule.Options; +import org.apache.aurora.scheduler.storage.mem.MemStorageModule; +import org.apache.aurora.scheduler.testing.FakeStatsProvider; +import org.easymock.IAnswer; +import org.junit.Test; + +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; + +public class SnapshotServiceTest extends EasyMockTest { + + private static final Snapshot SNAPSHOT = new Snapshot().setTasks( + ImmutableSet.of(TaskTestUtil.makeTask("a", TaskTestUtil.JOB).newBuilder())); + + private NonVolatileStorage storage; + private SnapshotStore snapshotStore; + private ServiceManagerIface serviceManager; + + private Snapshotter mockSnapshotter; + private Log mockLog; + private Stream mockStream; + private Position mockPosition; + + private void setUp(Amount<Long, Time> snapshotInterval) { + mockSnapshotter = createMock(Snapshotter.class); + mockLog = createMock(Log.class); + mockStream = createMock(Stream.class); + mockPosition = createMock(Position.class); + + Options options = new Options(); + options.snapshotInterval = + new TimeAmount(snapshotInterval.getValue(), snapshotInterval.getUnit()); + + Injector injector = Guice.createInjector( + new SchedulerServicesModule(), + new LogStorageModule(options), + new MemStorageModule(Bindings.annotatedKeyFactory(Volatile.class)), + new TierModule(TaskTestUtil.TIER_CONFIG), + new AbstractModule() { + @Override + protected void configure() { + bind(Key.get(Command.class, ShutdownStage.class)).to(ShutdownRegistryImpl.class); + bind(StatsProvider.class).toInstance(new FakeStatsProvider()); + bind(EventSink.class).toInstance(e -> { }); + bind(Snapshotter.class).toInstance(mockSnapshotter); + bind(Log.class).toInstance(mockLog); + } + } + ); + + storage = injector.getInstance(NonVolatileStorage.class); + snapshotStore = injector.getInstance(SnapshotStore.class); + serviceManager = + injector.getInstance(Key.get(ServiceManagerIface.class, SchedulerActive.class)); + } + + private void expectStorageInitialized() throws Exception { + expect(mockLog.open()).andReturn(mockStream); + List<Entry> empty = ImmutableList.of(); + expect(mockStream.readAll()).andReturn(empty.iterator()); + } + + private void expectSnapshotPersist(CountDownLatch latch) { + expect(mockStream.append(anyObject())).andReturn(mockPosition).atLeastOnce(); + mockStream.truncateBefore(mockPosition); + expectLastCall().andAnswer((IAnswer<Void>) () -> { + latch.countDown(); + return null; + }).atLeastOnce(); + } + + @Test + public void testPeriodicSnapshots() throws Exception { + setUp(Amount.of(1L, Time.MILLISECONDS)); + + expectStorageInitialized(); + + expect(mockSnapshotter.from(anyObject())).andReturn(SNAPSHOT).atLeastOnce(); + + CountDownLatch snapshotCalled = new CountDownLatch(2); + expectSnapshotPersist(snapshotCalled); + + control.replay(); + + storage.prepare(); + storage.start(stores -> { }); + serviceManager.startAsync().awaitHealthy(); + + snapshotCalled.await(); + + serviceManager.stopAsync().awaitStopped(10, TimeUnit.SECONDS); + } + + @Test + public void testExplicitInternalSnapshot() throws Exception { + setUp(Amount.of(1L, Time.HOURS)); + + expectStorageInitialized(); + + expect(mockSnapshotter.from(anyObject())).andReturn(SNAPSHOT); + expectSnapshotPersist(new CountDownLatch(1)); + + control.replay(); + + storage.prepare(); + storage.start(stores -> { }); + snapshotStore.snapshot(); + } + + @Test + public void testExplicitProvidedSnapshot() throws Exception { + setUp(Amount.of(1L, Time.HOURS)); + + expectStorageInitialized(); + expectSnapshotPersist(new CountDownLatch(1)); + + control.replay(); + + storage.prepare(); + storage.start(stores -> { }); + snapshotStore.snapshotWith(SNAPSHOT); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java index 5634f92..2ad4e84 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java @@ -18,12 +18,8 @@ import java.util.Map; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.inject.AbstractModule; -import com.google.inject.Guice; -import com.google.inject.Injector; import org.apache.aurora.common.stats.Stats; -import org.apache.aurora.common.stats.StatsProvider; import org.apache.aurora.common.util.testing.FakeBuildInfo; import org.apache.aurora.common.util.testing.FakeClock; import org.apache.aurora.gen.Attribute; @@ -55,6 +51,9 @@ import org.apache.aurora.scheduler.base.JobKeys; import org.apache.aurora.scheduler.base.TaskTestUtil; import org.apache.aurora.scheduler.resources.ResourceBag; import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; +import org.apache.aurora.scheduler.storage.durability.Loader; +import org.apache.aurora.scheduler.storage.durability.Persistence.Edit; import org.apache.aurora.scheduler.storage.durability.ThriftBackfill; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; @@ -65,10 +64,10 @@ import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import org.apache.aurora.scheduler.storage.mem.MemStorageModule; -import org.apache.aurora.scheduler.testing.FakeStatsProvider; import org.junit.Test; import static org.apache.aurora.common.util.testing.FakeBuildInfo.generateBuildInfo; +import static org.apache.aurora.scheduler.base.TaskTestUtil.THRIFT_BACKFILL; import static org.apache.aurora.scheduler.resources.ResourceManager.aggregateFromBag; import static org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl.SNAPSHOT_RESTORE; import static org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl.SNAPSHOT_SAVE; @@ -80,35 +79,27 @@ public class SnapshotStoreImplIT { private static final long NOW = 10335463456L; private static final IJobKey JOB_KEY = JobKeys.from("role", "env", "job"); - private SnapshotStoreImpl snapshotStore; + private Storage storage; + private SnapshotStoreImpl snapshotter; private void setUpStore() { - Injector injector = Guice.createInjector( - new MemStorageModule(), - new AbstractModule() { - @Override - protected void configure() { - bind(StatsProvider.class).toInstance(new FakeStatsProvider()); - } - }); - + storage = MemStorageModule.newEmptyStorage(); FakeClock clock = new FakeClock(); clock.setNowMillis(NOW); - snapshotStore = new SnapshotStoreImpl( - generateBuildInfo(), - clock, - injector.getInstance(Storage.class), - TaskTestUtil.THRIFT_BACKFILL); + snapshotter = new SnapshotStoreImpl(generateBuildInfo(), clock); Stats.flush(); } @Test public void testBackfill() { setUpStore(); - snapshotStore.applySnapshot(makeNonBackfilled()); + storage.write((NoResult.Quiet) stores -> + Loader.load( + stores, + THRIFT_BACKFILL, + snapshotter.asStream(makeNonBackfilled()).map(Edit::op))); - Snapshot backfilled = snapshotStore.createSnapshot(); - assertEquals(expected(), backfilled); + assertEquals(expected(), storage.write(snapshotter::from)); assertSnapshotRestoreStats(1L); assertSnapshotSaveStats(1L); } @@ -183,14 +174,14 @@ public class SnapshotStoreImplIT { } private void assertSnapshotSaveStats(long count) { - for (String stat : snapshotStore.snapshotFieldNames()) { + for (String stat : snapshotter.snapshotFieldNames()) { assertEquals(count, Stats.getVariable(SNAPSHOT_SAVE + stat + "_events").read()); assertNotNull(Stats.getVariable(SNAPSHOT_SAVE + stat + "_nanos_total")); } } private void assertSnapshotRestoreStats(long count) { - for (String stat : snapshotStore.snapshotFieldNames()) { + for (String stat : snapshotter.snapshotFieldNames()) { assertEquals(count, Stats.getVariable(SNAPSHOT_RESTORE + stat + "_events").read()); assertNotNull(Stats.getVariable(SNAPSHOT_RESTORE + stat + "_nanos_total")); } http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java b/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java index fd81bff..64fbb54 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java @@ -83,9 +83,9 @@ public class StorageTestUtil { } /** - * Expects any number of read or write operations. + * Expects any number of calls to fetch individual stores. */ - public void expectOperations() { + public void expectStoreAccesses() { expect(storeProvider.getTaskStore()).andReturn(taskStore).anyTimes(); expect(storeProvider.getQuotaStore()).andReturn(quotaStore).anyTimes(); expect(storeProvider.getAttributeStore()).andReturn(attributeStore).anyTimes(); @@ -99,6 +99,13 @@ public class StorageTestUtil { expect(mutableStoreProvider.getCronJobStore()).andReturn(jobStore).anyTimes(); expect(mutableStoreProvider.getSchedulerStore()).andReturn(schedulerStore).anyTimes(); expect(mutableStoreProvider.getJobUpdateStore()).andReturn(jobUpdateStore).anyTimes(); + } + + /** + * Expects any number of read or write operations. + */ + public void expectOperations() { + expectStoreAccesses(); expectRead().anyTimes(); expectWrite().anyTimes(); } http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java index 919ac14..040baf9 100644 --- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java +++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java @@ -84,7 +84,7 @@ import org.apache.aurora.scheduler.state.MaintenanceController; import org.apache.aurora.scheduler.state.StateChangeResult; import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.state.UUIDGenerator; -import org.apache.aurora.scheduler.storage.DistributedSnapshotStore; +import org.apache.aurora.scheduler.storage.SnapshotStore; import org.apache.aurora.scheduler.storage.Storage.StorageException; import org.apache.aurora.scheduler.storage.backup.Recovery; import org.apache.aurora.scheduler.storage.backup.StorageBackup; @@ -178,7 +178,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest { ImmutableSet.of(new Metadata("k1", "v1"), new Metadata("k2", "v2")); private StorageTestUtil storageUtil; - private DistributedSnapshotStore snapshotStore; + private SnapshotStore snapshotStore; private StorageBackup backup; private Recovery recovery; private MaintenanceController maintenance; @@ -197,7 +197,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest { public void setUp() throws Exception { storageUtil = new StorageTestUtil(this); storageUtil.expectOperations(); - snapshotStore = createMock(DistributedSnapshotStore.class); + snapshotStore = createMock(SnapshotStore.class); backup = createMock(StorageBackup.class); recovery = createMock(Recovery.class); maintenance = createMock(MaintenanceController.class); http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java index bb0fd89..231fd8d 100644 --- a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java +++ b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java @@ -62,7 +62,7 @@ import org.apache.aurora.scheduler.quota.QuotaModule; import org.apache.aurora.scheduler.resources.ResourceTestUtil; import org.apache.aurora.scheduler.resources.ResourceType; import org.apache.aurora.scheduler.stats.StatsModule; -import org.apache.aurora.scheduler.storage.DistributedSnapshotStore; +import org.apache.aurora.scheduler.storage.SnapshotStore; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage; import org.apache.aurora.scheduler.storage.backup.Recovery; @@ -136,7 +136,7 @@ public class ThriftIT extends EasyMockTest { bind(FrameworkInfoFactoryImpl.class).in(Singleton.class); bindMock(Recovery.class); bindMock(StorageBackup.class); - bindMock(DistributedSnapshotStore.class); + bindMock(SnapshotStore.class); bind(IServerInfo.class).toInstance(SERVER_INFO); }
