Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 8e78fbe60 -> 37365f918


QPID-8038: [Broker-J] [AMQP 0-x] Add protocol tests for basic.qos and 
channel.flow


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/988006b3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/988006b3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/988006b3

Branch: refs/heads/master
Commit: 988006b38d8dd10914357779966731dc1aa90530
Parents: 8e78fbe
Author: Keith Wall <[email protected]>
Authored: Mon Dec 25 17:35:12 2017 +0000
Committer: Keith Wall <[email protected]>
Committed: Mon Dec 25 17:36:58 2017 +0000

----------------------------------------------------------------------
 .../tests/protocol/v0_8/BasicInteraction.java   |   6 +
 .../qpid/tests/protocol/v0_8/BasicTest.java     | 132 +++++++++++++++++++
 2 files changed, 138 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/988006b3/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
 
b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
index 2c1d1b3..c004e38 100644
--- 
a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
+++ 
b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
@@ -243,6 +243,12 @@ public class BasicInteraction
         return _interaction.sendPerformative(new BasicAckBody(_ackDeliveryTag, 
_ackMultiple));
     }
 
+    public BasicInteraction ackMultiple(final boolean multiple)
+    {
+        _ackMultiple = multiple;
+        return this;
+    }
+
     public BasicInteraction ackDeliveryTag(final long deliveryTag)
     {
         _ackDeliveryTag = deliveryTag;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/988006b3/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
 
b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
index 79988c0..f1300fb 100644
--- 
a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
+++ 
b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
@@ -529,6 +529,138 @@ public class BasicTest extends BrokerAdminUsingTestBase
         }
     }
 
+    /**
+     * The Qpid JMS AMQP 0-x client relies on being able to raise and lower 
qos count during a channels lifetime
+     * to prevent channel starvation. This test supports this qos use-case.
+     */
+    @Test
+    public void qosCountResized() throws Exception
+    {
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "A", 
"B", "C", "D", "E", "F");
+
+        try(FrameTransport transport = new 
FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            String consumerTag = "A";
+
+            interaction.openAnonymousConnection()
+                       .channel().open()
+                       .consumeResponse(ChannelOpenOkBody.class)
+                       .channel().flow(true)
+                       .consumeResponse(ChannelFlowOkBody.class)
+                       .basic().qosPrefetchCount(3)
+                       .qos()
+                       .consumeResponse(BasicQosOkBody.class)
+                       .basic().consumeConsumerTag(consumerTag)
+                       .consumeQueue(BrokerAdmin.TEST_QUEUE_NAME)
+                       .consume()
+                       .consumeResponse(BasicConsumeOkBody.class);
+
+            final long deliveryTagA = 
receiveDeliveryHeaderAndBody(interaction, "A");
+            receiveDeliveryHeaderAndBody(interaction, "B");
+            final long deliveryTagC = 
receiveDeliveryHeaderAndBody(interaction, "C");
+
+            ensureSync(interaction);
+
+            // Raise qos count by one, expect D to arrive
+            interaction.basic().qosPrefetchCount(4).qos()
+                       .consumeResponse(BasicQosOkBody.class);
+
+            long deliveryTagD = receiveDeliveryHeaderAndBody(interaction, "D");
+            ensureSync(interaction);
+
+            // Ack A, expect E to arrive
+            interaction.basic().ackDeliveryTag(deliveryTagA).ack();
+
+            receiveDeliveryHeaderAndBody(interaction, "E");
+            ensureSync(interaction);
+
+            // Lower qos back to 2 and ensure no more messages arrive (message 
credit will be negative at this point).
+            interaction.basic().qosPrefetchCount(2).qos()
+                       .consumeResponse(BasicQosOkBody.class);
+            ensureSync(interaction);
+
+            // Ack B and C and ensure still no more messages arrive (message 
credit will now be zero)
+            interaction.basic()
+                       .ackMultiple(true).ackDeliveryTag(deliveryTagC).ack();
+            ensureSync(interaction);
+
+            // Ack D and ensure F delivery arrives
+            interaction.basic()
+                       .ackMultiple(false).ackDeliveryTag(deliveryTagD).ack();
+
+            receiveDeliveryHeaderAndBody(interaction, "F");
+
+            
interaction.channel().close().consumeResponse(ChannelCloseOkBody.class);
+
+            
assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), 
is(equalTo(2)));
+        }
+    }
+
+    /**
+     * The Qpid JMS AMQP 0-x client is capable of polling fors message.  It 
does this using a combination of
+     * basic.qos (count one) and regulating the flow using channel.flow. This 
test supports this use-case.
+     */
+    @Test
+    public void pollingUsingFlow() throws Exception
+    {
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "A", 
"B", "C");
+
+        try(FrameTransport transport = new 
FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            String consumerTag = "A";
+
+            interaction.openAnonymousConnection()
+                       .channel().open()
+                       .consumeResponse(ChannelOpenOkBody.class)
+                       .basic().qosPrefetchCount(1)
+                       .qos()
+                       .consumeResponse(BasicQosOkBody.class)
+                       .channel().flow(false)
+                       .consumeResponse(ChannelFlowOkBody.class)
+                       .basic().consumeConsumerTag(consumerTag)
+                       .consumeQueue(BrokerAdmin.TEST_QUEUE_NAME)
+                       .consume()
+                       .consumeResponse(BasicConsumeOkBody.class);
+
+            ensureSync(interaction);
+
+            interaction.channel().flow(true)
+                       .consumeResponse(ChannelFlowOkBody.class);
+
+            long deliveryTagA = receiveDeliveryHeaderAndBody(interaction, "A");
+
+            interaction.channel().flow(false)
+                       .consumeResponse(ChannelFlowOkBody.class)
+                       .basic().ackDeliveryTag(deliveryTagA).ack();
+
+            ensureSync(interaction);
+
+            interaction.channel().flow(true)
+                       .consumeResponse(ChannelFlowOkBody.class);
+
+            long deliveryTagB = receiveDeliveryHeaderAndBody(interaction, "B");
+
+            interaction.channel().flow(false)
+                       .consumeResponse(ChannelFlowOkBody.class)
+                       .basic().ackDeliveryTag(deliveryTagB).ack()
+                       
.channel().close().consumeResponse(ChannelCloseOkBody.class);
+
+            
assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), 
is(equalTo(1)));
+        }
+    }
+
+    private long receiveDeliveryHeaderAndBody(final Interaction interaction, 
String expectedMessageContent) throws Exception
+    {
+        BasicDeliverBody delivery = 
interaction.consumeResponse().getLatestResponse(BasicDeliverBody.class);
+        ContentBody content = 
interaction.consumeResponse(ContentHeaderBody.class)
+                                         
.consumeResponse().getLatestResponse(ContentBody.class);
+
+        assertThat(getContent(content), is(equalTo(expectedMessageContent)));
+        return delivery.getDeliveryTag();
+    }
+
     private void ensureSync(final Interaction interaction) throws Exception
     {
         interaction.exchange()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to