Updated Branches: refs/heads/trunk 8b06c44cc -> 7cf5c240a
Test to reproduce AMQ4920 Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7cf5c240 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7cf5c240 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7cf5c240 Branch: refs/heads/trunk Commit: 7cf5c240a260c30572729b3a1b29863b64935a44 Parents: 8b06c44 Author: Kevin Earls <[email protected]> Authored: Thu Dec 5 12:50:55 2013 +0100 Committer: Kevin Earls <[email protected]> Committed: Thu Dec 5 12:50:55 2013 +0100 ---------------------------------------------------------------------- .../activemq/transport/amqp/AMQ4920Test.java | 162 +++++++++++++++++++ 1 file changed, 162 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/7cf5c240/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java new file mode 100644 index 0000000..5d6d473 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + +public class AMQ4920Test extends AmqpTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(AMQ4920Test.class); + private static final Integer ITERATIONS = 1 * 1000; + private static final Integer CONSUMER_COUNT = 4; // At least 2 consumers are required to reproduce the original issue + public static final String TEXT_MESSAGE = "TextMessage: "; + private CountDownLatch latch = new CountDownLatch(CONSUMER_COUNT * ITERATIONS); + + @Before + public void setUp() throws Exception { + super.setUp(); + this.autoFailTestSupport.setAutoFail(false); + } + + @Test(timeout = 5 * 60 * 1000) + public void testSendWithMultipleConsumers() throws Exception { + ConnectionFactory connectionFactory = (ConnectionFactory) new ConnectionFactoryImpl("localhost", port, "admin", "admin"); + Connection connection = connectionFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + String destinationName = "topic://AMQ4920Test" + System.currentTimeMillis(); + Destination destination = session.createTopic(destinationName); + connection.start(); + + ExecutorService executor = Executors.newCachedThreadPool(); + for (int i=0; i < CONSUMER_COUNT; i++) { + AMQ4930ConsumerTask consumerTask = new AMQ4930ConsumerTask(destinationName, port, "Consumer-" + i, latch, ITERATIONS); + executor.submit(consumerTask); + } + connection.start(); + + LOG.debug("At start latch is " + latch.getCount()); + sendMessages(connection, destination, ITERATIONS, 10); + LOG.debug("After send latch is " + latch.getCount()); + + latch.await(15, TimeUnit.SECONDS); + LOG.debug("After await latch is " + latch.getCount()); + assertEquals(0, latch.getCount()); + + executor.shutdown(); + } + + public void sendMessages(Connection connection, Destination destination, int count, int sleepInterval) throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + + for (int i = 0; i < count; i++) { + TextMessage message = session.createTextMessage(); + message.setText(TEXT_MESSAGE + i); + LOG.debug("Sending message [" + i + "]"); + producer.send(message); + if (sleepInterval > 0) { + Thread.sleep(sleepInterval); + } + } + + session.close(); + } +} + + +class AMQ4930ConsumerTask implements Callable<Boolean> { + protected static final Logger LOG = LoggerFactory.getLogger(AMQ4930ConsumerTask.class); + private String destinationName; + private String consumerName; + private CountDownLatch messagesReceived; + private int port; + private int expectedMessageCount; + + public AMQ4930ConsumerTask (String destinationName, int port, String consumerName, CountDownLatch latch, int expectedMessageCount) { + this.destinationName = destinationName; + this.port = port; + this.consumerName = consumerName; + this.messagesReceived = latch; + this.expectedMessageCount = expectedMessageCount; + } + + @Override + public Boolean call() throws Exception { + LOG.debug(consumerName + " starting"); + Connection connection=null; + try { + ConnectionFactory connectionFactory = (ConnectionFactory) new ConnectionFactoryImpl("localhost", port, "admin", "admin"); + connection = connectionFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createTopic(destinationName); + MessageConsumer consumer = session.createConsumer(destination); + connection.start(); + + int receivedCount = 0; + while(receivedCount < expectedMessageCount) { + Message message = consumer.receive(5 * 1000); + if (message == null) { + LOG.error("consumer {} got null message on iteration {}", consumerName, receivedCount); + return false; + } + if (!(message instanceof TextMessage)) { + LOG.error("consumer {} expected text message on iteration {} but got {}", consumerName, receivedCount, message.getClass().getCanonicalName()); + return false; + } + TextMessage tm = (TextMessage) message; + if (!tm.getText().equals(AMQ4920Test.TEXT_MESSAGE + receivedCount)) { + LOG.error("consumer {} expected {} got message [{}]", consumerName, receivedCount, tm.getText()); + return false; + } + LOG.debug("consumer {} expected {} got message [{}]", consumerName, receivedCount, tm.getText()); // TODO make debug + + messagesReceived.countDown(); + receivedCount++; + } + } catch (Exception e) { + LOG.error("UnexpectedException in " + consumerName, e); + } finally { + try { + connection.close(); + } catch (JMSException ignoreMe) { + } + } + + return true; + } +} +
