Repository: aurora Updated Branches: refs/heads/master d10d2d171 -> ac8562489
Add a specific storage routine for bulk loading data. Reviewed at https://reviews.apache.org/r/33273/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/ac856248 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/ac856248 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/ac856248 Branch: refs/heads/master Commit: ac8562489633c0dc0cd510757275a74e07559c58 Parents: d10d2d1 Author: Bill Farner <[email protected]> Authored: Tue Apr 21 15:45:48 2015 -0700 Committer: Bill Farner <[email protected]> Committed: Tue Apr 21 15:45:48 2015 -0700 ---------------------------------------------------------------------- .../storage/CallOrderEnforcingStorage.java | 8 ++ .../aurora/scheduler/storage/Storage.java | 11 ++ .../aurora/scheduler/storage/db/DbStorage.java | 53 ++++++--- .../scheduler/storage/log/LogStorage.java | 30 ++++-- .../scheduler/storage/mem/MemStorage.java | 11 +- .../app/local/FakeNonVolatileStorage.java | 7 ++ .../scheduler/storage/db/DbStorageTest.java | 108 +++++++++++++++++++ .../scheduler/storage/log/LogStorageTest.java | 30 ++++++ 8 files changed, 234 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/ac856248/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java index 07d81e4..64aa10d 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java @@ -116,6 +116,14 @@ public class CallOrderEnforcingStorage implements NonVolatileStorage { } @Override + public <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work) + throws StorageException, E { + + checkInState(State.PREPARED); + wrapped.bulkLoad(work); + } + + @Override public <T, E extends Exception> T write(MutateWork<T, E> work) throws StorageException, E { checkInState(State.READY); http://git-wip-us.apache.org/repos/asf/aurora/blob/ac856248/src/main/java/org/apache/aurora/scheduler/storage/Storage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java index 972a3c1..21f6a64 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java @@ -212,6 +212,17 @@ public interface Storage { <T, E extends Exception> T write(MutateWork<T, E> work) throws StorageException, E; /** + * Recovers the contents of the storage, using the provided operation. This may be done with + * relaxed transactional guarantees and/or rollback support. + * + * @param work Bulk load operation. + * @param <E> The type of exception this unit of work can throw. + * @throws StorageException if there was a problem reading from or writing to stable storage. + * @throws E bubbled transparently when the unit of work throws + */ + <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work) throws StorageException, E; + + /** * Requests the underlying storage prepare its data set; ie: initialize schemas, begin syncing * out of date data, etc. This method should not block. * http://git-wip-us.apache.org/repos/asf/aurora/blob/ac856248/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java index 526df10..49db52d 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java @@ -17,6 +17,7 @@ import java.io.IOException; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; +import com.google.common.annotations.VisibleForTesting; import com.google.common.io.CharStreams; import com.google.common.util.concurrent.AbstractIdleService; import com.google.inject.Inject; @@ -34,9 +35,7 @@ import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.TaskStore; import org.apache.ibatis.builder.StaticSqlSource; import org.apache.ibatis.exceptions.PersistenceException; -import org.apache.ibatis.logging.LogFactory; import org.apache.ibatis.mapping.MappedStatement.Builder; -import org.apache.ibatis.mapping.SqlCommandType; import org.apache.ibatis.session.Configuration; import org.apache.ibatis.session.SqlSession; import org.apache.ibatis.session.SqlSessionFactory; @@ -44,6 +43,8 @@ import org.mybatis.guice.transactional.Transactional; import static java.util.Objects.requireNonNull; +import static org.apache.ibatis.mapping.SqlCommandType.UPDATE; + /** * A storage implementation backed by a relational database. * <p> @@ -140,11 +141,41 @@ class DbStorage extends AbstractIdleService implements Storage { } } + @VisibleForTesting + static final String DISABLE_UNDO_LOG = "DISABLE_UNDO_LOG"; + @VisibleForTesting + static final String ENABLE_UNDO_LOG = "ENABLE_UNDO_LOG"; + + // TODO(wfarner): Including @Transactional here seems to render the UNDO_LOG changes useless, + // resulting in no performance gain. Figure out why. + @Override + public <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work) + throws StorageException, E { + + // Disabling the undo log disables transaction rollback, but dramatically speeds up a bulk + // insert. + try (SqlSession session = sessionFactory.openSession(false)) { + try { + session.update(DISABLE_UNDO_LOG); + work.apply(storeProvider); + } catch (PersistenceException e) { + throw new StorageException(e.getMessage(), e); + } finally { + session.update(ENABLE_UNDO_LOG); + } + } + } + @Override public void prepare() { startAsync().awaitRunning(); } + private static void addMappedStatement(Configuration configuration, String name, String sql) { + configuration.addMappedStatement( + new Builder(configuration, name, new StaticSqlSource(configuration, sql), UPDATE).build()); + } + /** * Creates the SQL schema during service start-up. * Note: This design assumes a volatile database engine. @@ -152,22 +183,18 @@ class DbStorage extends AbstractIdleService implements Storage { @Override @Transactional protected void startUp() throws IOException { - LogFactory.useJdkLogging(); - Configuration configuration = sessionFactory.getConfiguration(); String createStatementName = "create_tables"; configuration.setMapUnderscoreToCamelCase(true); - configuration.addMappedStatement(new Builder( + + addMappedStatement( configuration, createStatementName, - new StaticSqlSource( - configuration, - CharStreams.toString( - new InputStreamReader( - DbStorage.class.getResourceAsStream("schema.sql"), - StandardCharsets.UTF_8))), - SqlCommandType.UPDATE) - .build()); + CharStreams.toString(new InputStreamReader( + DbStorage.class.getResourceAsStream("schema.sql"), + StandardCharsets.UTF_8))); + addMappedStatement(configuration, DISABLE_UNDO_LOG, "SET UNDO_LOG 0;"); + addMappedStatement(configuration, ENABLE_UNDO_LOG, "SET UNDO_LOG 1;"); try (SqlSession session = sessionFactory.openSession()) { session.update(createStatementName); http://git-wip-us.apache.org/repos/asf/aurora/blob/ac856248/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java index 63b5b1f..bb59cdf 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java @@ -538,16 +538,21 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore @Timed("scheduler_log_recover") void recover() throws RecoveryFailedException { - try { - streamManager.readFromBeginning(new Closure<LogEntry>() { - @Override - public void execute(LogEntry logEntry) { - replay(logEntry); + writeBehindStorage.bulkLoad(new MutateWork.NoResult.Quiet() { + @Override + protected void execute(MutableStoreProvider storeProvider) { + try { + streamManager.readFromBeginning(new Closure<LogEntry>() { + @Override + public void execute(LogEntry logEntry) { + replay(logEntry); + } + }); + } catch (CodingException | InvalidPositionException | StreamAccessException e) { + throw new RecoveryFailedException(e); } - }); - } catch (CodingException | InvalidPositionException | StreamAccessException e) { - throw new RecoveryFailedException(e); - } + } + }); } private static final class RecoveryFailedException extends SchedulerException { @@ -679,6 +684,13 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore } @Override + public <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work) + throws StorageException, E { + + throw new UnsupportedOperationException("Log storage may not be populated in bulk."); + } + + @Override public <T, E extends Exception> T read(Work<T, E> work) throws StorageException, E { return writeBehindStorage.read(work); } http://git-wip-us.apache.org/repos/asf/aurora/blob/ac856248/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java index dafe1c4..c5ccccd 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java @@ -128,8 +128,7 @@ public class MemStorage implements Storage { @Timed("mem_storage_read_operation") @Override - public <T, E extends Exception> T read(final Work<T, E> work) - throws StorageException, E { + public <T, E extends Exception> T read(final Work<T, E> work) throws StorageException, E { return delegatedStore.read(new Work<T, E>() { @Override public T apply(StoreProvider provider) throws E { @@ -149,6 +148,14 @@ public class MemStorage implements Storage { }); } + @Timed("mem_storage_bulk_load_operation") + @Override + public <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work) + throws StorageException, E { + + delegatedStore.bulkLoad(work); + } + @Override public void prepare() throws StorageException { delegatedStore.prepare(); http://git-wip-us.apache.org/repos/asf/aurora/blob/ac856248/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java b/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java index 3336f8c..0768ec3 100644 --- a/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java +++ b/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java @@ -51,6 +51,13 @@ class FakeNonVolatileStorage implements NonVolatileStorage { } @Override + public <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work) + throws StorageException, E { + + delegate.bulkLoad(work); + } + + @Override public <T, E extends Exception> T write(MutateWork<T, E> work) throws StorageException, E { return delegate.write(work); } http://git-wip-us.apache.org/repos/asf/aurora/blob/ac856248/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java new file mode 100644 index 0000000..743f5ba --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java @@ -0,0 +1,108 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.db; + +import com.twitter.common.testing.easymock.EasyMockTest; + +import org.apache.aurora.scheduler.storage.AttributeStore; +import org.apache.aurora.scheduler.storage.JobUpdateStore; +import org.apache.aurora.scheduler.storage.LockStore; +import org.apache.aurora.scheduler.storage.QuotaStore; +import org.apache.aurora.scheduler.storage.SchedulerStore; +import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; +import org.apache.aurora.scheduler.storage.Storage.MutateWork; +import org.apache.aurora.scheduler.storage.Storage.StorageException; +import org.apache.aurora.scheduler.storage.Storage.StoreProvider; +import org.apache.aurora.scheduler.storage.Storage.Work; +import org.apache.ibatis.exceptions.PersistenceException; +import org.apache.ibatis.session.SqlSession; +import org.apache.ibatis.session.SqlSessionFactory; +import org.easymock.EasyMock; +import org.junit.Before; +import org.junit.Test; + +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.assertEquals; + +public class DbStorageTest extends EasyMockTest { + + private SqlSessionFactory sessionFactory; + private SqlSession session; + private EnumValueMapper enumMapper; + private Work.Quiet<String> readWork; + private MutateWork.NoResult.Quiet writeWork; + + private DbStorage storage; + + @Before + public void setUp() { + sessionFactory = createMock(SqlSessionFactory.class); + session = createMock(SqlSession.class); + enumMapper = createMock(EnumValueMapper.class); + readWork = createMock(new Clazz<Work.Quiet<String>>() { }); + writeWork = createMock(new Clazz<MutateWork.NoResult.Quiet>() { }); + + storage = new DbStorage( + sessionFactory, + enumMapper, + createMock(SchedulerStore.Mutable.class), + createMock(AttributeStore.Mutable.class), + createMock(LockStore.Mutable.class), + createMock(QuotaStore.Mutable.class), + createMock(JobUpdateStore.Mutable.class)); + } + + @Test(expected = StorageException.class) + public void testReadFails() { + expect(readWork.apply(EasyMock.<StoreProvider>anyObject())) + .andThrow(new PersistenceException()); + + control.replay(); + + storage.read(readWork); + } + + @Test + public void testRead() { + expect(readWork.apply(EasyMock.<StoreProvider>anyObject())).andReturn("hi"); + + control.replay(); + + assertEquals("hi", storage.read(readWork)); + } + + @Test(expected = StorageException.class) + public void testBulkLoadFails() { + expect(sessionFactory.openSession(false)).andReturn(session); + expect(session.update(DbStorage.DISABLE_UNDO_LOG)).andThrow(new PersistenceException()); + expect(session.update(DbStorage.ENABLE_UNDO_LOG)).andReturn(0); + + control.replay(); + + storage.bulkLoad(writeWork); + } + + @Test + public void testBulkLoad() { + expect(sessionFactory.openSession(false)).andReturn(session); + expect(session.update(DbStorage.DISABLE_UNDO_LOG)).andReturn(0); + writeWork.apply(EasyMock.<MutableStoreProvider>anyObject()); + session.close(); + expect(session.update(DbStorage.ENABLE_UNDO_LOG)).andReturn(0); + + control.replay(); + + storage.bulkLoad(writeWork); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/ac856248/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java index cb6ba25..cbc2d38 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java @@ -109,6 +109,7 @@ import org.apache.aurora.scheduler.storage.log.testing.LogOpMatcher; import org.apache.aurora.scheduler.storage.log.testing.LogOpMatcher.StreamMatcher; import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; import org.easymock.Capture; +import org.easymock.EasyMock; import org.easymock.IAnswer; import org.junit.Before; import org.junit.Test; @@ -422,6 +423,15 @@ public class LogStorageTest extends EasyMockTest { expect(entry.contents()).andReturn(ThriftBinaryCodec.encodeNonNull(logEntry)); } + storageUtil.storage.bulkLoad(EasyMock.<MutateWork.NoResult<?>>anyObject()); + expectLastCall().andAnswer(new IAnswer<MutateWork.NoResult<?>>() { + @Override + public NoResult<?> answer() throws Throwable { + MutateWork.NoResult work = (MutateWork.NoResult<?>) EasyMock.getCurrentArguments()[0]; + work.apply(storageUtil.mutableStoreProvider); + return null; + } + }); expect(stream.readAll()).andReturn(entryBuilder.build().iterator()); } @@ -467,6 +477,15 @@ public class LogStorageTest extends EasyMockTest { } }); + storageUtil.storage.bulkLoad(EasyMock.<MutateWork.NoResult<?>>anyObject()); + expectLastCall().andAnswer(new IAnswer<MutateWork.NoResult<?>>() { + @Override + public NoResult<?> answer() throws Throwable { + MutateWork.NoResult work = (MutateWork.NoResult<?>) EasyMock.getCurrentArguments()[0]; + work.apply(storageUtil.mutableStoreProvider); + return null; + } + }); expect(stream.readAll()).andReturn(Iterators.<Entry>emptyIterator()); final Capture<MutateWork<Void, RuntimeException>> recoveryWork = createCapture(); expect(storageUtil.storage.write(capture(recoveryWork))).andAnswer( @@ -1033,6 +1052,17 @@ public class LogStorageTest extends EasyMockTest { }.run(); } + @Test(expected = UnsupportedOperationException.class) + public void testBulkLoad() throws Exception { + expect(log.open()).andReturn(stream); + MutateWork.NoResult.Quiet load = createMock(new Clazz<NoResult.Quiet>() { }); + + control.replay(); + + logStorage.prepare(); + logStorage.bulkLoad(load); + } + private LogEntry createTransaction(Op... ops) { return LogEntry.transaction( new Transaction(ImmutableList.copyOf(ops), storageConstants.CURRENT_SCHEMA_VERSION));
