https://issues.apache.org/jira/browse/AMQ-6422
Small fix to test and check for zero inflight on successive send to destination that should have no credit on the registered receiver. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/566e8261 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/566e8261 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/566e8261 Branch: refs/heads/activemq-5.14.x Commit: 566e82614aa3cb31c80d44d155debe0e63cc2a3c Parents: ca11674 Author: Timothy Bish <[email protected]> Authored: Fri Sep 9 13:02:04 2016 -0400 Committer: Timothy Bish <[email protected]> Committed: Tue Sep 27 12:14:43 2016 -0400 ---------------------------------------------------------------------- .../amqp/interop/AmqpSendReceiveTest.java | 26 +++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/566e8261/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java index 752c341..f39fc3e 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java @@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit; import javax.jms.Queue; import javax.jms.Topic; +import org.apache.activemq.broker.jmx.DestinationViewMBean; import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.junit.ActiveMQTestRunner; import org.apache.activemq.junit.Repeat; @@ -43,6 +44,7 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.activemq.util.Wait; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; @@ -569,12 +571,18 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { AmqpReceiver receiver = session.createReceiver(address); AmqpSender sender = session.createSender(address); + final DestinationViewMBean destinationView; + if (Queue.class.equals(destType)) { + destinationView = getProxyToQueue(getTestName()); + } else { + destinationView = getProxyToTopic(getTestName()); + } + for (int i = 0; i < MSG_COUNT; i++) { AmqpMessage message = new AmqpMessage(); message.setMessageId("msg" + i); sender.send(message); } - sender.close(); List<AmqpMessage> pendingAcks = new ArrayList<AmqpMessage>(); @@ -582,12 +590,28 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { receiver.flow(1); AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); assertNotNull(received); + pendingAcks.add(received); } + // Send one more to check in-flight stays at zero with no credit and all + // pending messages settled. + AmqpMessage message = new AmqpMessage(); + message.setMessageId("msg-final"); + sender.send(message); + for (AmqpMessage pendingAck : pendingAcks) { pendingAck.accept(); } + assertTrue("Should be no inflight messages: " + destinationView.getInFlightCount(), Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return destinationView.getInFlightCount() == 0; + } + })); + + sender.close(); receiver.close(); connection.close(); }
