Repository: qpid-broker-j Updated Branches: refs/heads/master 8e78fbe60 -> 37365f918
QPID-8038: [Broker-J] [AMQP 0-x] Add protocol tests for basic.qos and channel.flow Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/988006b3 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/988006b3 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/988006b3 Branch: refs/heads/master Commit: 988006b38d8dd10914357779966731dc1aa90530 Parents: 8e78fbe Author: Keith Wall <[email protected]> Authored: Mon Dec 25 17:35:12 2017 +0000 Committer: Keith Wall <[email protected]> Committed: Mon Dec 25 17:36:58 2017 +0000 ---------------------------------------------------------------------- .../tests/protocol/v0_8/BasicInteraction.java | 6 + .../qpid/tests/protocol/v0_8/BasicTest.java | 132 +++++++++++++++++++ 2 files changed, 138 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/988006b3/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java index 2c1d1b3..c004e38 100644 --- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java +++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java @@ -243,6 +243,12 @@ public class BasicInteraction return _interaction.sendPerformative(new BasicAckBody(_ackDeliveryTag, _ackMultiple)); } + public BasicInteraction ackMultiple(final boolean multiple) + { + _ackMultiple = multiple; + return this; + } + public BasicInteraction ackDeliveryTag(final long deliveryTag) { _ackDeliveryTag = deliveryTag; http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/988006b3/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java index 79988c0..f1300fb 100644 --- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java +++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java @@ -529,6 +529,138 @@ public class BasicTest extends BrokerAdminUsingTestBase } } + /** + * The Qpid JMS AMQP 0-x client relies on being able to raise and lower qos count during a channels lifetime + * to prevent channel starvation. This test supports this qos use-case. + */ + @Test + public void qosCountResized() throws Exception + { + getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "A", "B", "C", "D", "E", "F"); + + try(FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + String consumerTag = "A"; + + interaction.openAnonymousConnection() + .channel().open() + .consumeResponse(ChannelOpenOkBody.class) + .channel().flow(true) + .consumeResponse(ChannelFlowOkBody.class) + .basic().qosPrefetchCount(3) + .qos() + .consumeResponse(BasicQosOkBody.class) + .basic().consumeConsumerTag(consumerTag) + .consumeQueue(BrokerAdmin.TEST_QUEUE_NAME) + .consume() + .consumeResponse(BasicConsumeOkBody.class); + + final long deliveryTagA = receiveDeliveryHeaderAndBody(interaction, "A"); + receiveDeliveryHeaderAndBody(interaction, "B"); + final long deliveryTagC = receiveDeliveryHeaderAndBody(interaction, "C"); + + ensureSync(interaction); + + // Raise qos count by one, expect D to arrive + interaction.basic().qosPrefetchCount(4).qos() + .consumeResponse(BasicQosOkBody.class); + + long deliveryTagD = receiveDeliveryHeaderAndBody(interaction, "D"); + ensureSync(interaction); + + // Ack A, expect E to arrive + interaction.basic().ackDeliveryTag(deliveryTagA).ack(); + + receiveDeliveryHeaderAndBody(interaction, "E"); + ensureSync(interaction); + + // Lower qos back to 2 and ensure no more messages arrive (message credit will be negative at this point). + interaction.basic().qosPrefetchCount(2).qos() + .consumeResponse(BasicQosOkBody.class); + ensureSync(interaction); + + // Ack B and C and ensure still no more messages arrive (message credit will now be zero) + interaction.basic() + .ackMultiple(true).ackDeliveryTag(deliveryTagC).ack(); + ensureSync(interaction); + + // Ack D and ensure F delivery arrives + interaction.basic() + .ackMultiple(false).ackDeliveryTag(deliveryTagD).ack(); + + receiveDeliveryHeaderAndBody(interaction, "F"); + + interaction.channel().close().consumeResponse(ChannelCloseOkBody.class); + + assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(2))); + } + } + + /** + * The Qpid JMS AMQP 0-x client is capable of polling fors message. It does this using a combination of + * basic.qos (count one) and regulating the flow using channel.flow. This test supports this use-case. + */ + @Test + public void pollingUsingFlow() throws Exception + { + getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "A", "B", "C"); + + try(FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + String consumerTag = "A"; + + interaction.openAnonymousConnection() + .channel().open() + .consumeResponse(ChannelOpenOkBody.class) + .basic().qosPrefetchCount(1) + .qos() + .consumeResponse(BasicQosOkBody.class) + .channel().flow(false) + .consumeResponse(ChannelFlowOkBody.class) + .basic().consumeConsumerTag(consumerTag) + .consumeQueue(BrokerAdmin.TEST_QUEUE_NAME) + .consume() + .consumeResponse(BasicConsumeOkBody.class); + + ensureSync(interaction); + + interaction.channel().flow(true) + .consumeResponse(ChannelFlowOkBody.class); + + long deliveryTagA = receiveDeliveryHeaderAndBody(interaction, "A"); + + interaction.channel().flow(false) + .consumeResponse(ChannelFlowOkBody.class) + .basic().ackDeliveryTag(deliveryTagA).ack(); + + ensureSync(interaction); + + interaction.channel().flow(true) + .consumeResponse(ChannelFlowOkBody.class); + + long deliveryTagB = receiveDeliveryHeaderAndBody(interaction, "B"); + + interaction.channel().flow(false) + .consumeResponse(ChannelFlowOkBody.class) + .basic().ackDeliveryTag(deliveryTagB).ack() + .channel().close().consumeResponse(ChannelCloseOkBody.class); + + assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1))); + } + } + + private long receiveDeliveryHeaderAndBody(final Interaction interaction, String expectedMessageContent) throws Exception + { + BasicDeliverBody delivery = interaction.consumeResponse().getLatestResponse(BasicDeliverBody.class); + ContentBody content = interaction.consumeResponse(ContentHeaderBody.class) + .consumeResponse().getLatestResponse(ContentBody.class); + + assertThat(getContent(content), is(equalTo(expectedMessageContent))); + return delivery.getDeliveryTag(); + } + private void ensureSync(final Interaction interaction) throws Exception { interaction.exchange() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
