PROTON-1190: prevent attach/flow frames being sent for an opened link when the session isnt open, i.e has no local channel number assigned
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/9c7bdbd0 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/9c7bdbd0 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/9c7bdbd0 Branch: refs/heads/master Commit: 9c7bdbd07a9b0d633ad74584ca8d279168354bb5 Parents: 1505c3d Author: Robert Gemmell <[email protected]> Authored: Wed May 4 17:47:12 2016 +0100 Committer: Robert Gemmell <[email protected]> Committed: Wed May 4 17:53:04 2016 +0100 ---------------------------------------------------------------------- .../qpid/proton/engine/impl/TransportImpl.java | 10 +- .../proton/engine/impl/TransportImplTest.java | 135 +++++++++++++++++++ 2 files changed, 140 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c7bdbd0/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java index ca3d465..b9bf1a4 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java @@ -685,7 +685,7 @@ public class TransportImpl extends EndpointImpl TransportLink<?> transportLink = getTransportState(receiver); TransportSession transportSession = getTransportState(receiver.getSession()); - if(receiver.getLocalState() == EndpointState.ACTIVE) + if(receiver.getLocalState() == EndpointState.ACTIVE && transportSession.isLocalChannelSet()) { int credits = receiver.clearUnsentCredits(); if(credits != 0 || receiver.getDrain() || @@ -707,7 +707,7 @@ public class TransportImpl extends EndpointImpl SessionImpl session = (SessionImpl) endpoint; TransportSession transportSession = getTransportState(session); - if(session.getLocalState() == EndpointState.ACTIVE) + if(session.getLocalState() == EndpointState.ACTIVE && transportSession.isLocalChannelSet()) { if(transportSession.getIncomingWindowSize().equals(UnsignedInteger.ZERO)) { @@ -733,15 +733,15 @@ public class TransportImpl extends EndpointImpl LinkImpl link = (LinkImpl) endpoint; TransportLink<?> transportLink = getTransportState(link); - if(link.getLocalState() != EndpointState.UNINITIALIZED && !transportLink.attachSent()) + SessionImpl session = link.getSession(); + TransportSession transportSession = getTransportState(session); + if(link.getLocalState() != EndpointState.UNINITIALIZED && !transportLink.attachSent() && transportSession.isLocalChannelSet()) { if( (link.getRemoteState() == EndpointState.ACTIVE && !transportLink.isLocalHandleSet()) || link.getRemoteState() == EndpointState.UNINITIALIZED) { - SessionImpl session = link.getSession(); - TransportSession transportSession = getTransportState(session); UnsignedInteger localHandle = transportSession.allocateLocalHandle(transportLink); if(link.getRemoteState() == EndpointState.UNINITIALIZED) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c7bdbd0/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java index 69b46cf..d7cec11 100644 --- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java +++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java @@ -44,6 +44,7 @@ import org.apache.qpid.proton.amqp.messaging.Released; import org.apache.qpid.proton.amqp.transport.Attach; import org.apache.qpid.proton.amqp.transport.Begin; import org.apache.qpid.proton.amqp.transport.Close; +import org.apache.qpid.proton.amqp.transport.End; import org.apache.qpid.proton.amqp.transport.Flow; import org.apache.qpid.proton.amqp.transport.FrameBody; import org.apache.qpid.proton.amqp.transport.Open; @@ -471,6 +472,140 @@ public class TransportImplTest } /* + * No attach frame should be written before the Session begin is sent. + */ + @Test + public void testOpenReceiverBeforeOpenSession() + { + doOpenLinkBeforeOpenSessionTestImpl(true); + } + + /* + * No attach frame should be written before the Session begin is sent. + */ + @Test + public void testOpenSenderBeforeOpenSession() + { + doOpenLinkBeforeOpenSessionTestImpl(false); + } + + void doOpenLinkBeforeOpenSessionTestImpl(boolean receiverLink) + { + MockTransportImpl transport = new MockTransportImpl(); + Connection connection = Proton.connection(); + transport.bind(connection); + + // Open the connection + connection.open(); + + // Create but don't open the session + Session session = connection.session(); + + // Open the link + Link link = null; + if(receiverLink) + { + link = session.receiver("myReceiver"); + } + else + { + link = session.sender("mySender"); + } + link.open(); + + pumpMockTransport(transport); + + // Expect only an Open frame, no attach should be sent as the session isn't open + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 1, transport.writes.size()); + assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open); + + // Now open the session, expect the Begin + session.open(); + + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size()); + assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open); + assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin); + // Note: an Attach wasn't sent because link is no longer 'modified' after earlier pump. It + // could easily be argued it should, given how the engine generally handles things. Seems + // unlikely to be of much real world concern. + //assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach); + } + + /* + * Verify that no Attach frame is emitted by the Transport should a Receiver + * be opened after the session End frame was sent. + */ + @Test + public void testReceiverAttachAfterEndSent() + { + doLinkAttachAfterEndSentTestImpl(true); + } + + /* + * Verify that no Attach frame is emitted by the Transport should a Sender + * be opened after the session End frame was sent. + */ + @Test + public void testSenderAttachAfterEndSent() + { + doLinkAttachAfterEndSentTestImpl(false); + } + + void doLinkAttachAfterEndSentTestImpl(boolean receiverLink) + { + MockTransportImpl transport = new MockTransportImpl(); + Connection connection = Proton.connection(); + transport.bind(connection); + + connection.open(); + + Session session = connection.session(); + session.open(); + + Link link = null; + if(receiverLink) + { + link = session.receiver("myReceiver"); + } + else + { + link = session.sender("mySender"); + } + + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size()); + + assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open); + assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin); + + // Send the necessary responses to open/begin + transport.handleFrame(new TransportFrame(0, new Open(), null)); + + Begin begin = new Begin(); + begin.setRemoteChannel(UnsignedShort.valueOf((short) 0)); + transport.handleFrame(new TransportFrame(0, begin, null)); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size()); + + // Cause a End frame to be sent + session.close(); + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size()); + assertTrue("Unexpected frame type", transport.writes.get(2) instanceof End); + + // Open the link and verify the transport doesn't + // send any Attach frame, as an End frame was sent already. + link.open(); + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size()); + } + + /* * No frames should be written until the Connection object is * opened, at which point the Open and Begin frames should * be pipelined together. --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
