http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java new file mode 100644 index 0000000..07912b6 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java @@ -0,0 +1,781 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.durability; + +import java.util.EnumSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import com.google.common.base.Function; +import com.google.common.base.Functions; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +import org.apache.aurora.common.testing.easymock.EasyMockTest; +import org.apache.aurora.gen.AssignedTask; +import org.apache.aurora.gen.Attribute; +import org.apache.aurora.gen.HostAttributes; +import org.apache.aurora.gen.InstanceTaskConfig; +import org.apache.aurora.gen.JobConfiguration; +import org.apache.aurora.gen.JobInstanceUpdateEvent; +import org.apache.aurora.gen.JobUpdate; +import org.apache.aurora.gen.JobUpdateAction; +import org.apache.aurora.gen.JobUpdateEvent; +import org.apache.aurora.gen.JobUpdateInstructions; +import org.apache.aurora.gen.JobUpdateKey; +import org.apache.aurora.gen.JobUpdateSettings; +import org.apache.aurora.gen.JobUpdateStatus; +import org.apache.aurora.gen.JobUpdateSummary; +import org.apache.aurora.gen.MaintenanceMode; +import org.apache.aurora.gen.Range; +import org.apache.aurora.gen.ResourceAggregate; +import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.gen.ScheduledTask; +import org.apache.aurora.gen.TaskConfig; +import org.apache.aurora.gen.storage.Op; +import org.apache.aurora.gen.storage.PruneJobUpdateHistory; +import org.apache.aurora.gen.storage.RemoveJob; +import org.apache.aurora.gen.storage.RemoveJobUpdates; +import org.apache.aurora.gen.storage.RemoveQuota; +import org.apache.aurora.gen.storage.RemoveTasks; +import org.apache.aurora.gen.storage.SaveCronJob; +import org.apache.aurora.gen.storage.SaveFrameworkId; +import org.apache.aurora.gen.storage.SaveHostAttributes; +import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent; +import org.apache.aurora.gen.storage.SaveJobUpdate; +import org.apache.aurora.gen.storage.SaveJobUpdateEvent; +import org.apache.aurora.gen.storage.SaveQuota; +import org.apache.aurora.gen.storage.SaveTasks; +import org.apache.aurora.scheduler.base.JobKeys; +import org.apache.aurora.scheduler.base.TaskTestUtil; +import org.apache.aurora.scheduler.base.Tasks; +import org.apache.aurora.scheduler.events.EventSink; +import org.apache.aurora.scheduler.events.PubsubEvent; +import org.apache.aurora.scheduler.resources.ResourceTestUtil; +import org.apache.aurora.scheduler.storage.AttributeStore; +import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; +import org.apache.aurora.scheduler.storage.Storage.MutateWork; +import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; +import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; +import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent; +import org.apache.aurora.scheduler.storage.entities.IJobKey; +import org.apache.aurora.scheduler.storage.entities.IJobUpdate; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; +import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.aurora.gen.Resource.diskMb; +import static org.apache.aurora.gen.Resource.numCpus; +import static org.apache.aurora.gen.Resource.ramMb; +import static org.apache.aurora.scheduler.base.TaskTestUtil.makeConfig; +import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.capture; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class DurableStorageTest extends EasyMockTest { + + private static final IJobKey JOB_KEY = JobKeys.from("role", "env", "name"); + private static final IJobUpdateKey UPDATE_ID = + IJobUpdateKey.build(new JobUpdateKey(JOB_KEY.newBuilder(), "testUpdateId")); + + private DurableStorage durableStorage; + private Persistence persistence; + private StorageTestUtil storageUtil; + private EventSink eventSink; + + @Before + public void setUp() { + persistence = createMock(Persistence.class); + storageUtil = new StorageTestUtil(this); + eventSink = createMock(EventSink.class); + + durableStorage = new DurableStorage( + persistence, + storageUtil.storage, + storageUtil.schedulerStore, + storageUtil.jobStore, + storageUtil.taskStore, + storageUtil.quotaStore, + storageUtil.attributeStore, + storageUtil.jobUpdateStore, + eventSink, + new ReentrantLock(), + TaskTestUtil.THRIFT_BACKFILL); + + storageUtil.storage.prepare(); + } + + @Test + public void testStart() throws Exception { + // We should initialize persistence. + persistence.prepare(); + + // Our start should recover persistence and then forward to the underlying storage start of the + // supplied initialization logic. + AtomicBoolean initialized = new AtomicBoolean(false); + MutateWork.NoResult.Quiet initializationLogic = provider -> { + // Creating a mock and expecting apply(storeProvider) does not work here for whatever + // reason. + initialized.set(true); + }; + + Capture<MutateWork.NoResult.Quiet> recoverAndInitializeWork = createCapture(); + storageUtil.storage.write(capture(recoverAndInitializeWork)); + expectLastCall().andAnswer(() -> { + recoverAndInitializeWork.getValue().apply(storageUtil.mutableStoreProvider); + return null; + }); + + Capture<MutateWork<Void, RuntimeException>> initializationWork = createCapture(); + expect(storageUtil.storage.write(capture(initializationWork))).andAnswer( + () -> { + initializationWork.getValue().apply(storageUtil.mutableStoreProvider); + return null; + }); + + // Populate all Op types. + buildReplayOps(); + + control.replay(); + + durableStorage.prepare(); + durableStorage.start(initializationLogic); + assertTrue(initialized.get()); + + // Assert all Transaction types have handlers defined. + assertEquals( + EnumSet.allOf(Op._Fields.class), + EnumSet.copyOf(durableStorage.buildTransactionReplayActions().keySet())); + } + + private void buildReplayOps() throws Exception { + ImmutableSet.Builder<Op> builder = ImmutableSet.builder(); + + builder.add(Op.saveFrameworkId(new SaveFrameworkId("bob"))); + storageUtil.schedulerStore.saveFrameworkId("bob"); + + JobConfiguration actualJob = new JobConfiguration().setTaskConfig(nonBackfilledConfig()); + JobConfiguration expectedJob = + new JobConfiguration().setTaskConfig(makeConfig(JOB_KEY).newBuilder()); + SaveCronJob cronJob = new SaveCronJob().setJobConfig(actualJob); + builder.add(Op.saveCronJob(cronJob)); + storageUtil.jobStore.saveAcceptedJob(IJobConfiguration.build(expectedJob)); + + RemoveJob removeJob = new RemoveJob(JOB_KEY.newBuilder()); + builder.add(Op.removeJob(removeJob)); + storageUtil.jobStore.removeJob(JOB_KEY); + + ScheduledTask actualTask = makeTask("id", JOB_KEY).newBuilder(); + actualTask.getAssignedTask().setTask(nonBackfilledConfig()); + IScheduledTask expectedTask = makeTask("id", JOB_KEY); + SaveTasks saveTasks = new SaveTasks(ImmutableSet.of(actualTask)); + builder.add(Op.saveTasks(saveTasks)); + storageUtil.taskStore.saveTasks(ImmutableSet.of(expectedTask)); + + RemoveTasks removeTasks = new RemoveTasks(ImmutableSet.of("taskId1")); + builder.add(Op.removeTasks(removeTasks)); + storageUtil.taskStore.deleteTasks(removeTasks.getTaskIds()); + + ResourceAggregate nonBackfilled = new ResourceAggregate() + .setNumCpus(1.0) + .setRamMb(32) + .setDiskMb(64); + SaveQuota saveQuota = new SaveQuota(JOB_KEY.getRole(), nonBackfilled); + builder.add(Op.saveQuota(saveQuota)); + storageUtil.quotaStore.saveQuota( + saveQuota.getRole(), + IResourceAggregate.build(nonBackfilled.deepCopy() + .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64))))); + + builder.add(Op.removeQuota(new RemoveQuota(JOB_KEY.getRole()))); + storageUtil.quotaStore.removeQuota(JOB_KEY.getRole()); + + // This entry lacks a slave ID, and should therefore be discarded. + SaveHostAttributes hostAttributes1 = new SaveHostAttributes(new HostAttributes() + .setHost("host1") + .setMode(MaintenanceMode.DRAINED)); + builder.add(Op.saveHostAttributes(hostAttributes1)); + + SaveHostAttributes hostAttributes2 = new SaveHostAttributes(new HostAttributes() + .setHost("host2") + .setSlaveId("slave2") + .setMode(MaintenanceMode.DRAINED)); + builder.add(Op.saveHostAttributes(hostAttributes2)); + expect(storageUtil.attributeStore.saveHostAttributes( + IHostAttributes.build(hostAttributes2.getHostAttributes()))).andReturn(true); + + JobUpdate actualUpdate = new JobUpdate() + .setSummary(new JobUpdateSummary().setKey(UPDATE_ID.newBuilder())) + .setInstructions(new JobUpdateInstructions() + .setInitialState( + ImmutableSet.of(new InstanceTaskConfig().setTask(nonBackfilledConfig()))) + .setDesiredState(new InstanceTaskConfig().setTask(nonBackfilledConfig()))); + JobUpdate expectedUpdate = actualUpdate.deepCopy(); + expectedUpdate.getInstructions().getDesiredState().setTask(makeConfig(JOB_KEY).newBuilder()); + expectedUpdate.getInstructions().getInitialState() + .forEach(e -> e.setTask(makeConfig(JOB_KEY).newBuilder())); + SaveJobUpdate saveUpdate = new SaveJobUpdate().setJobUpdate(actualUpdate); + builder.add(Op.saveJobUpdate(saveUpdate)); + storageUtil.jobUpdateStore.saveJobUpdate(IJobUpdate.build(expectedUpdate)); + + SaveJobUpdateEvent saveUpdateEvent = + new SaveJobUpdateEvent(new JobUpdateEvent(), UPDATE_ID.newBuilder()); + builder.add(Op.saveJobUpdateEvent(saveUpdateEvent)); + storageUtil.jobUpdateStore.saveJobUpdateEvent( + UPDATE_ID, + IJobUpdateEvent.build(saveUpdateEvent.getEvent())); + + SaveJobInstanceUpdateEvent saveInstanceEvent = new SaveJobInstanceUpdateEvent( + new JobInstanceUpdateEvent(), + UPDATE_ID.newBuilder()); + builder.add(Op.saveJobInstanceUpdateEvent(saveInstanceEvent)); + storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent( + UPDATE_ID, + IJobInstanceUpdateEvent.build(saveInstanceEvent.getEvent())); + + builder.add(Op.pruneJobUpdateHistory(new PruneJobUpdateHistory(5, 10L))); + // No expectation - this op is ignored. + + builder.add(Op.removeJobUpdate( + new RemoveJobUpdates().setKeys(ImmutableSet.of(UPDATE_ID.newBuilder())))); + storageUtil.jobUpdateStore.removeJobUpdates(ImmutableSet.of(UPDATE_ID)); + + expect(persistence.recover()).andReturn(builder.build().stream()); + } + + private TaskConfig nonBackfilledConfig() { + // When more fields have to be backfilled + // modify this method. + return makeConfig(JOB_KEY).newBuilder(); + } + + abstract class AbstractStorageFixture { + private final AtomicBoolean runCalled = new AtomicBoolean(false); + + AbstractStorageFixture() { + // Prevent otherwise silent noop tests that forget to call run(). + addTearDown(new TearDown() { + @Override + public void tearDown() { + assertTrue(runCalled.get()); + } + }); + } + + void run() throws Exception { + runCalled.set(true); + + // Expect basic start operations. + + // Initialize persistence. + persistence.prepare(); + + // Replay the ops and perform and supplied initializationWork. + // Simulate NOOP initialization work + // Creating a mock and expecting apply(storeProvider) does not work here for whatever + // reason. + MutateWork.NoResult.Quiet initializationLogic = storeProvider -> { + // No-op. + }; + + Capture<MutateWork.NoResult.Quiet> recoverAndInitializeWork = createCapture(); + storageUtil.storage.write(capture(recoverAndInitializeWork)); + expectLastCall().andAnswer(() -> { + recoverAndInitializeWork.getValue().apply(storageUtil.mutableStoreProvider); + return null; + }); + + expect(persistence.recover()).andReturn(Stream.empty()); + Capture<MutateWork<Void, RuntimeException>> recoveryWork = createCapture(); + expect(storageUtil.storage.write(capture(recoveryWork))).andAnswer( + () -> { + recoveryWork.getValue().apply(storageUtil.mutableStoreProvider); + return null; + }); + + // Setup custom test expectations. + setupExpectations(); + + control.replay(); + + // Start the system. + durableStorage.prepare(); + durableStorage.start(initializationLogic); + + // Exercise the system. + runTest(); + } + + protected void setupExpectations() throws Exception { + // Default to no expectations. + } + + protected abstract void runTest(); + } + + abstract class AbstractMutationFixture extends AbstractStorageFixture { + @Override + protected void runTest() { + durableStorage.write((Quiet) AbstractMutationFixture.this::performMutations); + } + + protected abstract void performMutations(MutableStoreProvider storeProvider); + } + + private void expectPersist(Op op, Op... ops) { + try { + // Workaround for comparing streams. + persistence.persist(anyObject()); + expectLastCall().andAnswer((IAnswer<Void>) () -> { + assertEquals( + ImmutableList.<Op>builder().add(op).add(ops).build(), + ((Stream<Op>) EasyMock.getCurrentArguments()[0]).collect(Collectors.toList())); + + return null; + }); + } catch (Persistence.PersistenceException e) { + throw new RuntimeException(e); + } + } + + @Test + public void testSaveFrameworkId() throws Exception { + String frameworkId = "bob"; + new AbstractMutationFixture() { + @Override + protected void setupExpectations() throws Exception { + storageUtil.expectWrite(); + storageUtil.schedulerStore.saveFrameworkId(frameworkId); + expectPersist(Op.saveFrameworkId(new SaveFrameworkId(frameworkId))); + } + + @Override + protected void performMutations(MutableStoreProvider storeProvider) { + storeProvider.getSchedulerStore().saveFrameworkId(frameworkId); + } + }.run(); + } + + @Test + public void testSaveAcceptedJob() throws Exception { + IJobConfiguration jobConfig = + IJobConfiguration.build(new JobConfiguration().setKey(JOB_KEY.newBuilder())); + new AbstractMutationFixture() { + @Override + protected void setupExpectations() throws Exception { + storageUtil.expectWrite(); + storageUtil.jobStore.saveAcceptedJob(jobConfig); + expectPersist(Op.saveCronJob(new SaveCronJob(jobConfig.newBuilder()))); + } + + @Override + protected void performMutations(MutableStoreProvider storeProvider) { + storeProvider.getCronJobStore().saveAcceptedJob(jobConfig); + } + }.run(); + } + + @Test + public void testRemoveJob() throws Exception { + new AbstractMutationFixture() { + @Override + protected void setupExpectations() throws Exception { + storageUtil.expectWrite(); + storageUtil.jobStore.removeJob(JOB_KEY); + expectPersist(Op.removeJob(new RemoveJob().setJobKey(JOB_KEY.newBuilder()))); + } + + @Override + protected void performMutations(MutableStoreProvider storeProvider) { + storeProvider.getCronJobStore().removeJob(JOB_KEY); + } + }.run(); + } + + @Test + public void testSaveTasks() throws Exception { + Set<IScheduledTask> tasks = ImmutableSet.of(task("a", ScheduleStatus.INIT)); + new AbstractMutationFixture() { + @Override + protected void setupExpectations() throws Exception { + storageUtil.expectWrite(); + storageUtil.taskStore.saveTasks(tasks); + expectPersist(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(tasks)))); + } + + @Override + protected void performMutations(MutableStoreProvider storeProvider) { + storeProvider.getUnsafeTaskStore().saveTasks(tasks); + } + }.run(); + } + + @Test + public void testMutateTasks() throws Exception { + String taskId = "fred"; + Function<IScheduledTask, IScheduledTask> mutation = Functions.identity(); + Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.STARTING)); + new AbstractMutationFixture() { + @Override + protected void setupExpectations() throws Exception { + storageUtil.expectWrite(); + expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated); + expectPersist(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder())))); + } + + @Override + protected void performMutations(MutableStoreProvider storeProvider) { + assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation)); + } + }.run(); + } + + @Test + public void testNestedTransactions() throws Exception { + String taskId = "fred"; + Function<IScheduledTask, IScheduledTask> mutation = Functions.identity(); + Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.STARTING)); + ImmutableSet<String> tasksToRemove = ImmutableSet.of("b"); + + new AbstractMutationFixture() { + @Override + protected void setupExpectations() throws Exception { + storageUtil.expectWrite(); + expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated); + + storageUtil.taskStore.deleteTasks(tasksToRemove); + + expectPersist( + Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))), + Op.removeTasks(new RemoveTasks(tasksToRemove))); + } + + @Override + protected void performMutations(MutableStoreProvider storeProvider) { + assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation)); + + durableStorage.write((NoResult.Quiet) + innerProvider -> innerProvider.getUnsafeTaskStore().deleteTasks(tasksToRemove)); + } + }.run(); + } + + @Test + public void testSaveAndMutateTasks() throws Exception { + String taskId = "fred"; + Function<IScheduledTask, IScheduledTask> mutation = Functions.identity(); + Set<IScheduledTask> saved = ImmutableSet.of(task("a", ScheduleStatus.INIT)); + Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.PENDING)); + + new AbstractMutationFixture() { + @Override + protected void setupExpectations() throws Exception { + storageUtil.expectWrite(); + storageUtil.taskStore.saveTasks(saved); + + // Nested transaction with result. + expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated); + + // Resulting stream operation. + expectPersist(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder())))); + } + + @Override + protected void performMutations(MutableStoreProvider storeProvider) { + storeProvider.getUnsafeTaskStore().saveTasks(saved); + assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation)); + } + }.run(); + } + + @Test + public void testSaveAndMutateTasksNoCoalesceUniqueIds() throws Exception { + String taskId = "fred"; + Function<IScheduledTask, IScheduledTask> mutation = Functions.identity(); + Set<IScheduledTask> saved = ImmutableSet.of(task("b", ScheduleStatus.INIT)); + Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.PENDING)); + + new AbstractMutationFixture() { + @Override + protected void setupExpectations() throws Exception { + storageUtil.expectWrite(); + storageUtil.taskStore.saveTasks(saved); + + // Nested transaction with result. + expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated); + + // Resulting stream operation. + expectPersist(Op.saveTasks(new SaveTasks( + ImmutableSet.<ScheduledTask>builder() + .addAll(IScheduledTask.toBuildersList(saved)) + .add(mutated.get().newBuilder()) + .build()))); + } + + @Override + protected void performMutations(MutableStoreProvider storeProvider) { + storeProvider.getUnsafeTaskStore().saveTasks(saved); + assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation)); + } + }.run(); + } + + @Test + public void testRemoveTasksQuery() throws Exception { + IScheduledTask task = task("a", ScheduleStatus.FINISHED); + Set<String> taskIds = Tasks.ids(task); + new AbstractMutationFixture() { + @Override + protected void setupExpectations() throws Exception { + storageUtil.expectWrite(); + storageUtil.taskStore.deleteTasks(taskIds); + expectPersist(Op.removeTasks(new RemoveTasks(taskIds))); + } + + @Override + protected void performMutations(MutableStoreProvider storeProvider) { + storeProvider.getUnsafeTaskStore().deleteTasks(taskIds); + } + }.run(); + } + + @Test + public void testRemoveTasksIds() throws Exception { + Set<String> taskIds = ImmutableSet.of("42"); + new AbstractMutationFixture() { + @Override + protected void setupExpectations() throws Exception { + storageUtil.expectWrite(); + storageUtil.taskStore.deleteTasks(taskIds); + expectPersist(Op.removeTasks(new RemoveTasks(taskIds))); + } + + @Override + protected void performMutations(MutableStoreProvider storeProvider) { + storeProvider.getUnsafeTaskStore().deleteTasks(taskIds); + } + }.run(); + } + + @Test + public void testSaveQuota() throws Exception { + String role = "role"; + IResourceAggregate quota = ResourceTestUtil.aggregate(1.0, 128L, 1024L); + + new AbstractMutationFixture() { + @Override + protected void setupExpectations() throws Exception { + storageUtil.expectWrite(); + storageUtil.quotaStore.saveQuota(role, quota); + expectPersist(Op.saveQuota(new SaveQuota(role, quota.newBuilder()))); + } + + @Override + protected void performMutations(MutableStoreProvider storeProvider) { + storeProvider.getQuotaStore().saveQuota(role, quota); + } + }.run(); + } + + @Test + public void testRemoveQuota() throws Exception { + String role = "role"; + new AbstractMutationFixture() { + @Override + protected void setupExpectations() throws Exception { + storageUtil.expectWrite(); + storageUtil.quotaStore.removeQuota(role); + expectPersist(Op.removeQuota(new RemoveQuota(role))); + } + + @Override + protected void performMutations(MutableStoreProvider storeProvider) { + storeProvider.getQuotaStore().removeQuota(role); + } + }.run(); + } + + @Test + public void testSaveHostAttributes() throws Exception { + String host = "hostname"; + Set<Attribute> attributes = + ImmutableSet.of(new Attribute().setName("attr").setValues(ImmutableSet.of("value"))); + Optional<IHostAttributes> hostAttributes = Optional.of( + IHostAttributes.build(new HostAttributes() + .setHost(host) + .setAttributes(attributes))); + + new AbstractMutationFixture() { + @Override + protected void setupExpectations() throws Exception { + storageUtil.expectWrite(); + expect(storageUtil.attributeStore.getHostAttributes(host)) + .andReturn(Optional.absent()); + + expect(storageUtil.attributeStore.getHostAttributes(host)).andReturn(hostAttributes); + + expect(storageUtil.attributeStore.saveHostAttributes(hostAttributes.get())).andReturn(true); + eventSink.post(new PubsubEvent.HostAttributesChanged(hostAttributes.get())); + expectPersist( + Op.saveHostAttributes(new SaveHostAttributes(hostAttributes.get().newBuilder()))); + + expect(storageUtil.attributeStore.saveHostAttributes(hostAttributes.get())) + .andReturn(false); + + expect(storageUtil.attributeStore.getHostAttributes(host)).andReturn(hostAttributes); + } + + @Override + protected void performMutations(MutableStoreProvider storeProvider) { + AttributeStore.Mutable store = storeProvider.getAttributeStore(); + assertEquals(Optional.absent(), store.getHostAttributes(host)); + + assertTrue(store.saveHostAttributes(hostAttributes.get())); + + assertEquals(hostAttributes, store.getHostAttributes(host)); + + assertFalse(store.saveHostAttributes(hostAttributes.get())); + + assertEquals(hostAttributes, store.getHostAttributes(host)); + } + }.run(); + } + + @Test + public void testSaveUpdate() throws Exception { + IJobUpdate update = IJobUpdate.build(new JobUpdate() + .setSummary(new JobUpdateSummary() + .setKey(UPDATE_ID.newBuilder()) + .setUser("user")) + .setInstructions(new JobUpdateInstructions() + .setDesiredState(new InstanceTaskConfig() + .setTask(new TaskConfig()) + .setInstances(ImmutableSet.of(new Range(0, 3)))) + .setInitialState(ImmutableSet.of(new InstanceTaskConfig() + .setTask(new TaskConfig()) + .setInstances(ImmutableSet.of(new Range(0, 3))))) + .setSettings(new JobUpdateSettings()))); + + new AbstractMutationFixture() { + @Override + protected void setupExpectations() throws Exception { + storageUtil.expectWrite(); + storageUtil.jobUpdateStore.saveJobUpdate(update); + expectPersist(Op.saveJobUpdate(new SaveJobUpdate().setJobUpdate(update.newBuilder()))); + } + + @Override + protected void performMutations(MutableStoreProvider storeProvider) { + storeProvider.getJobUpdateStore().saveJobUpdate(update); + } + }.run(); + } + + @Test + public void testSaveJobUpdateEvent() throws Exception { + IJobUpdateEvent event = IJobUpdateEvent.build(new JobUpdateEvent() + .setStatus(JobUpdateStatus.ROLLING_BACK) + .setTimestampMs(12345L)); + + new AbstractMutationFixture() { + @Override + protected void setupExpectations() throws Exception { + storageUtil.expectWrite(); + storageUtil.jobUpdateStore.saveJobUpdateEvent(UPDATE_ID, event); + expectPersist(Op.saveJobUpdateEvent(new SaveJobUpdateEvent( + event.newBuilder(), + UPDATE_ID.newBuilder()))); + } + + @Override + protected void performMutations(MutableStoreProvider storeProvider) { + storeProvider.getJobUpdateStore().saveJobUpdateEvent(UPDATE_ID, event); + } + }.run(); + } + + @Test + public void testSaveJobInstanceUpdateEvent() throws Exception { + IJobInstanceUpdateEvent event = IJobInstanceUpdateEvent.build(new JobInstanceUpdateEvent() + .setAction(JobUpdateAction.INSTANCE_ROLLING_BACK) + .setTimestampMs(12345L) + .setInstanceId(0)); + + new AbstractMutationFixture() { + @Override + protected void setupExpectations() throws Exception { + storageUtil.expectWrite(); + storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent(UPDATE_ID, event); + expectPersist(Op.saveJobInstanceUpdateEvent( + new SaveJobInstanceUpdateEvent( + event.newBuilder(), + UPDATE_ID.newBuilder()))); + } + + @Override + protected void performMutations(MutableStoreProvider storeProvider) { + storeProvider.getJobUpdateStore().saveJobInstanceUpdateEvent(UPDATE_ID, event); + } + }.run(); + } + + @Test + public void testRemoveJobUpdates() throws Exception { + IJobUpdateKey key = IJobUpdateKey.build(new JobUpdateKey() + .setJob(JOB_KEY.newBuilder()) + .setId("update-id")); + + new AbstractMutationFixture() { + @Override + protected void setupExpectations() throws Exception { + storageUtil.expectWrite(); + storageUtil.jobUpdateStore.removeJobUpdates(ImmutableSet.of(key)); + + // No transaction is generated since this version is currently in 'read-only' + // compatibility mode for this operation type. + } + + @Override + protected void performMutations(MutableStoreProvider storeProvider) { + storeProvider.getJobUpdateStore().removeJobUpdates(ImmutableSet.of(key)); + } + }.run(); + } + + private static IScheduledTask task(String id, ScheduleStatus status) { + return IScheduledTask.build(new ScheduledTask() + .setStatus(status) + .setAssignedTask(new AssignedTask() + .setTaskId(id) + .setTask(new TaskConfig()))); + } +}
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfillTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfillTest.java b/src/test/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfillTest.java new file mode 100644 index 0000000..219576b --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfillTest.java @@ -0,0 +1,222 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.durability; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +import org.apache.aurora.common.testing.easymock.EasyMockTest; +import org.apache.aurora.gen.ResourceAggregate; +import org.apache.aurora.gen.TaskConfig; +import org.apache.aurora.scheduler.TierManager; +import org.apache.aurora.scheduler.base.TaskTestUtil; +import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.aurora.gen.Resource.diskMb; +import static org.apache.aurora.gen.Resource.namedPort; +import static org.apache.aurora.gen.Resource.numCpus; +import static org.apache.aurora.gen.Resource.ramMb; +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.assertEquals; + +public class ThriftBackfillTest extends EasyMockTest { + + private ThriftBackfill thriftBackfill; + private TierManager tierManager; + + @Before + public void setUp() { + tierManager = createMock(TierManager.class); + thriftBackfill = new ThriftBackfill(tierManager); + } + + @Test + public void testFieldsToSetNoPorts() { + TaskConfig config = new TaskConfig() + .setResources(ImmutableSet.of( + numCpus(1.0), + ramMb(32), + diskMb(64))) + .setProduction(false) + .setTier("tierName"); + TaskConfig expected = config.deepCopy() + .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64))); + + expect(tierManager.getTier(ITaskConfig.build(expected))).andReturn(TaskTestUtil.DEV_TIER); + + control.replay(); + + assertEquals( + expected, + thriftBackfill.backfillTask(config)); + } + + @Test + public void testResourceAggregateFieldsToSet() { + control.replay(); + + ResourceAggregate aggregate = new ResourceAggregate() + .setNumCpus(1.0) + .setRamMb(32) + .setDiskMb(64); + + IResourceAggregate expected = IResourceAggregate.build(aggregate.deepCopy() + .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)))); + + assertEquals(expected, ThriftBackfill.backfillResourceAggregate(aggregate)); + } + + @Test + public void testResourceAggregateSetToFields() { + control.replay(); + + ResourceAggregate aggregate = new ResourceAggregate() + .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64))); + + IResourceAggregate expected = IResourceAggregate.build(aggregate.deepCopy() + .setNumCpus(1.0) + .setRamMb(32) + .setDiskMb(64)); + + assertEquals(expected, ThriftBackfill.backfillResourceAggregate(aggregate)); + } + + @Test(expected = IllegalArgumentException.class) + public void testResourceAggregateTooManyResources() { + control.replay(); + + ResourceAggregate aggregate = new ResourceAggregate() + .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64), numCpus(2.0))); + ThriftBackfill.backfillResourceAggregate(aggregate); + } + + @Test(expected = IllegalArgumentException.class) + public void testResourceAggregateInvalidResources() { + control.replay(); + + ResourceAggregate aggregate = new ResourceAggregate() + .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), namedPort("http"))); + ThriftBackfill.backfillResourceAggregate(aggregate); + } + + @Test(expected = IllegalArgumentException.class) + public void testResourceAggregateMissingResources() { + control.replay(); + + ResourceAggregate aggregate = new ResourceAggregate() + .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32))); + ThriftBackfill.backfillResourceAggregate(aggregate); + } + + @Test + public void testBackfillTierProduction() { + TaskConfig config = new TaskConfig() + .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64))) + .setProduction(true) + .setTier("tierName"); + TaskConfig expected = config.deepCopy() + .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64))); + + expect(tierManager.getTier(ITaskConfig.build(expected))).andReturn(TaskTestUtil.PREFERRED_TIER); + + control.replay(); + + assertEquals( + expected, + thriftBackfill.backfillTask(config)); + } + + @Test + public void testBackfillTierNotProduction() { + TaskConfig config = new TaskConfig() + .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64))) + .setProduction(true) + .setTier("tierName"); + TaskConfig configWithBackfilledResources = config.deepCopy() + .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64))); + + expect(tierManager.getTier(ITaskConfig.build(configWithBackfilledResources))) + .andReturn(TaskTestUtil.DEV_TIER); + + control.replay(); + + TaskConfig expected = configWithBackfilledResources.deepCopy() + .setProduction(false); + + assertEquals( + expected, + thriftBackfill.backfillTask(config)); + } + + @Test + public void testBackfillTierSetsTierToPreemptible() { + TaskConfig config = new TaskConfig() + .setResources(ImmutableSet.of( + numCpus(1.0), + ramMb(32), + diskMb(64))); + TaskConfig configWithBackfilledResources = config.deepCopy() + .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64))); + + expect(tierManager.getTiers()).andReturn(TaskTestUtil.tierInfos()); + + control.replay(); + + TaskConfig expected = configWithBackfilledResources.deepCopy().setTier("preemptible"); + + assertEquals( + expected, + thriftBackfill.backfillTask(config)); + } + + @Test + public void testBackfillTierSetsTierToPreferred() { + TaskConfig config = new TaskConfig() + .setResources(ImmutableSet.of( + numCpus(1.0), + ramMb(32), + diskMb(64))) + .setProduction(true); + TaskConfig configWithBackfilledResources = config.deepCopy() + .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64))); + + expect(tierManager.getTiers()).andReturn(TaskTestUtil.tierInfos()); + + control.replay(); + + TaskConfig expected = configWithBackfilledResources.deepCopy().setTier("preferred"); + + assertEquals( + expected, + thriftBackfill.backfillTask(config)); + } + + @Test(expected = IllegalStateException.class) + public void testBackfillTierBadTierConfiguration() { + TaskConfig config = new TaskConfig() + .setResources(ImmutableSet.of( + numCpus(1.0), + ramMb(32), + diskMb(64))); + + expect(tierManager.getTiers()).andReturn(ImmutableMap.of()); + + control.replay(); + + thriftBackfill.backfillTask(config); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorderTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorderTest.java b/src/test/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorderTest.java new file mode 100644 index 0000000..cbad3eb --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorderTest.java @@ -0,0 +1,78 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.durability; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +import org.apache.aurora.gen.AssignedTask; +import org.apache.aurora.gen.ScheduledTask; +import org.apache.aurora.gen.storage.Op; +import org.apache.aurora.gen.storage.RemoveTasks; +import org.apache.aurora.gen.storage.SaveTasks; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TransactionRecorderTest { + @Test + public void testCoalesce() throws Exception { + // No coalescing - different operation types. + assertEquals( + ImmutableList.of( + Op.removeTasks(createRemoveTasks("1", "2")), + Op.saveTasks(createSaveTasks("4", "5"))), + record( + Op.removeTasks(createRemoveTasks("1", "2")), + Op.saveTasks(createSaveTasks("4", "5")))); + + assertEquals( + ImmutableList.of(Op.removeTasks(createRemoveTasks("1", "2", "3", "4"))), + record( + Op.removeTasks(createRemoveTasks("1", "2")), + Op.removeTasks(createRemoveTasks("3", "4")))); + + assertEquals( + ImmutableList.of(Op.saveTasks(createSaveTasks("3", "2", "1"))), + record(Op.saveTasks(createSaveTasks("1", "2")), Op.saveTasks(createSaveTasks("1", "3")))); + + assertEquals( + ImmutableList.of(Op.removeTasks(createRemoveTasks("3", "4", "5"))), + record( + Op.removeTasks(createRemoveTasks("3")), + Op.removeTasks(createRemoveTasks("4", "5")))); + } + + private static List<Op> record(Op... ops) { + TransactionRecorder recorder = new TransactionRecorder(); + Stream.of(ops).forEach(recorder::add); + return recorder.getOps(); + } + + private static SaveTasks createSaveTasks(String... taskIds) { + return new SaveTasks().setTasks( + Stream.of(taskIds) + .map(id -> new ScheduledTask().setAssignedTask(new AssignedTask().setTaskId(id))) + .collect(Collectors.toSet()) + ); + } + + private RemoveTasks createRemoveTasks(String... taskIds) { + return new RemoveTasks(ImmutableSet.copyOf(taskIds)); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorageTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorageTest.java new file mode 100644 index 0000000..e8b564b --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorageTest.java @@ -0,0 +1,166 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.durability; + +import java.util.Set; + +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableSet; + +import org.apache.aurora.common.testing.easymock.EasyMockTest; +import org.apache.aurora.gen.Attribute; +import org.apache.aurora.gen.HostAttributes; +import org.apache.aurora.gen.JobUpdateKey; +import org.apache.aurora.gen.MaintenanceMode; +import org.apache.aurora.gen.storage.Op; +import org.apache.aurora.gen.storage.SaveHostAttributes; +import org.apache.aurora.gen.storage.SaveTasks; +import org.apache.aurora.scheduler.base.TaskTestUtil; +import org.apache.aurora.scheduler.events.EventSink; +import org.apache.aurora.scheduler.events.PubsubEvent; +import org.apache.aurora.scheduler.storage.AttributeStore; +import org.apache.aurora.scheduler.storage.CronJobStore; +import org.apache.aurora.scheduler.storage.JobUpdateStore; +import org.apache.aurora.scheduler.storage.QuotaStore; +import org.apache.aurora.scheduler.storage.SchedulerStore; +import org.apache.aurora.scheduler.storage.TaskStore; +import org.apache.aurora.scheduler.storage.durability.DurableStorage.TransactionManager; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.LoggerFactory; + +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class WriteAheadStorageTest extends EasyMockTest { + + private TransactionManager transactionManager; + private TaskStore.Mutable taskStore; + private AttributeStore.Mutable attributeStore; + private JobUpdateStore.Mutable jobUpdateStore; + private EventSink eventSink; + private WriteAheadStorage storage; + + @Before + public void setUp() { + transactionManager = createMock(TransactionManager.class); + taskStore = createMock(TaskStore.Mutable.class); + attributeStore = createMock(AttributeStore.Mutable.class); + jobUpdateStore = createMock(JobUpdateStore.Mutable.class); + eventSink = createMock(EventSink.class); + + storage = new WriteAheadStorage( + transactionManager, + createMock(SchedulerStore.Mutable.class), + createMock(CronJobStore.Mutable.class), + taskStore, + createMock(QuotaStore.Mutable.class), + attributeStore, + jobUpdateStore, + LoggerFactory.getLogger(WriteAheadStorageTest.class), + eventSink); + } + + private void expectOp(Op op) { + expect(transactionManager.hasActiveTransaction()).andReturn(true); + transactionManager.log(op); + } + + @Test + public void testRemoveUpdates() { + Set<IJobUpdateKey> removed = ImmutableSet.of( + IJobUpdateKey.build(new JobUpdateKey(TaskTestUtil.JOB.newBuilder(), "a")), + IJobUpdateKey.build(new JobUpdateKey(TaskTestUtil.JOB.newBuilder(), "b"))); + jobUpdateStore.removeJobUpdates(removed); + // No operation is written since this Op is in read-only compatibility mode. + + control.replay(); + + storage.removeJobUpdates(removed); + } + + @Test + public void testMutate() { + String taskId = "a"; + Function<IScheduledTask, IScheduledTask> mutator = + createMock(new Clazz<Function<IScheduledTask, IScheduledTask>>() { }); + Optional<IScheduledTask> mutated = Optional.of(TaskTestUtil.makeTask(taskId, TaskTestUtil.JOB)); + + expect(taskStore.mutateTask(taskId, mutator)).andReturn(mutated); + expectOp(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder())))); + + control.replay(); + + assertEquals(mutated, storage.mutateTask(taskId, mutator)); + } + + @Test + public void testSaveHostAttributes() { + IHostAttributes attributes = IHostAttributes.build( + new HostAttributes() + .setHost("a") + .setMode(MaintenanceMode.DRAINING) + .setAttributes(ImmutableSet.of( + new Attribute().setName("b").setValues(ImmutableSet.of("1", "2"))))); + + expect(attributeStore.saveHostAttributes(attributes)).andReturn(true); + expectOp(Op.saveHostAttributes( + new SaveHostAttributes().setHostAttributes(attributes.newBuilder()))); + eventSink.post(new PubsubEvent.HostAttributesChanged(attributes)); + + expect(attributeStore.saveHostAttributes(attributes)).andReturn(false); + + control.replay(); + + assertTrue(storage.saveHostAttributes(attributes)); + + assertFalse(storage.saveHostAttributes(attributes)); + } + + @Test(expected = UnsupportedOperationException.class) + public void testDeleteAllTasks() { + control.replay(); + storage.deleteAllTasks(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testDeleteHostAttributes() { + control.replay(); + storage.deleteHostAttributes(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testDeleteJobs() { + control.replay(); + storage.deleteJobs(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testDeleteQuotas() { + control.replay(); + storage.deleteQuotas(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testDeleteAllUpdatesAndEvents() { + control.replay(); + storage.deleteAllUpdates(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java index 3f44559..cb38f10 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java @@ -21,7 +21,6 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingDeque; -import java.util.function.Consumer; import com.google.common.base.Function; import com.google.common.collect.ImmutableList; @@ -37,10 +36,8 @@ import org.apache.aurora.codec.ThriftBinaryCodec.CodingException; import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Data; import org.apache.aurora.common.testing.easymock.EasyMockTest; -import org.apache.aurora.gen.AssignedTask; import org.apache.aurora.gen.Attribute; import org.apache.aurora.gen.HostAttributes; -import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.gen.storage.DeduplicatedSnapshot; import org.apache.aurora.gen.storage.Frame; import org.apache.aurora.gen.storage.FrameChunk; @@ -48,9 +45,7 @@ import org.apache.aurora.gen.storage.FrameHeader; import org.apache.aurora.gen.storage.LogEntry; import org.apache.aurora.gen.storage.Op; import org.apache.aurora.gen.storage.RemoveJob; -import org.apache.aurora.gen.storage.RemoveTasks; import org.apache.aurora.gen.storage.SaveFrameworkId; -import org.apache.aurora.gen.storage.SaveTasks; import org.apache.aurora.gen.storage.Snapshot; import org.apache.aurora.gen.storage.Transaction; import org.apache.aurora.gen.storage.storageConstants; @@ -112,11 +107,11 @@ public class LogManagerTest extends EasyMockTest { public void testStreamManagerReadFromUnknownNone() throws CodingException { expect(stream.readAll()).andReturn(Collections.emptyIterator()); - Consumer<LogEntry> reader = createMock(new Clazz<Consumer<LogEntry>>() { }); - control.replay(); - createNoMessagesStreamManager().readFromBeginning(reader); + assertEquals( + ImmutableList.of(), + ImmutableList.copyOf(createNoMessagesStreamManager().readFromBeginning())); } @Test @@ -127,12 +122,11 @@ public class LogManagerTest extends EasyMockTest { expect(entry1.contents()).andReturn(encode(transaction1)); expect(stream.readAll()).andReturn(Iterators.singletonIterator(entry1)); - Consumer<LogEntry> reader = createMock(new Clazz<Consumer<LogEntry>>() { }); - reader.accept(transaction1); - control.replay(); - createNoMessagesStreamManager().readFromBeginning(reader); + assertEquals( + ImmutableList.of(transaction1), + ImmutableList.copyOf(createNoMessagesStreamManager().readFromBeginning())); } @Test @@ -214,50 +208,6 @@ public class LogManagerTest extends EasyMockTest { } @Test - public void testCoalesce() throws CodingException { - SaveTasks saveTasks1 = createSaveTasks("1", "2"); - createSaveTasks("2"); - SaveTasks saveTasks2 = createSaveTasks("1", "3"); - SaveTasks saveTasks3 = createSaveTasks("4", "5"); - - // saveTasks1 is unrepresented because both of its operations were trumped. - // saveTasks3 is unrepresented because its operations were deleted. - SaveTasks coalescedSaves = createSaveTasks("3", "2", "1"); - - RemoveTasks removeTasks1 = createRemoveTasks("1", "2"); - RemoveTasks removeTasks2 = createRemoveTasks("3"); - RemoveTasks removeTasks3 = createRemoveTasks("4", "5"); - - RemoveTasks coalescedRemoves = - new RemoveTasks(ImmutableSet.copyOf(Iterables.concat(removeTasks2.getTaskIds(), - removeTasks3.getTaskIds()))); - - expectAppend(position1, - createLogEntry( - Op.saveTasks(coalescedSaves), - Op.removeTasks(removeTasks1), - Op.saveTasks(saveTasks3), - Op.removeTasks(coalescedRemoves))); - - control.replay(); - - StreamTransaction streamTransaction = createNoMessagesStreamManager().startTransaction(); - - // The next 2 saves should coalesce - streamTransaction.add(Op.saveTasks(saveTasks1)); - streamTransaction.add(Op.saveTasks(saveTasks2)); - - streamTransaction.add(Op.removeTasks(removeTasks1)); - streamTransaction.add(Op.saveTasks(saveTasks3)); - - // The next 2 removes should coalesce - streamTransaction.add(Op.removeTasks(removeTasks2)); - streamTransaction.add(Op.removeTasks(removeTasks3)); - - assertEquals(position1, streamTransaction.commit()); - } - - @Test public void testTransactionSnapshot() throws CodingException { Snapshot snapshot = createSnapshot(); DeduplicatedSnapshot deduplicated = new SnapshotDeduplicatorImpl().deduplicate(snapshot); @@ -469,14 +419,12 @@ public class LogManagerTest extends EasyMockTest { expect(stream.readAll()).andReturn(entries.iterator()); - Consumer<LogEntry> reader = createMock(new Clazz<Consumer<LogEntry>>() { }); - reader.accept(transaction1); - reader.accept(transaction2); - StreamManager streamManager = createStreamManager(message.chunkSize); control.replay(); - streamManager.readFromBeginning(reader); + assertEquals( + ImmutableList.of(transaction1, transaction2), + ImmutableList.copyOf(streamManager.readFromBeginning())); } @Test @@ -494,9 +442,6 @@ public class LogManagerTest extends EasyMockTest { expect(stream.readAll()).andReturn(ImmutableList.of(snapshotEntry).iterator()); - Consumer<LogEntry> reader = createMock(new Clazz<Consumer<LogEntry>>() { }); - reader.accept(snapshotLogEntry); - control.replay(); HashFunction md5 = Hashing.md5(); @@ -506,7 +451,9 @@ public class LogManagerTest extends EasyMockTest { md5, new SnapshotDeduplicatorImpl()); streamManager.snapshot(snapshot); - streamManager.readFromBeginning(reader); + assertEquals( + ImmutableList.of(snapshotLogEntry), + ImmutableList.copyOf(streamManager.readFromBeginning())); } private Snapshot createSnapshot() { @@ -517,15 +464,6 @@ public class LogManagerTest extends EasyMockTest { .setTasks(ImmutableSet.of(TaskTestUtil.makeTask("task_id", TaskTestUtil.JOB).newBuilder())); } - private SaveTasks createSaveTasks(String... taskIds) { - return new SaveTasks(ImmutableSet.copyOf(Iterables.transform(ImmutableList.copyOf(taskIds), - taskId -> new ScheduledTask().setAssignedTask(new AssignedTask().setTaskId(taskId))))); - } - - private RemoveTasks createRemoveTasks(String... taskIds) { - return new RemoveTasks(ImmutableSet.copyOf(taskIds)); - } - private void expectFrames(Position position, Message message) throws CodingException { expect(stream.append(entryEq(message.header))).andReturn(position); for (LogEntry chunk : message.chunks) {
