This is an automated email from the ASF dual-hosted git repository. orudyy pushed a commit to branch 7.1.x in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
commit 119960d47d576833fc947fc05814ea7601b5a955 Author: Alex Rudyy <[email protected]> AuthorDate: Tue Jan 28 15:13:55 2020 +0000 QPID-8407: [Broker-J][BDB HA] Clean cached sequences and databases on BDB HA node transition into a Master role (cherry picked from commit 32878798a32c9dc1c762ecb9f870837ff365f350) --- .../store/berkeleydb/AbstractBDBMessageStore.java | 17 +--- .../replication/ReplicatedEnvironmentFacade.java | 28 ++++--- .../berkeleydb/replication/MultiNodeTest.java | 98 ++++++++++++++++++++-- 3 files changed, 112 insertions(+), 31 deletions(-) diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index 05733ad..57b1856 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -1561,8 +1561,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore final List<QueueEntryKey> entries = new ArrayList<>(); try(Cursor cursor = getDeliveryDb().openCursor(null, null)) { - boolean searchCompletedSuccessfully = false; - DatabaseEntry key = new DatabaseEntry(); DatabaseEntry value = new DatabaseEntry(); value.setPartial(0, 0, true); @@ -1570,19 +1568,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore CachingUUIDFactory uuidFactory = new CachingUUIDFactory(); QueueEntryBinding.objectToEntry(new QueueEntryKey(queue.getId(), 0L), key); - if (!searchCompletedSuccessfully && (searchCompletedSuccessfully = - cursor.getSearchKeyRange(key, value, LockMode.READ_UNCOMMITTED) == OperationStatus.SUCCESS)) - { - QueueEntryKey entry = QueueEntryBinding.entryToObject(uuidFactory, key); - if (entry.getQueueId().equals(queue.getId())) - { - entries.add(entry); - } - } - - if (searchCompletedSuccessfully) + if (cursor.getSearchKeyRange(key, value, LockMode.READ_UNCOMMITTED) == OperationStatus.SUCCESS) { - while (cursor.getNext(key, value, LockMode.READ_UNCOMMITTED) == OperationStatus.SUCCESS) + do { QueueEntryKey entry = QueueEntryBinding.entryToObject(uuidFactory, key); if (entry.getQueueId().equals(queue.getId())) @@ -1594,6 +1582,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore break; } } + while (cursor.getNext(key, value, LockMode.READ_UNCOMMITTED) == OperationStatus.SUCCESS); } } catch (RuntimeException e) diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index e3b053f..8006cb3 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -996,6 +996,10 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan LOGGER.info("The environment facade is in open state for node " + _prettyGroupNodeName); _joinTime = System.currentTimeMillis(); } + if (state == ReplicatedEnvironment.State.MASTER) + { + closeSequencesAndDatabasesSafely(); + } } StateChangeListener listener = _stateChangeListener.get(); @@ -1366,18 +1370,9 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan ReplicatedEnvironment environment = _environment.getAndSet(null); if (environment != null) { + closeSequencesAndDatabasesSafely(); try { - try - { - closeSequences(); - closeDatabases(); - } - catch(Exception e) - { - LOGGER.warn("Ignoring an exception whilst closing databases", e); - } - environment.close(); } catch (EnvironmentFailureException efe) @@ -1387,6 +1382,19 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } + private void closeSequencesAndDatabasesSafely() + { + try + { + closeSequences(); + closeDatabases(); + } + catch(Exception e) + { + LOGGER.warn("Ignoring an exception whilst closing databases", e); + } + } + private void closeSequences() { RuntimeException firstThrownException = null; diff --git a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java index c2009b0..92e253a 100644 --- a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java +++ b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java @@ -23,6 +23,7 @@ import static junit.framework.TestCase.assertEquals; import static org.apache.qpid.systests.Utils.INDEX; import static org.apache.qpid.systests.Utils.getReceiveTimeout; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; import static org.junit.Assert.assertNotNull; @@ -311,23 +312,29 @@ public class MultiNodeTest extends GroupJmsTestBase final int inactiveBrokerPort, final int activeBrokerPort) throws Exception { + transferMasterToNodeWithAmqpPort(connection, inactiveBrokerPort); + + assertThat(Utils.produceConsume(connection, queue), is(equalTo(true))); + + getBrokerAdmin().awaitNodeRole(activeBrokerPort, "REPLICA"); + } + + private void transferMasterToNodeWithAmqpPort(final Connection connection, final int nodeAmqpPort) + throws InterruptedException + { _failoverListener = new FailoverAwaitingListener(); getJmsProvider().addGenericConnectionListener(connection, _failoverListener); - Map<String, Object> attributes = getBrokerAdmin().getNodeAttributes(inactiveBrokerPort); + Map<String, Object> attributes = getBrokerAdmin().getNodeAttributes(nodeAmqpPort); assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE)); - getBrokerAdmin().setNodeAttributes(inactiveBrokerPort, + getBrokerAdmin().setNodeAttributes(nodeAmqpPort, Collections.singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER")); _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT); LOGGER.info("Listener has finished"); - attributes = getBrokerAdmin().getNodeAttributes(inactiveBrokerPort); + attributes = getBrokerAdmin().getNodeAttributes(nodeAmqpPort); assertEquals("Inactive broker has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE)); - - assertThat(Utils.produceConsume(connection, queue), is(equalTo(true))); - - getBrokerAdmin().awaitNodeRole(activeBrokerPort, "REPLICA"); } @Test @@ -870,6 +877,83 @@ public class MultiNodeTest extends GroupJmsTestBase } } + @Test + public void testAsynchronousRecoverer() throws Exception + { + configureAsynchronousRecoveryOnAllNodes(); + + final Connection connection = getConnectionBuilder().build(); + try + { + getJmsProvider().addGenericConnectionListener(connection, _failoverListener); + final Destination queue = createTestQueue(connection); + int brokerPort = getJmsProvider().getConnectedURI(connection).getPort(); + LOGGER.info("Sending message 'A' to the node with port {}", brokerPort); + Utils.sendTextMessage(connection, queue, "A"); + + final int anotherNodePort = getBrokerAdmin().getAmqpPort(brokerPort); + LOGGER.info("Changing mastership to the node with port {}", anotherNodePort); + transferMasterToNodeWithAmqpPort(connection, anotherNodePort); + getBrokerAdmin().awaitNodeRole(brokerPort, "REPLICA", "MASTER"); + + LOGGER.info("Sending message 'B' to the node with port {}", anotherNodePort); + Utils.sendTextMessage(connection, queue, "B"); + + LOGGER.info("Transfer mastership back to broker with port {}", brokerPort); + transferMasterToNodeWithAmqpPort(connection, brokerPort); + getBrokerAdmin().awaitNodeRole(anotherNodePort, "REPLICA", "MASTER"); + + LOGGER.info("Sending message 'C' to the node with port {}", anotherNodePort); + Utils.sendTextMessage(connection, queue, "C"); + + consumeTextMessages(connection, queue, "A", "B", "C"); + } + finally + { + connection.close(); + } + } + + private void configureAsynchronousRecoveryOnAllNodes() + { + final GroupBrokerAdmin brokerAdmin = getBrokerAdmin(); + for (int port : brokerAdmin.getGroupAmqpPorts()) + { + brokerAdmin.setNodeAttributes(port, Collections.singletonMap(BDBHAVirtualHostNode.CONTEXT, + Collections.singletonMap( + "use_async_message_store_recovery", + "true"))); + brokerAdmin.stopNode(port); + brokerAdmin.startNode(port); + brokerAdmin.awaitNodeRole(port, BDBHARemoteReplicationNode.ROLE, "REPLICA", "MASTER"); + } + + LOGGER.info("Asynchronous recoverer is configured on all group nodes"); + } + + private void consumeTextMessages(final Connection connection, final Destination queue, final String... expected) + throws JMSException + { + LOGGER.info("Trying to consume messages: {}", String.join(",", expected)); + connection.start(); + final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + try + { + final MessageConsumer consumer = session.createConsumer(queue); + + for (String m : expected) + { + final Message message = consumer.receive(getReceiveTimeout()); + assertThat(message, is(instanceOf(TextMessage.class))); + assertThat(((TextMessage) message).getText(), is(equalTo(m))); + } + } + finally + { + session.close(); + } + } + private final class FailoverAwaitingListener implements GenericConnectionListener { private final CountDownLatch _failoverCompletionLatch; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
