Repository: activemq-artemis Updated Branches: refs/heads/2.6.x 26df390fb -> 6f62113a3
[ARTEMIS-2175] Duplicate messages when JMS bridge is stopped and restarted Issue: https://issues.apache.org/jira/browse/ARTEMIS-2175 (cherry picked from commit ff5f1213bbf3fd2cfd9419ce44a4baf77ed9597f) Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/566ecbb4 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/566ecbb4 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/566ecbb4 Branch: refs/heads/2.6.x Commit: 566ecbb4d29ec7f201bb7f6645252dbc0eb72228 Parents: 26df390 Author: Ingo Weiss <[email protected]> Authored: Tue Nov 13 17:00:07 2018 +0000 Committer: Clebert Suconic <[email protected]> Committed: Wed Nov 14 11:44:42 2018 -0500 ---------------------------------------------------------------------- .../artemis/jms/bridge/impl/JMSBridgeImpl.java | 42 +++-- .../core/server/impl/ServerConsumerImpl.java | 5 + .../tests/extras/jms/bridge/JMSBridgeTest.java | 182 +++++++++++++++++++ 3 files changed, 214 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/566ecbb4/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java ---------------------------------------------------------------------- diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java index da37079..b1e7e12 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java @@ -501,11 +501,13 @@ public final class JMSBridgeImpl implements JMSBridge { } } - try { - sourceConn.close(); - } catch (Exception ignore) { - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to close source conn", ignore); + if (sourceConn != null) { + try { + sourceConn.close(); + } catch (Exception ignore) { + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { + ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to close source conn", ignore); + } } } @@ -519,6 +521,12 @@ public final class JMSBridgeImpl implements JMSBridge { } } + if (messages.size() > 0) { + // Clear outstanding messages so they don't get retransmitted and duplicated on the other side of the bridge + ActiveMQJMSBridgeLogger.LOGGER.trace("Clearing up messages before stopping..."); + messages.clear(); + } + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { ActiveMQJMSBridgeLogger.LOGGER.trace("Stopped " + this); } @@ -1189,11 +1197,13 @@ public final class JMSBridgeImpl implements JMSBridge { private void cleanup() { // Stop the source connection - try { - sourceConn.stop(); - } catch (Throwable ignore) { - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to stop source connection", ignore); + if (sourceConn != null) { + try { + sourceConn.stop(); + } catch (Throwable ignore) { + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { + ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to stop source connection", ignore); + } } } @@ -1217,11 +1227,13 @@ public final class JMSBridgeImpl implements JMSBridge { } // Close the old objects - try { - sourceConn.close(); - } catch (Throwable ignore) { - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to close source connection", ignore); + if (sourceConn != null) { + try { + sourceConn.close(); + } catch (Throwable ignore) { + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { + ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to close source connection", ignore); + } } } try { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/566ecbb4/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index fa81bfb..541808a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -1195,6 +1195,11 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { this.chunkBytes = null; } + @Override + public String toString() { + return "ServerConsumerImpl$LargeMessageDeliverer[ref=[" + ref + "]]"; + } + private ByteBuffer acquireHeapBodyBuffer(int requiredCapacity) { if (this.chunkBytes == null || this.chunkBytes.capacity() != requiredCapacity) { this.chunkBytes = ByteBuffer.allocate(requiredCapacity); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/566ecbb4/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java ---------------------------------------------------------------------- diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java index fed218b..33edaec 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.extras.jms.bridge; import javax.jms.Connection; import javax.jms.DeliveryMode; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; @@ -35,9 +36,12 @@ import java.util.List; import java.util.Map; import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.core.management.QueueControl; +import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants; import org.apache.activemq.artemis.api.jms.JMSFactoryType; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.jms.bridge.ConnectionFactoryFactory; import org.apache.activemq.artemis.jms.bridge.QualityOfServiceMode; import org.apache.activemq.artemis.jms.bridge.impl.JMSBridgeImpl; @@ -1766,10 +1770,188 @@ public class JMSBridgeTest extends BridgeTestBase { Assert.assertFalse(mbeanServer.isRegistered(objectName)); } + @Test + public void testDuplicateMessagesWhenBridgeStops() throws Exception { + final int NUM_MESSAGES = 500; + + JMSBridgeImpl bridge = new JMSBridgeImpl(cff0xa, cff1xa, sourceQueueFactory, targetQueueFactory, null, null, + null, null, null, 1000, 10, + QualityOfServiceMode.ONCE_AND_ONLY_ONCE, 10, 100, null, "ClientId123", + true) + .setBridgeName("test-bridge"); + bridge.setTransactionManager(getNewTm()); + createQueue(targetQueue.getQueueName(), 1); + + final List<TextMessage> sentMessages = new ArrayList<>(); + final List<TextMessage> receivedMessages = new ArrayList<>(); + + log.info("Starting bridge " + bridge); + bridge.start(); + waitForComponent(bridge, 15); + + Thread producerThread = new Thread(() -> { + Connection conn = null; + Session session = null; + int counter = 0; + try { + conn = cf0.createConnection(); + session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(sourceQueue); + TextMessage msg = null; + + while (counter < NUM_MESSAGES) { + msg = session.createTextMessage("message" + counter); + msg.setIntProperty("count", counter); + producer.send(msg); + sentMessages.add(msg); + log.info("Sent message with property counter: " + counter + ", messageId:" + msg.getJMSMessageID() + + ((msg.getStringProperty("_AMQ_DUPL_ID") != null) ? ", _AMQ_DUPL_ID=" + msg.getStringProperty("_AMQ_DUPL_ID") : "")); + counter++; + Thread.sleep(200); + } + + producer.close(); + } catch (InterruptedException | JMSException e) { + log.error("Error while producing messages: ", e); + } finally { + try { + if (session != null) { + session.close(); + } + + if (conn != null) { + conn.close(); + } + } catch (JMSException e) { + log.error("Error cleaning up the producer thread! ", e); + } + } + }); + + Thread consumerThread = new Thread(() -> { + Connection conn = null; + Session session = null; + try { + conn = cf1.createConnection(); + conn.start(); + + session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + MessageConsumer consumer = session.createConsumer(targetQueue); + TextMessage msg = null; + + boolean running = true; + while (running) { + msg = (TextMessage) consumer.receive(5000); + if (msg != null) { + msg.acknowledge(); + receivedMessages.add(msg); + log.info("Received message with messageId: " + msg.getJMSMessageID() + + " and property counter " + msg.getIntProperty("count")); + } else { + running = false; + } + } + + } catch (JMSException e) { + log.error("Error while consuming messages: ", e); + } finally { + try { + if (session != null) { + session.close(); + } + + if (conn != null) { + conn.close(); + } + } catch (JMSException e) { + log.error("Error cleaning up the consumer thread! ", e); + } + } + }); + + log.info("Starting producer thread..."); + producerThread.start(); + + Assert.assertTrue(waitForMessages(server1, targetQueue.getQueueName(), NUM_MESSAGES / 100, 250000)); + + log.info("Stopping bridge " + bridge); + bridge.stop(); + Thread.sleep(5000); + + log.info("Starting bridge " + bridge + " again"); + bridge.start(); + waitForComponent(bridge, 15); + + Assert.assertTrue(waitForMessages(server1, targetQueue.getQueueName(), NUM_MESSAGES, 300000)); + + + + log.info("Starting consumer thread..."); + consumerThread.start(); + + log.info("Waiting for the consumer thread to die..."); + consumerThread.join(); + + log.info("Waiting for the producer thread to die..."); + producerThread.join(); + + bridge.stop(); + + server1.stop(); + server0.stop(); + + Assert.assertEquals("Number of sent messages is different from received messages", sentMessages.size(), receivedMessages.size()); + } + public TransactionManager getNewTm() { return newTransactionManager(); } + private static long countMessagesInQueue(ActiveMQServer server, String queueName) { + QueueControl queueControl = (QueueControl) server.getManagementService().getResource(ResourceNames.QUEUE + queueName); + Assert.assertNotNull(queueControl); + long count = -1; + int numberOfTries = 0; + int maxNumberOfTries = 10; + while (count == -1 && numberOfTries < maxNumberOfTries) { + try { + numberOfTries++; + count = queueControl.countMessages(); + break; + } catch (Exception ex) { + if (numberOfTries > maxNumberOfTries - 1) { + throw new RuntimeException("countMessagesInQueue() failed for queue:" + queueName + + " and server: " + server + ". Number of tries: " + numberOfTries, ex); + } + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + log.error(e.getMessage(), e); + } + } + } + log.info("Number of messages in queue " + queueName + " on server: " + server + " is: " + count); + return count; + } + + private static boolean waitForMessages(ActiveMQServer server, String queueName, long numberOfMessages, long timeout) throws Exception { + + long startTime = System.currentTimeMillis(); + + long count = 0; + while ((count = countMessagesInQueue(server, queueName)) < numberOfMessages) { + log.info("Total number of messages in queue: " + queueName + " on server " + server + " is " + count); + Thread.sleep(5000); + if (System.currentTimeMillis() - startTime > timeout) { + log.warn(numberOfMessages + " not on server " + server + " in timeout " + timeout + "ms."); + return false; + } + } + return true; + + } + // Inner classes ------------------------------------------------------------------- private static class StressSender implements Runnable {
