Repository: aurora Updated Branches: refs/heads/master cbc42c484 -> da48ad20b
DbStorage: avoid flushing for reentrant writes, remove extra @Transactional. Bugs closed: AURORA-1395 Reviewed at https://reviews.apache.org/r/37141/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/da48ad20 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/da48ad20 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/da48ad20 Branch: refs/heads/master Commit: da48ad20b07ba800911db85e5f65c4112fe18415 Parents: cbc42c4 Author: Bill Farner <[email protected]> Authored: Thu Aug 13 11:58:48 2015 -0400 Committer: Bill Farner <[email protected]> Committed: Thu Aug 13 11:58:48 2015 -0400 ---------------------------------------------------------------------- .../aurora/scheduler/storage/db/DbStorage.java | 32 +++++++++--- .../scheduler/storage/db/DbStorageTest.java | 51 ++++++++++++++++++++ 2 files changed, 75 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/da48ad20/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java index aac62e2..cf54a3f 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java @@ -140,14 +140,29 @@ class DbStorage extends AbstractIdleService implements Storage { } } + private final ThreadLocal<Boolean> inTransaction = new ThreadLocal<Boolean>() { + @Override + protected Boolean initialValue() { + return false; + } + }; + + @Transactional + <T, E extends Exception> T transactionedWrite(MutateWork<T, E> work) throws E { + return work.apply(storeProvider); + } + @Timed("db_storage_write_operation") @Override - @Transactional public <T, E extends Exception> T write(MutateWork<T, E> work) throws StorageException, E { - T result; - try (SqlSession session = sessionFactory.openSession(false)) { - result = work.apply(storeProvider); - session.commit(); + // Only flush for the top-level write() call when calls are reentrant. + boolean shouldFlush = !inTransaction.get(); + if (shouldFlush) { + inTransaction.set(true); + } + + try { + return transactionedWrite(work); } catch (PersistenceException e) { throw new StorageException(e.getMessage(), e); } finally { @@ -157,10 +172,11 @@ class DbStorage extends AbstractIdleService implements Storage { // introduction of DbStorage, but should be revisited. // TODO(wfarner): Consider revisiting to execute async work only when the transaction is // successful. - postTransactionWork.flush(); + if (shouldFlush) { + postTransactionWork.flush(); + inTransaction.set(false); + } } - - return result; } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/aurora/blob/da48ad20/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java index 3b05db9..9725314 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java @@ -13,6 +13,8 @@ */ package org.apache.aurora.scheduler.storage.db; +import java.util.concurrent.atomic.AtomicBoolean; + import com.twitter.common.testing.easymock.EasyMockTest; import org.apache.aurora.scheduler.async.FlushableWorkQueue; @@ -22,6 +24,7 @@ import org.apache.aurora.scheduler.storage.JobUpdateStore; import org.apache.aurora.scheduler.storage.LockStore; import org.apache.aurora.scheduler.storage.QuotaStore; import org.apache.aurora.scheduler.storage.SchedulerStore; +import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; import org.apache.aurora.scheduler.storage.Storage.MutateWork; import org.apache.aurora.scheduler.storage.Storage.StorageException; import org.apache.aurora.scheduler.storage.Storage.Work; @@ -30,11 +33,14 @@ import org.apache.ibatis.exceptions.PersistenceException; import org.apache.ibatis.session.SqlSession; import org.apache.ibatis.session.SqlSessionFactory; import org.easymock.EasyMock; +import org.easymock.IAnswer; import org.junit.Before; import org.junit.Test; import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; public class DbStorageTest extends EasyMockTest { @@ -113,4 +119,49 @@ public class DbStorageTest extends EasyMockTest { storage.bulkLoad(writeWork); } + + @Test + public void testFlushWithReentrantWrites() { + final AtomicBoolean flushed = new AtomicBoolean(false); + flusher.flush(); + expectLastCall().andAnswer(new IAnswer<Void>() { + @Override + public Void answer() { + flushed.set(true); + return null; + } + }); + + control.replay(); + + storage.write(new MutateWork.NoResult.Quiet() { + @Override + public void execute(MutableStoreProvider storeProvider) { + noopWrite(); + + // Should not have flushed yet. + assertFalse("flush() should not be called until outer write() completes.", flushed.get()); + } + }); + } + + private void noopWrite() { + storage.write(new MutateWork.NoResult.Quiet() { + @Override + public void execute(MutableStoreProvider storeProvider) { + // No-op. + } + }); + } + + @Test + public void testFlushWithSeqentialWrites() { + flusher.flush(); + expectLastCall().times(2); + + control.replay(); + + noopWrite(); + noopWrite(); + } }
