Repository: activemq
Updated Branches:
  refs/heads/master ffee8b442 -> 6c01b641b


https://issues.apache.org/jira/browse/AMQ-6422 - move delivery tracking to 
pumpoutbound and additional test that shows how the presettle case breaks. 
Thanks to Robbie Gemmell for the feedback


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6c01b641
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6c01b641
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6c01b641

Branch: refs/heads/master
Commit: 6c01b641b1850b384e57d74ad6471ea4c8fcf01f
Parents: ffee8b4
Author: gtully <gary.tu...@gmail.com>
Authored: Wed Sep 21 13:59:45 2016 +0100
Committer: gtully <gary.tu...@gmail.com>
Committed: Wed Sep 21 13:59:45 2016 +0100

----------------------------------------------------------------------
 .../transport/amqp/protocol/AmqpSender.java     |  2 +-
 .../amqp/interop/AmqpReceiverTest.java          | 41 ++++++++++++++++++++
 2 files changed, 42 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/6c01b641/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
index 75f2371..455e0b0 100644
--- 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
+++ 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
@@ -292,7 +292,6 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
         }
 
         pumpOutbound();
-        logicalDeliveryCount++;
     }
 
     @Override
@@ -410,6 +409,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
                         }
                         currentBuffer = null;
                         currentDelivery = null;
+                        logicalDeliveryCount++;
                     }
                 } else {
                     return;

http://git-wip-us.apache.org/repos/asf/activemq/blob/6c01b641/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
index c68e850..b73f087 100644
--- 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
@@ -225,6 +225,47 @@ public class AmqpReceiverTest extends 
AmqpClientTestSupport {
 
     @Test(timeout = 60000)
     @Repeat(repetitions = 1)
+    public void testPresettledReceiverReadsAllMessagesInNonFlowBatch() throws 
Exception {
+        final int MSG_COUNT = 100;
+        sendMessages(getTestName(), MSG_COUNT, false);
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpReceiver receiver = session.createReceiver("queue://" + 
getTestName(), null, false, true);
+
+        QueueViewMBean queueView = getProxyToQueue(getTestName());
+        assertEquals(MSG_COUNT, queueView.getQueueSize());
+        assertEquals(0, queueView.getDispatchCount());
+
+        receiver.flow(20);
+        // consume less that flow
+        for (int j=0;j<10;j++) {
+            assertNotNull(receiver.receive(5, TimeUnit.SECONDS));
+        }
+
+        // flow more and consume all
+        receiver.flow(10);
+        for (int j=0;j<20;j++) {
+            assertNotNull(receiver.receive(5, TimeUnit.SECONDS));
+        }
+
+        // remainder
+        receiver.flow(70);
+        for (int j=0;j<70;j++) {
+            assertNotNull(receiver.receive(5, TimeUnit.SECONDS));
+        }
+
+        receiver.close();
+
+        assertEquals(0, queueView.getQueueSize());
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    @Repeat(repetitions = 1)
     public void 
testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws 
Exception {
         int MSG_COUNT = 4;
         sendMessages(getTestName(), MSG_COUNT, false);

Reply via email to