Repository: activemq-artemis
Updated Branches:
  refs/heads/2.6.x 26df390fb -> 6f62113a3


[ARTEMIS-2175] Duplicate messages when JMS bridge is stopped and restarted

Issue: https://issues.apache.org/jira/browse/ARTEMIS-2175
(cherry picked from commit ff5f1213bbf3fd2cfd9419ce44a4baf77ed9597f)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/566ecbb4
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/566ecbb4
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/566ecbb4

Branch: refs/heads/2.6.x
Commit: 566ecbb4d29ec7f201bb7f6645252dbc0eb72228
Parents: 26df390
Author: Ingo Weiss <[email protected]>
Authored: Tue Nov 13 17:00:07 2018 +0000
Committer: Clebert Suconic <[email protected]>
Committed: Wed Nov 14 11:44:42 2018 -0500

----------------------------------------------------------------------
 .../artemis/jms/bridge/impl/JMSBridgeImpl.java  |  42 +++--
 .../core/server/impl/ServerConsumerImpl.java    |   5 +
 .../tests/extras/jms/bridge/JMSBridgeTest.java  | 182 +++++++++++++++++++
 3 files changed, 214 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/566ecbb4/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java
 
b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java
index da37079..b1e7e12 100644
--- 
a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java
+++ 
b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java
@@ -501,11 +501,13 @@ public final class JMSBridgeImpl implements JMSBridge {
             }
          }
 
-         try {
-            sourceConn.close();
-         } catch (Exception ignore) {
-            if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
-               ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to close source 
conn", ignore);
+         if (sourceConn != null) {
+            try {
+               sourceConn.close();
+            } catch (Exception ignore) {
+               if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
+                  ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to close source 
conn", ignore);
+               }
             }
          }
 
@@ -519,6 +521,12 @@ public final class JMSBridgeImpl implements JMSBridge {
             }
          }
 
+         if (messages.size() > 0) {
+            // Clear outstanding messages so they don't get retransmitted and 
duplicated on the other side of the bridge
+            ActiveMQJMSBridgeLogger.LOGGER.trace("Clearing up messages before 
stopping...");
+            messages.clear();
+         }
+
          if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
             ActiveMQJMSBridgeLogger.LOGGER.trace("Stopped " + this);
          }
@@ -1189,11 +1197,13 @@ public final class JMSBridgeImpl implements JMSBridge {
 
    private void cleanup() {
       // Stop the source connection
-      try {
-         sourceConn.stop();
-      } catch (Throwable ignore) {
-         if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
-            ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to stop source 
connection", ignore);
+      if (sourceConn != null) {
+         try {
+            sourceConn.stop();
+         } catch (Throwable ignore) {
+            if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
+               ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to stop source 
connection", ignore);
+            }
          }
       }
 
@@ -1217,11 +1227,13 @@ public final class JMSBridgeImpl implements JMSBridge {
       }
 
       // Close the old objects
-      try {
-         sourceConn.close();
-      } catch (Throwable ignore) {
-         if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
-            ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to close source 
connection", ignore);
+      if (sourceConn != null) {
+         try {
+            sourceConn.close();
+         } catch (Throwable ignore) {
+            if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
+               ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to close source 
connection", ignore);
+            }
          }
       }
       try {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/566ecbb4/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index fa81bfb..541808a 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -1195,6 +1195,11 @@ public class ServerConsumerImpl implements 
ServerConsumer, ReadyListener {
          this.chunkBytes = null;
       }
 
+      @Override
+      public String toString() {
+         return "ServerConsumerImpl$LargeMessageDeliverer[ref=[" + ref + "]]";
+      }
+
       private ByteBuffer acquireHeapBodyBuffer(int requiredCapacity) {
          if (this.chunkBytes == null || this.chunkBytes.capacity() != 
requiredCapacity) {
             this.chunkBytes = ByteBuffer.allocate(requiredCapacity);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/566ecbb4/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java
----------------------------------------------------------------------
diff --git 
a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java
 
b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java
index fed218b..33edaec 100644
--- 
a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java
+++ 
b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.extras.jms.bridge;
 
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
@@ -35,9 +36,12 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.api.core.management.ResourceNames;
 import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
 import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
 import org.apache.activemq.artemis.api.jms.JMSFactoryType;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.jms.bridge.ConnectionFactoryFactory;
 import org.apache.activemq.artemis.jms.bridge.QualityOfServiceMode;
 import org.apache.activemq.artemis.jms.bridge.impl.JMSBridgeImpl;
@@ -1766,10 +1770,188 @@ public class JMSBridgeTest extends BridgeTestBase {
       Assert.assertFalse(mbeanServer.isRegistered(objectName));
    }
 
+   @Test
+   public void testDuplicateMessagesWhenBridgeStops() throws Exception {
+      final int NUM_MESSAGES = 500;
+
+      JMSBridgeImpl bridge = new JMSBridgeImpl(cff0xa, cff1xa, 
sourceQueueFactory, targetQueueFactory, null, null,
+         null, null, null, 1000, 10,
+         QualityOfServiceMode.ONCE_AND_ONLY_ONCE, 10, 100, null, "ClientId123",
+         true)
+         .setBridgeName("test-bridge");
+      bridge.setTransactionManager(getNewTm());
+      createQueue(targetQueue.getQueueName(), 1);
+
+      final List<TextMessage> sentMessages = new ArrayList<>();
+      final List<TextMessage> receivedMessages = new ArrayList<>();
+
+      log.info("Starting bridge " + bridge);
+      bridge.start();
+      waitForComponent(bridge, 15);
+
+      Thread producerThread = new Thread(() -> {
+         Connection conn = null;
+         Session session = null;
+         int counter = 0;
+         try {
+            conn = cf0.createConnection();
+            session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+            MessageProducer producer = session.createProducer(sourceQueue);
+            TextMessage msg = null;
+
+            while (counter < NUM_MESSAGES) {
+               msg = session.createTextMessage("message" + counter);
+               msg.setIntProperty("count", counter);
+               producer.send(msg);
+               sentMessages.add(msg);
+               log.info("Sent message with property counter: " + counter + ", 
messageId:" + msg.getJMSMessageID()
+                  + ((msg.getStringProperty("_AMQ_DUPL_ID") != null) ? ", 
_AMQ_DUPL_ID=" + msg.getStringProperty("_AMQ_DUPL_ID") : ""));
+               counter++;
+               Thread.sleep(200);
+            }
+
+            producer.close();
+         } catch (InterruptedException | JMSException e) {
+            log.error("Error while producing messages: ", e);
+         } finally {
+            try {
+               if (session != null) {
+                  session.close();
+               }
+
+               if (conn != null) {
+                  conn.close();
+               }
+            } catch (JMSException e) {
+               log.error("Error cleaning up the producer thread! ", e);
+            }
+         }
+      });
+
+      Thread consumerThread = new Thread(() -> {
+         Connection conn = null;
+         Session session = null;
+         try {
+            conn = cf1.createConnection();
+            conn.start();
+
+            session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+            MessageConsumer consumer = session.createConsumer(targetQueue);
+            TextMessage msg = null;
+
+            boolean running = true;
+            while (running) {
+               msg = (TextMessage) consumer.receive(5000);
+               if (msg != null) {
+                  msg.acknowledge();
+                  receivedMessages.add(msg);
+                  log.info("Received message with messageId: " + 
msg.getJMSMessageID() +
+                     " and property counter " + msg.getIntProperty("count"));
+               } else {
+                  running = false;
+               }
+            }
+
+         } catch (JMSException e) {
+            log.error("Error while consuming messages: ", e);
+         } finally {
+            try {
+               if (session != null) {
+                  session.close();
+               }
+
+               if (conn != null) {
+                  conn.close();
+               }
+            } catch (JMSException e) {
+               log.error("Error cleaning up the consumer thread! ", e);
+            }
+         }
+      });
+
+      log.info("Starting producer thread...");
+      producerThread.start();
+
+      Assert.assertTrue(waitForMessages(server1, targetQueue.getQueueName(), 
NUM_MESSAGES / 100, 250000));
+
+      log.info("Stopping bridge " + bridge);
+      bridge.stop();
+      Thread.sleep(5000);
+
+      log.info("Starting bridge " + bridge + " again");
+      bridge.start();
+      waitForComponent(bridge, 15);
+
+      Assert.assertTrue(waitForMessages(server1, targetQueue.getQueueName(), 
NUM_MESSAGES, 300000));
+
+
+
+      log.info("Starting consumer thread...");
+      consumerThread.start();
+
+      log.info("Waiting for the consumer thread to die...");
+      consumerThread.join();
+
+      log.info("Waiting for the producer thread to die...");
+      producerThread.join();
+
+      bridge.stop();
+
+      server1.stop();
+      server0.stop();
+
+      Assert.assertEquals("Number of sent messages is different from received 
messages", sentMessages.size(), receivedMessages.size());
+   }
+
    public TransactionManager getNewTm() {
       return newTransactionManager();
    }
 
+   private static long countMessagesInQueue(ActiveMQServer server, String 
queueName) {
+      QueueControl queueControl = (QueueControl) 
server.getManagementService().getResource(ResourceNames.QUEUE + queueName);
+      Assert.assertNotNull(queueControl);
+      long count = -1;
+      int numberOfTries = 0;
+      int maxNumberOfTries = 10;
+      while (count == -1 && numberOfTries < maxNumberOfTries) {
+         try {
+            numberOfTries++;
+            count = queueControl.countMessages();
+            break;
+         } catch (Exception ex) {
+            if (numberOfTries > maxNumberOfTries - 1) {
+               throw new RuntimeException("countMessagesInQueue() failed for 
queue:" + queueName
+                  + " and server: " + server + ". Number of tries: " + 
numberOfTries, ex);
+            }
+            try {
+               Thread.sleep(2000);
+            } catch (InterruptedException e) {
+               log.error(e.getMessage(), e);
+            }
+         }
+      }
+      log.info("Number of messages in queue " + queueName + " on server: " + 
server + " is: " + count);
+      return count;
+   }
+
+   private static boolean waitForMessages(ActiveMQServer server, String 
queueName, long numberOfMessages, long timeout) throws Exception {
+
+      long startTime = System.currentTimeMillis();
+
+      long count = 0;
+      while ((count = countMessagesInQueue(server, queueName)) < 
numberOfMessages) {
+         log.info("Total number of messages in queue: " + queueName + " on 
server " + server + " is " + count);
+         Thread.sleep(5000);
+         if (System.currentTimeMillis() - startTime > timeout) {
+            log.warn(numberOfMessages + " not on server " + server + " in 
timeout " + timeout + "ms.");
+            return false;
+         }
+      }
+      return true;
+
+   }
+
    // Inner classes 
-------------------------------------------------------------------
 
    private static class StressSender implements Runnable {

Reply via email to