This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new f6401d8 ARTEMIS-2547 fix AMQP Client reconnect fails on broker stop
start Add unit test Add fix to clear clientids when server is stopped.
new 7168cc1 This closes #2891
f6401d8 is described below
commit f6401d81b55860e091afda052327ca6183bc6586
Author: michael.pearce <[email protected]>
AuthorDate: Mon Nov 11 12:33:13 2019 +0000
ARTEMIS-2547 fix AMQP Client reconnect fails on broker stop start
Add unit test
Add fix to clear clientids when server is stopped.
---
.../core/server/impl/ActiveMQServerImpl.java | 2 +
.../integration/amqp/JMSClientTestSupport.java | 24 +++++++++-
.../integration/amqp/JMSMessageConsumerTest.java | 54 ++++++++++++++++++++++
3 files changed, 78 insertions(+), 2 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index f73407a..91e71c4 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -1249,6 +1249,8 @@ public class ActiveMQServerImpl implements ActiveMQServer
{
scaledDownNodeIDs.clear();
+ connectedClientIds.clear();
+
for (ActiveMQComponent externalComponent : externalComponents) {
try {
if (externalComponent instanceof ServiceComponent) {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
index 39af3e6..fbed4ec 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
@@ -75,7 +75,7 @@ public abstract class JMSClientTestSupport extends
AmqpClientTestSupport {
return "";
}
- protected URI getBrokerQpidJMSConnectionURI() {
+ protected String getBrokerQpidJMSConnectionString() {
try {
int port = AMQP_PORT;
@@ -100,7 +100,23 @@ public abstract class JMSClientTestSupport extends
AmqpClientTestSupport {
uri = uri + "?" + getJmsConnectionURIOptions();
}
- return new URI(uri);
+ return uri;
+ } catch (Exception e) {
+ throw new RuntimeException();
+ }
+ }
+
+ protected URI getBrokerQpidJMSConnectionURI() {
+ try {
+ return new URI(getBrokerQpidJMSConnectionString());
+ } catch (Exception e) {
+ throw new RuntimeException();
+ }
+ }
+
+ protected URI getBrokerQpidJMSFailoverConnectionURI() {
+ try {
+ return new URI("failover:(" + getBrokerQpidJMSConnectionString() +
")");
} catch (Exception e) {
throw new RuntimeException();
}
@@ -110,6 +126,10 @@ public abstract class JMSClientTestSupport extends
AmqpClientTestSupport {
return createConnection(getBrokerQpidJMSConnectionURI(), null, null,
null, true);
}
+ protected Connection createFailoverConnection() throws JMSException {
+ return createConnection(getBrokerQpidJMSFailoverConnectionURI(), null,
null, null, true);
+ }
+
protected Connection createConnection(boolean start) throws JMSException {
return createConnection(getBrokerQpidJMSConnectionURI(), null, null,
null, start);
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java
index 7a83172..a634b44 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java
@@ -866,4 +866,58 @@ public class JMSMessageConsumerTest extends
JMSClientTestSupport {
assertFalse(failedToSubscribe.get());
}
+
+ @Test(timeout = 30000)
+ public void testBrokerRestartAMQPProducerAMQPConsumer() throws Exception {
+ Connection connection = createFailoverConnection(); //AMQP
+ Connection connection2 = createFailoverConnection(); //AMQP
+ testBrokerRestart(connection, connection2);
+ }
+
+ private void testBrokerRestart(Connection connection1, Connection
connection2) throws Exception {
+ try {
+ Session session1 = connection1.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Session session2 = connection2.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+
+ javax.jms.Queue queue1 = session1.createQueue(getQueueName());
+ javax.jms.Queue queue2 = session2.createQueue(getQueueName());
+
+ final MessageConsumer consumer2 = session2.createConsumer(queue2);
+
+ MessageProducer producer = session1.createProducer(queue1);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ connection1.start();
+
+ TextMessage message = session1.createTextMessage();
+ message.setText("hello");
+ producer.send(message);
+
+ Message received = consumer2.receive(100);
+
+ assertNotNull("Should have received a message by now.", received);
+ assertTrue("Should be an instance of TextMessage", received
instanceof TextMessage);
+ assertEquals(DeliveryMode.PERSISTENT, received.getJMSDeliveryMode());
+
+
+ server.stop();
+ Wait.waitFor(() -> !server.isStarted(), 1000);
+
+ server.start();
+
+ TextMessage message2 = session1.createTextMessage();
+ message2.setText("hello");
+ producer.send(message2);
+
+ Message received2 = consumer2.receive(100);
+
+ assertNotNull("Should have received a message by now.", received2);
+ assertTrue("Should be an instance of TextMessage", received2
instanceof TextMessage);
+ assertEquals(DeliveryMode.PERSISTENT, received2.getJMSDeliveryMode());
+
+
+ } finally {
+ connection1.close();
+ connection2.close();
+ }
+ }
}