Repository: qpid-proton Updated Branches: refs/heads/master 3a034abbd -> fca557871
PROTON-1100: protect against sending various frame types before the Open frame has been sent if the Connection object wasnt opened Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/fca55787 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/fca55787 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/fca55787 Branch: refs/heads/master Commit: fca557871e7cfda7cca2d5f1ffadbe4264e319e5 Parents: 3a034ab Author: Robert Gemmell <[email protected]> Authored: Wed Jan 20 15:58:00 2016 +0000 Committer: Robert Gemmell <[email protected]> Committed: Wed Jan 20 16:08:40 2016 +0000 ---------------------------------------------------------------------- .../qpid/proton/engine/impl/TransportImpl.java | 12 +- .../proton/engine/impl/TransportImplTest.java | 151 ++++++++++++++++++- 2 files changed, 155 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fca55787/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java index 7af8e82..6d3e83f 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java @@ -385,7 +385,7 @@ public class TransportImpl extends EndpointImpl private void processDetach() { - if(_connectionEndpoint != null) + if(_connectionEndpoint != null && _isOpenSent) { EndpointImpl endpoint = _connectionEndpoint.getTransportHead(); while(endpoint != null) @@ -458,7 +458,7 @@ public class TransportImpl extends EndpointImpl private void processSenderFlow() { - if(_connectionEndpoint != null) + if(_connectionEndpoint != null && _isOpenSent) { EndpointImpl endpoint = _connectionEndpoint.getTransportHead(); while(endpoint != null) @@ -673,7 +673,7 @@ public class TransportImpl extends EndpointImpl private void processReceiverFlow() { - if(_connectionEndpoint != null) + if(_connectionEndpoint != null && _isOpenSent) { EndpointImpl endpoint = _connectionEndpoint.getTransportHead(); while(endpoint != null) @@ -721,7 +721,7 @@ public class TransportImpl extends EndpointImpl private void processAttach() { - if(_connectionEndpoint != null) + if(_connectionEndpoint != null && _isOpenSent) { EndpointImpl endpoint = _connectionEndpoint.getTransportHead(); @@ -841,7 +841,7 @@ public class TransportImpl extends EndpointImpl private void processBegin() { - if(_connectionEndpoint != null) + if(_connectionEndpoint != null && _isOpenSent) { EndpointImpl endpoint = _connectionEndpoint.getTransportHead(); while(endpoint != null) @@ -922,7 +922,7 @@ public class TransportImpl extends EndpointImpl private void processEnd() { - if(_connectionEndpoint != null) + if(_connectionEndpoint != null && _isOpenSent) { EndpointImpl endpoint = _connectionEndpoint.getTransportHead(); while(endpoint != null) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fca55787/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java index bcf1ef9..a2e2c78 100644 --- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java +++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java @@ -39,6 +39,9 @@ import org.apache.qpid.proton.amqp.transport.FrameBody; import org.apache.qpid.proton.amqp.transport.Open; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.EndpointState; +import org.apache.qpid.proton.engine.Link; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.engine.Session; import org.apache.qpid.proton.engine.Transport; import org.apache.qpid.proton.engine.TransportException; import org.apache.qpid.proton.framing.TransportFrame; @@ -290,7 +293,7 @@ public class TransportImplTest open.setIdleTimeOut(new UnsignedInteger(4000)); TransportFrame openFrame = new TransportFrame(CHANNEL_ID, open, null); transport.handleFrame(openFrame); - while(transport.pending() > 0) transport.pop(transport.head().remaining()); + pumpMockTransport(transport); long deadline = transport.tick(0); assertEquals("Expected to be returned a deadline of 2000", 2000, deadline); // deadline = 4000 / 2 @@ -329,7 +332,7 @@ public class TransportImplTest transport.handleFrame(TRANSPORT_FRAME_OPEN); connection.open(); - while(transport.pending() > 0) transport.pop(transport.head().remaining()); + pumpMockTransport(transport); long deadline = transport.tick(0); assertEquals("Expected to be returned a deadline of 4000", 4000, deadline); @@ -360,4 +363,148 @@ public class TransportImplTest deadline = transport.tick(7000); assertEquals("Calling tick() after the deadline should result in the connection being closed", EndpointState.CLOSED, connection.getLocalState()); } + + /* + * No frames should be written until the Connection object is + * opened, at which point the Open, and Begin frames should + * be pipelined together. + */ + @Test + public void testOpenSessionBeforeOpenConnection() + { + MockTransportImpl transport = new MockTransportImpl(); + Connection connection = Proton.connection(); + transport.bind(connection); + + Session session = connection.session(); + session.open(); + + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 0, transport.writes.size()); + + connection.open(); + + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size()); + + assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open); + assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin); + } + + /* + * No frames should be written until the Connection object is + * opened, at which point the Open, Begin, and Attach frames + * should be pipelined together. + */ + @Test + public void testOpenReceiverBeforeOpenConnection() + { + doOpenLinkBeforeOpenConnectionTestImpl(true); + } + + /** + * No frames should be written until the Connection object is + * opened, at which point the Open, Begin, and Attach frames + * should be pipelined together. + */ + @Test + public void testOpenSenderBeforeOpenConnection() + { + doOpenLinkBeforeOpenConnectionTestImpl(false); + } + + void doOpenLinkBeforeOpenConnectionTestImpl(boolean receiverLink) + { + MockTransportImpl transport = new MockTransportImpl(); + Connection connection = Proton.connection(); + transport.bind(connection); + + Session session = connection.session(); + session.open(); + + Link link = null; + if(receiverLink) + { + link = session.receiver("myReceiver"); + } + else + { + link = session.sender("mySender"); + } + link.open(); + + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 0, transport.writes.size()); + + // Now open the connection, expect the Open, Begin, and Attach frames + connection.open(); + + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size()); + + assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open); + assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin); + assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach); + } + + /* + * No frames should be written until the Connection object is + * opened, at which point the Open and Begin frames should + * be pipelined together. + */ + @Test + public void testReceiverFlowWithoutOpen() + { + MockTransportImpl transport = new MockTransportImpl(); + Connection connection = Proton.connection(); + transport.bind(connection); + + Session session = connection.session(); + session.open(); + + Receiver reciever = session.receiver("myReceiver"); + reciever.flow(5); + + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 0, transport.writes.size()); + + // Now open the connection, expect the Open and Begin frames but + // nothing else as we haven't opened the receiver itself yet. + connection.open(); + + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size()); + + assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open); + assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin); + } + + private void pumpMockTransport(MockTransportImpl transport) + { + while(transport.pending() > 0) + { + transport.pop(transport.head().remaining()); + } + } + + private String getFrameTypesWritten(MockTransportImpl transport) + { + String result = ""; + for(FrameBody f : transport.writes) { + result += f.getClass().getSimpleName(); + result += ","; + } + + if(result.isEmpty()) { + return "no-frames-written"; + } else { + return result; + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
