Repository: activemq
Updated Branches:
  refs/heads/master 0a21c5f8f -> 5e7b70f11


https://issues.apache.org/jira/browse/AMQ-5413

ensure drain completion clear currently tracked credit value, next flow
should update to the correct value.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5e7b70f1
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5e7b70f1
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5e7b70f1

Branch: refs/heads/master
Commit: 5e7b70f11fb53cb1a0a00edc5e61faf90bbdce78
Parents: 0a21c5f
Author: Timothy Bish <[email protected]>
Authored: Wed May 27 11:30:16 2015 -0400
Committer: Timothy Bish <[email protected]>
Committed: Wed May 27 11:30:29 2015 -0400

----------------------------------------------------------------------
 .../transport/amqp/protocol/AmqpSender.java     |  1 +
 .../transport/amqp/AmqpTestSupport.java         | 15 ++++++
 .../amqp/JMSClientTransactionTest.java          | 50 ++++++++++++++++++++
 3 files changed, 66 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/5e7b70f1/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
index 13826b3..1dd99d2 100644
--- 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
+++ 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
@@ -400,6 +400,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
                     // It's the end of browse signal in response to a 
MessagePull
                     getEndpoint().drained();
                     draining = false;
+                    currentCredit = 0;
                 } else {
                     jms.setRedeliveryCounter(md.getRedeliveryCounter());
                     jms.setReadOnlyBody(true);

http://git-wip-us.apache.org/repos/asf/activemq/blob/5e7b70f1/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
index b841ecf..6f00ab2 100644
--- 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.transport.amqp;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.URI;
 import java.security.SecureRandom;
 import java.util.Set;
@@ -42,6 +43,7 @@ import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.jmx.BrokerViewMBean;
 import org.apache.activemq.broker.jmx.ConnectorViewMBean;
 import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
 import org.apache.activemq.broker.jmx.TopicViewMBean;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.spring.SpringSslContext;
@@ -332,6 +334,19 @@ public class AmqpTestSupport {
         return proxy;
     }
 
+    protected SubscriptionViewMBean getProxyToQueueSubscriber(String name) 
throws MalformedObjectNameException, JMSException, IOException {
+        ObjectName queueViewMBeanName = new 
ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name);
+        QueueViewMBean proxy = (QueueViewMBean) 
brokerService.getManagementContext()
+                .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, 
true);
+        SubscriptionViewMBean subscription = null;
+        for (ObjectName subscriber : proxy.getSubscriptions()) {
+            subscription = (SubscriptionViewMBean) 
brokerService.getManagementContext()
+                .newProxyInstance(subscriber, SubscriptionViewMBean.class, 
true);
+        }
+
+        return subscription;
+    }
+
     protected TopicViewMBean getProxyToTopic(String name) throws 
MalformedObjectNameException, JMSException {
         ObjectName queueViewMBeanName = new 
ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName="+name);
         TopicViewMBean proxy = (TopicViewMBean) 
brokerService.getManagementContext()

http://git-wip-us.apache.org/repos/asf/activemq/blob/5e7b70f1/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java
index 47dc9ec..508638e 100644
--- 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java
@@ -17,15 +17,19 @@
 package org.apache.activemq.transport.amqp;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
+import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
 import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -113,4 +117,50 @@ public class JMSClientTransactionTest extends 
JMSClientTestSupport {
 
         session.close();
     }
+
+    @Test(timeout = 60000)
+    public void testQueueTXRollbackAndCommit() throws Exception {
+        final int MSG_COUNT = 3;
+
+        connection = createConnection();
+        connection.start();
+
+        Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+        Queue destination = session.createQueue(getDestinationName());
+
+        MessageProducer producer = session.createProducer(destination);
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        for (int i = 1; i <= MSG_COUNT; i++) {
+            LOG.info("Sending message: {} to rollback", i);
+            TextMessage message = session.createTextMessage("Rolled back 
Message: " + i);
+            message.setIntProperty("MessageSequence", i);
+            producer.send(message);
+        }
+
+        session.rollback();
+
+        assertEquals(0, getProxyToQueue(getDestinationName()).getQueueSize());
+
+        for (int i = 1; i <= MSG_COUNT; i++) {
+            LOG.info("Sending message: {} to commit", i);
+            TextMessage message = session.createTextMessage("Commit Message: " 
+ i);
+            message.setIntProperty("MessageSequence", i);
+            producer.send(message);
+        }
+
+        session.commit();
+
+        assertEquals(MSG_COUNT, 
getProxyToQueue(getDestinationName()).getQueueSize());
+        SubscriptionViewMBean subscription = 
getProxyToQueueSubscriber(getDestinationName());
+        assertNotNull(subscription);
+        assertTrue(subscription.getPrefetchSize() > 0);
+
+        for (int i = 1; i <= MSG_COUNT; i++) {
+            LOG.info("Trying to receive message: {}", i);
+            TextMessage message = (TextMessage) consumer.receive(1000);
+            assertNotNull("Message " + i + "should be available", message);
+            assertEquals("Should get message: " + i, i , 
message.getIntProperty("MessageSequence"));
+        }
+    }
 }

Reply via email to