Repository: activemq Updated Branches: refs/heads/master 0a21c5f8f -> 5e7b70f11
https://issues.apache.org/jira/browse/AMQ-5413 ensure drain completion clear currently tracked credit value, next flow should update to the correct value. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5e7b70f1 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5e7b70f1 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5e7b70f1 Branch: refs/heads/master Commit: 5e7b70f11fb53cb1a0a00edc5e61faf90bbdce78 Parents: 0a21c5f Author: Timothy Bish <[email protected]> Authored: Wed May 27 11:30:16 2015 -0400 Committer: Timothy Bish <[email protected]> Committed: Wed May 27 11:30:29 2015 -0400 ---------------------------------------------------------------------- .../transport/amqp/protocol/AmqpSender.java | 1 + .../transport/amqp/AmqpTestSupport.java | 15 ++++++ .../amqp/JMSClientTransactionTest.java | 50 ++++++++++++++++++++ 3 files changed, 66 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/5e7b70f1/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java index 13826b3..1dd99d2 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java @@ -400,6 +400,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> { // It's the end of browse signal in response to a MessagePull getEndpoint().drained(); draining = false; + currentCredit = 0; } else { jms.setRedeliveryCounter(md.getRedeliveryCounter()); jms.setReadOnlyBody(true); http://git-wip-us.apache.org/repos/asf/activemq/blob/5e7b70f1/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java index b841ecf..6f00ab2 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java @@ -17,6 +17,7 @@ package org.apache.activemq.transport.amqp; import java.io.File; +import java.io.IOException; import java.net.URI; import java.security.SecureRandom; import java.util.Set; @@ -42,6 +43,7 @@ import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.jmx.BrokerViewMBean; import org.apache.activemq.broker.jmx.ConnectorViewMBean; import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.jmx.SubscriptionViewMBean; import org.apache.activemq.broker.jmx.TopicViewMBean; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.spring.SpringSslContext; @@ -332,6 +334,19 @@ public class AmqpTestSupport { return proxy; } + protected SubscriptionViewMBean getProxyToQueueSubscriber(String name) throws MalformedObjectNameException, JMSException, IOException { + ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name); + QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext() + .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); + SubscriptionViewMBean subscription = null; + for (ObjectName subscriber : proxy.getSubscriptions()) { + subscription = (SubscriptionViewMBean) brokerService.getManagementContext() + .newProxyInstance(subscriber, SubscriptionViewMBean.class, true); + } + + return subscription; + } + protected TopicViewMBean getProxyToTopic(String name) throws MalformedObjectNameException, JMSException { ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName="+name); TopicViewMBean proxy = (TopicViewMBean) brokerService.getManagementContext() http://git-wip-us.apache.org/repos/asf/activemq/blob/5e7b70f1/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java index 47dc9ec..508638e 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java @@ -17,15 +17,19 @@ package org.apache.activemq.transport.amqp; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; +import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.jmx.SubscriptionViewMBean; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -113,4 +117,50 @@ public class JMSClientTransactionTest extends JMSClientTestSupport { session.close(); } + + @Test(timeout = 60000) + public void testQueueTXRollbackAndCommit() throws Exception { + final int MSG_COUNT = 3; + + connection = createConnection(); + connection.start(); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue destination = session.createQueue(getDestinationName()); + + MessageProducer producer = session.createProducer(destination); + MessageConsumer consumer = session.createConsumer(destination); + + for (int i = 1; i <= MSG_COUNT; i++) { + LOG.info("Sending message: {} to rollback", i); + TextMessage message = session.createTextMessage("Rolled back Message: " + i); + message.setIntProperty("MessageSequence", i); + producer.send(message); + } + + session.rollback(); + + assertEquals(0, getProxyToQueue(getDestinationName()).getQueueSize()); + + for (int i = 1; i <= MSG_COUNT; i++) { + LOG.info("Sending message: {} to commit", i); + TextMessage message = session.createTextMessage("Commit Message: " + i); + message.setIntProperty("MessageSequence", i); + producer.send(message); + } + + session.commit(); + + assertEquals(MSG_COUNT, getProxyToQueue(getDestinationName()).getQueueSize()); + SubscriptionViewMBean subscription = getProxyToQueueSubscriber(getDestinationName()); + assertNotNull(subscription); + assertTrue(subscription.getPrefetchSize() > 0); + + for (int i = 1; i <= MSG_COUNT; i++) { + LOG.info("Trying to receive message: {}", i); + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull("Message " + i + "should be available", message); + assertEquals("Should get message: " + i, i , message.getIntProperty("MessageSequence")); + } + } }
