https://issues.apache.org/jira/browse/AMQ-6422
Add test for credit grants but no settles for a single receiver. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ca11674f Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ca11674f Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ca11674f Branch: refs/heads/activemq-5.14.x Commit: ca11674f37cf3a67a9215f341a8e8458ce7b0641 Parents: a5a4262 Author: Timothy Bish <[email protected]> Authored: Fri Sep 9 12:52:48 2016 -0400 Committer: Timothy Bish <[email protected]> Committed: Tue Sep 27 12:14:31 2016 -0400 ---------------------------------------------------------------------- .../amqp/interop/AmqpSendReceiveTest.java | 53 ++++++++++++++++++++ 1 file changed, 53 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/ca11674f/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java index 8a4958f..752c341 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java @@ -22,7 +22,9 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.util.ArrayList; import java.util.LinkedList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -538,4 +540,55 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { connection.close(); } + + @Test(timeout = 60000) + public void testReceiveMessageBeyondAckedAmountQueue() throws Exception { + doTestReceiveMessageBeyondAckedAmount(Queue.class); + } + + @Test(timeout = 60000) + public void testReceiveMessageBeyondAckedAmountTopic() throws Exception { + doTestReceiveMessageBeyondAckedAmount(Topic.class); + } + + private void doTestReceiveMessageBeyondAckedAmount(Class<?> destType) throws Exception { + final int MSG_COUNT = 50; + + AmqpClient client = createAmqpClient(); + + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + final String address; + if (Queue.class.equals(destType)) { + address = "queue://" + getTestName(); + } else { + address = "topic://" + getTestName(); + } + + AmqpReceiver receiver = session.createReceiver(address); + AmqpSender sender = session.createSender(address); + + for (int i = 0; i < MSG_COUNT; i++) { + AmqpMessage message = new AmqpMessage(); + message.setMessageId("msg" + i); + sender.send(message); + } + sender.close(); + + List<AmqpMessage> pendingAcks = new ArrayList<AmqpMessage>(); + + for (int i = 0; i < MSG_COUNT; i++) { + receiver.flow(1); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + } + + for (AmqpMessage pendingAck : pendingAcks) { + pendingAck.accept(); + } + + receiver.close(); + connection.close(); + } }
