Repository: aurora Updated Branches: refs/heads/master 98a2bc194 -> 3a3415e1c
Dropping bulkLoad() from Storage Bugs closed: AURORA-1324 Reviewed at https://reviews.apache.org/r/44368/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/3a3415e1 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/3a3415e1 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/3a3415e1 Branch: refs/heads/master Commit: 3a3415e1c8b4b5442480454ed664ea44c62e862d Parents: 98a2bc1 Author: Maxim Khutornenko <[email protected]> Authored: Thu Mar 3 18:36:18 2016 -0800 Committer: Maxim Khutornenko <[email protected]> Committed: Thu Mar 3 18:36:18 2016 -0800 ---------------------------------------------------------------------- .../aurora/benchmark/StatusUpdateBenchmark.java | 8 ----- .../aurora/benchmark/ThriftApiBenchmarks.java | 2 +- .../storage/CallOrderEnforcingStorage.java | 8 ----- .../aurora/scheduler/storage/Storage.java | 13 +------ .../aurora/scheduler/storage/db/DbStorage.java | 32 ----------------- .../scheduler/storage/log/LogStorage.java | 19 +++------- .../app/local/FakeNonVolatileStorage.java | 7 ---- .../scheduler/storage/db/DbStorageTest.java | 37 +------------------- .../scheduler/storage/log/LogStorageTest.java | 24 ------------- 9 files changed, 8 insertions(+), 142 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/3a3415e1/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java b/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java index dc1ef82..f408453 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java +++ b/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java @@ -142,14 +142,6 @@ public class StatusUpdateBenchmark { } @Override - public <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work) - throws StorageException, E { - - maybeSleep(); - underlyingStorage.bulkLoad(work); - } - - @Override public void prepare() throws StorageException { underlyingStorage.prepare(); } http://git-wip-us.apache.org/repos/asf/aurora/blob/3a3415e1/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java index 293b88f..6074638 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java +++ b/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java @@ -167,7 +167,7 @@ public class ThriftApiBenchmarks { private static void bulkLoadTasks(Storage storage, final TestConfiguration config) { // Ideally we would use the API to populate the storage, but wiring in the writable thrift // interface requires considerably more binding setup. - storage.bulkLoad(storeProvider -> { + storage.write((Storage.MutateWork.NoResult.Quiet) storeProvider -> { for (int roleId = 0; roleId < config.roles; roleId++) { String role = "role" + roleId; for (int envId = 0; envId < config.envs; envId++) { http://git-wip-us.apache.org/repos/asf/aurora/blob/3a3415e1/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java index de4ada4..2a5ec9c 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java @@ -114,14 +114,6 @@ public class CallOrderEnforcingStorage implements NonVolatileStorage { } @Override - public <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work) - throws StorageException, E { - - checkInState(State.PREPARED); - wrapped.bulkLoad(work); - } - - @Override public <T, E extends Exception> T write(MutateWork<T, E> work) throws StorageException, E { checkInState(State.READY); http://git-wip-us.apache.org/repos/asf/aurora/blob/3a3415e1/src/main/java/org/apache/aurora/scheduler/storage/Storage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java index 578bb37..5124d17 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java @@ -221,17 +221,6 @@ public interface Storage { <T, E extends Exception> T write(MutateWork<T, E> work) throws StorageException, E; /** - * Recovers the contents of the storage, using the provided operation. This may be done with - * relaxed transactional guarantees and/or rollback support. - * - * @param work Bulk load operation. - * @param <E> The type of exception this unit of work can throw. - * @throws StorageException if there was a problem reading from or writing to stable storage. - * @throws E bubbled transparently when the unit of work throws - */ - <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work) throws StorageException, E; - - /** * Requests the underlying storage prepare its data set; ie: initialize schemas, begin syncing * out of date data, etc. This method should not block. * @@ -273,7 +262,7 @@ public interface Storage { @Retention(RetentionPolicy.RUNTIME) @Target({ ElementType.PARAMETER, ElementType.METHOD }) @Qualifier - public @interface Volatile { } + @interface Volatile { } /** * Utility functions for interacting with a Storage instance. http://git-wip-us.apache.org/repos/asf/aurora/blob/3a3415e1/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java index c0f8d35..cca92dd 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java @@ -19,7 +19,6 @@ import java.nio.charset.StandardCharsets; import javax.sql.DataSource; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Supplier; import com.google.common.io.CharStreams; import com.google.common.util.concurrent.AbstractIdleService; @@ -173,35 +172,6 @@ class DbStorage extends AbstractIdleService implements Storage { }); } - @VisibleForTesting - static final String DISABLE_UNDO_LOG = "DISABLE_UNDO_LOG"; - @VisibleForTesting - static final String ENABLE_UNDO_LOG = "ENABLE_UNDO_LOG"; - - // TODO(wfarner): Including @Transactional here seems to render the UNDO_LOG changes useless, - // resulting in no performance gain. Figure out why. - @Timed("db_storage_bulk_load_operation") - @Override - public <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work) - throws StorageException, E { - - gatedWorkQueue.closeDuring(() -> { - // Disabling the undo log disables transaction rollback, but dramatically speeds up a bulk - // insert. - try (SqlSession session = sessionFactory.openSession(false)) { - try { - session.update(DISABLE_UNDO_LOG); - work.apply(storeProvider); - } catch (PersistenceException e) { - throw new StorageException(e.getMessage(), e); - } finally { - session.update(ENABLE_UNDO_LOG); - } - } - return null; - }); - } - @Override public void prepare() { startAsync().awaitRunning(); @@ -233,8 +203,6 @@ class DbStorage extends AbstractIdleService implements Storage { CharStreams.toString(new InputStreamReader( DbStorage.class.getResourceAsStream("schema.sql"), StandardCharsets.UTF_8))); - addMappedStatement(configuration, DISABLE_UNDO_LOG, "SET UNDO_LOG 0;"); - addMappedStatement(configuration, ENABLE_UNDO_LOG, "SET UNDO_LOG 1;"); try (SqlSession session = sessionFactory.openSession()) { session.update(createStatementName); http://git-wip-us.apache.org/repos/asf/aurora/blob/3a3415e1/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java index 243c8a0..5143668 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java @@ -432,13 +432,11 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore @Timed("scheduler_log_recover") void recover() throws RecoveryFailedException { - writeBehindStorage.bulkLoad(storeProvider -> { - try { - streamManager.readFromBeginning(LogStorage.this::replay); - } catch (CodingException | InvalidPositionException | StreamAccessException e) { - throw new RecoveryFailedException(e); - } - }); + try { + streamManager.readFromBeginning(LogStorage.this::replay); + } catch (CodingException | InvalidPositionException | StreamAccessException e) { + throw new RecoveryFailedException(e); + } } private static final class RecoveryFailedException extends SchedulerException { @@ -559,13 +557,6 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore } @Override - public <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work) - throws StorageException, E { - - throw new UnsupportedOperationException("Log storage may not be populated in bulk."); - } - - @Override public <T, E extends Exception> T read(Work<T, E> work) throws StorageException, E { return writeBehindStorage.read(work); } http://git-wip-us.apache.org/repos/asf/aurora/blob/3a3415e1/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java b/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java index 0768ec3..3336f8c 100644 --- a/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java +++ b/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java @@ -51,13 +51,6 @@ class FakeNonVolatileStorage implements NonVolatileStorage { } @Override - public <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work) - throws StorageException, E { - - delegate.bulkLoad(work); - } - - @Override public <T, E extends Exception> T write(MutateWork<T, E> work) throws StorageException, E { return delegate.write(work); } http://git-wip-us.apache.org/repos/asf/aurora/blob/3a3415e1/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java index 420d444..f26529c 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java @@ -23,13 +23,11 @@ import org.apache.aurora.scheduler.storage.JobUpdateStore; import org.apache.aurora.scheduler.storage.LockStore; import org.apache.aurora.scheduler.storage.QuotaStore; import org.apache.aurora.scheduler.storage.SchedulerStore; -import org.apache.aurora.scheduler.storage.Storage.MutateWork; import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; import org.apache.aurora.scheduler.storage.Storage.StorageException; import org.apache.aurora.scheduler.storage.Storage.Work; import org.apache.aurora.scheduler.storage.TaskStore; import org.apache.ibatis.exceptions.PersistenceException; -import org.apache.ibatis.session.SqlSession; import org.apache.ibatis.session.SqlSessionFactory; import org.easymock.EasyMock; import org.easymock.IExpectationSetters; @@ -40,26 +38,19 @@ import static org.easymock.EasyMock.expect; import static org.junit.Assert.assertEquals; public class DbStorageTest extends EasyMockTest { - - private SqlSessionFactory sessionFactory; - private SqlSession session; private GatedWorkQueue gatedWorkQueue; private Work.Quiet<String> readWork; - private MutateWork.NoResult.Quiet writeWork; private DbStorage storage; @Before public void setUp() { - sessionFactory = createMock(SqlSessionFactory.class); - session = createMock(SqlSession.class); gatedWorkQueue = createMock(GatedWorkQueue.class); readWork = createMock(new Clazz<Work.Quiet<String>>() { }); - writeWork = createMock(new Clazz<MutateWork.NoResult.Quiet>() { }); StatsProvider statsProvider = createMock(StatsProvider.class); storage = new DbStorage( - sessionFactory, + createMock(SqlSessionFactory.class), createMock(EnumValueMapper.class), gatedWorkQueue, createMock(CronJobStore.Mutable.class), @@ -99,32 +90,6 @@ public class DbStorageTest extends EasyMockTest { }); } - @Test(expected = StorageException.class) - public void testBulkLoadFails() throws Exception { - expect(sessionFactory.openSession(false)).andReturn(session); - expect(session.update(DbStorage.DISABLE_UNDO_LOG)).andThrow(new PersistenceException()); - expect(session.update(DbStorage.ENABLE_UNDO_LOG)).andReturn(0); - expectGateClosed(); - - control.replay(); - - storage.bulkLoad(writeWork); - } - - @Test - public void testBulkLoad() throws Exception { - expect(sessionFactory.openSession(false)).andReturn(session); - expect(session.update(DbStorage.DISABLE_UNDO_LOG)).andReturn(0); - expect(writeWork.apply(EasyMock.anyObject())).andReturn(null); - session.close(); - expect(session.update(DbStorage.ENABLE_UNDO_LOG)).andReturn(0); - expectGateClosed(); - - control.replay(); - - storage.bulkLoad(writeWork); - } - @Test public void testGateWithReentrantWrites() throws Exception { expectGateClosed().times(2); http://git-wip-us.apache.org/repos/asf/aurora/blob/3a3415e1/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java index 7382eca..bf9479d 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java @@ -110,7 +110,6 @@ import org.apache.aurora.scheduler.storage.log.testing.LogOpMatcher; import org.apache.aurora.scheduler.storage.log.testing.LogOpMatcher.StreamMatcher; import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; import org.easymock.Capture; -import org.easymock.EasyMock; import org.junit.Before; import org.junit.Test; @@ -369,12 +368,6 @@ public class LogStorageTest extends EasyMockTest { expect(entry.contents()).andReturn(ThriftBinaryCodec.encodeNonNull(logEntry)); } - storageUtil.storage.bulkLoad(EasyMock.anyObject()); - expectLastCall().andAnswer(() -> { - NoResult work = (NoResult<?>) EasyMock.getCurrentArguments()[0]; - work.apply(storageUtil.mutableStoreProvider); - return null; - }); expect(stream.readAll()).andReturn(entryBuilder.build().iterator()); } @@ -414,12 +407,6 @@ public class LogStorageTest extends EasyMockTest { return null; }); - storageUtil.storage.bulkLoad(EasyMock.anyObject()); - expectLastCall().andAnswer(() -> { - NoResult work = (NoResult<?>) EasyMock.getCurrentArguments()[0]; - work.apply(storageUtil.mutableStoreProvider); - return null; - }); expect(stream.readAll()).andReturn(Collections.emptyIterator()); Capture<MutateWork<Void, RuntimeException>> recoveryWork = createCapture(); expect(storageUtil.storage.write(capture(recoveryWork))).andAnswer( @@ -964,17 +951,6 @@ public class LogStorageTest extends EasyMockTest { }.run(); } - @Test(expected = UnsupportedOperationException.class) - public void testBulkLoad() throws Exception { - expect(log.open()).andReturn(stream); - MutateWork.NoResult.Quiet load = createMock(new Clazz<NoResult.Quiet>() { }); - - control.replay(); - - logStorage.prepare(); - logStorage.bulkLoad(load); - } - private LogEntry createTransaction(Op... ops) { return LogEntry.transaction( new Transaction(ImmutableList.copyOf(ops), storageConstants.CURRENT_SCHEMA_VERSION));
