Repository: qpid-proton
Updated Branches:
  refs/heads/master 197d83f10 -> bcd08cc4a


PROTON-1114: protect against sending various frame types after the Close frame 
has been sent


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/bcd08cc4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/bcd08cc4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/bcd08cc4

Branch: refs/heads/master
Commit: bcd08cc4a7d53665212ea8f66707501d2d1c0db3
Parents: f11723c
Author: Robert Gemmell <[email protected]>
Authored: Wed Jan 27 17:27:25 2016 +0000
Committer: Robert Gemmell <[email protected]>
Committed: Wed Jan 27 17:31:25 2016 +0000

----------------------------------------------------------------------
 .../qpid/proton/engine/impl/TransportImpl.java  |  10 +-
 .../proton/engine/impl/TransportImplTest.java   | 507 +++++++++++++++++++
 2 files changed, 512 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bcd08cc4/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index d132508..ca3d465 100644
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -459,7 +459,7 @@ public class TransportImpl extends EndpointImpl
 
     private void processSenderFlow()
     {
-        if(_connectionEndpoint != null && _isOpenSent)
+        if(_connectionEndpoint != null && _isOpenSent && !_isCloseSent)
         {
             EndpointImpl endpoint = _connectionEndpoint.getTransportHead();
             while(endpoint != null)
@@ -490,7 +490,7 @@ public class TransportImpl extends EndpointImpl
 
     private void processTransportWork()
     {
-        if(_connectionEndpoint != null && _isOpenSent)
+        if(_connectionEndpoint != null && _isOpenSent && !_isCloseSent)
         {
             DeliveryImpl delivery = _connectionEndpoint.getTransportWorkHead();
             while(delivery != null)
@@ -674,7 +674,7 @@ public class TransportImpl extends EndpointImpl
 
     private void processReceiverFlow()
     {
-        if(_connectionEndpoint != null && _isOpenSent)
+        if(_connectionEndpoint != null && _isOpenSent && !_isCloseSent)
         {
             EndpointImpl endpoint = _connectionEndpoint.getTransportHead();
             while(endpoint != null)
@@ -722,7 +722,7 @@ public class TransportImpl extends EndpointImpl
 
     private void processAttach()
     {
-        if(_connectionEndpoint != null && _isOpenSent)
+        if(_connectionEndpoint != null && _isOpenSent && !_isCloseSent)
         {
             EndpointImpl endpoint = _connectionEndpoint.getTransportHead();
 
@@ -842,7 +842,7 @@ public class TransportImpl extends EndpointImpl
 
     private void processBegin()
     {
-        if(_connectionEndpoint != null && _isOpenSent)
+        if(_connectionEndpoint != null && _isOpenSent && !_isCloseSent)
         {
             EndpointImpl endpoint = _connectionEndpoint.getTransportHead();
             while(endpoint != null)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bcd08cc4/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
 
b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
index a0e6766..69b46cf 100644
--- 
a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
+++ 
b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
@@ -32,14 +32,18 @@ import static org.junit.Assert.fail;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.LinkedList;
 
 import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.apache.qpid.proton.amqp.UnsignedShort;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Released;
 import org.apache.qpid.proton.amqp.transport.Attach;
 import org.apache.qpid.proton.amqp.transport.Begin;
+import org.apache.qpid.proton.amqp.transport.Close;
 import org.apache.qpid.proton.amqp.transport.Flow;
 import org.apache.qpid.proton.amqp.transport.FrameBody;
 import org.apache.qpid.proton.amqp.transport.Open;
@@ -622,6 +626,7 @@ public class TransportImplTest
         assertTrue("Unexpected frame type", transport.writes.get(1) instanceof 
Begin);
         assertTrue("Unexpected frame type", transport.writes.get(2) instanceof 
Attach);
 
+        // Send the necessary responses to open/begin/attach then give sender 
credit
         transport.handleFrame(new TransportFrame(0, new Open(), null));
 
         Begin begin = new Begin();
@@ -651,11 +656,13 @@ public class TransportImplTest
 
         assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 3, transport.writes.size());
 
+        // Now pump the transport again and expect a transfer for the message
         pumpMockTransport(transport);
 
         assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 4, transport.writes.size());
         assertTrue("Unexpected frame type", transport.writes.get(3) instanceof 
Transfer);
 
+        // Verify that we did, or did not, emit a flow event
         if(emitFlowEventOnSend)
         {
             assertEvents(collector, Event.Type.LINK_FLOW);
@@ -666,6 +673,456 @@ public class TransportImplTest
         }
     }
 
+    /**
+     * Verify that no Begin frame is emitted by the Transport should a Session 
open
+     * after the Close frame was sent.
+     */
+    @Test
+    public void testSessionBeginAfterCloseSent()
+    {
+        MockTransportImpl transport = new MockTransportImpl();
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        connection.open();
+
+        Session session = connection.session();
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 1, transport.writes.size());
+
+        assertTrue("Unexpected frame type", transport.writes.get(0) instanceof 
Open);
+
+        // Send the necessary response to Open
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 1, transport.writes.size());
+
+        // Cause a Close frame to be sent
+        connection.close();
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 2, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(1) instanceof 
Close);
+
+        // Open the session and verify the transport doesn't
+        // send any Begin frame, as a Close frame was sent already.
+        session.open();
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 2, transport.writes.size());
+    }
+
+    /**
+     * Verify that no End frame is emitted by the Transport should a Session 
close
+     * after the Close frame was sent.
+     */
+    @Test
+    public void testSessionEndAfterCloseSent()
+    {
+        MockTransportImpl transport = new MockTransportImpl();
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        connection.open();
+
+        Session session = connection.session();
+        session.open();
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 2, transport.writes.size());
+
+        assertTrue("Unexpected frame type", transport.writes.get(0) instanceof 
Open);
+        assertTrue("Unexpected frame type", transport.writes.get(1) instanceof 
Begin);
+
+        // Send the necessary responses to open/begin
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 2, transport.writes.size());
+
+        // Cause a Close frame to be sent
+        connection.close();
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 3, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(2) instanceof 
Close);
+
+        // Close the session and verify the transport doesn't
+        // send any End frame, as a Close frame was sent already.
+        session.close();
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 3, transport.writes.size());
+    }
+
+    /**
+     * Verify that no Attach frame is emitted by the Transport should a 
Receiver
+     * be opened after the Close frame was sent.
+     */
+    @Test
+    public void testReceiverAttachAfterCloseSent()
+    {
+        doLinkAttachAfterCloseSentTestImpl(true);
+    }
+
+    /**
+     * Verify that no Attach frame is emitted by the Transport should a Sender
+     * be opened after the Close frame was sent.
+     */
+    @Test
+    public void testSenderAttachAfterCloseSent()
+    {
+        doLinkAttachAfterCloseSentTestImpl(false);
+    }
+
+    void doLinkAttachAfterCloseSentTestImpl(boolean receiverLink)
+    {
+        MockTransportImpl transport = new MockTransportImpl();
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        connection.open();
+
+        Session session = connection.session();
+        session.open();
+
+        Link link = null;
+        if(receiverLink)
+        {
+            link = session.receiver("myReceiver");
+        }
+        else
+        {
+            link = session.sender("mySender");
+        }
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 2, transport.writes.size());
+
+        assertTrue("Unexpected frame type", transport.writes.get(0) instanceof 
Open);
+        assertTrue("Unexpected frame type", transport.writes.get(1) instanceof 
Begin);
+
+        // Send the necessary responses to open/begin
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 2, transport.writes.size());
+
+        // Cause a Close frame to be sent
+        connection.close();
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 3, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(2) instanceof 
Close);
+
+        // Open the link and verify the transport doesn't
+        // send any Attach frame, as a Close frame was sent already.
+        link.open();
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 3, transport.writes.size());
+    }
+
+    /**
+     * Verify that no Flow frame is emitted by the Transport should a Receiver
+     * have credit added after the Close frame was sent.
+     */
+    @Test
+    public void testReceiverFlowAfterCloseSent()
+    {
+        MockTransportImpl transport = new MockTransportImpl();
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        connection.open();
+
+        Session session = connection.session();
+        session.open();
+
+        String linkName = "myReceiver";
+        Receiver receiver = session.receiver(linkName);
+        receiver.open();
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 3, transport.writes.size());
+
+        assertTrue("Unexpected frame type", transport.writes.get(0) instanceof 
Open);
+        assertTrue("Unexpected frame type", transport.writes.get(1) instanceof 
Begin);
+        assertTrue("Unexpected frame type", transport.writes.get(2) instanceof 
Attach);
+
+        // Send the necessary responses to open/begin/attach
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        Attach attach = new Attach();
+        attach.setHandle(UnsignedInteger.ZERO);
+        attach.setRole(Role.RECEIVER);
+        attach.setName(linkName);
+        attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach, null));
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 3, transport.writes.size());
+
+        // Cause the Close frame to be sent
+        connection.close();
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 4, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(3) instanceof 
Close);
+
+        // Grant new credit for the Receiver and verify the transport doesn't
+        // send any Flow frame, as a Close frame was sent already.
+        receiver.flow(1);
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 4, transport.writes.size());
+    }
+
+    /**
+     * Verify that no Flow frame is emitted by the Transport should a Sender
+     * have credit drained added after the Close frame was sent.
+     */
+    @Test
+    public void testSenderFlowAfterCloseSent()
+    {
+        MockTransportImpl transport = new MockTransportImpl();
+
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        connection.open();
+
+        Collector collector = Collector.Factory.create();
+        connection.collect(collector);
+
+        Session session = connection.session();
+        session.open();
+
+        String linkName = "mySender";
+        Sender sender = session.sender(linkName);
+        sender.open();
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 3, transport.writes.size());
+
+        assertTrue("Unexpected frame type", transport.writes.get(0) instanceof 
Open);
+        assertTrue("Unexpected frame type", transport.writes.get(1) instanceof 
Begin);
+        assertTrue("Unexpected frame type", transport.writes.get(2) instanceof 
Attach);
+
+        assertFalse("Should not be in drain yet", sender.getDrain());
+
+        // Send the necessary responses to open/begin/attach then give sender 
credit and drain
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        Attach attach = new Attach();
+        attach.setHandle(UnsignedInteger.ZERO);
+        attach.setRole(Role.RECEIVER);
+        attach.setName(linkName);
+        attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach, null));
+
+        int credit = 10;
+        Flow flow = new Flow();
+        flow.setHandle(UnsignedInteger.ZERO);
+        flow.setDeliveryCount(UnsignedInteger.ZERO);
+        flow.setNextIncomingId(UnsignedInteger.ONE);
+        flow.setNextOutgoingId(UnsignedInteger.ZERO);
+        flow.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        flow.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        flow.setDrain(true);
+        flow.setLinkCredit(UnsignedInteger.valueOf(credit));
+
+        transport.handleFrame(new TransportFrame(0, flow, null));
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 3, transport.writes.size());
+
+        assertTrue("Should not be in drain", sender.getDrain());
+        assertEquals("Should have credit", credit, sender.getCredit());
+
+        // Cause the Close frame to be sent
+        connection.close();
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 4, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(3) instanceof 
Close);
+
+        // Drain the credit and verify the transport doesn't
+        // send any Flow frame, as a Close frame was sent already.
+        int drained = sender.drained();
+        assertEquals("Should have drained all credit", credit, drained);
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 4, transport.writes.size());
+    }
+
+    /**
+     * Verify that no Disposition frame is emitted by the Transport should a 
Delivery
+     * have disposition applied after the Close frame was sent.
+     */
+    @Test
+    public void testDispositionAfterCloseSent()
+    {
+        MockTransportImpl transport = new MockTransportImpl();
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        connection.open();
+
+        Session session = connection.session();
+        session.open();
+
+        String linkName = "myReceiver";
+        Receiver receiver = session.receiver(linkName);
+        receiver.flow(5);
+        receiver.open();
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 4, transport.writes.size());
+
+        assertTrue("Unexpected frame type", transport.writes.get(0) instanceof 
Open);
+        assertTrue("Unexpected frame type", transport.writes.get(1) instanceof 
Begin);
+        assertTrue("Unexpected frame type", transport.writes.get(2) instanceof 
Attach);
+        assertTrue("Unexpected frame type", transport.writes.get(3) instanceof 
Flow);
+
+        Delivery delivery = receiver.current();
+        assertNull("Should not yet have a delivery", delivery);
+
+        // Send the necessary responses to open/begin/attach as well as a 
transfer
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        begin.setNextOutgoingId(UnsignedInteger.ONE);
+        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        Attach attach = new Attach();
+        attach.setHandle(UnsignedInteger.ZERO);
+        attach.setRole(Role.SENDER);
+        attach.setName(linkName);
+        attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach, null));
+
+        String deliveryTag = "tag1";
+        String messageContent = "content1";
+        handleTransfer(transport, 1, deliveryTag, messageContent);
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 4, transport.writes.size());
+
+        delivery = verifyDelivery(receiver, deliveryTag, messageContent);
+        assertNotNull("Should now have a delivery", delivery);
+
+        // Cause the Close frame to be sent
+        connection.close();
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 5, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(4) instanceof 
Close);
+
+        delivery.disposition(Released.getInstance());
+        delivery.settle();
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 5, transport.writes.size());
+    }
+
+    /**
+     * Verify that no Transfer frame is emitted by the Transport should a 
Delivery
+     * be sendable after the Close frame was sent.
+     */
+    @Test
+    public void testTransferAfterCloseSent()
+    {
+        MockTransportImpl transport = new MockTransportImpl();
+
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        connection.open();
+
+        Collector collector = Collector.Factory.create();
+        connection.collect(collector);
+
+        Session session = connection.session();
+        session.open();
+
+        String linkName = "mySender";
+        Sender sender = session.sender(linkName);
+        sender.open();
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 3, transport.writes.size());
+
+        assertTrue("Unexpected frame type", transport.writes.get(0) instanceof 
Open);
+        assertTrue("Unexpected frame type", transport.writes.get(1) instanceof 
Begin);
+        assertTrue("Unexpected frame type", transport.writes.get(2) instanceof 
Attach);
+
+        // Send the necessary responses to open/begin/attach then give sender 
credit
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        Attach attach = new Attach();
+        attach.setHandle(UnsignedInteger.ZERO);
+        attach.setRole(Role.RECEIVER);
+        attach.setName(linkName);
+        attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach, null));
+
+        Flow flow = new Flow();
+        flow.setHandle(UnsignedInteger.ZERO);
+        flow.setDeliveryCount(UnsignedInteger.ZERO);
+        flow.setNextIncomingId(UnsignedInteger.ONE);
+        flow.setNextOutgoingId(UnsignedInteger.ZERO);
+        flow.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        flow.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        flow.setLinkCredit(UnsignedInteger.valueOf(10));
+
+        transport.handleFrame(new TransportFrame(0, flow, null));
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 3, transport.writes.size());
+
+        // Cause the Close frame to be sent
+        connection.close();
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 4, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(3) instanceof 
Close);
+
+        // Send a new message and verify the transport doesn't
+        // send any Transfer frame, as a Close frame was sent already.
+        sendMessage(sender, "tag1", "content1");
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 4, transport.writes.size());
+    }
+
     private void assertNoEvents(Collector collector)
     {
         assertEvents(collector);
@@ -737,4 +1194,54 @@ public class TransportImplTest
 
         return delivery;
     }
+
+    private void handleTransfer(TransportImpl transport, int deliveryNumber, 
String deliveryTag, String messageContent)
+    {
+        byte[] tag = deliveryTag.getBytes(StandardCharsets.UTF_8);
+
+        Message m = Message.Factory.create();
+        m.setBody(new AmqpValue(messageContent));
+
+        byte[] encoded = new byte[BUFFER_SIZE];
+        int len = m.encode(encoded, 0, BUFFER_SIZE);
+
+        assertTrue("given array was too small", len < BUFFER_SIZE);
+
+        Transfer transfer = new Transfer();
+        transfer.setDeliveryId(UnsignedInteger.valueOf(deliveryNumber));
+        transfer.setHandle(UnsignedInteger.ZERO);
+        transfer.setDeliveryTag(new Binary(tag));
+        
transfer.setMessageFormat(UnsignedInteger.valueOf(DeliveryImpl.DEFAULT_MESSAGE_FORMAT));
+
+        transport.handleFrame(new TransportFrame(0, transfer, new 
Binary(encoded, 0, len)));
+    }
+
+    private Delivery verifyDelivery(Receiver receiver, String deliveryTag, 
String messageContent)
+    {
+        Delivery delivery = receiver.current();
+
+        assertTrue(Arrays.equals(deliveryTag.getBytes(StandardCharsets.UTF_8), 
delivery.getTag()));
+
+        assertNull(delivery.getLocalState());
+        assertNull(delivery.getRemoteState());
+
+        assertFalse(delivery.isPartial());
+        assertTrue(delivery.isReadable());
+
+        byte[] received = new byte[BUFFER_SIZE];
+        int len = receiver.recv(received, 0, BUFFER_SIZE);
+
+        assertTrue("given array was too small", len < BUFFER_SIZE);
+
+        Message m = Proton.message();
+        m.decode(received, 0, len);
+
+        Object messageBody = ((AmqpValue)m.getBody()).getValue();
+        assertEquals("Unexpected message content", messageContent, 
messageBody);
+
+        boolean receiverAdvanced = receiver.advance();
+        assertTrue("receiver has not advanced", receiverAdvanced);
+
+        return delivery;
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to