Repository: qpid-proton Updated Branches: refs/heads/master 197d83f10 -> bcd08cc4a
PROTON-1114: protect against sending various frame types after the Close frame has been sent Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/bcd08cc4 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/bcd08cc4 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/bcd08cc4 Branch: refs/heads/master Commit: bcd08cc4a7d53665212ea8f66707501d2d1c0db3 Parents: f11723c Author: Robert Gemmell <[email protected]> Authored: Wed Jan 27 17:27:25 2016 +0000 Committer: Robert Gemmell <[email protected]> Committed: Wed Jan 27 17:31:25 2016 +0000 ---------------------------------------------------------------------- .../qpid/proton/engine/impl/TransportImpl.java | 10 +- .../proton/engine/impl/TransportImplTest.java | 507 +++++++++++++++++++ 2 files changed, 512 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bcd08cc4/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java index d132508..ca3d465 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java @@ -459,7 +459,7 @@ public class TransportImpl extends EndpointImpl private void processSenderFlow() { - if(_connectionEndpoint != null && _isOpenSent) + if(_connectionEndpoint != null && _isOpenSent && !_isCloseSent) { EndpointImpl endpoint = _connectionEndpoint.getTransportHead(); while(endpoint != null) @@ -490,7 +490,7 @@ public class TransportImpl extends EndpointImpl private void processTransportWork() { - if(_connectionEndpoint != null && _isOpenSent) + if(_connectionEndpoint != null && _isOpenSent && !_isCloseSent) { DeliveryImpl delivery = _connectionEndpoint.getTransportWorkHead(); while(delivery != null) @@ -674,7 +674,7 @@ public class TransportImpl extends EndpointImpl private void processReceiverFlow() { - if(_connectionEndpoint != null && _isOpenSent) + if(_connectionEndpoint != null && _isOpenSent && !_isCloseSent) { EndpointImpl endpoint = _connectionEndpoint.getTransportHead(); while(endpoint != null) @@ -722,7 +722,7 @@ public class TransportImpl extends EndpointImpl private void processAttach() { - if(_connectionEndpoint != null && _isOpenSent) + if(_connectionEndpoint != null && _isOpenSent && !_isCloseSent) { EndpointImpl endpoint = _connectionEndpoint.getTransportHead(); @@ -842,7 +842,7 @@ public class TransportImpl extends EndpointImpl private void processBegin() { - if(_connectionEndpoint != null && _isOpenSent) + if(_connectionEndpoint != null && _isOpenSent && !_isCloseSent) { EndpointImpl endpoint = _connectionEndpoint.getTransportHead(); while(endpoint != null) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bcd08cc4/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java index a0e6766..69b46cf 100644 --- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java +++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java @@ -32,14 +32,18 @@ import static org.junit.Assert.fail; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.LinkedList; import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.UnsignedInteger; import org.apache.qpid.proton.amqp.UnsignedShort; import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.Released; import org.apache.qpid.proton.amqp.transport.Attach; import org.apache.qpid.proton.amqp.transport.Begin; +import org.apache.qpid.proton.amqp.transport.Close; import org.apache.qpid.proton.amqp.transport.Flow; import org.apache.qpid.proton.amqp.transport.FrameBody; import org.apache.qpid.proton.amqp.transport.Open; @@ -622,6 +626,7 @@ public class TransportImplTest assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin); assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach); + // Send the necessary responses to open/begin/attach then give sender credit transport.handleFrame(new TransportFrame(0, new Open(), null)); Begin begin = new Begin(); @@ -651,11 +656,13 @@ public class TransportImplTest assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size()); + // Now pump the transport again and expect a transfer for the message pumpMockTransport(transport); assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size()); assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Transfer); + // Verify that we did, or did not, emit a flow event if(emitFlowEventOnSend) { assertEvents(collector, Event.Type.LINK_FLOW); @@ -666,6 +673,456 @@ public class TransportImplTest } } + /** + * Verify that no Begin frame is emitted by the Transport should a Session open + * after the Close frame was sent. + */ + @Test + public void testSessionBeginAfterCloseSent() + { + MockTransportImpl transport = new MockTransportImpl(); + Connection connection = Proton.connection(); + transport.bind(connection); + + connection.open(); + + Session session = connection.session(); + + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 1, transport.writes.size()); + + assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open); + + // Send the necessary response to Open + transport.handleFrame(new TransportFrame(0, new Open(), null)); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 1, transport.writes.size()); + + // Cause a Close frame to be sent + connection.close(); + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size()); + assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Close); + + // Open the session and verify the transport doesn't + // send any Begin frame, as a Close frame was sent already. + session.open(); + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size()); + } + + /** + * Verify that no End frame is emitted by the Transport should a Session close + * after the Close frame was sent. + */ + @Test + public void testSessionEndAfterCloseSent() + { + MockTransportImpl transport = new MockTransportImpl(); + Connection connection = Proton.connection(); + transport.bind(connection); + + connection.open(); + + Session session = connection.session(); + session.open(); + + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size()); + + assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open); + assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin); + + // Send the necessary responses to open/begin + transport.handleFrame(new TransportFrame(0, new Open(), null)); + + Begin begin = new Begin(); + begin.setRemoteChannel(UnsignedShort.valueOf((short) 0)); + transport.handleFrame(new TransportFrame(0, begin, null)); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size()); + + // Cause a Close frame to be sent + connection.close(); + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size()); + assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Close); + + // Close the session and verify the transport doesn't + // send any End frame, as a Close frame was sent already. + session.close(); + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size()); + } + + /** + * Verify that no Attach frame is emitted by the Transport should a Receiver + * be opened after the Close frame was sent. + */ + @Test + public void testReceiverAttachAfterCloseSent() + { + doLinkAttachAfterCloseSentTestImpl(true); + } + + /** + * Verify that no Attach frame is emitted by the Transport should a Sender + * be opened after the Close frame was sent. + */ + @Test + public void testSenderAttachAfterCloseSent() + { + doLinkAttachAfterCloseSentTestImpl(false); + } + + void doLinkAttachAfterCloseSentTestImpl(boolean receiverLink) + { + MockTransportImpl transport = new MockTransportImpl(); + Connection connection = Proton.connection(); + transport.bind(connection); + + connection.open(); + + Session session = connection.session(); + session.open(); + + Link link = null; + if(receiverLink) + { + link = session.receiver("myReceiver"); + } + else + { + link = session.sender("mySender"); + } + + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size()); + + assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open); + assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin); + + // Send the necessary responses to open/begin + transport.handleFrame(new TransportFrame(0, new Open(), null)); + + Begin begin = new Begin(); + begin.setRemoteChannel(UnsignedShort.valueOf((short) 0)); + transport.handleFrame(new TransportFrame(0, begin, null)); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size()); + + // Cause a Close frame to be sent + connection.close(); + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size()); + assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Close); + + // Open the link and verify the transport doesn't + // send any Attach frame, as a Close frame was sent already. + link.open(); + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size()); + } + + /** + * Verify that no Flow frame is emitted by the Transport should a Receiver + * have credit added after the Close frame was sent. + */ + @Test + public void testReceiverFlowAfterCloseSent() + { + MockTransportImpl transport = new MockTransportImpl(); + Connection connection = Proton.connection(); + transport.bind(connection); + + connection.open(); + + Session session = connection.session(); + session.open(); + + String linkName = "myReceiver"; + Receiver receiver = session.receiver(linkName); + receiver.open(); + + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size()); + + assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open); + assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin); + assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach); + + // Send the necessary responses to open/begin/attach + transport.handleFrame(new TransportFrame(0, new Open(), null)); + + Begin begin = new Begin(); + begin.setRemoteChannel(UnsignedShort.valueOf((short) 0)); + transport.handleFrame(new TransportFrame(0, begin, null)); + + Attach attach = new Attach(); + attach.setHandle(UnsignedInteger.ZERO); + attach.setRole(Role.RECEIVER); + attach.setName(linkName); + attach.setInitialDeliveryCount(UnsignedInteger.ZERO); + transport.handleFrame(new TransportFrame(0, attach, null)); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size()); + + // Cause the Close frame to be sent + connection.close(); + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size()); + assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Close); + + // Grant new credit for the Receiver and verify the transport doesn't + // send any Flow frame, as a Close frame was sent already. + receiver.flow(1); + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size()); + } + + /** + * Verify that no Flow frame is emitted by the Transport should a Sender + * have credit drained added after the Close frame was sent. + */ + @Test + public void testSenderFlowAfterCloseSent() + { + MockTransportImpl transport = new MockTransportImpl(); + + Connection connection = Proton.connection(); + transport.bind(connection); + + connection.open(); + + Collector collector = Collector.Factory.create(); + connection.collect(collector); + + Session session = connection.session(); + session.open(); + + String linkName = "mySender"; + Sender sender = session.sender(linkName); + sender.open(); + + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size()); + + assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open); + assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin); + assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach); + + assertFalse("Should not be in drain yet", sender.getDrain()); + + // Send the necessary responses to open/begin/attach then give sender credit and drain + transport.handleFrame(new TransportFrame(0, new Open(), null)); + + Begin begin = new Begin(); + begin.setRemoteChannel(UnsignedShort.valueOf((short) 0)); + transport.handleFrame(new TransportFrame(0, begin, null)); + + Attach attach = new Attach(); + attach.setHandle(UnsignedInteger.ZERO); + attach.setRole(Role.RECEIVER); + attach.setName(linkName); + attach.setInitialDeliveryCount(UnsignedInteger.ZERO); + transport.handleFrame(new TransportFrame(0, attach, null)); + + int credit = 10; + Flow flow = new Flow(); + flow.setHandle(UnsignedInteger.ZERO); + flow.setDeliveryCount(UnsignedInteger.ZERO); + flow.setNextIncomingId(UnsignedInteger.ONE); + flow.setNextOutgoingId(UnsignedInteger.ZERO); + flow.setIncomingWindow(UnsignedInteger.valueOf(1024)); + flow.setOutgoingWindow(UnsignedInteger.valueOf(1024)); + flow.setDrain(true); + flow.setLinkCredit(UnsignedInteger.valueOf(credit)); + + transport.handleFrame(new TransportFrame(0, flow, null)); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size()); + + assertTrue("Should not be in drain", sender.getDrain()); + assertEquals("Should have credit", credit, sender.getCredit()); + + // Cause the Close frame to be sent + connection.close(); + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size()); + assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Close); + + // Drain the credit and verify the transport doesn't + // send any Flow frame, as a Close frame was sent already. + int drained = sender.drained(); + assertEquals("Should have drained all credit", credit, drained); + + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size()); + } + + /** + * Verify that no Disposition frame is emitted by the Transport should a Delivery + * have disposition applied after the Close frame was sent. + */ + @Test + public void testDispositionAfterCloseSent() + { + MockTransportImpl transport = new MockTransportImpl(); + Connection connection = Proton.connection(); + transport.bind(connection); + + connection.open(); + + Session session = connection.session(); + session.open(); + + String linkName = "myReceiver"; + Receiver receiver = session.receiver(linkName); + receiver.flow(5); + receiver.open(); + + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size()); + + assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open); + assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin); + assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach); + assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Flow); + + Delivery delivery = receiver.current(); + assertNull("Should not yet have a delivery", delivery); + + // Send the necessary responses to open/begin/attach as well as a transfer + transport.handleFrame(new TransportFrame(0, new Open(), null)); + + Begin begin = new Begin(); + begin.setRemoteChannel(UnsignedShort.valueOf((short) 0)); + begin.setNextOutgoingId(UnsignedInteger.ONE); + begin.setIncomingWindow(UnsignedInteger.valueOf(1024)); + begin.setOutgoingWindow(UnsignedInteger.valueOf(1024)); + transport.handleFrame(new TransportFrame(0, begin, null)); + + Attach attach = new Attach(); + attach.setHandle(UnsignedInteger.ZERO); + attach.setRole(Role.SENDER); + attach.setName(linkName); + attach.setInitialDeliveryCount(UnsignedInteger.ZERO); + transport.handleFrame(new TransportFrame(0, attach, null)); + + String deliveryTag = "tag1"; + String messageContent = "content1"; + handleTransfer(transport, 1, deliveryTag, messageContent); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size()); + + delivery = verifyDelivery(receiver, deliveryTag, messageContent); + assertNotNull("Should now have a delivery", delivery); + + // Cause the Close frame to be sent + connection.close(); + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 5, transport.writes.size()); + assertTrue("Unexpected frame type", transport.writes.get(4) instanceof Close); + + delivery.disposition(Released.getInstance()); + delivery.settle(); + + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 5, transport.writes.size()); + } + + /** + * Verify that no Transfer frame is emitted by the Transport should a Delivery + * be sendable after the Close frame was sent. + */ + @Test + public void testTransferAfterCloseSent() + { + MockTransportImpl transport = new MockTransportImpl(); + + Connection connection = Proton.connection(); + transport.bind(connection); + + connection.open(); + + Collector collector = Collector.Factory.create(); + connection.collect(collector); + + Session session = connection.session(); + session.open(); + + String linkName = "mySender"; + Sender sender = session.sender(linkName); + sender.open(); + + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size()); + + assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open); + assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin); + assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach); + + // Send the necessary responses to open/begin/attach then give sender credit + transport.handleFrame(new TransportFrame(0, new Open(), null)); + + Begin begin = new Begin(); + begin.setRemoteChannel(UnsignedShort.valueOf((short) 0)); + transport.handleFrame(new TransportFrame(0, begin, null)); + + Attach attach = new Attach(); + attach.setHandle(UnsignedInteger.ZERO); + attach.setRole(Role.RECEIVER); + attach.setName(linkName); + attach.setInitialDeliveryCount(UnsignedInteger.ZERO); + transport.handleFrame(new TransportFrame(0, attach, null)); + + Flow flow = new Flow(); + flow.setHandle(UnsignedInteger.ZERO); + flow.setDeliveryCount(UnsignedInteger.ZERO); + flow.setNextIncomingId(UnsignedInteger.ONE); + flow.setNextOutgoingId(UnsignedInteger.ZERO); + flow.setIncomingWindow(UnsignedInteger.valueOf(1024)); + flow.setOutgoingWindow(UnsignedInteger.valueOf(1024)); + flow.setLinkCredit(UnsignedInteger.valueOf(10)); + + transport.handleFrame(new TransportFrame(0, flow, null)); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size()); + + // Cause the Close frame to be sent + connection.close(); + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size()); + assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Close); + + // Send a new message and verify the transport doesn't + // send any Transfer frame, as a Close frame was sent already. + sendMessage(sender, "tag1", "content1"); + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size()); + } + private void assertNoEvents(Collector collector) { assertEvents(collector); @@ -737,4 +1194,54 @@ public class TransportImplTest return delivery; } + + private void handleTransfer(TransportImpl transport, int deliveryNumber, String deliveryTag, String messageContent) + { + byte[] tag = deliveryTag.getBytes(StandardCharsets.UTF_8); + + Message m = Message.Factory.create(); + m.setBody(new AmqpValue(messageContent)); + + byte[] encoded = new byte[BUFFER_SIZE]; + int len = m.encode(encoded, 0, BUFFER_SIZE); + + assertTrue("given array was too small", len < BUFFER_SIZE); + + Transfer transfer = new Transfer(); + transfer.setDeliveryId(UnsignedInteger.valueOf(deliveryNumber)); + transfer.setHandle(UnsignedInteger.ZERO); + transfer.setDeliveryTag(new Binary(tag)); + transfer.setMessageFormat(UnsignedInteger.valueOf(DeliveryImpl.DEFAULT_MESSAGE_FORMAT)); + + transport.handleFrame(new TransportFrame(0, transfer, new Binary(encoded, 0, len))); + } + + private Delivery verifyDelivery(Receiver receiver, String deliveryTag, String messageContent) + { + Delivery delivery = receiver.current(); + + assertTrue(Arrays.equals(deliveryTag.getBytes(StandardCharsets.UTF_8), delivery.getTag())); + + assertNull(delivery.getLocalState()); + assertNull(delivery.getRemoteState()); + + assertFalse(delivery.isPartial()); + assertTrue(delivery.isReadable()); + + byte[] received = new byte[BUFFER_SIZE]; + int len = receiver.recv(received, 0, BUFFER_SIZE); + + assertTrue("given array was too small", len < BUFFER_SIZE); + + Message m = Proton.message(); + m.decode(received, 0, len); + + Object messageBody = ((AmqpValue)m.getBody()).getValue(); + assertEquals("Unexpected message content", messageContent, messageBody); + + boolean receiverAdvanced = receiver.advance(); + assertTrue("receiver has not advanced", receiverAdvanced); + + return delivery; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
