QPID-8038: [Broker-J] [AMQP 0-10] Add a protocol test supporting explicit 
message acquision


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/a827841b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/a827841b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/a827841b

Branch: refs/heads/master
Commit: a827841b24b22cd21f0ca6523da00093f2a26669
Parents: a6408e1
Author: Keith Wall <kw...@apache.org>
Authored: Wed Feb 14 15:53:20 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Wed Feb 14 15:53:20 2018 +0000

----------------------------------------------------------------------
 .../protocol/v0_10/MessageInteraction.java      | 20 ++++++
 .../qpid/tests/protocol/v0_10/MessageTest.java  | 71 ++++++++++++++++++++
 2 files changed, 91 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a827841b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java
 
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java
index 54ecf50..43bd2ec 100644
--- 
a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java
+++ 
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java
@@ -25,6 +25,7 @@ import 
org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
 import org.apache.qpid.server.protocol.v0_10.transport.Header;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageAccept;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageAcquire;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageAcquireMode;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageCancel;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageCreditUnit;
@@ -42,6 +43,7 @@ public class MessageInteraction
     private MessageCancel _cancel;
     private MessageFlow _flow;
     private MessageAccept _accept;
+    private MessageAcquire _acquire;
 
     public MessageInteraction(final Interaction interaction)
     {
@@ -51,6 +53,7 @@ public class MessageInteraction
         _cancel = new MessageCancel();
         _flow = new MessageFlow();
         _accept = new MessageAccept();
+        _acquire = new MessageAcquire();
     }
 
     public MessageInteraction transferId(final int id)
@@ -199,4 +202,21 @@ public class MessageInteraction
         _accept.setTransfers(transfers);
         return this;
     }
+
+    public Interaction acquire() throws Exception
+    {
+        return _interaction.sendPerformative(_acquire);
+    }
+
+    public MessageInteraction acquireId(final int id)
+    {
+        _acquire.setId(id);
+        return this;
+    }
+
+    public MessageInteraction acquireTransfers(final RangeSet transfers)
+    {
+        _acquire.setTransfers(transfers);
+        return this;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a827841b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java
 
b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java
index 4088b9b..6747451 100644
--- 
a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java
+++ 
b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java
@@ -35,6 +35,8 @@ import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v0_10.transport.Acquired;
+import org.apache.qpid.server.protocol.v0_10.transport.ExecutionResult;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageAcquireMode;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageCreditUnit;
@@ -235,6 +237,75 @@ public class MessageTest extends BrokerAdminUsingTestBase
         }
     }
 
+    @Test
+    @SpecificationTest(section = "10.message.acquire",
+            description = "Acquires previously transferred messages for 
consumption. The acquired ids (if any) are "
+                          + "sent via message.acquired.")
+    public void acquireTransfer() throws Exception
+    {
+        String testMessageBody = "testMessage";
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, 
testMessageBody);
+        try (FrameTransport transport = new 
FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            byte[] sessionName = "testSession".getBytes(UTF_8);
+            final String subscriberName = "testSubscriber";
+            interaction.openAnonymousConnection()
+                       .channelId(1)
+                       .attachSession(sessionName)
+                       .message()
+                       .subscribeAcceptMode(MessageAcceptMode.EXPLICIT)
+                       .subscribeAcquireMode(MessageAcquireMode.NOT_ACQUIRED)
+                       .subscribeDestination(subscriberName)
+                       .subscribeQueue(BrokerAdmin.TEST_QUEUE_NAME)
+                       .subscribeId(0)
+                       .subscribe()
+                       .message()
+                       .flowId(1)
+                       .flowDestination(subscriberName)
+                       .flowUnit(MessageCreditUnit.MESSAGE)
+                       .flowValue(1)
+                       .flow()
+                       .message()
+                       .flowId(2)
+                       .flowDestination(subscriberName)
+                       .flowUnit(MessageCreditUnit.BYTE)
+                       .flowValue(-1)
+                       .flow();
+
+            MessageTransfer transfer = consumeResponse(interaction,
+                                                       MessageTransfer.class,
+                                                       SessionCompleted.class,
+                                                       
SessionCommandPoint.class,
+                                                       SessionConfirmed.class,
+                                                       SessionFlush.class);
+
+            
assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), 
is(equalTo(1)));
+
+            RangeSet transfers = Range.newInstance(transfer.getId());
+            final ExecutionResult result = 
interaction.message().acquireId(3).acquireTransfers(transfers).acquire()
+                                                      
.consumeResponse(SessionFlush.class)
+                                                      
.consumeResponse().getLatestResponse(ExecutionResult.class);
+            final Acquired acquired = (Acquired) result.getValue();
+            assertThat(acquired.getTransfers().includes(transfer.getId()), 
is(equalTo(true)));
+
+            
interaction.message().acceptId(4).acceptTransfers(transfers).accept()
+                       .session().flushCompleted()
+                                 .flush();
+
+            SessionCompleted completed = consumeResponse(interaction,
+                                                         
SessionCompleted.class,
+                                                         
SessionCommandPoint.class,
+                                                         
SessionConfirmed.class,
+                                                         SessionFlush.class);
+
+            assertThat(completed.getCommands(), is(notNullValue()));
+            assertThat(completed.getCommands().includes(4), is(equalTo(true)));
+
+            
assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), 
is(equalTo(0)));
+        }
+    }
+
     private <T extends Method> T consumeResponse(final Interaction interaction,
                                                  final Class<T> expected,
                                                  final Class<? extends 
Method>... ignore)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to