This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new f6401d8  ARTEMIS-2547 fix AMQP Client reconnect fails on broker stop 
start Add unit test Add fix to clear clientids when server is stopped.
     new 7168cc1  This closes #2891
f6401d8 is described below

commit f6401d81b55860e091afda052327ca6183bc6586
Author: michael.pearce <[email protected]>
AuthorDate: Mon Nov 11 12:33:13 2019 +0000

    ARTEMIS-2547 fix AMQP Client reconnect fails on broker stop start
    Add unit test
    Add fix to clear clientids when server is stopped.
---
 .../core/server/impl/ActiveMQServerImpl.java       |  2 +
 .../integration/amqp/JMSClientTestSupport.java     | 24 +++++++++-
 .../integration/amqp/JMSMessageConsumerTest.java   | 54 ++++++++++++++++++++++
 3 files changed, 78 insertions(+), 2 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index f73407a..91e71c4 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -1249,6 +1249,8 @@ public class ActiveMQServerImpl implements ActiveMQServer 
{
 
       scaledDownNodeIDs.clear();
 
+      connectedClientIds.clear();
+
       for (ActiveMQComponent externalComponent : externalComponents) {
          try {
             if (externalComponent instanceof ServiceComponent) {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
index 39af3e6..fbed4ec 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
@@ -75,7 +75,7 @@ public abstract class JMSClientTestSupport extends 
AmqpClientTestSupport {
       return "";
    }
 
-   protected URI getBrokerQpidJMSConnectionURI() {
+   protected String getBrokerQpidJMSConnectionString() {
 
       try {
          int port = AMQP_PORT;
@@ -100,7 +100,23 @@ public abstract class JMSClientTestSupport extends 
AmqpClientTestSupport {
             uri = uri + "?" + getJmsConnectionURIOptions();
          }
 
-         return new URI(uri);
+         return uri;
+      } catch (Exception e) {
+         throw new RuntimeException();
+      }
+   }
+
+   protected URI getBrokerQpidJMSConnectionURI() {
+      try {
+         return new URI(getBrokerQpidJMSConnectionString());
+      } catch (Exception e) {
+         throw new RuntimeException();
+      }
+   }
+
+   protected URI getBrokerQpidJMSFailoverConnectionURI() {
+      try {
+         return new URI("failover:(" + getBrokerQpidJMSConnectionString() + 
")");
       } catch (Exception e) {
          throw new RuntimeException();
       }
@@ -110,6 +126,10 @@ public abstract class JMSClientTestSupport extends 
AmqpClientTestSupport {
       return createConnection(getBrokerQpidJMSConnectionURI(), null, null, 
null, true);
    }
 
+   protected Connection createFailoverConnection() throws JMSException {
+      return createConnection(getBrokerQpidJMSFailoverConnectionURI(), null, 
null, null, true);
+   }
+
    protected Connection createConnection(boolean start) throws JMSException {
       return createConnection(getBrokerQpidJMSConnectionURI(), null, null, 
null, start);
    }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java
index 7a83172..a634b44 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java
@@ -866,4 +866,58 @@ public class JMSMessageConsumerTest extends 
JMSClientTestSupport {
 
       assertFalse(failedToSubscribe.get());
    }
+
+   @Test(timeout = 30000)
+   public void testBrokerRestartAMQPProducerAMQPConsumer() throws Exception {
+      Connection connection = createFailoverConnection(); //AMQP
+      Connection connection2 = createFailoverConnection(); //AMQP
+      testBrokerRestart(connection, connection2);
+   }
+
+   private void testBrokerRestart(Connection connection1, Connection 
connection2) throws Exception {
+      try {
+         Session session1 = connection1.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Session session2 = connection2.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+         javax.jms.Queue queue1 = session1.createQueue(getQueueName());
+         javax.jms.Queue queue2 = session2.createQueue(getQueueName());
+
+         final MessageConsumer consumer2 = session2.createConsumer(queue2);
+
+         MessageProducer producer = session1.createProducer(queue1);
+         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+         connection1.start();
+
+         TextMessage message = session1.createTextMessage();
+         message.setText("hello");
+         producer.send(message);
+
+         Message received = consumer2.receive(100);
+
+         assertNotNull("Should have received a message by now.", received);
+         assertTrue("Should be an instance of TextMessage", received 
instanceof TextMessage);
+         assertEquals(DeliveryMode.PERSISTENT, received.getJMSDeliveryMode());
+
+
+         server.stop();
+         Wait.waitFor(() -> !server.isStarted(), 1000);
+
+         server.start();
+
+         TextMessage message2 = session1.createTextMessage();
+         message2.setText("hello");
+         producer.send(message2);
+
+         Message received2 = consumer2.receive(100);
+
+         assertNotNull("Should have received a message by now.", received2);
+         assertTrue("Should be an instance of TextMessage", received2 
instanceof TextMessage);
+         assertEquals(DeliveryMode.PERSISTENT, received2.getJMSDeliveryMode());
+
+
+      } finally {
+         connection1.close();
+         connection2.close();
+      }
+   }
 }

Reply via email to