This is an automated email from the ASF dual-hosted git repository. orudyy pushed a commit to branch 8.0.x in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
commit e92ad84df4eb89e842a3a6b2f6dc91d3ddfd6a64 Author: Dedeepya T <dedeepy...@yahoo.co.in> AuthorDate: Thu Jul 15 17:01:57 2021 +0530 QPID-8546:[Broker-J] Use special durability for non-sync commits in BDB HA This closes #99 --- .../store/berkeleydb/AbstractBDBMessageStore.java | 17 +- .../server/store/berkeleydb/EnvironmentFacade.java | 4 +- .../berkeleydb/StandardEnvironmentFacade.java | 13 +- .../replication/ReplicatedEnvironmentFacade.java | 56 +++++-- .../store/berkeleydb/BDBMessageStoreTest.java | 2 +- .../ReplicatedEnvironmentFacadeTest.java | 182 ++++++++++++++++++++- 6 files changed, 239 insertions(+), 35 deletions(-) diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index 94d6fb1..84e1190 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -315,7 +315,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - void removeMessage(long messageId, boolean sync) throws StoreException + void removeMessage(long messageId) throws StoreException { boolean complete = false; Transaction tx = null; @@ -351,7 +351,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore getLogger().debug("Deleted content for message {}", messageId); - getEnvironmentFacade().commit(tx, sync); + getEnvironmentFacade().commitNoSync(tx); complete = true; tx = null; @@ -789,17 +789,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore * * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason. */ - private void commitTranImpl(final Transaction tx, boolean syncCommit) throws StoreException + private void commitTranImpl(final Transaction tx) throws StoreException { if (tx == null) { throw new StoreException("Fatal internal error: transactional is null at commitTran"); } - getEnvironmentFacade().commit(tx, syncCommit); + getEnvironmentFacade().commit(tx); - getLogger().debug("commitTranImpl completed {} transaction {}", - syncCommit ? "synchronous" : "asynchronous", tx); + getLogger().debug("commitTranImpl completed {} transaction synchronous", tx); } @@ -1201,7 +1200,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore throw getEnvironmentFacade().handleDatabaseException("failed to begin transaction", e); } store(txn); - getEnvironmentFacade().commit(txn, false); + getEnvironmentFacade().commitAsync(txn, false); } } @@ -1214,7 +1213,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore _messages.remove(this); if(stored()) { - removeMessage(_messageId, false); + removeMessage(_messageId); storedSizeChangeOccurred(-getContentSize()); } if (!_messageDeleteListeners.isEmpty()) @@ -1378,7 +1377,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore { checkMessageStoreOpen(); doPreCommitActions(); - AbstractBDBMessageStore.this.commitTranImpl(_txn, true); + AbstractBDBMessageStore.this.commitTranImpl(_txn); doPostCommitActions(); AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease); } diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java index fcf6d78..bf5e9b7 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java @@ -67,7 +67,7 @@ public interface EnvironmentFacade Transaction beginTransaction(TransactionConfig transactionConfig); - void commit(Transaction tx, boolean sync); + void commit(Transaction tx); <X> ListenableFuture<X> commitAsync(Transaction tx, X val); RuntimeException handleDatabaseException(String contextMessage, RuntimeException e); @@ -98,4 +98,6 @@ public interface EnvironmentFacade Map<String,Object> getDatabaseStatistics(String database, boolean reset); void deleteDatabase(String databaseName); + + void commitNoSync(final Transaction tx); } diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java index 271c54c..3b55d24 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java @@ -166,7 +166,12 @@ public class StandardEnvironmentFacade implements EnvironmentFacade } @Override - public void commit(com.sleepycat.je.Transaction tx, boolean syncCommit) + public void commit(Transaction tx) + { + commitInternal(tx, true); + } + + private void commitInternal(final Transaction tx, final boolean syncCommit) { try { @@ -184,6 +189,12 @@ public class StandardEnvironmentFacade implements EnvironmentFacade } @Override + public void commitNoSync(final Transaction tx) + { + commitInternal(tx, false); + } + + @Override public <X> ListenableFuture<X> commitAsync(final Transaction tx, final X val) { try diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index 7b15c44..3e201ba 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -93,6 +93,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan public static final String ENVIRONMENT_RESTART_RETRY_LIMIT_PROPERTY_NAME = "qpid.bdb.ha.environment_restart_retry_limit"; public static final String EXECUTOR_SHUTDOWN_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.executor_shutdown_timeout"; public static final String DISABLE_COALESCING_COMMITTER_PROPERTY_NAME = "qpid.bdb.ha.disable_coalescing_committer"; + public static final String NO_SYNC_TX_DURABILITY_PROPERTY_NAME = "qpid.bdb.ha.noSyncTxDurablity"; private static final Logger LOGGER = LoggerFactory.getLogger(ReplicatedEnvironmentFacade.class); @@ -103,6 +104,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private static final int DEFAULT_ENVIRONMENT_RESTART_RETRY_LIMIT = 3; private static final int DEFAULT_EXECUTOR_SHUTDOWN_TIMEOUT = 5000; private static final boolean DEFAULT_DISABLE_COALESCING_COMMITTER = false; + private static final String DEFAULT_NO_SYNC_TX_DURABILITY_PROPERTY_NAME = "NO_SYNC,NO_SYNC,NONE"; + private static final String DEFAULT_SYNC_TX_DURABILITY_PROPERTY_NAME = "SYNC,NO_SYNC,NONE"; /** Length of time allowed for a master transfer to complete before the operation will timeout */ private final int _masterTransferTimeout; @@ -139,6 +142,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private final boolean _disableCoalescingCommiter; + private final Durability _noSyncTxDurability; + private final int _logHandlerCleanerProtectedFilesLimit; static final SyncPolicy LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY = SyncPolicy.SYNC; @@ -284,6 +289,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan _stateChangeExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(new DaemonThreadFactory("StateChange-" + _prettyGroupNodeName))); _groupChangeExecutor = new ScheduledThreadPoolExecutor(2, new DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName)); _disableCoalescingCommiter = configuration.getFacadeParameter(Boolean.class,DISABLE_COALESCING_COMMITTER_PROPERTY_NAME, DEFAULT_DISABLE_COALESCING_COMMITTER); + _noSyncTxDurability = Durability.parse(configuration.getFacadeParameter(String.class, NO_SYNC_TX_DURABILITY_PROPERTY_NAME, getDefaultDurability(_disableCoalescingCommiter))); // create environment in a separate thread to avoid renaming of the current thread by JE EnvHomeRegistry.getInstance().registerHome(_environmentDirectory); @@ -318,6 +324,18 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } + private String getDefaultDurability(final boolean disableCoalescingCommiter) + { + if (disableCoalescingCommiter) + { + return DEFAULT_SYNC_TX_DURABILITY_PROPERTY_NAME; + } + else + { + return DEFAULT_NO_SYNC_TX_DURABILITY_PROPERTY_NAME; + } + } + @Override public Transaction beginTransaction(TransactionConfig transactionConfig) { @@ -325,40 +343,48 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } @Override - public void commit(final Transaction tx, boolean syncCommit) + public void commit(final Transaction tx) { - try - { - // Using commit() instead of commitNoSync() for the HA store to allow - // the HA durability configuration to influence resulting behaviour. - tx.commit(_realMessageStoreDurability); - } - catch (DatabaseException de) - { - throw handleDatabaseException("Got DatabaseException on commit, closing environment", de); - } + commitInternal(tx, _realMessageStoreDurability); if (_coalescingCommiter != null && _realMessageStoreDurability.getLocalSync() == SyncPolicy.NO_SYNC && _messageStoreDurability.getLocalSync() == SyncPolicy.SYNC) { - _coalescingCommiter.commit(tx, syncCommit); + _coalescingCommiter.commit(tx, true); } } - @Override - public <X> ListenableFuture<X> commitAsync(final Transaction tx, final X val) + private void commitInternal(final Transaction tx, final Durability realMessageStoreDurability) { try { // Using commit() instead of commitNoSync() for the HA store to allow // the HA durability configuration to influence resulting behaviour. - tx.commit(_realMessageStoreDurability); + tx.commit(realMessageStoreDurability); } catch (DatabaseException de) { throw handleDatabaseException("Got DatabaseException on commit, closing environment", de); } + } + + @Override + public void commitNoSync(final Transaction tx) + { + commitInternal(tx, _noSyncTxDurability); + + if (_coalescingCommiter != null && _realMessageStoreDurability.getLocalSync() == SyncPolicy.NO_SYNC + && _messageStoreDurability.getLocalSync() == SyncPolicy.SYNC) + { + _coalescingCommiter.commit(tx, false); + } + } + + @Override + public <X> ListenableFuture<X> commitAsync(final Transaction tx, final X val) + { + commitInternal(tx, _realMessageStoreDurability); if (_coalescingCommiter != null && _realMessageStoreDurability.getLocalSync() == SyncPolicy.NO_SYNC && _messageStoreDurability.getLocalSync() == SyncPolicy.SYNC) diff --git a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index d3fea6e..8b03750 100644 --- a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -116,7 +116,7 @@ public class BDBMessageStoreTest extends MessageStoreTestCase StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(bdbStore); long messageid_0_8 = storedMessage_0_8.getMessageNumber(); - bdbStore.removeMessage(messageid_0_8, true); + bdbStore.removeMessage(messageid_0_8); //verify the removal using the BDB store implementation methods directly try diff --git a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java index 90c15f7..9094f02 100644 --- a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java +++ b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java @@ -21,11 +21,11 @@ package org.apache.qpid.server.store.berkeleydb.replication; import static org.apache.qpid.server.store.berkeleydb.EnvironmentFacade.JUL_LOGGER_LEVEL_OVERRIDE; -import static org.apache.qpid.server.store.berkeleydb.EnvironmentFacade - .LOG_HANDLER_CLEANER_PROTECTED_FILES_LIMIT_PROPERTY_NAME; +import static org.apache.qpid.server.store.berkeleydb.EnvironmentFacade.LOG_HANDLER_CLEANER_PROTECTED_FILES_LIMIT_PROPERTY_NAME; import static org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade.*; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -37,8 +37,8 @@ import static org.junit.Assume.assumeThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -69,6 +69,8 @@ import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseEntry; import com.sleepycat.je.Durability; import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.LockMode; +import com.sleepycat.je.OperationStatus; import com.sleepycat.je.Transaction; import com.sleepycat.je.TransactionConfig; import com.sleepycat.je.rep.NoConsistencyRequiredPolicy; @@ -1196,18 +1198,172 @@ public class ReplicatedEnvironmentFacadeTest extends UnitTestBase masterListener.awaitForStateChange(State.MASTER, _timeout, TimeUnit.SECONDS)); } + @Test + public void testNodeCommitNoSyncWithCoalescing() throws Exception + { + DatabaseConfig createConfig = new DatabaseConfig(); + createConfig.setAllowCreate(true); + createConfig.setTransactional(true); + + TestStateChangeListener masterListener = new TestStateChangeListener(); + ReplicatedEnvironmentFacade node1 = + addNodeWithDurability(TEST_NODE_NAME, TEST_NODE_HOST_PORT, true, masterListener, new NoopReplicationGroupListener(), false,"NO_SYNC,NO_SYNC,NONE"); + assertTrue("Environment was not created", masterListener.awaitForStateChange(State.MASTER, + _timeout, TimeUnit.SECONDS)); + + String replicaNodeHostPort = "localhost:" + _portHelper.getNextAvailable(); + String replicaName = TEST_NODE_NAME + 1; + ReplicatedEnvironmentFacade node2 = + createReplica(replicaName, replicaNodeHostPort, new NoopReplicationGroupListener()); + + Database db = node1.openDatabase("mydb", createConfig); + + int key = 1; + String data = "value"; + // Put a record (using commitNoSync) + TransactionConfig transactionConfig = addTestKeyValueWithCommitNoSync(node1, db, key, data); + db.close(); + + node1.close(); + node2.close(); + + LOGGER.debug("RESTARTING " + TEST_NODE_NAME); + + node1 = addNodeWithDurability(TEST_NODE_NAME, TEST_NODE_HOST_PORT, true, masterListener, new NoopReplicationGroupListener(), false,"NO_SYNC,NO_SYNC,NONE"); + boolean awaitForStateChange = masterListener.awaitForStateChange(State.MASTER, + _timeout, TimeUnit.SECONDS); + LOGGER.debug("RESTARTING " + replicaName); + TestStateChangeListener node2StateChangeListener = new TestStateChangeListener(); + node2 = addNode(replicaName, + replicaNodeHostPort, + false, + node2StateChangeListener, + new NoopReplicationGroupListener()); + db = node1.openDatabase("mydb", DatabaseConfig.DEFAULT); + byte[] resultData = getTestKeyValue(node1, db, key, transactionConfig); + DatabaseEntry dbData = getDatabaseEntry(data); + assertArrayEquals(resultData, dbData.getData()); + assertEquals("value", StringBinding.entryToString(dbData)); + + db.close(); + + LOGGER.debug("CLOSING"); + node1.close(); + node2.close(); + } + + @Test + public void testNodeCommitSyncWithoutCoalescing() throws Exception + { + DatabaseConfig createConfig = new DatabaseConfig(); + createConfig.setAllowCreate(true); + createConfig.setTransactional(true); + + TestStateChangeListener masterListener = new TestStateChangeListener(); + ReplicatedEnvironmentFacade node1 = + addNodeWithDurability(TEST_NODE_NAME, TEST_NODE_HOST_PORT, true, masterListener, new NoopReplicationGroupListener(),true,"SYNC,NO_SYNC,NONE"); + + assertTrue("Environment was not created", masterListener.awaitForStateChange(State.MASTER, + _timeout, TimeUnit.SECONDS)); + + String replicaNodeHostPort = "localhost:" + _portHelper.getNextAvailable(); + String replicaName = TEST_NODE_NAME + 1; + ReplicatedEnvironmentFacade node2 = + createReplica(replicaName, replicaNodeHostPort, new NoopReplicationGroupListener()); + + Database db = node1.openDatabase("mydb", createConfig); + + int key = 1; + String data = "value"; + // Put a record (using commitNoSync) + TransactionConfig transactionConfig = addTestKeyValueWithCommitNoSync(node1, db, key, data); + db.close(); + + node1.close(); + node2.close(); + + LOGGER.debug("RESTARTING " + TEST_NODE_NAME); + + node1 = addNodeWithDurability(TEST_NODE_NAME, TEST_NODE_HOST_PORT, true, masterListener, new NoopReplicationGroupListener(),true,"SYNC,NO_SYNC,NONE"); + boolean awaitForStateChange = masterListener.awaitForStateChange(State.MASTER, + _timeout, TimeUnit.SECONDS); + LOGGER.debug("RESTARTING " + replicaName); + TestStateChangeListener node2StateChangeListener = new TestStateChangeListener(); + node2 = addNode(replicaName, + replicaNodeHostPort, + false, + node2StateChangeListener, + new NoopReplicationGroupListener()); + db = node1.openDatabase("mydb", DatabaseConfig.DEFAULT); + byte[] resultData = getTestKeyValue(node1, db, key, transactionConfig); + DatabaseEntry dbData = getDatabaseEntry(data); + assertArrayEquals(resultData, dbData.getData()); + assertEquals("value", StringBinding.entryToString(dbData)); + + db.close(); + + LOGGER.debug("CLOSING"); + node1.close(); + node2.close(); + } + + private DatabaseEntry getDatabaseEntry(final String data) + { + DatabaseEntry dbData = new DatabaseEntry(); + StringBinding.stringToEntry(data, dbData); + return dbData; + } + private DatabaseEntry getDatabaseEntry(final int data) + { + DatabaseEntry dbData = new DatabaseEntry(); + IntegerBinding.intToEntry(data, dbData); + return dbData; + } + + private byte[] getTestKeyValue(final ReplicatedEnvironmentFacade node1, + final Database db, + final int keyValue, + final TransactionConfig transactionConfig) + { + Transaction txn = node1.beginTransaction(transactionConfig); + + DatabaseEntry key = getDatabaseEntry(keyValue); + DatabaseEntry result = new DatabaseEntry(); + OperationStatus status = db.get(txn, key, result, LockMode.READ_UNCOMMITTED); + txn.commit(); + byte[] resultData = new byte[0]; + if (status == OperationStatus.SUCCESS) + { + resultData = result.getData(); + } + return resultData; + } + + private TransactionConfig addTestKeyValueWithCommitNoSync(final ReplicatedEnvironmentFacade node1, + final Database db, + final int keyValue, final String dataValue) + { + DatabaseEntry key = getDatabaseEntry(keyValue); + DatabaseEntry data = getDatabaseEntry(dataValue); + TransactionConfig transactionConfig = new TransactionConfig(); + transactionConfig.setDurability(node1.getRealMessageStoreDurability()); + + Transaction txn = node1.beginTransaction(null); + db.put(txn, key, data); + node1.commitNoSync(txn); + return transactionConfig; + } + + private void putRecord(final ReplicatedEnvironmentFacade master, final Database db, final int keyValue, final String dataValue) { - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry data = new DatabaseEntry(); + DatabaseEntry key = getDatabaseEntry(keyValue); + DatabaseEntry data = getDatabaseEntry(dataValue); TransactionConfig transactionConfig = new TransactionConfig(); transactionConfig.setDurability(master.getRealMessageStoreDurability()); Transaction txn = master.beginTransaction(transactionConfig); - IntegerBinding.intToEntry(keyValue, key); - StringBinding.stringToEntry(dataValue, data); - db.put(txn, key, data); txn.commit(); } @@ -1293,6 +1449,16 @@ public class ReplicatedEnvironmentFacadeTest extends UnitTestBase return addNode(nodeName,nodeHostPort,designatedPrimary,stateChangeListener,replicationGroupListener,false); } + private ReplicatedEnvironmentFacade addNodeWithDurability(String nodeName, String nodeHostPort, boolean designatedPrimary, + StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener, boolean disableCoalescing, String durability) + { + ReplicatedEnvironmentConfiguration config = createReplicatedEnvironmentConfiguration(nodeName, nodeHostPort, designatedPrimary,disableCoalescing); + when(config.getFacadeParameter(eq(String.class), + eq(NO_SYNC_TX_DURABILITY_PROPERTY_NAME), + anyString())).thenReturn(durability); + return createReplicatedEnvironmentFacade(nodeName, stateChangeListener, replicationGroupListener, config); + + } private ReplicatedEnvironmentFacade createReplicatedEnvironmentFacade(String nodeName, StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener, ReplicatedEnvironmentConfiguration config) { ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config); ref.setStateChangeListener(stateChangeListener); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org